Source code for utils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu May  5 23:28:03 2022

@author: hill103

this script stores utils functions
"""



import os
import numpy as np
import pandas as pd
from config import print, diagnosis_path
import scanpy as sc
sc.settings.verbosity = 0  # verbosity: errors (0), warnings (1), info (2), hints (3)



[docs] def calcRMSE(truth, predicted): ''' calculate RMSE Parameters ---------- truth : 1-D numpy array true values. predicted : 1-D numpy array predictions. Returns ------- float RMSE ''' return np.sqrt(((predicted - truth) ** 2).mean())
[docs] def reportRMSE(true_theta, pred_theta): ''' calculate the RMSE of theta (celltype proportion) across all spots we first calculate the RMSE of each spot, then calculate the MEAN of RMSE values across all spots Parameters ---------- true_theta : 2-D numpy array (spots * celltypes) true values. pred_theta : 2-D numpy array (spots * celltypes) predictions. Returns ------- float RMSE across all spots ''' return np.array([calcRMSE(true_theta[i,], pred_theta[i,]) for i in range(true_theta.shape[0])]).mean()
[docs] def reparameterTheta(theta, e_alpha): ''' re-parametrization w = e^alpha * theta Parameters ---------- theta : 3-D numpy array (#spot * #celltype * 1) theta (celltype proportion). e_alpha : 1-D numpy array e^alpha (spot-specific effect). Returns ------- w : 3-D numpy array (#spot * #celltype * 1) re-parametrization w = e^alpha * theta. ''' return e_alpha[:, None, None] * theta
[docs] def read_spatial_data(spatial_file, filter_gene, n_hv_gene=0): ''' read spatial data saved as a CSV file by Scanpy Parameters ---------- spatial_file : string full path of input csv file of raw nUMI counts in spatial transcriptomic data (spots * genes). filter_gene : bool whether to filter genes before DE. n_hv_gene : int number of highly variable genes to be kept in spatial data. If equals 0, all genes are kept. Returns ------- spatial_spot_obj : a AnnData object AnnData object of spatial spot data. tmp_df : dataframe dataframe of raw nUMI of spatial data (spots * genes). N : series sequencing depth per spot. ''' # Read spatial spot-level data spatial_spot_obj = sc.read_csv(spatial_file) print(f'read spatial data from file {spatial_file}') print(f'total {spatial_spot_obj.n_obs} spots; {spatial_spot_obj.n_vars} genes\n') # check whether cell name and gene name are unique if len(set(spatial_spot_obj.obs_names.tolist())) < spatial_spot_obj.n_obs: raise Exception('spot barcodes in spatial data are not unique!') if len(set(spatial_spot_obj.var_names.tolist())) < spatial_spot_obj.n_vars: raise Exception('gene names in spatial data are not unique!') # note for spatial data, we do not filter out spots if filter_gene: # Remove genes present in <3 cells pre_n_gene = spatial_spot_obj.n_vars sc.pp.filter_genes(spatial_spot_obj, min_cells=3) if pre_n_gene > spatial_spot_obj.n_vars: print(f'filtering genes present in <3 spots: {pre_n_gene-spatial_spot_obj.n_vars} genes removed\n') else: print('filtering genes present in <3 spots: No genes removed\n') # make a DEEP COPY of raw nUMI count spatial_spot_obj.layers['raw_nUMI'] = spatial_spot_obj.X.copy() # calculate sequencing depth per cell, note currently X is nUMI, we make a deep copy to avoid dataframe change tmp_df = spatial_spot_obj.to_df().copy() N = tmp_df.sum(axis=1) # sum also works on sparse dataframe # Normalize each cell by total counts over ALL genes sc.pp.normalize_total(spatial_spot_obj, target_sum=1, inplace=True) # identify highly variable genes in spatial data, select TOP X HV genes # no need to consider highly variable genes in spatial data, as for cell-type deconvolution, we work on each spot independently if n_hv_gene >= spatial_spot_obj.n_vars: print(f'\n[WARNING] use all {spatial_spot_obj.n_vars} genes for downstream analysis as available genes in spatial data <= specified highly varabile gene number {n_hv_gene}') elif n_hv_gene > 0: print(f'\n[WARNING] identify {n_hv_gene} highly variable genes from spatial data and keep those genes only...') spatial_hv_genes = sc.pp.highly_variable_genes(spatial_spot_obj, layer='raw_nUMI', flavor='seurat_v3', n_top_genes=n_hv_gene, inplace=False) spatial_hv_genes = spatial_hv_genes.loc[spatial_hv_genes['highly_variable']==True].index.to_list() spatial_spot_obj = spatial_spot_obj[:, spatial_hv_genes].copy() tmp_df = tmp_df[spatial_hv_genes] # after sequencing depth calculation and normalization we remove mitochondrial genes non_mt_genes = [gene for gene in spatial_spot_obj.var_names if not gene.startswith('MT-') ] n_mt_genes = spatial_spot_obj.n_vars - len(non_mt_genes) print(f'filtering {n_mt_genes} mitochondrial genes\n') if n_mt_genes > 0: # the same filtering will be applied to all layers spatial_spot_obj = spatial_spot_obj[:, non_mt_genes].copy() tmp_df = tmp_df[non_mt_genes] print(f'finally remain {spatial_spot_obj.n_obs} spots; {spatial_spot_obj.n_vars} genes for downstream analysis\n') return spatial_spot_obj, tmp_df, N
[docs] def read_scRNA_data(ref_file, ref_anno_file, filter_cell, filter_gene): ''' read scRNA-seq data saved as a CSV file by Scanpy, also read cell-type annotation, then subset cells with cell-type annotation Parameters ---------- ref_file : string full path of input csv file of raw nUMI counts in scRNA-seq data (cells * genes). ref_anno_file : string full path of input csv file of cell-type annotations for all cells in scRNA-seq data. filter_cell : bool whether to filter cells before DE. filter_gene : bool whether to filter genes before DE. Returns ------- a AnnData object ''' # Read scRNA cell-level data and cell-type annotation scrna_obj = sc.read_csv(ref_file) print(f'read scRNA-seq data from file {ref_file}') print(f'total {scrna_obj.n_obs} cells; {scrna_obj.n_vars} genes') # check whether cell name and gene name are unique if len(set(scrna_obj.obs_names.tolist())) < scrna_obj.n_obs: raise Exception('spot barcodes in spatial data are not unique!') if len(set(scrna_obj.var_names.tolist())) < scrna_obj.n_vars: raise Exception('gene names in spatial data are not unique!') scrna_celltype = pd.read_csv(ref_anno_file, index_col=0) print(f'read scRNA-seq cell-type annotation from file {ref_anno_file}') print(f'total {len(set(scrna_celltype.iloc[:,0]))} cell-types') # check whether cell name are unique if len(set(scrna_celltype.index.to_list())) < scrna_celltype.shape[0]: raise Exception('cell barcodes in scRNA-seq cell-type annotation are not unique!') # check overlap of cells in gene expression and cell-type annotation overlap_cells = sorted(list(set(scrna_celltype.index.to_list()) & set(scrna_obj.obs_names))) if len(overlap_cells) < scrna_celltype.shape[0]: print(f'[WARNING] {scrna_celltype.shape[0]-len(overlap_cells)} cells in cell-type annotation but not found in nUMI matrix') # only keep cells with cell-type annotations scrna_obj = scrna_obj[overlap_cells, ].copy() assert((scrna_obj.obs_names == overlap_cells).all()) print(f'subset cells with cell-type annotation, finally keep {scrna_obj.n_obs} cells; {scrna_obj.n_vars} genes\n') # add cell-type annotation to metadata scrna_celltype = scrna_celltype.loc[overlap_cells, :] assert((scrna_obj.obs_names == scrna_celltype.index).all()) scrna_obj.obs['celltype'] = pd.Categorical(scrna_celltype.iloc[:,0]) # Categoricals are preferred for efficiency if filter_cell: # Remove cells with <200 genes pre_n_cell = scrna_obj.n_obs sc.pp.filter_cells(scrna_obj, min_genes=200) if pre_n_cell > scrna_obj.n_obs: print(f'filtering cells with <200 genes: {pre_n_cell-scrna_obj.n_obs} cells removed\n') else: print('filtering cells with <200 genes: No cells removed\n') if filter_gene: # Remove genes present in <10 cells pre_n_gene = scrna_obj.n_vars sc.pp.filter_genes(scrna_obj, min_cells=10) if pre_n_gene > scrna_obj.n_vars: print(f'filtering genes present in <10 cells: {pre_n_gene-scrna_obj.n_vars} genes removed\n') else: print('filtering genes present in <10 cells: No genes removed\n') # make a DEEP COPY of raw nUMI count scrna_obj.layers['raw_nUMI'] = scrna_obj.X.copy() # Normalize each cell by total counts over ALL genes sc.pp.normalize_total(scrna_obj, target_sum=1, inplace=True) # after sequencing depth calculation and normalization we remove mitochondrial genes non_mt_genes = [gene for gene in scrna_obj.var_names if not gene.startswith('MT-') ] n_mt_genes = scrna_obj.n_vars - len(non_mt_genes) print(f'filtering {n_mt_genes} mitochondrial genes\n') if n_mt_genes > 0: # the same filtering will be applied to all layers scrna_obj = scrna_obj[:, non_mt_genes].copy() print(f'finally remain {scrna_obj.n_obs} cells; {scrna_obj.n_vars} genes for downstream analysis\n') return scrna_obj
[docs] def run_DE(sc_obj, n_marker_per_cmp, use_fdr, p_val_cutoff, fc_cutoff, pct1_cutoff, pct2_cutoff, sortby_fc, save_result=False, save_file_name=None): ''' differential on cell-types in scRNA-seq data. we compare each cell-type with another one cell-type at a time. only choose TOP X marker genes for one comparison with one cell-type vs another one cell-type, with filtering (the FDR adjusted p value <= 0.05 + fold change >= 1.2 + pct.1 >= 0.3 + pct.2 <= 0.1, and sorting by fold change (by default). Then combine all marker genes from all comparisons. Note: the genes in object are overlapped genes with spatial data only. cell-type annotation is saved in object metadata <celltype> gene expression in AnnData object has already been normalized by sequencing depth Parameters ---------- sc_obj : AnnData object scRNA-seq data object. n_marker_per_cmp : int number of TOP marker genes for each comparison in DE use_fdr : bool whether to use FDR adjusted p value for filtering and sorting p_val_cutoff : float threshold of p value (or FDR if --use_fdr is true) in marker genes filtering fc_cutoff : float threshold of fold change (without log transform!) in marker genes filtering pct1_cutoff : float threshold of pct.1 in marker genes filtering pct2_cutoff : float threshold of pct.2 in marker genes filtering sortby_fc : bool whether to sort marker genes by fold change save_result : bool if true, save dataframe of DE result to csv file save_file_name : string file name (without path) for saving DE result Returns ------- marker_gene_list : list identified cell-type specific marker gene list ''' def calc_pct(sc_obj, celltype): ''' calculate pct of genes expressed in cells within one celltype. raw nUMI count saved in layer <raw_nUMI> Parameters ---------- sc_obj : AnnData object scRNA-seq data object. celltype : string name of one cell-type. Returns ------- pct_df : Series with genes as index A Series including genes and corresponding pcts. ''' sub_obj = sc_obj[sc_obj.obs['celltype'] == celltype] # get raw nUMI count (cells * genes) sub_df = sc.get.obs_df(sub_obj, layer='raw_nUMI', keys=sub_obj.var_names.to_list()) assert sub_df.shape[0] > 0, f'Error! there is no cell for cell-type `{celltype}`' # column sum divided by number of rows return ((sub_df>0).sum(axis=0)) / (sub_df.shape[0]) print('Differential analysis across cell-types on scRNA-seq data...') celltypes = sorted(list(set(sc_obj.obs['celltype']))) scrna_marker_genes = list() de_result_list = [] if use_fdr: pval_col = 'pvals_adj' else: pval_col = 'pvals' # first calculate pct for each cell-type pct_dict = {} for this_celltype in celltypes: pct_dict[this_celltype] = calc_pct(sc_obj, this_celltype) # perform test for t, this_celltype in enumerate(celltypes): print(f'{t/len(celltypes):.0%}...', end='') for other_celltype in celltypes: if this_celltype == other_celltype: continue # compare one cell-type against another cell-type sc.tl.rank_genes_groups(sc_obj, groupby='celltype', use_raw=False, corr_method='benjamini-hochberg', method='wilcoxon', groups=[this_celltype], reference=other_celltype) tmp_df = sc.get.rank_genes_groups_df(sc_obj, group=None) # add pcts tmp_df = tmp_df.merge(pct_dict[this_celltype].rename('pct1'), left_on='names', right_index=True, validate='one_to_one') tmp_df = tmp_df.merge(pct_dict[other_celltype].rename('pct2'), left_on='names', right_index=True, validate='one_to_one') # filter genes tmp_df = tmp_df.loc[(tmp_df[pval_col]<=p_val_cutoff) & (tmp_df['logfoldchanges']>=np.log2(fc_cutoff)) & (tmp_df['pct1']>=pct1_cutoff) & (tmp_df['pct2']<=pct2_cutoff)] # add cell-types tmp_df['celltype1'] = this_celltype tmp_df['celltype2'] = other_celltype if tmp_df.shape[0] <= n_marker_per_cmp: if tmp_df.shape[0] < n_marker_per_cmp: print(f'\n[WARNING] only {tmp_df.shape[0]} genes passing filtering (<{n_marker_per_cmp}) for {this_celltype} vs {other_celltype}') # no need to further rank, directly select all available genes scrna_marker_genes += tmp_df['names'].to_list() # combine DE result tmp_df['selected'] = 1 else: ''' # rank of pct.1/pct.2 tmp_df['pct_divide'] = tmp_df['pct1'] / tmp_df['pct2'] tmp_df.sort_values(by='pct_divide', ascending=False, inplace=True) tmp_df['pct_rank'] = range(tmp_df.shape[0]) # rank of log fold change tmp_df.sort_values(by='logfoldchanges', ascending=False, inplace=True) tmp_df['logfc_rank'] = range(tmp_df.shape[0]) tmp_df['comb_rank'] = tmp_df['pct_rank'] + tmp_df['logfc_rank'] tmp_df.sort_values(by=['comb_rank', 'logfoldchanges'], ascending=[True, False], inplace=True) ''' # sort by fold change or p value if sortby_fc: tmp_df.sort_values(by=['logfoldchanges', pval_col], ascending=[False, True], inplace=True) else: tmp_df.sort_values(by=[pval_col, 'logfoldchanges'], ascending=[True, False], inplace=True) # select top X marker genes scrna_marker_genes += tmp_df['names'].to_list()[:n_marker_per_cmp] # combine DE result tmp_df['selected'] = 0 tmp_df.loc[tmp_df.index.to_list()[:n_marker_per_cmp], 'selected'] = 1 tmp_df.rename(columns={'names': 'gene'}, inplace=True) de_result_list.append(tmp_df.loc[:, ['gene', 'logfoldchanges', 'pvals', 'pvals_adj', 'pct1', 'pct2', 'celltype1', 'celltype2', 'selected']].copy()) print('100%') scrna_marker_genes = sorted(list(set(scrna_marker_genes))) print(f'finally selected {len(scrna_marker_genes)} cell-type marker genes\n') if save_result: os.makedirs(os.path.join(diagnosis_path, 'celltype_markers'), exist_ok=True) pd.concat(de_result_list).to_csv(os.path.join(diagnosis_path, 'celltype_markers', save_file_name)+'.gz', index=False, compression='gzip') return scrna_marker_genes
[docs] def run_DE_only(ref_file, ref_anno_file, spatial_genes, n_marker_per_cmp, use_fdr, p_val_cutoff, fc_cutoff, pct1_cutoff, pct2_cutoff, sortby_fc, save_result=False, filter_cell=True, filter_gene=True): ''' read scRNA-seq raw nUMI and cell-type annotation, then perform DE analysis. Note: the genes in scRNA-seq data need to be subsetted to overlapped genes with spatial data only. Parameters ---------- ref_file : string full path of input csv file of raw nUMI counts in scRNA-seq data (cells * genes). ref_anno_file : string full path of input csv file of cell-type annotations for all cells in scRNA-seq data. spatial_genes : list genes included in spatial dataset. n_marker_per_cmp : int number of TOP marker genes for each comparison in DE use_fdr : bool whether to use FDR adjusted p value for filtering and sorting p_val_cutoff : float threshold of p value (or FDR if --use_fdr is true) in marker genes filtering fc_cutoff : float threshold of fold change (without log transform!) in marker genes filtering pct1_cutoff : float threshold of pct.1 in marker genes filtering pct2_cutoff : float threshold of pct.2 in marker genes filtering sortby_fc : bool whether to sort marker genes by fold change save_result : bool if true, save dataframe of DE result to csv file filter_cell : bool whether to filter cells before DE filter_gene : bool whether to filter genes before DE Returns ------- scrna_obj : AnnData object a AnnData object for scRNA-seq data marker_gene_profile : DataFrame average gene expressions of identified cell-type specific marker genes from refer scRNA-seq data ''' scrna_obj = read_scRNA_data(ref_file, ref_anno_file, filter_cell, filter_gene) # subset genes overlap_genes = list(set(spatial_genes).intersection(set(scrna_obj.var_names))) print(f'get {len(overlap_genes)} overlapped genes between spatial data and reference scRNA-seq data\n') #if len(overlap_genes) < len(spatial_genes): #print(f'{len(spatial_genes)-len(overlap_genes)} genes in spatial data but not found in scRNA-seq data: {", ".join(set(spatial_genes).difference(set(overlap_genes)))}\n') scrna_obj = scrna_obj[:, overlap_genes].copy() # DE marker_genes = run_DE(scrna_obj, n_marker_per_cmp, use_fdr, p_val_cutoff, fc_cutoff, pct1_cutoff, pct2_cutoff, sortby_fc, save_result, 'DE_celltype_markers.csv') # generate average gene expressions (gene signature) for cell-types based on normalized values tmp_df = sc.get.obs_df(scrna_obj, keys=marker_genes) tmp_df['celltype'] = scrna_obj.obs['celltype'] tmp_df = tmp_df.groupby(['celltype']).mean() return scrna_obj, tmp_df
[docs] def rerun_DE(scRNA_df, scRNA_celltype, n_marker_per_cmp, use_fdr, p_val_cutoff, fc_cutoff, pct1_cutoff, pct2_cutoff, sortby_fc, save_result=False, filter_gene=True): ''' rerun DE on CVAE transformed scRNA-seq data genes are only overlapped genes between spatial and scRNA-seq data gene expression values are already normalized by sequencing depth Parameters ---------- scRNA_df : dataframe normalized gene expression after CVAE tranforming on scRNA-seq data (cells * genes). scRNA_celltype : dataframe cell-type annotations for cells in scRNA-seq data. Only 1 column named <celltype> n_marker_per_cmp : int number of TOP marker genes for each comparison in DE use_fdr : bool whether to use FDR adjusted p value for filtering and sorting p_val_cutoff : float threshold of p value (or FDR if --use_fdr is true) in marker genes filtering fc_cutoff : float threshold of fold change (without log transform!) in marker genes filtering pct1_cutoff : float threshold of pct.1 in marker genes filtering pct2_cutoff : float threshold of pct.2 in marker genes filtering sortby_fc : bool whether to sort marker genes by fold change save_result : bool if true, save dataframe of DE result to csv file filter_gene : bool whether to filter genes before DE. Returns ------- marker_gene_list : list a list of cell-type specific marker genes based on DE on CVAE tranformed gene expressions. ''' # first build a AnnData object and replace the normalized data with CVAE transformed gene expressions scrna_obj = sc.AnnData(scRNA_df) # make a DEEP COPY of raw nUMI count, though it's actually normalized values scrna_obj.layers['raw_nUMI'] = scrna_obj.X.copy() assert((scrna_obj.obs_names == scRNA_celltype.index).all()) # add cell-type annotation to metadata scrna_obj.obs['celltype'] = pd.Categorical(scRNA_celltype.iloc[:,0]) # Categoricals are preferred for efficiency if filter_gene: # Remove genes present in <10 cells pre_n_gene = scrna_obj.n_vars sc.pp.filter_genes(scrna_obj, min_cells=10) if pre_n_gene > scrna_obj.n_vars: print(f'filtering genes present in <10 cells: {pre_n_gene-scrna_obj.n_vars} genes removed\n') else: print('filtering genes present in <10 cells: No genes removed\n') # do not normalize by sequencing depth as it's already normalized # so directly run DE on values in AnnData.X return run_DE(scrna_obj, n_marker_per_cmp, use_fdr, p_val_cutoff, fc_cutoff, pct1_cutoff, pct2_cutoff, sortby_fc, save_result, 'redo_DE_celltype_markers.csv')
# check total size of a Python object such as a Dictionary # ref https://code.activestate.com/recipes/577504/
[docs] def total_size(o, handlers={}, verbose=False): """ Returns the approximate memory footprint of an object and all of its contents. Automatically finds the contents of several built-in containers and their subclasses, including tuple, list, deque, dict, set, and frozenset. To search other containers, add handlers that iterate over their contents. Parameters ---------- o : object The object whose memory footprint is to be calculated. handlers : dict, optional A dictionary of handler functions keyed by container types. These handlers are used to iterate over container contents. Defaults to an empty dictionary. verbose : bool, optional If True, the function prints more information about what it's doing. Defaults to False. Returns ------- obj_size : int An approximation of the total memory footprint of the object in bytes. """ from sys import getsizeof, stderr from itertools import chain from collections import deque try: from reprlib import repr except ImportError: pass dict_handler = lambda d: chain.from_iterable(d.items()) all_handlers = {tuple: iter, list: iter, deque: iter, dict: dict_handler, set: iter, frozenset: iter, } all_handlers.update(handlers) # user handlers take precedence seen = set() # track which object id's have already been seen default_size = getsizeof(0) # estimate sizeof object without __sizeof__ def sizeof(o): if id(o) in seen: # do not double count the same object return 0 seen.add(id(o)) s = getsizeof(o, default_size) if verbose: print(s, type(o), repr(o), file=stderr) for typ, handler in all_handlers.items(): if isinstance(o, typ): s += sum(map(sizeof, handler(o))) break return s return sizeof(o)
[docs] def check_decoder(cvae, decoder, data, labels): ''' since we first create a decoder then update its weights based on the corresponding weights in CVAE, we need to double check the weights are updated correctly, and the decoded output matchs the CVAE output Parameters ---------- cvae : Keras model already trained CVAE model decoder : Keras model a separated decoder whose weights are already updated, i.e. it should give the same decoded output with CVAE data : 2-D numpy array data used for checking decoder (columns are genes, rows are cells, spatial spots or pseudo-spots) labels : 1-D numpy array corresponding conditional variables for each row in data Returns ------- None. ''' from tensorflow.keras.models import Model # a tmp model to get the embedding after sampling and decoder output at the same time tmp_model = Model([cvae.get_layer('encoder_input').input, cvae.get_layer('cond_input').input], [cvae.get_layer('z').output, cvae.get_layer('decoder_output_act').output], name='tmp_model') # the preditions of embedding and decoder output [tmp_embedding, tmp_output] = tmp_model.predict([data, labels]) # feed the embedding to the new decoder tmp_output2 = decoder.predict([tmp_embedding, labels]) # are them the same? tmp = np.all((tmp_output-tmp_output2)<1e-12) assert(tmp==True)
[docs] def numba_info(): ''' this function checks the Numba information, especially the Threading Layer Information It turns out TBB or OMP gives similiar running speed Parameters ---------- None. Returns ------- None. ''' from numba.misc import numba_sysinfo info = numba_sysinfo.get_sysinfo() # returns a dict of sections -> lines print('\nchecking Numba Threading Layer Options') # If you want a boolean for each layer: tbb_ok = info['TBB Threading'] print(f"TBB Threading Layer Available : {tbb_ok}") omp_ok = info['OpenMP Threading'] print(f"OpenMP Threading Layer Available : {omp_ok}") workqueue_ok = info['Workqueue Threading'] print(f"Workqueue Threading Layer Available : {workqueue_ok}") ''' if tbb_ok: os.environ.setdefault("NUMBA_THREADING_LAYER", "tbb") # choose TBB print("[HIGHLIGHT] Set Threading Layer to TBB") else: print("[WARNING] TBB not found. Install TBB beforehand; runtime may increase substantially if a different Threading Layer is used.") '''
[docs] def check_numba(): ''' this function checks the active threading layer and thread count in Numba Parameters ---------- None. Returns ------- None. ''' import numba as na import time @na.jit(nopython=True, parallel=True, fastmath=False) def tiny_kernel(x): acc = 0.0 for i in na.prange(x.size): # forces Numba to pick a threading layer acc += x[i] * 0.5 + 1.0 return acc # small input so this is instant on any machine x = np.arange(200000, dtype=np.float64) # Compile + first run t0 = time.perf_counter() _ = tiny_kernel(x) # triggers compilation and backend init t1 = time.perf_counter() # Hot run (no compile) t2 = time.perf_counter() res2 = tiny_kernel(x) t3 = time.perf_counter() print("Active threading layer:", na.threading_layer()) print("Active Numba threads:", na.get_num_threads()) print("Compiled signatures:", tiny_kernel.signatures) print(f"Elapsed (compile + 1st run): {t1 - t0:.4f}s") print(f"Elapsed (hot run): {t3 - t2:.4f}s") print(f"Result checksum (should be 10000150000.000): {res2:.3f}")