De principiante a profesional — con ejemplos prácticos
Numba es un compilador JIT (Just-In-Time) para Python que traduce funciones Python a código máquina nativo usando LLVM. Funciona mejor con código numérico que usa arrays NumPy y loops. Imagina que tienes un loop que procesa 100 millones de datos y tarda 15 segundos en Python puro; con Numba, agregas @njit encima de tu función y baja a 0.05 segundos. Es especialmente brutal en cálculos numéricos masivos (simulaciones, procesamiento de señales, machine learning, y trading algorítmico) donde millones de operaciones por segundo marcan la diferencia entre una estrategia viable y una idea teórica
Cuando decoras una función con @jit, Numba hace lo siguiente:
# Con conda (recomendado)
$ conda install numba
# Con pip
$ pip install numba
# Extras para máximo rendimiento
$ conda install intel-cmplr-lib-rt # Intel SVML para funciones matemáticas rápidas
$ pip install scipy # Para numpy.linalg optimizado
from numba import jit
import numpy as np
@jit # Numba compila esto a código máquina
def suma_cuadrados(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i] ** 2
return total
# Primera llamada: compila + ejecuta (más lenta)
arr = np.arange(1_000_000, dtype=np.float64)
resultado = suma_cuadrados(arr) # ~compilación: 0.5s
# Segunda llamada: solo ejecuta (rapidísima)
resultado = suma_cuadrados(arr) # ~ejecución: 0.002s vs ~0.5s en Python puro
int64 y luego con float64, se compilan dos versiones distintas.
| Modo | Descripción | Velocidad |
|---|---|---|
| nopython (default) | Código 100% nativo, sin usar el intérprete Python | ⚡⚡⚡ Máxima |
| object mode | Fallback que usa objetos Python (casi no acelera) | ⚡ Mínima |
@jit es equivalente a @jit(nopython=True) y a @njit. El modo object ya no es el fallback por defecto. Si tu código no es compatible con nopython mode, Numba lanzará un error en lugar de compilar silenciosamente en object mode.
Numba infiere los tipos automáticamente en la primera llamada:
from numba import jit
@jit
def f(x, y):
return x + y
f(1, 2) # Compila versión int64
f(1j, 2) # Compila OTRA versión para complex128
f(1.0, 2.0) # Compila OTRA versión para float64
Defines exactamente qué tipos acepta. Más rápido en la primera llamada (no hay inferencia):
from numba import jit, int32, float64
@jit(int32(int32, int32)) # retorno(arg1, arg2)
def add_int(x, y):
return x + y
# Múltiples signatures
@jit([int32(int32, int32),
float64(float64, float64)])
def add_multi(x, y):
return x + y
cache=True — Evita recompilar@jit(cache=True)
def heavy_computation(x):
# Se guarda en disco tras la primera compilación
# Las siguientes ejecuciones del script no recompilan
return np.sum(np.sqrt(x))
nogil=True — Libera el GIL@jit(nogil=True)
def process_chunk(data):
# Se puede ejecutar concurrentemente con otros threads
# ¡Ideal para multithreading real en Python!
result = 0.0
for i in range(len(data)):
result += data[i] ** 2
return result
fastmath=True — Matemáticas más rápidas# Relaja IEEE 754 para mayor velocidad
@jit(fastmath=True)
def do_sum_fast(A):
acc = 0.
for x in A:
acc += np.sqrt(x)
return acc # ~2x más rápido que sin fastmath
# Control granular con flags LLVM específicas
@jit(fastmath={'reassoc', 'nsz'}) # solo reasociación y no-signed-zero
def add_assoc(x, y):
return (x - y) + y
@jit
def square(x):
return x ** 2
@jit
def hypot(x, y):
return math.sqrt(square(x) + square(y)) # LLVM puede inlinear square()
@jit las funciones auxiliares que llamas desde código Numba. Si no lo haces, Numba genera código mucho más lento al tener que "salir" al intérprete Python.
| Tipo Numba | Equivalente | Uso típico |
|---|---|---|
int8, int16, int32, int64 | Enteros con signo | Índices, contadores |
uint8, uint16, uint32, uint64 | Enteros sin signo | Datos binarios |
float32, float64 | Punto flotante | Cálculos numéricos |
complex64, complex128 | Complejos | Señales, FFT |
boolean | Booleano | Flags |
intp, uintp | Tamaño de puntero | Índices de arrays |
void | None | Funciones sin retorno |
from numba import float64, int32
# Indexas el tipo escalar para crear tipo de array
float64[:] # Array 1D de float64
int32[:, :] # Array 2D de int32
float64[:, :, :] # Array 3D de float64
# En signatures completas
@jit(float64[:](float64[:], float64[:]))
def add_arrays(a, b):
return a + b
# Arrays C-contiguous vs Fortran-contiguous
from numba import types
types.Array(types.float64, 2, 'C') # 2D, C-order (row-major)
types.Array(types.float64, 2, 'F') # 2D, Fortran-order (column-major)
types.Array(types.float64, 2, 'A') # 2D, cualquier layout
from numba import types
from numba.typed import Dict, List
# Listas tipadas (se pueden usar en @jit)
@jit
def usar_lista():
lst = List()
lst.append(1)
lst.append(2)
lst.append(3)
return lst
# Diccionarios tipados
@jit
def usar_dict():
d = Dict()
d[1] = 10.0
d[2] = 20.0
return d
# Tipos explícitos
types.DictType(types.int64, types.float64)
types.ListType(types.float64)
numba.typed.List y numba.typed.Dict.
Escribes la operación escalar y Numba genera el loop automáticamente, con todas las features de un ufunc NumPy (broadcasting, reduce, accumulate).
from numba import vectorize, float64, int64
# Eager: defines los tipos explícitamente
@vectorize([float64(float64, float64)])
def clip_value(x, threshold):
if x > threshold:
return threshold
elif x < -threshold:
return -threshold
return x
# Ahora funciona como cualquier ufunc de NumPy
arr = np.random.randn(1000)
result = clip_value(arr, 1.5) # Broadcasting automático
# ¡También reduce y accumulate!
@vectorize([float64(float64, float64)])
def add(x, y):
return x + y
a = np.arange(12).reshape(3, 4)
add.reduce(a, axis=0) # Suma por columnas
add.accumulate(a, axis=1) # Suma acumulada por filas
| Target | Cuándo usar | Overhead |
|---|---|---|
target='cpu' | Datos pequeños (< 1KB), baja intensidad | Mínimo |
target='parallel' | Datos medianos (< 1MB) | Bajo (threading) |
target='cuda' | Datos grandes (> 1MB), alta intensidad | Alto (transferencia GPU) |
# Paralelizado automáticamente en todos los cores
@vectorize([float64(float64, float64)], target='parallel')
def add_parallel(x, y):
return x + y
Cuando necesitas operar sobre porciones de arrays, no solo escalares. Defines un layout simbólico que indica las dimensiones de entrada/salida.
from numba import guvectorize, float64
# Media móvil: recibe array 1D, devuelve escalar por cada "fila"
@guvectorize([(float64[:], float64[:])], '(n)->()')
def moving_mean(arr, result):
total = 0.0
for i in range(arr.shape[0]):
total += arr[i]
result[0] = total / arr.shape[0]
# Aplicar sobre una matriz (cada fila es un vector)
data = np.random.rand(100, 10)
means = moving_mean(data) # → array de 100 medias (broadcasting automático)
# Ejemplo más completo: suma con peso
@guvectorize(
[(float64[:], float64[:], float64[:])],
'(n),(n)->(n)' # dos arrays entrada → un array salida, misma forma
)
def weighted_add(a, weights, result):
for i in range(a.shape[0]):
result[i] = a[i] * weights[i]
@guvectorize los resultados se escriben en el argumento de salida (result), no se retornan. NumPy aloca el array de salida automáticamente.
| Layout | Significado |
|---|---|
(n),()->(n) | Array + escalar → array |
(n)->() | Array → escalar (reducción) |
(m,n),(n)->(m) | Matriz × vector → vector |
(n),(n)->(n) | Dos arrays → array elemento a elemento |
Con parallel=True, Numba identifica operaciones con semántica paralela y las fusiona en kernels que se ejecutan en múltiples threads.
from numba import njit
import numpy as np
@njit(parallel=True)
def ident_parallel(x):
return np.cos(x) ** 2 + np.sin(x) ** 2
# ~5x más rápido que NumPy puro
# ~6x más rápido que @njit sin parallel
+ - * / ** %)== != < <= > >=)sum, prod, min, max, argmin, argmax, mean, var, stdzeros, ones, arange, linspacenumpy.dot (matrix × vector, vector × vector)rand, randn, normal, uniform, etc.)from numba import njit, prange
import numpy as np
# Reducción paralela simple
@njit(parallel=True)
def parallel_sum(A):
s = 0
for i in prange(A.shape[0]): # cada iteración en un thread distinto
s += A[i]
return s
# Reducción sobre array 2D
@njit(parallel=True)
def parallel_product_2d(n):
shp = (13, 17)
result = 2 * np.ones(shp, np.int_)
tmp = 2 * np.ones_like(result)
for i in prange(n):
result *= tmp
return result
+=, +, -=, -, *=, *, /=, /, max(), min(). El operador //= NO está soportado (depende del orden de aplicación).
@njit(parallel=True)
def logistic_regression(Y, X, w, iterations):
for i in range(iterations):
w -= np.dot(
((1.0 / (1.0 + np.exp(-Y * np.dot(X, w))) - 1.0) * Y), X
)
return w
# Numba fusiona automáticamente las operaciones element-wise
# en un solo kernel paralelo. ¡Sin cambiar el código!
# ❌ INCORRECTO: race condition en slices
@njit(parallel=True)
def prange_wrong(x):
y = np.zeros(4)
for i in prange(x.shape[0]):
y[:] += x[i] # ¡Múltiples threads escriben en y al mismo tiempo!
return y
# ✅ CORRECTO: reducción sobre array completo
@njit(parallel=True)
def prange_correct(x):
y = np.zeros(4)
for i in prange(x.shape[0]):
y += x[i] # Numba detecta la reducción correctamente
return y
prange NO es thread-safe. Nunca hagas z.append(i) dentro de un loop paralelo — causará corrupción de memoria.
@njit(parallel=True)
def test(x):
n = x.shape[0]
a = np.sin(x)
b = np.cos(a * a)
acc = 0
for i in prange(n - 2):
for j in prange(n - 1):
acc += b[i] + b[j + 1]
return acc
test(np.arange(10))
test.parallel_diagnostics(level=4) # Imprime qué se paralelizó y qué no
Permite definir clases cuyos métodos se compilan a código nativo y cuya data se almacena como estructura C (sin overhead del intérprete).
import numpy as np
from numba import int32, float32
from numba.experimental import jitclass
spec = [
('value', int32),
('array', float32[:]),
]
@jitclass(spec)
class Bag:
def __init__(self, value):
self.value = value
self.array = np.zeros(value, dtype=np.float32)
@property
def size(self):
return self.array.size
def increment(self, val):
for i in range(self.size):
self.array[i] += val
return self.array
@staticmethod
def add(x, y):
return x + y
mybag = Bag(21)
mybag.increment(5.0)
from typing import List
from numba.experimental import jitclass
from numba.typed import List as NumbaList
@jitclass
class Counter:
value: int
def __init__(self):
self.value = 0
def get(self) -> int:
ret = self.value
self.value += 1
return ret
@jitclass
class ListLoopIterator:
counter: Counter
items: List[float]
def __init__(self, items: List[float]):
self.items = items
self.counter = Counter()
from numba import types, typed
from numba.experimental import jitclass
kv_ty = (types.int64, types.unicode_type)
@jitclass([
('d', types.DictType(*kv_ty)),
('l', types.ListType(types.float64))
])
class ContainerHolder:
def __init__(self):
self.d = typed.Dict.empty(*kv_ty)
self.l = typed.List.empty_list(types.float64)
c = ContainerHolder()
c.d[1] = "apple"
c.l.append(123.)
Dict, List) deben inicializarse explícitamente en __init__. Escribir en un contenedor no inicializado causa segfault.
jitclass soporta una amplia gama de métodos especiales: __abs__, __bool__, __getitem__, __setitem__, __len__, __hash__, __eq__, __ne__, __lt__, __le__, __gt__, __ge__, __add__, __mul__, __sub__, __truediv__, __floordiv__, __mod__, __pow__, __neg__, __pos__, y todos sus variantes __i*__ (in-place) y __r*__ (reflected).
Los stencils son patrones donde cada elemento del array resultado depende de un "vecindario" de elementos del array de entrada. Piensa en filtros de imagen, simulaciones de calor, autómatas celulares, etc.
from numba import stencil
# Promedio de 4 vecinos (tipo Laplaciano discreto)
@stencil
def kernel_laplacian(a):
return 0.25 * (a[0, 1] + a[1, 0] + a[0, -1] + a[-1, 0])
# Los índices son RELATIVOS al punto actual
# a[0, 0] = elemento actual, a[-1, 0] = arriba, etc.
input_arr = np.arange(100).reshape((10, 10)).astype(np.float64)
output = kernel_laplacian(input_arr)
# Los bordes se ponen a 0 por defecto
# Media móvil de 30 días (sin escribir 30 términos)
@stencil(neighborhood=((-29, 0),))
def moving_average_30d(a):
cumul = 0
for i in range(-29, 1):
cumul += a[i]
return cumul / 30
| Opción | Descripción |
|---|---|
cval=X | Valor para los bordes (default: 0) |
neighborhood=((min,max),...) | Define el rango de vecinos explícitamente |
standard_indexing=("b",) | Arrays auxiliares con indexación normal (no relativa) |
@njit(parallel=True)
def apply_blur(image):
# El stencil se paraleliza automáticamente con parallel=True
return stencil(
lambda a: 0.2 * (a[-1,0] + a[1,0] + a[0,-1] + a[0,1] + a[0,0])
)(image)
| Configuración | SVML | Tiempo | Speedup vs NumPy |
|---|---|---|---|
| NumPy puro | — | 5.84s | 1x |
@njit | No | 5.95s | ~1x |
@njit | Sí | 2.26s | 2.6x |
@njit(fastmath=True) | Sí | 1.8s | 3.2x |
@njit(parallel=True) | Sí | 0.624s | 9.4x |
@njit(parallel=True, fastmath=True) | Sí | 0.576s | 10x |
$ conda install intel-cmplr-lib-rt
# Numba lo detecta automáticamente
# Acelera funciones trascendentales (sin, cos, exp, log...)
# Ambos son IGUAL de rápidos con @njit
@njit
def vectorized_style(x):
return np.cos(x) ** 2 + np.sin(x) ** 2
@njit
def loop_style(x):
r = np.empty_like(x)
for i in range(len(x)):
r[i] = np.cos(x[i]) ** 2 + np.sin(x[i]) ** 2
return r
# ¡Escribe loops sin miedo! LLVM optimiza igual que C
@njit(parallel=True, fastmath=True, cache=True)
def max_performance_sum(A):
n = len(A)
acc = 0.
for i in prange(n): # paralelo + fastmath = máxima velocidad
acc += np.sqrt(A[i])
return acc
# Resultado: ~5.37ms vs 35.2ms sin parallel/fastmath (6.5x)
# Asegúrate de tener SciPy (para LAPACK/BLAS)
# Con Anaconda, SciPy usa Intel MKL automáticamente
$ pip install scipy
@njit
def solve_system(A, b):
return np.linalg.solve(A, b) # Usa MKL internamente
NUMBA_DEVELOPER_MODE=1.
# C-contiguous (row-major) vs Fortran-contiguous (column-major)
# Acceder a memoria de forma contigua es MUCHO más rápido
@njit
def row_sum_fast(matrix): # ✅ Acceso por filas en C-order
total = 0.0
for i in range(matrix.shape[0]):
for j in range(matrix.shape[1]):
total += matrix[i, j]
return total
@njit
def col_sum_slow(matrix): # ❌ Acceso por columnas en C-order (cache misses)
total = 0.0
for j in range(matrix.shape[1]):
for i in range(matrix.shape[0]):
total += matrix[i, j]
return total
El sistema de extensiones de Numba permite añadir soporte para funciones, métodos y tipos personalizados en nopython mode.
from numba import types
from numba.extending import overload
# Supongamos que quieres que len() funcione con tuplas en Numba
@overload(len)
def tuple_len(seq):
if isinstance(seq, types.BaseTuple):
n = len(seq) # esto se ejecuta en compile time
def len_impl(seq):
return n # esto se ejecuta en runtime
return len_impl
# Si no retorna nada, Numba prueba otras implementaciones
from numba.extending import overload_method
@overload_method(types.Array, 'take')
def array_take(arr, indices):
if isinstance(indices, types.Array):
def take_impl(arr, indices):
n = indices.shape[0]
res = np.empty(n, arr.dtype)
for i in range(n):
res[i] = arr[indices[i]]
return res
return take_impl
from numba.extending import overload_attribute
@overload_attribute(types.Array, 'nbytes')
def array_nbytes(arr):
def get(arr):
return arr.size * arr.itemsize
return get
Para expertos: genera código LLVM IR directamente. Se inlinea en el caller.
from numba.extending import intrinsic
@intrinsic
def cast_int_to_byte_ptr(typingctx, src):
if isinstance(src, types.Integer):
result_type = types.CPointer(types.uint8)
sig = result_type(types.uintp)
def codegen(context, builder, signature, args):
[src] = args
rtype = signature.return_type
llrtype = context.get_value_type(rtype)
return builder.inttoptr(src, llrtype)
return sig, codegen
import ctypes
from numba.extending import get_cython_function_address
# Obtener dirección de función C definida en un módulo Cython
addr = get_cython_function_address("foo", "myexp")
functype = ctypes.CFUNCTYPE(ctypes.c_double, ctypes.c_double)
myexp = functype(addr)
@njit
def double_myexp(x):
return 2 * myexp(x) # ¡Llama a la función C dentro de Numba!
from numba.core import types
from numba.experimental import structref
@structref.register
class MyStructType(types.StructRef):
def preprocess_fields(self, fields):
return tuple(
(name, types.unliteral(typ)) for name, typ in fields
)
# Proxy Python para interactuar desde el intérprete
class MyStruct(structref.StructRefProxy):
def __new__(cls, name, vector):
return structref.StructRefProxy.__new__(cls, name, vector)
# Se pueden definir propiedades que delegan a funciones JIT
Esto es lo que nadie te enseña pero determina el 80% del rendimiento de tu código numérico. Tu CPU puede hacer miles de millones de operaciones por segundo, pero si está esperando datos de RAM, está literalmente sentada sin hacer nada. Entender esto transforma cómo escribes código para siempre.
Tu CPU NO accede a RAM directamente. Existe una cadena de cachés intermedias, cada una más rápida pero más pequeña:
| Nivel | Tamaño típico | Latencia | Analogía |
|---|---|---|---|
| Registros | ~1 KB | ~0.3 ns (1 ciclo) | Tu mano: instantáneo |
| Cache L1 | 32-64 KB por core | ~1 ns (3-4 ciclos) | Tu escritorio: 1 segundo |
| Cache L2 | 256 KB - 1 MB por core | ~4 ns (10-12 ciclos) | Tu estantería: 4 segundos |
| Cache L3 | 8-64 MB compartido | ~12 ns (30-40 ciclos) | La biblioteca del edificio: 12 segundos |
| RAM | 8-128 GB | ~60-100 ns (200+ ciclos) | Amazon: un minuto esperando |
| Disco SSD | TB | ~10,000-100,000 ns | Pedir algo de China: horas |
La CPU nunca trae "un solo número" de RAM. Siempre trae un bloque llamado cache line, típicamente de 64 bytes. Si trabajas con float64 (8 bytes cada uno), una cache line trae 8 valores consecutivos de golpe.
Esto tiene una consecuencia brutal:
import numpy as np
from numba import njit
# Imagina una matriz 1000x1000 de float64 en C-order (row-major)
# En memoria, se almacena así:
# [fila0_col0, fila0_col1, fila0_col2, ..., fila1_col0, fila1_col1, ...]
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Estos 8 valores están en la MISMA cache line
matrix = np.random.rand(1000, 1000)
# ✅ ACCESO POR FILAS: aprovecha la cache line completa
# Cuando lees matrix[0, 0], la CPU trae matrix[0, 0:8] gratis
# El siguiente acceso matrix[0, 1] YA ESTÁ EN CACHE → 0 espera
@njit
def sum_row_major(matrix):
total = 0.0
for i in range(matrix.shape[0]): # filas (dimensión externa)
for j in range(matrix.shape[1]): # columnas (dimensión interna)
total += matrix[i, j] # ✅ Acceso contiguo en memoria
return total
# ❌ ACCESO POR COLUMNAS: destruye la cache
# Cuando lees matrix[0, 0], la CPU trae fila0_col0..col7
# Pero el siguiente acceso es matrix[1, 0] → OTRA cache line → cache miss
# Cada acceso salta 1000 float64 (8000 bytes) → 125 cache lines de distancia
@njit
def sum_col_major(matrix):
total = 0.0
for j in range(matrix.shape[1]): # columnas primero
for i in range(matrix.shape[0]): # filas después
total += matrix[i, j] # ❌ Saltos de 8KB entre accesos
return total
# Benchmark real en una matriz 4000x4000:
# sum_row_major: ~12ms (cache hits: ~98%)
# sum_col_major: ~85ms (cache hits: ~12%)
# ¡~7x más lento con EXACTAMENTE las mismas operaciones matemáticas!
import numpy as np
# C-ORDER (row-major): filas son contiguas
# Memoria: [f0c0, f0c1, f0c2, f1c0, f1c1, f1c2, ...]
c_matrix = np.zeros((1000, 1000), order='C') # default de NumPy
# FORTRAN-ORDER (column-major): columnas son contiguas
# Memoria: [f0c0, f1c0, f2c0, f0c1, f1c1, f2c1, ...]
f_matrix = np.zeros((1000, 1000), order='F')
# Equivalente en Numba: np.zeros((1000, 1000)[::-1]).T
# ¿CUÁNDO USAR CADA UNO?
# Caso 1: Procesar datos tabulares (filas = registros, columnas = features)
# Si accedes fila por fila → C-order ✅
@njit
def procesar_registros(data): # data es (n_registros, n_features) C-order
n = data.shape[0]
result = np.empty(n)
for i in range(n):
# Acceder a data[i, :] es contiguo en C-order ✅
result[i] = np.sum(data[i, :])
return result
# Caso 2: Series temporales (filas = tiempo, columnas = sensores)
# Si accedes columna por columna → Fortran-order ✅
@njit
def procesar_sensores(data): # data es (n_tiempos, n_sensores) F-order
n_sensors = data.shape[1]
result = np.empty(n_sensors)
for j in range(n_sensors):
# Acceder a data[:, j] es contiguo en F-order ✅
result[j] = np.mean(data[:, j])
return result
arr = np.zeros((100, 100))
print(arr.flags)
# C_CONTIGUOUS : True ← C-order
# F_CONTIGUOUS : False
# Strides te dicen cuántos bytes hay entre elementos consecutivos
print(arr.strides)
# (800, 8) → saltar una fila = 800 bytes, saltar una columna = 8 bytes
# El stride más pequeño (8) es la dimensión contigua → última dim (columnas)
# ⚠️ CUIDADO: operaciones que destruyen la contigüidad
sliced = arr[::2, :] # Slice con step
print(sliced.flags.c_contiguous) # False — ya no es contiguo
print(sliced.strides) # (1600, 8) — stride duplicado
transposed = arr.T
print(transposed.flags.c_contiguous) # False
print(transposed.flags.f_contiguous) # True — ahora es F-order
@njit
def procesar_seguro(data):
# Si data puede llegar no-contiguo (ej: un slice), haz copia contigua
if not data.flags.c_contiguous:
data = np.ascontiguousarray(data) # Copia, pero ahora es C-contiguo
# NOTA: np.ascontiguousarray no está en Numba nopython mode
# Alternativa dentro de @njit:
data_copy = data.copy() # .copy() devuelve C-contiguous por defecto
return data_copy
types.Array(types.float64, 2, 'C') → solo acepta C-contiguoustypes.Array(types.float64, 2, 'A') → acepta cualquier layout'C' y Numba sabe que el array es contiguo, puede generar código más agresivamente optimizado.
Cada np.zeros(), np.empty(), o np.array() dentro de un loop pide memoria al sistema operativo. Esto implica: syscall al kernel, posible page fault, invalidación de cache, y trabajo para el garbage collector. En un loop que se ejecuta millones de veces, esto es devastador.
# ❌ TERRIBLE: aloca memoria en CADA iteración
@njit
def procesar_terrible(dataset, n_iter):
for i in range(n_iter):
temp = np.zeros(1000) # ❌ Alocación + inicialización × n_iter veces
result = np.empty(1000) # ❌ Otra alocación × n_iter veces
for j in range(1000):
temp[j] = dataset[i, j] ** 2
result[j] = np.sqrt(temp[j])
return result
# ✅ CORRECTO: pre-aloca FUERA del loop, reutiliza buffers
@njit
def procesar_optimo(dataset, n_iter):
temp = np.empty(1000) # ✅ Se aloca UNA sola vez
result = np.empty(1000) # ✅ Se aloca UNA sola vez
for i in range(n_iter):
for j in range(1000):
temp[j] = dataset[i, j] ** 2 # Reutiliza el buffer
result[j] = np.sqrt(temp[j])
return result
# Con n_iter=100000: terrible ~8.2s vs óptimo ~1.1s → 7.5x más rápido
# Numba con parallel=True INTENTA hoistear alocaciones automáticamente:
@njit(parallel=True)
def auto_hoist(n):
for i in prange(n):
temp = np.zeros((50, 50)) # Numba intenta sacar esto fuera del loop
for j in range(50):
temp[j, j] = i
# Internamente, Numba transforma esto a:
# temp = np.empty((50, 50)) ← alocación hoisted fuera del loop
# for i in prange(n):
# temp[:] = 0 ← solo la inicialización queda dentro
# for j in range(50):
# temp[j, j] = i
# PERO: solo funciona con np.empty y np.zeros
# Alocaciones más complejas o condicionales NO se hoistean
# REGLA: no confíes en el hoisting automático. Pre-aloca tú mismo.
Una de las ventajas más grandes de Numba sobre NumPy puro es que puedes eliminar arrays temporales. En NumPy vectorizado, cada operación intermedia crea un array temporal:
# Con NumPy puro — CADA operación crea un array temporal en RAM:
def numpy_style(x):
temp1 = np.sin(x) # temp1: 80MB si x tiene 10M float64
temp2 = temp1 ** 2 # temp2: otros 80MB
temp3 = np.cos(x) # temp3: otros 80MB
temp4 = temp3 ** 2 # temp4: otros 80MB
result = temp1 + temp3 # result: otros 80MB
return result
# Total: ~400MB de temporales para un input de 80MB
# Cada temporal destruye la cache y causa page faults
# Con Numba — CERO temporales, todo en registros/cache:
@njit
def numba_fused(x):
n = len(x)
result = np.empty(n)
for i in range(n):
# Cada valor se calcula y se guarda INMEDIATAMENTE
# sin crear arrays intermedios
s = np.sin(x[i])
c = np.cos(x[i])
result[i] = s ** 2 + c ** 2
# s y c viven en registros de CPU, no en RAM
return result
# Total: 0 bytes de temporales. Solo input + output.
# Con 10M elementos:
# numpy_style: ~650ms (memory Bandwidth (Ancho de banda) limited)
# numba_fused: ~180ms (compute limited — que es lo que queremos)
Cuando tu dataset es enorme (cientos de MB o GB), ni siquiera un loop fila-por-fila es óptimo. El truco es procesar en bloques que quepan en cache L2/L3.
@njit
def sum_naive(matrix):
"""Suma de todos los elementos — naive."""
total = 0.0
for i in range(matrix.shape[0]):
for j in range(matrix.shape[1]):
total += matrix[i, j]
return total
# Para matrices pequeñas, esto está bien.
# Para matrices de 10000x10000+, la cache L2 (256KB) no puede
# contener una fila entera si es muy ancha.
@njit
def sum_blocked(matrix, block_size=64):
"""Suma con blocking: procesa sub-matrices que caben en cache L2."""
total = 0.0
rows, cols = matrix.shape
# Recorre por bloques de block_size × block_size
for bi in range(0, rows, block_size):
for bj in range(0, cols, block_size):
# Este bloque (64×64 float64 = 32KB) cabe en L1 cache
bi_end = min(bi + block_size, rows)
bj_end = min(bj + block_size, cols)
for i in range(bi, bi_end):
for j in range(bj, bj_end):
total += matrix[i, j]
return total
# Ejemplo clásico donde blocking es CRÍTICO: multiplicación de matrices
@njit
def matmul_blocked(A, B, block_size=64):
"""Multiplicación de matrices con cache blocking.
Para matrices grandes, puede ser 3-5x más rápido que el naive."""
M, K = A.shape
K2, N = B.shape
C = np.zeros((M, N))
for bi in range(0, M, block_size):
for bj in range(0, N, block_size):
for bk in range(0, K, block_size):
# Multiplicar sub-bloques que caben en cache
bi_end = min(bi + block_size, M)
bj_end = min(bj + block_size, N)
bk_end = min(bk + block_size, K)
for i in range(bi, bi_end):
for j in range(bj, bj_end):
temp = 0.0
for k in range(bk, bk_end):
temp += A[i, k] * B[k, j]
C[i, j] += temp
return C
| Cache objetivo | Tamaño típico | block_size para float64 | Cálculo |
|---|---|---|---|
| L1 (32KB) | 32,768 bytes | 64×64 | 64×64×8 = 32,768 bytes |
| L2 (256KB) | 262,144 bytes | 128×128 | 128×128×8 = 131,072 (dejas espacio para otros datos) |
| L3 (8MB) | 8,388,608 bytes | 512×512 | Para datasets gigantes con poca reutilización |
3 × block_size² × 8 bytes < 32KB, así que block_size ≈ 36. En la práctica, potencias de 2 entre 32 y 128 suelen funcionar bien. Experimenta con benchmarking.
# Escenario: Transponer y procesar una matriz grande
# Datos: 5000×5000 de float64 (200MB)
# ❌ NIVEL 1 — TERRIBLE: acceso aleatorio
@njit
def terrible(matrix, indices):
total = 0.0
for idx in indices: # indices aleatorios → cache miss garantizado
total += matrix.flat[idx]
return total
# ❌ NIVEL 2 — MALO: stride grande (acceso por columnas en C-order)
@njit
def malo(matrix):
total = 0.0
for j in range(matrix.shape[1]):
for i in range(matrix.shape[0]):
total += matrix[i, j] # stride = n_cols × 8 bytes
return total
# 🆗 NIVEL 3 — DECENTE: acceso secuencial contiguo
@njit
def decente(matrix):
total = 0.0
for i in range(matrix.shape[0]):
for j in range(matrix.shape[1]):
total += matrix[i, j] # stride = 8 bytes (contiguo)
return total
# ✅ NIVEL 4 — BUENO: contiguo + sin temporales + parallel
@njit(parallel=True)
def bueno(matrix):
total = 0.0
for i in prange(matrix.shape[0]):
row_sum = 0.0 # acumulador local → vive en registro
for j in range(matrix.shape[1]):
row_sum += matrix[i, j]
total += row_sum # reducción paralela segura
return total
# ⚡ NIVEL 5 — ÓPTIMO: blocking + parallel + fastmath + SVML
@njit(parallel=True, fastmath=True)
def optimo(matrix, block_size=64):
total = 0.0
rows, cols = matrix.shape
for bi in prange(0, rows, block_size): # bloques en paralelo
block_sum = 0.0
bi_end = min(bi + block_size, rows)
for i in range(bi, bi_end):
for j in range(cols):
block_sum += matrix[i, j]
total += block_sum
return total
# Rendimiento relativo (5000×5000 float64):
# terrible: ~850ms (baseline)
# malo: ~340ms (2.5x más rápido)
# decente: ~48ms (17x más rápido)
# bueno: ~12ms (70x más rápido)
# óptimo: ~6ms (140x más rápido)
Este es un patrón de diseño de datos que puede cambiar el rendimiento dramáticamente. Supongamos que tienes 1 millón de partículas con posición (x, y, z) y velocidad (vx, vy, vz):
# ❌ ARRAY OF STRUCTURES (AoS) — Natural pero lento para procesamiento masivo
# Cada partícula es una "fila" con todos sus atributos juntos
# Memoria: [x0,y0,z0,vx0,vy0,vz0, x1,y1,z1,vx1,vy1,vz1, ...]
particles_aos = np.zeros((1_000_000, 6)) # columnas: x,y,z,vx,vy,vz
@njit
def update_positions_aos(particles, dt):
for i in range(particles.shape[0]):
# Para actualizar x, la CPU trae una cache line que contiene
# [x0, y0, z0, vx0, vy0, vz0, x1, y1] — solo usa x0 y vx0
# 6 de cada 8 valores en la cache line son BASURA para esta operación
particles[i, 0] += particles[i, 3] * dt # x += vx * dt
particles[i, 1] += particles[i, 4] * dt # y += vy * dt
particles[i, 2] += particles[i, 5] * dt # z += vz * dt
# ✅ STRUCTURE OF ARRAYS (SoA) — Cada atributo en su propio array
# Memoria de x: [x0, x1, x2, x3, x4, x5, x6, x7, ...]
# Memoria de vx: [vx0, vx1, vx2, vx3, vx4, vx5, vx6, vx7, ...]
n = 1_000_000
x = np.zeros(n)
y = np.zeros(n)
z = np.zeros(n)
vx = np.random.randn(n)
vy = np.random.randn(n)
vz = np.random.randn(n)
@njit(parallel=True, fastmath=True)
def update_positions_soa(x, y, z, vx, vy, vz, dt):
for i in prange(len(x)):
# La CPU trae [x0, x1, x2, x3, x4, x5, x6, x7]
# TODOS los 8 valores en la cache line son útiles
# Además, LLVM puede usar instrucciones SIMD (procesar 4 a la vez)
x[i] += vx[i] * dt
y[i] += vy[i] * dt
z[i] += vz[i] * dt
# Benchmarks con 1M partículas:
# AoS: ~4.8ms
# SoA: ~0.6ms → 8x más rápido con los mismos cálculos
| # | Verificar | Cómo |
|---|---|---|
| 1 | ¿Loop interno recorre dimensión contigua? | Última dim en C-order, primera en F-order |
| 2 | ¿Aloco arrays dentro de loops calientes? | Mover np.empty/zeros fuera del loop |
| 3 | ¿Creo arrays temporales innecesarios? | Fusionar operaciones en un solo loop |
| 4 | ¿Mis arrays son contiguos? | Verificar arr.flags.c_contiguous |
| 5 | ¿Mi dataset cabe en cache? | Si no → usar blocking con tamaño < L2 |
| 6 | ¿Estructura de datos apropiada? | SoA si proceso un atributo a la vez sobre muchas entidades |
| 7 | ¿Acceso aleatorio a arrays grandes? | Reordenar datos o usar índices sorted |
| 8 | ¿Uso parallel=True en ops de arrays? | Divide el trabajo en cache-friendly chunks entre cores |
Sin benchmarking correcto, todas las optimizaciones anteriores son adivinanzas. La mayoría de la gente benchmarkea Numba mal, obtiene números incorrectos, y toma decisiones equivocadas. Esta sección te enseña a medir de verdad.
El error más común y más grave. La primera llamada a una función Numba incluye el tiempo de compilación (0.1s - 5s). Si mides eso, tus benchmarks no significan nada.
import numpy as np
from numba import njit
import time
@njit
def mi_funcion(x):
return np.sum(np.sqrt(x))
data = np.random.rand(1_000_000)
# ❌ MAL: incluye compilación
start = time.time()
mi_funcion(data)
print(f"Tiempo: {time.time() - start:.4f}s")
# Resultado: "Tiempo: 0.8523s" ← ¡MENTIRA! 0.85s son de compilación
# ✅ BIEN: calentar primero, luego medir
mi_funcion(data) # ← WARMUP: primera llamada compila (descartar)
start = time.time()
for _ in range(100):
mi_funcion(data)
elapsed = (time.time() - start) / 100
print(f"Tiempo real: {elapsed*1000:.2f}ms")
# Resultado: "Tiempo real: 1.23ms" ← La verdad
Este es el patrón que debes usar siempre. Cópialo y adáptalo:
import numpy as np
from numba import njit, prange
import time
import sys
def benchmark(func, *args, n_warmup=3, n_runs=50, label=""):
"""Benchmark profesional: warmup + múltiples runs + estadísticas."""
# 1. WARMUP: compilar + estabilizar cache
for _ in range(n_warmup):
func(*args)
# 2. MEDIR: múltiples ejecuciones
times = []
for _ in range(n_runs):
start = time.perf_counter() # ← perf_counter, NO time.time()
result = func(*args)
elapsed = time.perf_counter() - start
times.append(elapsed)
# 3. ANALIZAR: no solo la media
times = np.array(times)
print(f"{'─'*50}")
print(f" {label or func.__name__}")
print(f"{'─'*50}")
print(f" Mediana: {np.median(times)*1000:10.3f} ms") # ← MEDIANA, no media
print(f" Media: {np.mean(times)*1000:10.3f} ms")
print(f" Min: {np.min(times)*1000:10.3f} ms") # ← mejor caso real
print(f" Max: {np.max(times)*1000:10.3f} ms")
print(f" Std: {np.std(times)*1000:10.3f} ms")
print(f" Runs: {n_runs}")
print()
return np.median(times)
# USO:
data = np.random.rand(10_000_000)
@njit
def version_a(x):
return np.sum(np.sqrt(x))
@njit(parallel=True, fastmath=True)
def version_b(x):
total = 0.0
for i in prange(len(x)):
total += np.sqrt(x[i])
return total
t_a = benchmark(version_a, data, label="@njit básico")
t_b = benchmark(version_b, data, label="@njit parallel+fastmath")
t_np = benchmark(np.sum, np.sqrt(data), label="NumPy puro")
print(f"Speedup B vs A: {t_a/t_b:.1f}x")
print(f"Speedup B vs NumPy: {t_np/t_b:.1f}x")
Porque los outliers de tiempo alto (causados por garbage collection, interrupciones del OS, context switches) distorsionan la media. La mediana te dice "en la mitad de los casos tardó menos que X" — mucho más representativo del rendimiento real.
time.perf_counter() y no time.time()?perf_counter usa el reloj monotónico de mayor resolución disponible en el sistema (nanosegundos). time.time() puede tener resolución de milisegundos y es afectado por ajustes del reloj del sistema.
A veces necesitas saber cuánto tarda Numba en compilar (para decidir si usar cache=True):
def benchmark_compilation(func, *args):
"""Mide el tiempo de compilación vs ejecución por separado."""
# Forzar recompilación
func.recompile() # Limpia todas las especializaciones
# Medir primera llamada (compilación + ejecución)
start = time.perf_counter()
func(*args)
first_call = time.perf_counter() - start
# Medir segunda llamada (solo ejecución)
start = time.perf_counter()
func(*args)
second_call = time.perf_counter() - start
compilation = first_call - second_call
print(f"Compilación: {compilation*1000:.1f} ms")
print(f"Ejecución: {second_call*1000:.3f} ms")
print(f"Ratio: compilación es {compilation/second_call:.0f}x la ejecución")
# Si la ratio es > 1000, cache=True es casi obligatorio
if compilation / max(second_call, 1e-9) > 1000:
print(" → RECOMENDACIÓN: usa cache=True")
Un benchmark con un solo tamaño de datos es incompleto. Las optimizaciones se comportan diferente según la escala:
def scaling_test(funcs, sizes, labels=None, gen_data=None):
"""Compara funciones en múltiples tamaños de datos.
Revela si una optimización escala o tiene overhead fijo."""
if gen_data is None:
gen_data = lambda n: np.random.rand(n)
if labels is None:
labels = [f.__name__ for f in funcs]
# Warmup todas las funciones con el tamaño más pequeño
small_data = gen_data(sizes[0])
for func in funcs:
func(small_data)
print(f"\n{'Tamaño':>12}", end="")
for label in labels:
print(f"{label:>15}", end="")
print(f"{'Speedup':>12}")
print("─" * (12 + 15 * len(funcs) + 12))
for size in sizes:
data = gen_data(size)
times = []
for func in funcs:
t = []
for _ in range(20):
start = time.perf_counter()
func(data)
t.append(time.perf_counter() - start)
times.append(np.median(t))
print(f"{size:>12,}", end="")
for t in times:
print(f"{t*1000:>14.3f}ms", end="")
speedup = times[0] / times[-1] if times[-1] > 0 else float('inf')
print(f"{speedup:>11.1f}x")
# Uso:
scaling_test(
funcs=[version_a, version_b],
sizes=[1_000, 10_000, 100_000, 1_000_000, 10_000_000],
labels=["njit", "parallel+fast"]
)
# Output típico:
# Tamaño njit parallel+fast Speedup
# ─────────────────────────────────────────────────────
# 1,000 0.002ms 0.045ms 0.0x ← parallel PIERDE en datos chicos
# 10,000 0.012ms 0.048ms 0.3x ← todavía pierde
# 100,000 0.108ms 0.065ms 1.7x ← empieza a ganar
# 1,000,000 1.052ms 0.342ms 3.1x ← gana claramente
# 10,000,000 10.891ms 2.187ms 5.0x ← domina
parallel=True tiene un overhead fijo de ~0.04ms por crear el thread pool. Con datos pequeños (<10K), ese overhead domina y la versión paralela es MÁS LENTA. Con datos grandes, el overhead es insignificante y la paralelización domina. Sin un scaling test, nunca verías esto.
El tiempo absoluto no te dice si tu código está cerca del límite teórico. El Throughput (Rendimiento real) (GB/s procesados) sí:
def benchmark_Throughput (Rendimiento real)(func, data, n_runs=50, label=""):
"""Mide Throughput (Rendimiento real) en GB/s para saber si estás cerca del límite de memoria."""
# Warmup
func(data)
func(data)
# Medir
times = []
for _ in range(n_runs):
start = time.perf_counter()
func(data)
times.append(time.perf_counter() - start)
median_time = np.median(times)
data_bytes = data.nbytes
Throughput (Rendimiento real)_gbs = (data_bytes / median_time) / 1e9
print(f"{label:30s} | {median_time*1000:8.3f} ms | {Throughput (Rendimiento real)_gbs:6.1f} GB/s")
# Referencia: Bandwidth (Ancho de banda) de RAM típica
# DDR4-3200: ~25 GB/s por canal, ~50 GB/s dual channel
# DDR5-5600: ~45 GB/s por canal, ~90 GB/s dual channel
# Si tu Throughput (Rendimiento real) está cerca de estos números, estás
# MEMORY BOUND y optimizar la CPU no ayudará más.
# La única forma de ir más rápido es reducir los datos
# que necesitas leer (mejor layout, menos temporales, etc.)
# Uso:
data = np.random.rand(50_000_000) # ~400MB
@njit
def just_sum(x):
return np.sum(x)
@njit(parallel=True)
def par_sum(x):
return np.sum(x)
benchmark_Throughput (Rendimiento real)(just_sum, data, label="np.sum @njit")
benchmark_Throughput (Rendimiento real)(par_sum, data, label="np.sum @njit parallel")
benchmark_Throughput (Rendimiento real)(np.sum, data, label="np.sum NumPy puro")
# Output típico:
# np.sum @njit | 17.234 ms | 23.2 GB/s
# np.sum @njit parallel | 4.891 ms | 81.8 GB/s ← cerca del límite de RAM
# np.sum NumPy puro | 18.103 ms | 22.1 GB/s
#
# La versión parallel a 81.8 GB/s está CERCA del límite de DDR5 (~90 GB/s)
# No va a ir más rápido sin importar qué hagas en código.
# ¡Estás memory-bound! La CPU está esperando datos de RAM.
No asumas que Numba hizo lo que esperabas. Verifica.
@njit(parallel=True)
def mi_funcion(x):
n = x.shape[0]
a = np.sin(x)
b = np.cos(a * a)
acc = 0
for i in prange(n - 2):
for j in prange(n - 1):
acc += b[i] + b[j + 1]
return acc
# Forzar compilación
mi_funcion(np.arange(10, dtype=np.float64))
# ─── HERRAMIENTA 1: inspect_types() ───
# Muestra qué tipo infirió Numba para CADA variable
mi_funcion.inspect_types()
# Busca sorpresas: ¿una variable es float64 cuando esperabas int?
# ¿Un array es 'A' (any layout) cuando debería ser 'C'?
# ─── HERRAMIENTA 2: parallel_diagnostics() ───
# Muestra qué se paralelizó, qué se fusionó, y qué NO
mi_funcion.parallel_diagnostics(level=4)
# Nivel 1: resumen básico
# Nivel 2: + info de fusión de loops
# Nivel 3: + estructura antes/después de optimización
# Nivel 4: + hoisting de código + instrucciones
# ─── HERRAMIENTA 3: inspect_llvm() ───
# Para expertos: el código LLVM IR generado
llvm_ir = mi_funcion.inspect_llvm(mi_funcion.signatures[0])
# Buscar: "vector.body" → LLVM vectorizó el loop (SIMD)
# Buscar: "llvm.sqrt.v4f64" → está usando SIMD de 4 doubles
# Buscar: "svml" → está usando Intel SVML
if 'svml' in llvm_ir.lower():
print("✅ SVML activo")
else:
print("⚠️ SVML NO detectado — instala intel-cmplr-lib-rt")
# ─── HERRAMIENTA 4: inspect_asm() ───
# Para ultra-expertos: el assembly nativo generado
asm = mi_funcion.inspect_asm(mi_funcion.signatures[0])
# Buscar: "vmulpd" / "vaddpd" → instrucciones AVX (256-bit SIMD)
# Buscar: "zmm" → instrucciones AVX-512 (512-bit SIMD)
# ─── HERRAMIENTA 5: debug de vectorización LLVM ───
import llvmlite.binding as llvm
llvm.set_option('', '--debug-only=loop-vectorize')
# NOTA: requiere LLVM con assertions (build de desarrollo)
# Mensajes comunes:
# "LV: Vectorization is possible but not beneficial" → el loop es muy corto
# "LV: Can't vectorize due to memory conflicts" → acceso no contiguo
# "LV: Not vectorizing: loop did not meet vectorization requirements" → dependencia
# ERROR MUY COMÚN: comparar operaciones diferentes
data = np.random.rand(1_000_000)
# Esto NO es una comparación justa:
# NumPy:
result_np = np.sum(np.sqrt(data)) # crea array temporal de sqrt
# Numba:
@njit
def fused(x):
total = 0.0
for i in range(len(x)):
total += np.sqrt(x[i]) # sin array temporal
return total
# La versión Numba es más rápida EN PARTE porque evita el temporal,
# no solo por la compilación JIT.
# COMPARACIÓN JUSTA: misma operación, mismo patrón de memoria
@njit
def numba_same_as_numpy(x):
return np.sum(np.sqrt(x)) # misma operación que NumPy
@njit
def numba_fused(x):
total = 0.0
for i in range(len(x)):
total += np.sqrt(x[i])
return total
# Ahora puedes decir:
# "Numba sin fusión (misma operación que NumPy): 1.2x más rápido"
# "Numba CON fusión de loops: 3.5x más rápido"
# "De ese 3.5x, 1.2x viene de JIT y 2.9x viene de eliminar temporales"
# Cambiar de float64 a float32 tiene TRES beneficios simultáneos:
# 1. La mitad de bytes → el doble de datos caben en cache
# 2. La mitad de bytes → el doble de Throughput (Rendimiento real) de memoria
# 3. SIMD procesa el doble de elementos por instrucción
# (8 float32 vs 4 float64 en AVX-256)
data64 = np.random.rand(10_000_000).astype(np.float64) # 80 MB
data32 = data64.astype(np.float32) # 40 MB
@njit(parallel=True, fastmath=True)
def process(x):
result = np.empty_like(x)
for i in prange(len(x)):
result[i] = np.sin(x[i]) ** 2 + np.cos(x[i]) ** 2
return result
benchmark(process, data64, label="float64 (80MB)")
benchmark(process, data32, label="float32 (40MB)")
# Resultado típico:
# float64: 12.3ms
# float32: 4.1ms → 3x más rápido con ~7 dígitos de precisión (vs ~15)
# ¿Cuándo es aceptable float32?
# ✅ Machine Learning (pesos, activaciones, gradientes)
# ✅ Procesamiento de señales e imágenes
# ✅ Simulaciones donde 7 dígitos bastan
# ✅ Visualización y gráficos
# ❌ Cálculos financieros precisos
# ❌ Álgebra lineal con matrices mal condicionadas
# ❌ Acumulación de sumas muy largas (error se acumula)
| # | Verificar | Si no lo haces... |
|---|---|---|
| 1 | ¿Hice warmup antes de medir? | Mides compilación, no ejecución |
| 2 | ¿Uso time.perf_counter()? | Resolución insuficiente para funciones rápidas |
| 3 | ¿Múltiples ejecuciones + mediana? | GC y OS noise distorsionan un solo run |
| 4 | ¿Probé con múltiples tamaños de datos? | Tu "optimización" puede ser más lenta para datos reales |
| 5 | ¿Verifiqué Throughput (Rendimiento real) en GB/s? | Podrías estar optimizando CPU cuando estás memory-bound |
| 6 | ¿Comparé operaciones equivalentes? | Speedup falso por comparar cosas diferentes |
| 7 | ¿Revisé parallel_diagnostics()? | parallel=True podría no estar haciendo nada |
| 8 | ¿Verifiqué que SVML está activo? | Podrías estar a 3x del rendimiento óptimo sin saberlo |
| 9 | ¿Probé float32 vs float64? | Posible 2-3x gratis si la precisión no es crítica |
Ya sabes cómo hacer que un solo core trabaje al máximo (memoria, cache, layout). Ahora multiplicamos eso por todos los cores. Pero el paralelismo mal aplicado no solo no ayuda — puede ser más lento que secuencial. Esta sección te enseña a paralelizar sin errores y a saber exactamente cuándo conviene y cuándo no.
| Mecanismo | Cómo funciona | Cuándo usar | Overhead |
|---|---|---|---|
parallel=True(auto-parallelización) |
Numba detecta operaciones con semántica paralela y las fusiona en kernels multi-thread | Operaciones sobre arrays completos: np.sin(x) + np.cos(y), reducciones |
Bajo (~0.05ms) |
prange(loops paralelos explícitos) |
Tú indicas qué loop paralelizar. Numba reparte iteraciones entre threads | Loops donde cada iteración es independiente y hace trabajo significativo | Bajo (~0.05ms) |
nogil=True +threading manual |
Tú manejas los threads de Python. Numba libera el GIL para que ejecuten en paralelo | Pipelines complejos, control total de scheduling, combinar Numba con I/O | Mínimo (tú controlas) |
parallel=True y prange usan el mismo thread pool interno de Numba. No compiten entre sí — si tienes un prange dentro de una función parallel=True, el prange se ejecuta en paralelo y las operaciones de array de esa función también se paralelizan (pero los loops prange internos se serializan para evitar over-subscription).
Numba no implementa su propio sistema de threads. Delega a una librería externa, y la elección de cuál usar importa:
| Layer | Backend | Fork-safe | Thread-safe | Dynamic scheduling | Instalar |
|---|---|---|---|---|---|
| tbb | Intel TBB | ✅ | ✅ | ✅ | conda install tbb |
| omp | OpenMP | ❌ (Linux) | ✅ | ❌ | Ya incluido en la mayoría de sistemas |
| workqueue | Built-in Numba | ✅ | ❌ | ❌ | Siempre disponible |
from numba import config, threading_layer
# Ver qué layer se seleccionó
# (necesitas ejecutar algo parallel primero)
@njit(parallel=True)
def _trigger(x):
return x + 1
_trigger(np.array([1.0]))
print(f"Threading layer: {threading_layer()}")
# Forzar un layer específico (ANTES de cualquier compilación parallel)
# Opción 1: variable de entorno
# $ NUMBA_THREADING_LAYER=tbb python mi_script.py
# Opción 2: programáticamente (antes de cualquier @njit parallel)
config.THREADING_LAYER = 'tbb'
# Opción 3: elegir por seguridad
config.THREADING_LAYER = 'safe' # fork + thread safe (requiere TBB)
config.THREADING_LAYER = 'threadsafe' # solo thread safe
config.THREADING_LAYER = 'forksafe' # solo fork safe
conda install tbb). Es el único backend que soporta dynamic scheduling (repartir trabajo dinámicamente cuando hay iteraciones con costo variable). También es el único que es fork-safe Y thread-safe, lo que importa si usas multiprocessing además de Numba.
from numba import (njit, prange, config,
set_num_threads, get_num_threads, get_thread_id)
# Ver cuántos threads tiene Numba
print(f"Threads disponibles: {config.NUMBA_NUM_THREADS}")
print(f"Threads activos: {get_num_threads()}")
# ─── ESCENARIO: 4 procesos Python, 8 cores físicos ───
# Sin ajustar: cada proceso usa 8 threads = 32 threads totales = oversubscription
# Con ajuste: cada proceso usa 2 threads = 8 threads totales = perfecto
# Opción A: Variable de entorno (antes de lanzar Python)
# $ NUMBA_NUM_THREADS=2 python worker.py
# Opción B: En runtime (más flexible)
set_num_threads(2) # Ahora parallel usa solo 2 threads
# Se puede cambiar dinámicamente:
set_num_threads(8) # Volver a usar todos para una tarea pesada
set_num_threads(2) # Reducir de nuevo
# ¡Funciona DENTRO de funciones @njit también!
@njit(parallel=True)
def adaptive_parallel(data, use_all_cores):
if use_all_cores:
set_num_threads(config.NUMBA_NUM_THREADS)
else:
set_num_threads(2)
result = np.empty(len(data))
for i in prange(len(data)):
result[i] = np.sqrt(data[i])
return result
# Identificar threads individuales (útil para debugging)
@njit(parallel=True)
def show_threads(n):
for i in prange(n):
tid = get_thread_id() # 0 a get_num_threads()-1
# No uses print aquí en producción (I/O serializa los threads)
return get_num_threads()
set_num_threads(n) solo puede bajar el número, nunca subir más allá de NUMBA_NUM_THREADS (que se fija al importar Numba). Si necesitas un máximo alto, no restrinjas NUMBA_NUM_THREADS con variable de entorno — usa set_num_threads() dinámicamente.
Por defecto, prange usa static scheduling: divide las N iteraciones en partes iguales entre los threads. Si cada iteración tarda lo mismo, es perfecto. Si no, algunos threads terminan antes y esperan ociosos.
# ─── PROBLEMA: iteraciones con costo variable ───
@njit(parallel=True)
def costo_variable(limits):
"""Cada iteración tiene un costo muy diferente."""
results = np.empty(len(limits))
for i in prange(len(limits)):
# Collatz: el costo varía enormemente según el número
n = limits[i]
count = 0
while n > 1:
if n % 2 == 0:
n //= 2
else:
n = n * 3 + 1
count += 1
results[i] = count
return results
# Con static scheduling: thread 0 puede terminar en 1ms
# mientras thread 3 tarda 50ms → los otros 3 threads esperan 49ms
# ─── SOLUCIÓN: dynamic scheduling con chunksize ───
# Requiere TBB como threading layer
from numba import parallel_chunksize
# Chunk size pequeño = mejor balanceo de carga, más overhead de scheduling
# Chunk size grande = peor balanceo, menos overhead
# Regla práctica: chunk_size ≈ n_iteraciones / (n_threads × 10)
limits = np.random.randint(1, 1_000_000, size=100_000)
# Static (default): desbalanceado
result1 = costo_variable(limits)
# Dynamic con chunk size optimizado
with parallel_chunksize(100): # 100K iteraciones / (8 threads × 10) ≈ 1250
result2 = costo_variable(limits) # pero chunks más pequeños → mejor balance
# También se puede hacer programáticamente
from numba import set_parallel_chunksize, get_parallel_chunksize
old = set_parallel_chunksize(100)
result3 = costo_variable(limits)
set_parallel_chunksize(old) # restaurar
from numba import threading_layer; print(threading_layer()) debe mostrar tbb.
# REGLA CRÍTICA: Numba NO soporta prange anidados en paralelo.
# El loop externo se ejecuta en paralelo, el interno se SERIALIZA.
@njit(parallel=True)
def nested_prange(matrix):
total = 0.0
for i in prange(matrix.shape[0]): # ✅ PARALELO
for j in prange(matrix.shape[1]): # ❌ SERIALIZADO (tratado como range)
total += matrix[i, j]
return total
# ¿Cómo saber cuál se paralelizó? → parallel_diagnostics
# Mostrará algo como:
# +--3 (parallel) ← loop externo
# +--2 (serial) ← loop interno serializado
# ─── CONSECUENCIA PRÁCTICA ───
# Si tienes un loop externo de 4 iteraciones y uno interno de 1000,
# solo el de 4 se paraleliza → ¡solo 4 threads trabajan!
# ❌ MAL: pocas iteraciones en el loop paralelo
@njit(parallel=True)
def mal_anidado(data_3d): # shape: (4, 1000, 1000)
result = np.zeros(data_3d.shape[0])
for c in prange(4): # Solo 4 iteraciones paralelas
for i in range(1000): # 1000 iteraciones seriales
for j in range(1000): # 1000 iteraciones seriales
result[c] += data_3d[c, i, j]
return result
# ✅ BIEN: aplanar para maximizar iteraciones paralelas
@njit(parallel=True)
def bien_aplanado(data_3d): # shape: (4, 1000, 1000)
C, H, W = data_3d.shape
flat_size = C * H # = 4000 iteraciones paralelas
result = np.zeros(C)
for idx in prange(flat_size):
c = idx // H
i = idx % H
row_sum = 0.0
for j in range(W):
row_sum += data_3d[c, i, j]
result[c] += row_sum # ¡Cuidado! Esto es una reducción en result[c]
return result
prange es conveniente pero limitado: un solo patrón de loop paralelo, sin control de scheduling, sin poder mezclar con I/O. Para pipelines complejos, necesitas nogil=True con threading de Python.
import threading
import numpy as np
from numba import njit
# ─── PATRÓN 1: Divide & Conquer manual ───
# Útil cuando necesitas control total de qué chunk va a qué thread
@njit(nogil=True)
def process_chunk(data, result, start, end):
"""Procesa un rango [start, end) del array.
nogil=True permite que múltiples threads ejecuten esto simultáneamente."""
for i in range(start, end):
# Operación costosa por elemento
val = data[i]
for _ in range(100): # simular trabajo pesado
val = np.sin(val) * np.cos(val) + 0.001
result[i] = val
def parallel_process(data, n_threads=None):
"""Procesa data en paralelo con n_threads threads."""
import os
if n_threads is None:
n_threads = os.cpu_count()
n = len(data)
result = np.empty_like(data)
chunk_size = (n + n_threads - 1) // n_threads # ceil division
threads = []
for t in range(n_threads):
start = t * chunk_size
end = min(start + chunk_size, n)
if start >= n:
break
thread = threading.Thread(
target=process_chunk,
args=(data, result, start, end)
)
threads.append(thread)
thread.start()
for t in threads:
t.join()
return result
data = np.random.rand(1_000_000)
result = parallel_process(data, n_threads=8)
import threading
from queue import Queue
@njit(nogil=True)
def heavy_transform(chunk, output):
"""Transformación pesada sobre un chunk de datos."""
for i in range(len(chunk)):
val = chunk[i]
for _ in range(50):
val = np.sqrt(np.abs(val) + 1.0)
output[i] = val
def pipeline_process(data, chunk_size=100_000, n_workers=4):
"""Pipeline: un thread carga chunks, varios workers los procesan.
Ideal cuando combinas I/O con cómputo pesado."""
n = len(data)
result = np.empty_like(data)
task_queue = Queue(maxsize=n_workers * 2) # buffer limitado
done_event = threading.Event()
def worker():
while not done_event.is_set() or not task_queue.empty():
try:
start, end = task_queue.get(timeout=0.1)
except:
continue
# heavy_transform tiene nogil=True
# → se ejecuta sin GIL → verdadero paralelismo
heavy_transform(data[start:end], result[start:end])
task_queue.task_done()
# Lanzar workers
workers = []
for _ in range(n_workers):
t = threading.Thread(target=worker, daemon=True)
t.start()
workers.append(t)
# Producir tareas (podría ser lectura de disco aquí)
for start in range(0, n, chunk_size):
end = min(start + chunk_size, n)
task_queue.put((start, end))
task_queue.join() # Esperar que todos los chunks se procesen
done_event.set() # Señalar a workers que terminen
for t in workers:
t.join()
return result
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from numba import njit
@njit(nogil=True)
def compute_stats(chunk):
"""Calcula media y varianza de un chunk (nogil para paralelismo real)."""
n = len(chunk)
mean = 0.0
for i in range(n):
mean += chunk[i]
mean /= n
var = 0.0
for i in range(n):
diff = chunk[i] - mean
var += diff * diff
var /= (n - 1)
return mean, var
def parallel_stats_pipeline(data, n_chunks=16, n_workers=8):
"""Calcula estadísticas globales procesando chunks en paralelo."""
chunk_size = (len(data) + n_chunks - 1) // n_chunks
chunks = [data[i*chunk_size : (i+1)*chunk_size] for i in range(n_chunks)]
# ThreadPoolExecutor + nogil = paralelismo real en Python
with ThreadPoolExecutor(max_workers=n_workers) as pool:
futures = [pool.submit(compute_stats, chunk) for chunk in chunks]
results = [f.result() for f in futures]
# Combinar resultados parciales
total_mean = np.mean([r[0] for r in results])
return total_mean
data = np.random.randn(100_000_000) # 800MB
result = parallel_stats_pipeline(data)
# ─── ESCENARIO 1: Operaciones vectoriales sobre arrays grandes ───
# → parallel=True (auto-detección + fusión de kernels)
@njit(parallel=True, fastmath=True)
def vector_ops(x, y):
# Numba fusiona todo esto en UN SOLO kernel paralelo
a = np.sin(x) ** 2
b = np.cos(y) ** 2
return a + b
# ✅ parallel=True es ideal: detecta las operaciones, las fusiona,
# y las reparte entre threads automáticamente
# ❌ prange sería innecesariamente verboso para esto
# ─── ESCENARIO 2: Loop con lógica compleja por iteración ───
# → prange (tú controlas el loop explícitamente)
@njit(parallel=True)
def complex_per_row(matrix):
n = matrix.shape[0]
result = np.empty(n)
for i in prange(n):
# Lógica compleja que parallel=True no puede auto-detectar
row = matrix[i, :]
sorted_indices = np.argsort(row) # no es paralelizable automáticamente
median_idx = sorted_indices[len(row) // 2]
result[i] = row[median_idx]
return result
# ✅ prange es ideal: cada fila es independiente pero la lógica es compleja
# ❌ parallel=True no podría auto-detectar el paralelismo aquí
# ─── ESCENARIO 3: Pipeline I/O + cómputo ───
# → nogil + threading (control total, mezclar I/O con cómputo)
@njit(nogil=True)
def transform_chunk(chunk, output):
for i in range(len(chunk)):
output[i] = np.sqrt(chunk[i]) * np.log(chunk[i] + 1)
def io_pipeline(filenames):
"""Lee archivos mientras procesa los anteriores.
prange NO puede hacer esto: necesitas solapar I/O con cómputo."""
with ThreadPoolExecutor(max_workers=6) as pool:
futures = []
for fname in filenames:
data = np.load(fname) # I/O (con GIL, pero no bloquea Numba)
output = np.empty_like(data)
fut = pool.submit(transform_chunk, data, output) # Cómputo sin GIL
futures.append((fname, fut, output))
# Mientras un thread procesa, otro puede estar leyendo el siguiente archivo
return [(fn, out) for fn, fut, out in futures if fut.result() is None or True]
# ✅ nogil+threading es ideal: solapas I/O con cómputo
# ❌ prange no puede mezclar I/O con cómputo
# ─── ESCENARIO 4: Múltiples funciones independientes ───
# → nogil + threading (ejecutar funciones DIFERENTES en paralelo)
@njit(nogil=True)
def task_a(data, output):
for i in range(len(data)):
output[i] = np.sin(data[i])
@njit(nogil=True)
def task_b(data, output):
for i in range(len(data)):
output[i] = np.cos(data[i])
def run_tasks_parallel(data_a, data_b):
out_a = np.empty_like(data_a)
out_b = np.empty_like(data_b)
t1 = threading.Thread(target=task_a, args=(data_a, out_a))
t2 = threading.Thread(target=task_b, args=(data_b, out_b))
t1.start(); t2.start()
t1.join(); t2.join()
return out_a, out_b
# ✅ Dos funciones DIFERENTES ejecutándose simultáneamente
# ❌ prange solo paraleliza iteraciones del MISMO loop
| Situación | Mecanismo | Por qué |
|---|---|---|
| Operaciones NumPy sobre arrays grandes | parallel=True | Auto-detecta y fusiona |
| Loop con iteraciones independientes | prange | Explícito, simple, eficiente |
| Iteraciones con costo muy variable | prange + parallel_chunksize + TBB | Dynamic scheduling evita desbalance |
| Pipeline I/O + cómputo | nogil + ThreadPoolExecutor | Solapa I/O con cómputo |
| Funciones diferentes en paralelo | nogil + threading | prange solo paraleliza un loop |
| Múltiples procesos Python + Numba | set_num_threads | Evitar oversubscription |
| Dataset > RAM | nogil + pipeline de chunks | Procesar en streaming sin cargar todo |
Oversubscription ocurre cuando lanzas más threads que cores físicos. Los threads compiten por CPU, el OS hace context switches constantemente, y la cache se invalida en cada switch. El resultado: más threads = más lento.
import os
from numba import config, set_num_threads
# ─── DETECTAR oversubscription ───
physical_cores = os.cpu_count() # ojo: incluye hyperthreading
# En un i7-12700 con 12 cores / 20 threads:
# os.cpu_count() = 20 (incluye HT)
# Numba usará 20 threads por defecto → subóptimo para cómputo puro
# Para cómputo numérico puro, usa cores FÍSICOS, no lógicos
# Hyperthreading NO ayuda con código numérico intensivo
# (dos threads en el mismo core comparten las unidades de cómputo)
# ─── PREVENIR oversubscription ───
# Caso 1: Script single-process
# Regla: threads = cores físicos (no lógicos)
set_num_threads(physical_cores // 2) # Aprox cores físicos si HT está activo
# Caso 2: Múltiples procesos (ej: 4 workers)
n_workers = 4
threads_per_worker = max(1, physical_cores // (2 * n_workers))
set_num_threads(threads_per_worker)
# Caso 3: Numba + otra librería paralela (NumPy con MKL, scikit-learn, etc.)
# ¡MKL y Numba AMBOS lanzan threads! Si no coordinan → oversubscription
# Solución: limitar threads de ambos
os.environ['MKL_NUM_THREADS'] = '4' # Limitar MKL
os.environ['OMP_NUM_THREADS'] = '4' # Limitar OpenMP
os.environ['NUMBA_NUM_THREADS'] = '4' # Limitar Numba
# IMPORTANTE: estas env vars deben setearse ANTES de importar las librerías
El patrón definitivo para procesar datos que no caben en cache (o incluso en RAM). Es la base de cómo funcionan internamente Spark, Dask, y Polars.
import numpy as np
from numba import njit, prange, set_num_threads, config
import time
# ─── PASO 1: Función de procesamiento por chunk (cache-friendly) ───
@njit(fastmath=True)
def process_single_chunk(chunk):
"""Procesa un chunk que cabe en cache L2/L3.
Retorna estadísticas parciales: (sum, sum_sq, min, max, count)"""
n = len(chunk)
total = 0.0
total_sq = 0.0
vmin = chunk[0]
vmax = chunk[0]
for i in range(n):
val = chunk[i]
# Todas las operaciones en un solo pass → datos en cache
total += val
total_sq += val * val
if val < vmin:
vmin = val
if val > vmax:
vmax = val
return total, total_sq, vmin, vmax, n
# ─── PASO 2: Paralelizar sobre chunks ───
@njit(parallel=True, fastmath=True)
def process_all_chunks(data, chunk_size):
"""Divide data en chunks y los procesa en paralelo."""
n = len(data)
n_chunks = (n + chunk_size - 1) // chunk_size
# Arrays para resultados parciales (un resultado por chunk)
sums = np.empty(n_chunks)
sums_sq = np.empty(n_chunks)
mins = np.empty(n_chunks)
maxs = np.empty(n_chunks)
counts = np.empty(n_chunks, dtype=np.int64)
for c in prange(n_chunks):
start = c * chunk_size
end = min(start + chunk_size, n)
chunk = data[start:end]
s, sq, mn, mx, cnt = process_single_chunk(chunk)
sums[c] = s
sums_sq[c] = sq
mins[c] = mn
maxs[c] = mx
counts[c] = cnt
return sums, sums_sq, mins, maxs, counts
# ─── PASO 3: Combinar resultados parciales ───
@njit
def combine_results(sums, sums_sq, mins, maxs, counts):
"""Combina estadísticas parciales en resultado global."""
total_count = np.sum(counts)
total_sum = np.sum(sums)
total_sum_sq = np.sum(sums_sq)
mean = total_sum / total_count
variance = (total_sum_sq / total_count) - mean * mean
global_min = mins[0]
global_max = maxs[0]
for i in range(1, len(mins)):
if mins[i] < global_min:
global_min = mins[i]
if maxs[i] > global_max:
global_max = maxs[i]
return mean, np.sqrt(variance), global_min, global_max, total_count
# ─── ORQUESTADOR ───
def massive_stats(data, chunk_size=131072):
"""chunk_size=131072 float64 = 1MB → cabe cómodamente en L2/L3"""
sums, sums_sq, mins, maxs, counts = process_all_chunks(data, chunk_size)
return combine_results(sums, sums_sq, mins, maxs, counts)
# ─── BENCHMARK ───
data = np.random.randn(100_000_000) # 800MB de datos
# Warmup
massive_stats(data)
start = time.perf_counter()
mean, std, vmin, vmax, count = massive_stats(data)
elapsed = time.perf_counter() - start
print(f"100M elementos en {elapsed*1000:.1f}ms")
print(f"Throughput (Rendimiento real): {data.nbytes / elapsed / 1e9:.1f} GB/s")
print(f"Mean={mean:.6f}, Std={std:.6f}, Min={vmin:.4f}, Max={vmax:.4f}")
| # | Verificar | Si no lo haces... |
|---|---|---|
| 1 | ¿Suficientes iteraciones en el prange externo? | Pocos threads trabajan, los demás esperan |
| 2 | ¿Cada iteración es independiente (sin escrituras compartidas)? | Race conditions, resultados corruptos |
| 3 | ¿El threading layer es apropiado? | fork+thread safety, dynamic scheduling no disponible |
| 4 | ¿Coordiné threads con otras librerías (MKL, OpenMP)? | Oversubscription, peor rendimiento |
| 5 | ¿Probé con scaling test (1K → 10M)? | Overhead de threading domina con datos pequeños |
| 6 | ¿Uso parallel_diagnostics() para verificar? | Numba puede haber serializado tu loop sin avisar |
| 7 | ¿Chunks de prange caben en cache L2/L3? | Cache thrashing entre threads, peor que secuencial |
| 8 | ¿Iteraciones de costo variable? → chunksize dinámico | Threads rápidos esperan a los lentos |
| 9 | ¿Necesito mezclar I/O + cómputo? | prange no puede, usa nogil + threading |
Esta sección es diferente. No es teoría — son 4 problemas reales resueltos paso a paso, desde la versión naive hasta una versión que exprime hasta el último nanosegundo. Cada caso sigue el mismo proceso disciplinado: medir → identificar bottleneck → optimizar → medir de nuevo → iterar. Al final, entenderás visceralmente la diferencia entre "rápido" y "ridículamente rápido".
El problema: Tienes un dataset de 100M de transacciones financieras. Cada transacción tiene un customer_id (0 a 999,999) y un amount. Necesitas calcular la suma, media, mínimo, máximo y conteo por cliente.
import numpy as np
import time
# Generar datos: 100M transacciones
N = 100_000_000
N_CUSTOMERS = 1_000_000
customer_ids = np.random.randint(0, N_CUSTOMERS, size=N, dtype=np.int32)
amounts = np.random.exponential(100.0, size=N).astype(np.float64)
# V0: NumPy puro — simple pero lento
def aggregate_numpy(ids, amounts, n_customers):
sums = np.zeros(n_customers)
counts = np.zeros(n_customers, dtype=np.int64)
mins = np.full(n_customers, np.inf)
maxs = np.full(n_customers, -np.inf)
# np.add.at es la forma "correcta" en NumPy, pero es MUY lenta
np.add.at(sums, ids, amounts)
np.add.at(counts, ids, 1)
np.minimum.at(mins, ids, amounts)
np.maximum.at(maxs, ids, amounts)
means = np.where(counts > 0, sums / counts, 0.0)
return sums, means, mins, maxs, counts
# Benchmark V0:
# ~45 segundos para 100M filas
# Bottleneck: np.add.at hace scatter operations — cada una es un cache miss aleatorio
np.add.at accede a posiciones aleatorias del array resultado (scatter). Cada acceso salta a un customer_id diferente → cache misses masivos. Además, hace 4 passes completos sobre los 100M de datos.
from numba import njit
@njit
def aggregate_v1(ids, amounts, n_customers):
"""Un solo pass sobre los datos: 4 operaciones fusionadas."""
sums = np.zeros(n_customers)
counts = np.zeros(n_customers, dtype=np.int64)
mins = np.full(n_customers, np.inf)
maxs = np.full(n_customers, -np.inf)
for i in range(len(ids)):
cid = ids[i]
amt = amounts[i]
sums[cid] += amt
counts[cid] += 1
if amt < mins[cid]:
mins[cid] = amt
if amt > maxs[cid]:
maxs[cid] = amt
means = np.empty(n_customers)
for i in range(n_customers):
if counts[i] > 0:
means[i] = sums[i] / counts[i]
else:
means[i] = 0.0
return sums, means, mins, maxs, counts
# Benchmark V1:
# ~3.2 segundos — 14x más rápido que NumPy
# ¿Por qué? Un solo pass = lee los 100M de datos UNA vez en vez de 4
# Pero: el scatter (sums[cid] +=) sigue siendo cache-unfriendly
@njit
def aggregate_v2(ids, amounts, n_customers):
"""Sort por customer_id → acceso secuencial al resultado."""
# Paso 1: Ordenar por customer_id
order = np.argsort(ids) # O(N log N) pero mejora localidad
# Paso 2: Scan secuencial sobre datos ordenados
sums = np.zeros(n_customers)
counts = np.zeros(n_customers, dtype=np.int64)
mins = np.full(n_customers, np.inf)
maxs = np.full(n_customers, -np.inf)
for idx in range(len(order)):
i = order[idx]
cid = ids[i]
amt = amounts[i]
# Ahora todos los customer_id iguales están JUNTOS
# → sums[cid] está en cache porque es el mismo cid que antes
sums[cid] += amt
counts[cid] += 1
if amt < mins[cid]:
mins[cid] = amt
if amt > maxs[cid]:
maxs[cid] = amt
means = np.empty(n_customers)
for i in range(n_customers):
means[i] = sums[i] / counts[i] if counts[i] > 0 else 0.0
return sums, means, mins, maxs, counts
# Benchmark V2:
# ~5.8 segundos (sort domina: ~4s sort + ~1.8s scan)
# ¡Más lento! El sort es caro para 100M. Pero el scan es 2x más rápido.
# Lección: sort-then-scan solo gana cuando N_CUSTOMERS << N
# y la distribución es muy dispersa. Aquí hay 100 transacciones/cliente → V1 gana.
from numba import njit, prange, get_num_threads
@njit(parallel=True)
def aggregate_v3(ids, amounts, n_customers):
"""Histograma paralelo: cada thread tiene su propia copia,
luego se reducen. Elimina race conditions sin locks."""
n_threads = get_num_threads()
n = len(ids)
# Cada thread tiene sus propios acumuladores (sin competencia)
local_sums = np.zeros((n_threads, n_customers))
local_counts = np.zeros((n_threads, n_customers), dtype=np.int64)
local_mins = np.full((n_threads, n_customers), np.inf)
local_maxs = np.full((n_threads, n_customers), -np.inf)
# Fase 1: Cada thread procesa su chunk
chunk_size = (n + n_threads - 1) // n_threads
for t in prange(n_threads):
start = t * chunk_size
end = min(start + chunk_size, n)
for i in range(start, end):
cid = ids[i]
amt = amounts[i]
local_sums[t, cid] += amt
local_counts[t, cid] += 1
if amt < local_mins[t, cid]:
local_mins[t, cid] = amt
if amt > local_maxs[t, cid]:
local_maxs[t, cid] = amt
# Fase 2: Reducir resultados parciales (paralelo sobre clientes)
sums = np.zeros(n_customers)
counts = np.zeros(n_customers, dtype=np.int64)
mins = np.full(n_customers, np.inf)
maxs = np.full(n_customers, -np.inf)
means = np.zeros(n_customers)
for c in prange(n_customers):
for t in range(n_threads):
sums[c] += local_sums[t, c]
counts[c] += local_counts[t, c]
if local_mins[t, c] < mins[c]:
mins[c] = local_mins[t, c]
if local_maxs[t, c] > maxs[c]:
maxs[c] = local_maxs[t, c]
if counts[c] > 0:
means[c] = sums[c] / counts[c]
return sums, means, mins, maxs, counts
# Benchmark V3 (8 cores):
# ~0.9 segundos — 50x vs NumPy, 3.5x vs V1
# PERO: usa n_threads × n_customers × 4 arrays = mucha memoria
# Con 8 threads × 1M clientes × 4 arrays × 8 bytes = 256 MB de buffers
@njit(parallel=True, fastmath=True)
def aggregate_v4(ids, amounts, n_customers):
"""Versión final: parallel histograms con chunks que caben en L3.
Equilibrio perfecto entre velocidad y memoria."""
n_threads = get_num_threads()
n = len(ids)
# Si n_customers es pequeño (< 100K), cada thread puede tener copia completa
# Si es grande (> 100K), procesamos por bloques de clientes
CUSTOMER_BLOCK = min(n_customers, 65536) # 64K clientes × 8 bytes = 512KB → cabe en L2
sums = np.zeros(n_customers)
counts = np.zeros(n_customers, dtype=np.int64)
mins = np.full(n_customers, np.inf)
maxs = np.full(n_customers, -np.inf)
# Buffer local por thread (solo CUSTOMER_BLOCK de tamaño)
local_sums = np.zeros((n_threads, CUSTOMER_BLOCK))
local_counts = np.zeros((n_threads, CUSTOMER_BLOCK), dtype=np.int64)
local_mins = np.empty((n_threads, CUSTOMER_BLOCK))
local_maxs = np.empty((n_threads, CUSTOMER_BLOCK))
# Procesar por bloques de customer_ids
for cblock_start in range(0, n_customers, CUSTOMER_BLOCK):
cblock_end = min(cblock_start + CUSTOMER_BLOCK, n_customers)
cblock_size = cblock_end - cblock_start
# Reset buffers locales
for t in prange(n_threads):
for c in range(cblock_size):
local_sums[t, c] = 0.0
local_counts[t, c] = 0
local_mins[t, c] = np.inf
local_maxs[t, c] = -np.inf
# Scatter paralelo sobre datos (cada thread su chunk de datos)
chunk_size = (n + n_threads - 1) // n_threads
for t in prange(n_threads):
start = t * chunk_size
end = min(start + chunk_size, n)
for i in range(start, end):
cid = ids[i]
if cid >= cblock_start and cid < cblock_end:
local_cid = cid - cblock_start
amt = amounts[i]
local_sums[t, local_cid] += amt
local_counts[t, local_cid] += 1
if amt < local_mins[t, local_cid]:
local_mins[t, local_cid] = amt
if amt > local_maxs[t, local_cid]:
local_maxs[t, local_cid] = amt
# Reduce paralelo
for c in prange(cblock_size):
gc = cblock_start + c
for t in range(n_threads):
sums[gc] += local_sums[t, c]
counts[gc] += local_counts[t, c]
if local_mins[t, c] < mins[gc]:
mins[gc] = local_mins[t, c]
if local_maxs[t, c] > maxs[gc]:
maxs[gc] = local_maxs[t, c]
# Calcular medias
means = np.empty(n_customers)
for c in prange(n_customers):
means[c] = sums[c] / counts[c] if counts[c] > 0 else 0.0
return sums, means, mins, maxs, counts
# Benchmark V4 (8 cores):
# ~1.1 segundos (ligeramente más lento que V3 por la condición if)
# PERO: usa solo n_threads × 64K × 4 × 8 = 16MB de buffers (vs 256MB de V3)
# Para 10M+ clientes, V3 se queda sin memoria y V4 sigue funcionando
| Versión | Técnica | Tiempo (100M filas) | Speedup vs V0 | Memoria extra |
|---|---|---|---|---|
| V0 | NumPy np.add.at | ~45,000 ms | 1x | Mínima |
| V1 | @njit single pass | ~3,200 ms | 14x | Mínima |
| V2 | Sort + scan | ~5,800 ms | 8x | Sort buffer |
| V3 | Parallel histograms | ~900 ms | 50x | 256 MB |
| V4 | Chunked parallel hist | ~1,100 ms | 41x | 16 MB |
El problema: Tienes 5,000 series temporales de 500,000 puntos cada una (ej: cotizaciones de 2 años a resolución de segundo). Para cada serie necesitas calcular una media móvil exponencial (EMA), bandas de Bollinger, y el Z-score rolling — todo de una sola pasada.
def indicators_python(prices, window):
"""Calcula EMA, Bollinger Bands y Z-score. Python puro."""
n_series, n_points = prices.shape
ema = np.empty_like(prices)
upper = np.empty_like(prices)
lower = np.empty_like(prices)
zscore = np.empty_like(prices)
alpha = 2.0 / (window + 1)
for s in range(n_series):
# EMA inicial
ema[s, 0] = prices[s, 0]
for i in range(1, n_points):
ema[s, i] = alpha * prices[s, i] + (1 - alpha) * ema[s, i-1]
# Rolling mean y std para Bollinger
for i in range(n_points):
start = max(0, i - window + 1)
segment = prices[s, start:i+1]
mean = np.mean(segment)
std = np.std(segment)
upper[s, i] = mean + 2 * std
lower[s, i] = mean - 2 * std
zscore[s, i] = (prices[s, i] - mean) / std if std > 0 else 0
return ema, upper, lower, zscore
# Con 5000 series × 500K puntos: HORAS (demasiado lento para medir)
# Bottleneck: O(N×W) por serie para rolling mean/std (recalcula todo el window)
@njit
def indicators_v1(prices, window):
"""Una serie a la vez. Rolling stats en O(N) con sumas acumuladas."""
n_series, n_points = prices.shape
ema = np.empty_like(prices)
upper = np.empty_like(prices)
lower = np.empty_like(prices)
zscore = np.empty_like(prices)
alpha = 2.0 / (window + 1)
for s in range(n_series):
# EMA: O(N) — dependencia secuencial (NO paralelizable por punto)
ema[s, 0] = prices[s, 0]
for i in range(1, n_points):
ema[s, i] = alpha * prices[s, i] + (1 - alpha) * ema[s, i - 1]
# Rolling mean y variance con sumas deslizantes: O(N)
# En vez de recalcular la media de scratch, mantenemos sum y sum_sq
win_sum = 0.0
win_sum_sq = 0.0
for i in range(n_points):
# Añadir nuevo elemento
win_sum += prices[s, i]
win_sum_sq += prices[s, i] * prices[s, i]
# Remover elemento que sale de la ventana
if i >= window:
old = prices[s, i - window]
win_sum -= old
win_sum_sq -= old * old
# Calcular estadísticas
count = min(i + 1, window)
mean = win_sum / count
var = (win_sum_sq / count) - mean * mean
if var < 0.0: # protección contra error numérico
var = 0.0
std = np.sqrt(var)
upper[s, i] = mean + 2.0 * std
lower[s, i] = mean - 2.0 * std
zscore[s, i] = (prices[s, i] - mean) / std if std > 1e-15 else 0.0
return ema, upper, lower, zscore
# Benchmark V1 (5000 × 500K):
# ~18 segundos
# Mejora algorítmica: de O(N×W) a O(N) → enormemente más rápido
# Pero: secuencial sobre las 5000 series
@njit(parallel=True, fastmath=True)
def indicators_v2(prices, window):
"""Paralelo sobre series. Cada serie se procesa en un thread."""
n_series, n_points = prices.shape
ema = np.empty_like(prices)
upper = np.empty_like(prices)
lower = np.empty_like(prices)
zscore = np.empty_like(prices)
alpha = 2.0 / (window + 1)
one_minus_alpha = 1.0 - alpha
# Paralelizar sobre series (cada una es independiente)
for s in prange(n_series):
# ── EMA ──
ema[s, 0] = prices[s, 0]
for i in range(1, n_points):
ema[s, i] = alpha * prices[s, i] + one_minus_alpha * ema[s, i - 1]
# ── Rolling stats O(N) ──
win_sum = 0.0
win_sum_sq = 0.0
for i in range(n_points):
p = prices[s, i]
win_sum += p
win_sum_sq += p * p
if i >= window:
old = prices[s, i - window]
win_sum -= old
win_sum_sq -= old * old
count = min(i + 1, window)
inv_count = 1.0 / count # multiplicar es más rápido que dividir
mean = win_sum * inv_count
var = win_sum_sq * inv_count - mean * mean
if var < 0.0:
var = 0.0
std = np.sqrt(var)
upper[s, i] = mean + 2.0 * std
lower[s, i] = mean - 2.0 * std
zscore[s, i] = (p - mean) / std if std > 1e-15 else 0.0
return ema, upper, lower, zscore
# Benchmark V2 (8 cores):
# ~2.8 segundos — 6.4x vs V1
# 5000 series / 8 threads = 625 series/thread → excelente granularidad
@njit(parallel=True, fastmath=True)
def indicators_v3(prices_T, window):
"""Datos transpuestos: shape (n_points, n_series).
Ahora el loop interno recorre series contiguas en memoria → SIMD."""
n_points, n_series = prices_T.shape
ema_T = np.empty_like(prices_T)
upper_T = np.empty_like(prices_T)
lower_T = np.empty_like(prices_T)
zscore_T = np.empty_like(prices_T)
alpha = np.float32(2.0) / np.float32(window + 1)
one_minus_alpha = np.float32(1.0) - alpha
# EMA: loop por tiempo, dentro vectorizado sobre series
for s in prange(n_series):
ema_T[0, s] = prices_T[0, s]
for i in range(1, n_points):
for s in prange(n_series):
ema_T[i, s] = alpha * prices_T[i, s] + one_minus_alpha * ema_T[i-1, s]
# Rolling stats: paralelo sobre series, secuencial por tiempo
for s in prange(n_series):
win_sum = np.float32(0.0)
win_sum_sq = np.float32(0.0)
for i in range(n_points):
p = prices_T[i, s]
win_sum += p
win_sum_sq += p * p
if i >= window:
old = prices_T[i - window, s]
win_sum -= old
win_sum_sq -= old * old
count = min(i + 1, window)
inv_c = np.float32(1.0) / np.float32(count)
mean = win_sum * inv_c
var = win_sum_sq * inv_c - mean * mean
if var < np.float32(0.0):
var = np.float32(0.0)
std = np.sqrt(var)
upper_T[i, s] = mean + np.float32(2.0) * std
lower_T[i, s] = mean - np.float32(2.0) * std
zscore_T[i, s] = (p - mean) / std if std > np.float32(1e-7) else np.float32(0.0)
return ema_T, upper_T, lower_T, zscore_T
# Uso: transponer a float32 antes
# prices_T = prices.T.astype(np.float32).copy() # .copy() para C-contiguous
# results = indicators_v3(prices_T, 20)
# Benchmark V3 (8 cores, float32):
# ~0.9 segundos
# float32 = mitad de memoria + doble SIMD width (8 floats vs 4 doubles)
| Versión | Técnica | Tiempo | Speedup acumulado |
|---|---|---|---|
| V0 | Python puro O(N×W) | Horas+ | 1x |
| V1 | @njit + O(N) rolling | ~18,000 ms | ~200x+ |
| V2 | + parallel sobre series | ~2,800 ms | ~1,300x+ |
| V3 | + float32 + transponer | ~900 ms | ~4,000x+ |
El problema: Tienes 50M de eventos con un user_id y necesitas "enriquecer" cada evento con datos del perfil del usuario (lookup table de 2M usuarios). Es esencialmente un LEFT JOIN a velocidad de C.
# Datos
N_EVENTS = 50_000_000
N_USERS = 2_000_000
event_user_ids = np.random.randint(0, N_USERS, size=N_EVENTS, dtype=np.int64)
# Lookup table: user_id → (age, score, region)
user_ages = np.random.randint(18, 80, size=N_USERS, dtype=np.int32)
user_scores = np.random.rand(N_USERS).astype(np.float32)
user_regions = np.random.randint(0, 50, size=N_USERS, dtype=np.int8)
# V0: Direct indexing (los user_ids ya son índices 0..N_USERS-1)
def lookup_numpy(event_ids, ages, scores, regions):
# Si los IDs son contiguos 0..N, esto es un simple fancy indexing
e_ages = ages[event_ids]
e_scores = scores[event_ids]
e_regions = regions[event_ids]
return e_ages, e_scores, e_regions
# Benchmark V0:
# ~850 ms (3 passes, cada uno hace 50M random accesses)
# Bottleneck: 3 passes separados × acceso aleatorio = 3× cache misses
@njit
def lookup_v1(event_ids, ages, scores, regions):
"""Un solo pass: por cada evento, buscar todos los atributos a la vez."""
n = len(event_ids)
e_ages = np.empty(n, dtype=np.int32)
e_scores = np.empty(n, dtype=np.float32)
e_regions = np.empty(n, dtype=np.int8)
for i in range(n):
uid = event_ids[i]
e_ages[i] = ages[uid]
e_scores[i] = scores[uid]
e_regions[i] = regions[uid]
return e_ages, e_scores, e_regions
# Benchmark V1: ~420 ms — 2x vs V0
# ¿Por qué solo 2x con 3 passes → 1? Porque ages, scores, regions
# están en direcciones diferentes → el acceso a ages[uid] no trae
# scores[uid] al cache. Cada uno sigue siendo un random access.
# AQUÍ aplicamos el patrón INVERSO al caso de partículas
# En el Caso 1, SoA era mejor porque procesábamos UN atributo a la vez
# En lookup, procesamos TODOS los atributos de un usuario a la vez
# → AoS es mejor porque todos los datos del usuario están juntos en cache
# Empaquetar perfil del usuario en una sola estructura contigua
# Usar un array 2D donde cada fila es un usuario
# Para máxima localidad, almacenar todo como float32
user_profiles = np.empty((N_USERS, 3), dtype=np.float32)
user_profiles[:, 0] = user_ages.astype(np.float32)
user_profiles[:, 1] = user_scores
user_profiles[:, 2] = user_regions.astype(np.float32)
# Ahora: user_profiles[uid] = [age, score, region] contiguos en 12 bytes
# ¡Caben en UNA cache line! Un solo cache miss trae TODO el perfil.
@njit(parallel=True)
def lookup_v2(event_ids, profiles):
"""Lookup con datos empaquetados: un cache miss trae todo el perfil."""
n = len(event_ids)
results = np.empty((n, 3), dtype=np.float32)
for i in prange(n):
uid = event_ids[i]
results[i, 0] = profiles[uid, 0]
results[i, 1] = profiles[uid, 1]
results[i, 2] = profiles[uid, 2]
return results
# Benchmark V2: ~180 ms — 4.7x vs V0
# Los 3 atributos del mismo usuario están en la misma cache line
# → 1 cache miss en vez de 3 por evento
@njit(parallel=True, fastmath=True)
def lookup_v3(event_ids, profiles):
"""Ordenar eventos por user_id → accesos a profiles se vuelven
semi-secuenciales. Usuarios populares ya están en cache."""
n = len(event_ids)
# Paso 1: Sort indices por user_id
order = np.argsort(event_ids)
# Paso 2: Lookup en orden sorted → localidad temporal masiva
results = np.empty((n, 3), dtype=np.float32)
for i in prange(n):
orig_idx = order[i]
uid = event_ids[orig_idx]
results[orig_idx, 0] = profiles[uid, 0]
results[orig_idx, 1] = profiles[uid, 1]
results[orig_idx, 2] = profiles[uid, 2]
return results
# Benchmark V3: ~290 ms (sort ~200ms + lookup ~90ms)
# El lookup puro es 2x más rápido (90 vs 180ms)
# Pero el sort cuesta 200ms. Solo gana si haces MUCHOS lookups con los mismos IDs
# En pipelines donde el mismo dataset se procesa repetidamente → pre-sort una vez
| Versión | Técnica | Tiempo | Speedup |
|---|---|---|---|
| V0 | NumPy fancy indexing × 3 | ~850 ms | 1x |
| V1 | @njit single pass | ~420 ms | 2x |
| V2 | AoS packing + parallel | ~180 ms | 4.7x |
| V3 | Pre-sort + packed | ~90 ms (lookup only) | 9.4x (amortizado) |
El problema: Un pipeline de procesamiento de señales de sensores IoT. Flujo: datos crudos (20M muestras × 8 sensores) → filtrado de outliers → normalización → detección de anomalías → estadísticas por ventana temporal. Cada etapa alimenta la siguiente.
# Datos: 20M muestras × 8 sensores
N_SAMPLES = 20_000_000
N_SENSORS = 8
raw_data = np.random.randn(N_SAMPLES, N_SENSORS).astype(np.float32) * 100
# Inyectar outliers
outlier_mask = np.random.rand(N_SAMPLES, N_SENSORS) < 0.001
raw_data[outlier_mask] *= 100
# V0: Cada etapa lee y escribe un array completo
def pipeline_v0(data, window=1000):
# Etapa 1: Filtrar outliers (reemplazar con NaN → interpolar)
cleaned = data.copy() # 640MB copia
means = np.mean(data, axis=0)
stds = np.std(data, axis=0)
for j in range(N_SENSORS):
mask = np.abs(data[:, j] - means[j]) > 5 * stds[j]
cleaned[mask, j] = means[j]
# Etapa 2: Normalizar a Z-score
normalized = np.empty_like(cleaned) # 640MB más
for j in range(N_SENSORS):
m = np.mean(cleaned[:, j])
s = np.std(cleaned[:, j])
normalized[:, j] = (cleaned[:, j] - m) / s
# Etapa 3: Detección de anomalías (score combinado)
anomaly_scores = np.sqrt(np.sum(normalized ** 2, axis=1)) # 160MB
# Etapa 4: Estadísticas por ventana
n_windows = N_SAMPLES // window
window_stats = np.empty((n_windows, 4)) # mean, std, max_anomaly, anomaly_rate
for w in range(n_windows):
s = w * window
e = s + window
scores = anomaly_scores[s:e]
window_stats[w, 0] = np.mean(scores)
window_stats[w, 1] = np.std(scores)
window_stats[w, 2] = np.max(scores)
window_stats[w, 3] = np.sum(scores > 3.0) / window
return anomaly_scores, window_stats
# Benchmark V0:
# ~12 segundos, pico de memoria ~2.5 GB
# 4 passes completos sobre 640MB + arrays temporales gigantes
@njit(parallel=True, fastmath=True)
def pipeline_v1(data, window):
"""TODAS las etapas fusionadas en un solo pass por ventana.
Cada ventana se procesa completa: clean → normalize → score → stats.
Los datos de una ventana caben en cache: 1000 × 8 × 4 = 32KB (L1!)"""
n_samples, n_sensors = data.shape
n_windows = n_samples // window
# Pre-calcular estadísticas globales para cleaning
# (necesitamos un pre-pass, pero solo sobre 8 columnas)
global_means = np.zeros(n_sensors, dtype=np.float32)
global_stds = np.zeros(n_sensors, dtype=np.float32)
for j in prange(n_sensors):
s = np.float32(0.0)
sq = np.float32(0.0)
for i in range(n_samples):
v = data[i, j]
s += v
sq += v * v
inv_n = np.float32(1.0) / np.float32(n_samples)
global_means[j] = s * inv_n
var = sq * inv_n - (s * inv_n) ** 2
global_stds[j] = np.sqrt(var) if var > 0 else np.float32(1.0)
# Output arrays
anomaly_scores = np.empty(n_samples, dtype=np.float32)
window_stats = np.empty((n_windows, 4), dtype=np.float32)
# Precalcular means/stds de datos limpios para normalización
# (en producción esto vendría de datos históricos, no del batch actual)
clean_means = global_means.copy()
clean_stds = global_stds.copy()
threshold = np.float32(5.0)
anomaly_thresh = np.float32(3.0)
# ── LOOP PRINCIPAL: paralelo por ventanas ──
for w in prange(n_windows):
w_start = w * window
w_end = w_start + window
# Acumuladores de la ventana
score_sum = np.float32(0.0)
score_sq_sum = np.float32(0.0)
score_max = np.float32(0.0)
anomaly_count = 0
for i in range(w_start, w_end):
# ── Etapa 1+2+3 FUSIONADAS por muestra ──
score_sq_total = np.float32(0.0)
for j in range(n_sensors):
val = data[i, j]
# Clean: reemplazar outliers
diff = val - global_means[j]
if diff > threshold * global_stds[j] or diff < -threshold * global_stds[j]:
val = global_means[j]
# Normalize
z = (val - clean_means[j]) / clean_stds[j]
# Accumulate anomaly score
score_sq_total += z * z
# Score para esta muestra
score = np.sqrt(score_sq_total)
anomaly_scores[i] = score
# Etapa 4: acumular stats de ventana
score_sum += score
score_sq_sum += score * score
if score > score_max:
score_max = score
if score > anomaly_thresh:
anomaly_count += 1
# Escribir stats de la ventana
inv_w = np.float32(1.0) / np.float32(window)
w_mean = score_sum * inv_w
w_var = score_sq_sum * inv_w - w_mean * w_mean
window_stats[w, 0] = w_mean
window_stats[w, 1] = np.sqrt(w_var) if w_var > 0 else np.float32(0.0)
window_stats[w, 2] = score_max
window_stats[w, 3] = np.float32(anomaly_count) * inv_w
return anomaly_scores, window_stats
# Benchmark V1:
# ~0.45 segundos — 27x vs V0
# Pico de memoria: ~200MB (vs 2.5GB de V0)
# ¿POR QUÉ tan rápido?
# 1. UN solo pass sobre los datos (no 4)
# 2. Cada ventana (1000×8×4 = 32KB) cabe ENTERA en L1 cache
# 3. CERO arrays temporales intermedios
# 4. 20K ventanas en prange → excelente granularidad paralela
# 5. fastmath + float32 → SIMD procesa 8 valores por instrucción
| Versión | Tiempo | Memoria pico | Speedup | Passes sobre datos |
|---|---|---|---|---|
| V0 (NumPy etapas separadas) | ~12,000 ms | ~2.5 GB | 1x | 4+ passes |
| V1 (Fusionado + parallel) | ~450 ms | ~200 MB | 27x | 1 pass + 1 pre-pass |
Si analizas los 4 casos, emergen principios universales que aplican a CUALQUIER problema de datos masivos:
| # | Principio | Efecto típico | Dónde se vio |
|---|---|---|---|
| 1 | Reducir passes sobre los datos (fusionar operaciones) | Nx (N = passes eliminados) | Todos los casos |
| 2 | Eliminar arrays temporales (fusionar en un loop) | 2-5x + reducción de memoria 3-10x | Caso 1 (V1), Caso 4 (V1) |
| 3 | Mejorar localidad de cache (AoS/SoA según patrón) | 2-8x | Caso 3 (V2) |
| 4 | Paralelizar sobre la dimensión con más iteraciones | ~Nx (N = cores) | Caso 2 (series), Caso 4 (ventanas) |
| 5 | Chunk size = L1/L2 cache | 2-5x vs chunks grandes | Caso 1 (V4), Caso 4 (V1) |
| 6 | Cambio algorítmico primero (O(N²) → O(N)) | 10-1000x | Caso 2 (V1): rolling O(N×W) → O(N) |
| 7 | float32 cuando la precisión lo permite | 2-3x (memoria + SIMD) | Caso 2 (V3), Caso 4 (V1) |
| 8 | Histogramas thread-local → reduce (evitar locks) | Nx sin race conditions | Caso 1 (V3) |
| 9 | Pre-sort para convertir acceso aleatorio en secuencial | 2-3x en lookups (amortizado) | Caso 3 (V3) |
| 10 | Medir Throughput (Rendimiento real) (GB/s) para saber si estás memory-bound | Evita optimizar CPU cuando el límite es RAM | Todos |
Todo lo anterior te enseñó a hacer las cosas bien. Esta sección es diferente — te enseña a no destruirte a ti mismo. Cada anti-pattern aquí parece razonable, pasa los tests, y funciona con datos pequeños. Pero a escala masiva, cada uno es una bomba de tiempo que convierte minutos en horas y horas en días. Los he ordenado del más común al más sutil.
Severidad: CATASTRÓFICA | Frecuencia: MUY ALTA
Este es el anti-pattern más devastador y más común. Debido al overhead de dispatch (~1μs × millones de llamadas). Parece completamente inocente:
# ☠️ TRAMPA: parallel=True DENTRO de un loop Python que itera millones de veces
@njit(parallel=True)
def process_single_row(row):
"""Procesa UNA fila con parallel=True."""
total = 0.0
for i in prange(len(row)):
total += np.sqrt(row[i]) * np.sin(row[i])
return total
# La llamada "inocente":
data = np.random.rand(1_000_000, 50) # 1M filas, 50 columnas
results = np.empty(data.shape[0])
for i in range(data.shape[0]): # 1 MILLÓN de iteraciones
results[i] = process_single_row(data[i]) # ☠️ cada llamada: dispatch + thread sync
# Tiempo: ~180 segundos
# POR QUÉ: Cada llamada a process_single_row():
# 1. Python → Numba dispatch: ~1μs (boxing/unboxing de argumentos)
# 2. Thread pool wake up: ~5-20μs (despertar N threads dormidos)
# 3. Work distribution: ~2μs (repartir 50 elementos entre 8 threads)
# 4. Thread sync barrier: ~5-10μs (esperar que todos terminen)
# 5. Cómputo real: ~0.1μs (50 sqrt+sin es NADA)
# Total overhead por llamada: ~30μs × 1M llamadas = 30 SEGUNDOS de puro overhead
# El cómputo real total es ~0.1 segundos
@njit(parallel=True, fastmath=True)
def process_all_rows(data):
"""El loop sobre filas está DENTRO — UN solo dispatch, UN solo thread sync."""
n_rows, n_cols = data.shape
results = np.empty(n_rows)
for i in prange(n_rows): # 1M iteraciones paralelas
total = 0.0
for j in range(n_cols):
total += np.sqrt(data[i, j]) * np.sin(data[i, j])
results[i] = total
return results
results = process_all_rows(data)
# Tiempo: ~0.3 segundos — 600x más rápido
# UN dispatch + 1M iteraciones de prange = overhead despreciable
parallel=True o @njit en un loop Python que itera más de ~1000 veces. El overhead del dispatch Python→Numba (~1μs) × N llamadas te destruye. Siempre mueve el loop DENTRO de la función Numba.
Severidad: ALTA | Frecuencia: MUY ALTA
# ☠️ TRAMPA: np.empty/np.zeros dentro de un loop que itera mucho
@njit
def process_with_temp_bad(data, n_iter):
n = data.shape[0]
result = np.zeros(n)
for iteration in range(n_iter):
temp = np.empty(n) # ☠️ ALOCACIÓN: syscall al OS
temp2 = np.zeros(n) # ☠️ ALOCACIÓN + memset a 0
for i in range(n):
temp[i] = np.sin(data[i]) * iteration
temp2[i] = temp[i] ** 2
for i in range(n):
result[i] += temp2[i]
return result
# Con n=100_000, n_iter=1000:
# Tiempo: ~4.8 segundos
# 1000 iteraciones × 2 alocaciones × 100K × 8 bytes = 1.6 GB total de alocación
# Cada np.empty llama a NRT_MemInfo_alloc → malloc → puede causar page fault
# Cada np.zeros además hace memset (llena de ceros 800KB)
# ✅ CORRECTO: pre-alocar fuera del loop, reutilizar
@njit
def process_with_temp_good(data, n_iter):
n = data.shape[0]
result = np.zeros(n)
temp = np.empty(n) # ✅ UNA vez
temp2 = np.empty(n) # ✅ UNA vez (ni siquiera necesita zeros)
for iteration in range(n_iter):
for i in range(n):
temp[i] = np.sin(data[i]) * iteration
temp2[i] = temp[i] ** 2
for i in range(n):
result[i] += temp2[i]
return result
# Tiempo: ~1.2 segundos — 4x más rápido
# ⚡ AÚN MEJOR: eliminar temporales completamente fusionando loops
@njit
def process_fused(data, n_iter):
n = data.shape[0]
result = np.zeros(n)
for iteration in range(n_iter):
for i in range(n):
val = np.sin(data[i]) * iteration # en registro, no en array
result[i] += val * val # directo, sin temporal
return result
# Tiempo: ~0.8 segundos — 6x vs original
Severidad: ALTA | Frecuencia: ALTA
Este es el más traicionero: tu código compila, ejecuta, produce resultados correctos... pero parallel=True no paralelizó absolutamente nada. Usas 1 core de 8 sin saberlo. ¿Y sabes por qué? porque Numba no avisa cuando falla la paralelización...
# ☠️ TRAMPA 3A: usar range() en vez de prange()
@njit(parallel=True)
def trap_range(data):
total = 0.0
for i in range(len(data)): # ☠️ range, NO prange → secuencial
total += data[i] ** 2
return total
# parallel=True está activado pero NO se usa
# Numba NO da error, NO da warning (en la mayoría de los casos)
# ☠️ TRAMPA 3B: dependencia entre iteraciones que impide paralelizar
@njit(parallel=True)
def trap_dependency(data):
result = np.empty(len(data))
result[0] = data[0]
for i in prange(1, len(data)):
result[i] = result[i-1] + data[i] # ☠️ depende del valor anterior
return result
# Numba PUEDE serializar el prange sin avisarte
# El resultado será CORRECTO (ejecuta secuencialmente) pero sin speedup
# ☠️ TRAMPA 3C: operaciones sobre arrays sin parallel semántico
@njit(parallel=True)
def trap_no_parallel_ops(data):
# np.sort NO tiene semántica paralela en Numba
sorted_data = np.sort(data)
# np.searchsorted tampoco
idx = np.searchsorted(sorted_data, 0.5)
return sorted_data, idx
# parallel=True no hace nada aquí — estas ops no se paralelizan
# ☠️ TRAMPA 3D: prange en el loop INTERNO (se serializa)
@njit(parallel=True)
def trap_inner_prange(matrix):
total = 0.0
for i in prange(matrix.shape[0]): # ✅ paralelo
for j in prange(matrix.shape[1]): # ☠️ SERIALIZADO silenciosamente
total += matrix[i, j]
return total
# Si tienes 4 filas y 10,000 columnas → solo 4 threads trabajan
# Después de CADA función parallel=True que escribas, haz esto:
trap_range(np.random.rand(100))
trap_range.parallel_diagnostics(level=1)
# Si NO ves loops marcados como "parallel" → no se paralelizó NADA
# Si ves "(serial)" donde esperabas "(parallel)" → Numba lo serializó
# Automatizar la verificación:
def verify_parallel(func, *args):
"""Verifica que una función parallel=True realmente paraleliza."""
import io, sys
func(*args) # compilar
# Capturar output de parallel_diagnostics
old_stdout = sys.stdout
sys.stdout = buf = io.StringIO()
try:
func.parallel_diagnostics(level=1)
except:
print("NO PARALLEL", file=sys.stderr)
sys.stdout = old_stdout
output = buf.getvalue()
if 'parallel' not in output.lower():
print(f"⚠️ {func.py_func.__name__}: parallel=True NO tiene efecto")
elif 'serial' in output.lower():
print(f"⚠️ {func.py_func.__name__}: algunos loops están serializados")
else:
print(f"✅ {func.py_func.__name__}: paralización confirmada")
Severidad: CATASTRÓFICA | Frecuencia: MEDIA
Numba NO detecta race conditions en arrays en la mayoría de los casos. Tu código da resultados diferentes cada vez que lo ejecutas, y puedes no notarlo si la diferencia es pequeña. Esto en sistemas donde la precisión y exactitud son críticas, sería un desastre.
# ☠️ TRAMPA: múltiples threads escriben al MISMO elemento
@njit(parallel=True)
def race_histogram(data, n_bins):
"""Histograma con race condition."""
bins = np.zeros(n_bins, dtype=np.int64)
for i in prange(len(data)):
bin_idx = int(data[i] * n_bins)
if bin_idx >= n_bins:
bin_idx = n_bins - 1
bins[bin_idx] += 1 # ☠️ RACE: thread A y B incrementan el mismo bin
return bins
data = np.random.rand(10_000_000)
# Ejecutar 10 veces — resultados DIFERENTES cada vez:
for _ in range(10):
result = race_histogram(data, 100)
print(np.sum(result)) # Debería ser 10_000_000 SIEMPRE
# Output: 9_998_374, 9_999_102, 9_997_850, ... ← ¡DATOS CORRUPTOS!
# ☠️ TRAMPA SUTIL: race en slice de array
@njit(parallel=True)
def race_slice(data):
n = data.shape[0]
result = np.zeros(4)
for i in prange(n):
result[:] += data[i] # ☠️ RACE: todos los threads escriben a result[:]
return result
@njit(parallel=True)
def safe_histogram(data, n_bins):
"""Histograma sin race condition: cada thread tiene su propio buffer."""
n_threads = get_num_threads()
n = len(data)
# Cada thread tiene su propia copia
local_bins = np.zeros((n_threads, n_bins), dtype=np.int64)
chunk = (n + n_threads - 1) // n_threads
for t in prange(n_threads):
start = t * chunk
end = min(start + chunk, n)
for i in range(start, end):
bin_idx = int(data[i] * n_bins)
if bin_idx >= n_bins:
bin_idx = n_bins - 1
local_bins[t, bin_idx] += 1 # ✅ cada thread su propio array
# Reducir
bins = np.zeros(n_bins, dtype=np.int64)
for b in prange(n_bins):
for t in range(n_threads):
bins[b] += local_bins[t, b]
return bins
# np.sum(safe_histogram(data, 100)) == 10_000_000 SIEMPRE
+= sobre una variable simple SÍ son seguras en prange (Numba las detecta y maneja automáticamente). Las reducciones sobre ARRAYS o slices de arrays NO son seguras — Numba no las detecta.
Severidad: ALTA | Frecuencia: MEDIA
# ☠️ TRAMPA: llamar la misma función con TIPOS DIFERENTES
@njit
def flexible_func(x):
return np.sum(x ** 2)
# Cada tipo nuevo = NUEVA COMPILACIÓN (0.1-2 segundos cada una)
flexible_func(np.array([1, 2, 3])) # int64 → compila
flexible_func(np.array([1.0, 2.0])) # float64 → compila OTRA VEZ
flexible_func(np.array([1.0, 2.0], dtype=np.float32)) # float32 → compila OTRA VEZ
flexible_func(np.array([[1.0, 2.0]])) # 2D float64 → compila OTRA VEZ
# En un pipeline real donde los datos llegan como int a veces y float otras,
# puedes estar recompilando sin saberlo
# ☠️ CASO PEOR: generar tipos distintos en un loop
for dtype in [np.int32, np.int64, np.float32, np.float64]:
data = np.random.rand(1000).astype(dtype)
flexible_func(data) # ☠️ 4 compilaciones = ~4 segundos de overhead
# ✅ SOLUCIÓN: estandarizar tipos ANTES de llamar a Numba
data_clean = data.astype(np.float64) # SIEMPRE float64
flexible_func(data_clean) # Usa la versión ya compilada
# ✅ SOLUCIÓN alternativa: usar signature explícita
@njit('float64(float64[:])')
def fixed_func(x):
return np.sum(x ** 2)
# Ahora si pasas int32, Numba da ERROR en vez de recompilar silenciosamente
# Verificar cuántas compilaciones tiene tu función:
print(f"Especializaciones: {len(flexible_func.signatures)}")
# Si este número crece con el tiempo → tienes un problema
Severidad: MEDIA | Frecuencia: MUY ALTA
# ☠️ TRAMPA: reescribir algo que NumPy ya hace en C/BLAS optimizado
# Multiplicación de matrices: NumPy llama a BLAS (MKL/OpenBLAS)
# que está optimizado por ingenieros de Intel con décadas de trabajo
A = np.random.rand(1000, 1000)
B = np.random.rand(1000, 1000)
# ❌ Intentar hacer matmul en Numba
@njit(parallel=True)
def matmul_numba(A, B):
M, K = A.shape
K2, N = B.shape
C = np.zeros((M, N))
for i in prange(M):
for j in range(N):
for k in range(K):
C[i, j] += A[i, k] * B[k, j]
return C
# Benchmark:
# np.dot(A, B): ~15 ms (BLAS, AVX-512, cache blocking perfecto)
# matmul_numba: ~450 ms (30x MÁS LENTO que NumPy)
# OTROS CASOS donde NumPy gana:
# np.linalg.solve() → LAPACK optimizado
# np.fft.fft() → FFTW/MKL optimizado
# np.linalg.svd() → LAPACK optimizado
# Estas operaciones tienen DÉCADAS de optimización. No las reescribas.
# Numba gana cuando:
# ✅ Fusionar múltiples operaciones NumPy en un solo loop
# ✅ Lógica condicional (if/else) dentro de loops
# ✅ Acceso a elementos individuales en loops
# ✅ Algoritmos con estado (EMA, filtro Kalman, etc.)
# ✅ Operaciones no vectorizables nativamente
# NumPy gana cuando:
# ❌ Álgebra lineal densa (BLAS/LAPACK)
# ❌ FFT
# ❌ Una sola operación vectorizada sobre un array grande
# ❌ Operaciones que NumPy ya ejecuta en C compilado
Severidad: MEDIA | Frecuencia: MEDIA
# ☠️ TRAMPA: cache=True no detecta cambios en funciones importadas
# archivo: utils.py
from numba import njit
@njit
def helper(x):
return x ** 2 # Versión 1
# archivo: main.py
from utils import helper
@njit(cache=True)
def main_func(data):
result = np.empty_like(data)
for i in range(len(data)):
result[i] = helper(data[i])
return result
# Ahora cambias utils.py:
# def helper(x): return x ** 3 # Versión 2
#
# main_func sigue usando el CACHE VIEJO con x**2
# ¡Tu código da resultados incorrectos sin error!
# ✅ SOLUCIÓN: limpiar cache después de cambios
# $ find . -name "__pycache__" -type d -exec rm -rf {} +
# $ find . -name "*.nbi" -delete && find . -name "*.nbc" -delete
# ☠️ OTRA TRAMPA: globals tratados como constantes
THRESHOLD = 0.5
@njit
def uses_global(data):
count = 0
for i in range(len(data)):
if data[i] > THRESHOLD: # Numba usa el valor de compilación
count += 1
return count
uses_global(np.random.rand(100)) # compila con THRESHOLD=0.5
THRESHOLD = 0.9 # ¡Cambiar esto NO tiene efecto!
uses_global(np.random.rand(100)) # sigue usando 0.5
# ✅ SOLUCIÓN: pasar como argumento
@njit
def uses_argument(data, threshold): # ✅ argumento, no global
count = 0
for i in range(len(data)):
if data[i] > threshold:
count += 1
return count
Severidad: CATASTRÓFICA | Frecuencia: MEDIA
# En Numba, boundscheck está DESACTIVADO por defecto (por rendimiento)
# Esto significa que acceder fuera de rango NO da error
# — lee o escribe basura en memoria adyacente
@njit
def silent_corruption(data, indices):
result = np.zeros(10)
for i in range(len(indices)):
idx = indices[i]
result[idx] += data[i] # Si idx >= 10 → escribe en memoria AJENA
return result # result puede tener valores corruptos
# Con datos reales esto puede:
# - Corromper otros arrays en memoria
# - Causar segfault HORAS después (cuando accedes al array corrupto)
# - Dar resultados sutilmente incorrectos que pasan validación
# ✅ DURANTE DESARROLLO: activar boundscheck
@njit(boundscheck=True) # o NUMBA_BOUNDSCHECK=1 como env var
def safe_during_dev(data, indices):
result = np.zeros(10)
for i in range(len(indices)):
idx = indices[i]
result[idx] += data[i] # Ahora lanza IndexError si idx >= 10
return result
# ✅ EN PRODUCCIÓN: validar indices manualmente (sin boundscheck overhead)
@njit
def safe_in_production(data, indices, n_bins):
result = np.zeros(n_bins)
for i in range(len(indices)):
idx = indices[i]
if 0 <= idx < n_bins: # ✅ Guard manual, mínimo overhead
result[idx] += data[i]
return result
Severidad: ALTA | Frecuencia: ALTA
# ☠️ TRAMPA: Numba parallel + NumPy con MKL + scikit-learn paralelo
# Cada uno lanza SUS PROPIOS threads sin coordinarse
# Escenario real:
# - Tu CPU tiene 8 cores físicos (16 con HyperThreading)
# - Numba lanza 16 threads (ve 16 "cores" lógicos)
# - NumPy con MKL lanza 16 threads para np.dot()
# - scikit-learn lanza 16 threads para cross_val_score()
# Total: 48 threads compitiendo por 8 cores físicos
# → Context switches masivos, cache invalidation, 3x MÁS LENTO
# ☠️ Ejemplo concreto:
@njit(parallel=True)
def numba_step(data):
return np.sum(data ** 2) # Numba usa 16 threads
def pipeline(data):
step1 = numba_step(data) # 16 threads Numba
step2 = np.dot(data.T, data) # 16 threads MKL (SIMULTÁNEOS si async)
return step1, step2
# ✅ SOLUCIÓN: coordinar threads entre librerías
import os
os.environ['MKL_NUM_THREADS'] = '4' # ANTES de importar numpy
os.environ['OMP_NUM_THREADS'] = '4' # ANTES de importar numpy
os.environ['NUMBA_NUM_THREADS'] = '4' # ANTES de importar numba
# O dinámicamente:
from numba import set_num_threads
set_num_threads(4) # Cuando Numba está activo
set_num_threads(1) # Cuando dejas que MKL use todos los cores
Severidad: MEDIA | Frecuencia: MEDIA
# ☠️ TRAMPA: usar Dict para algo que debería ser un array
from numba import types
from numba.typed import Dict
@njit
def count_with_dict(data, n_bins):
"""Contar ocurrencias usando un dict — LENTO."""
counts = Dict.empty(key_type=types.int64, value_type=types.int64)
for i in range(len(data)):
key = int(data[i] * n_bins)
if key in counts:
counts[key] += 1 # ☠️ Hash lookup + hash insert cada vez
else:
counts[key] = 1
return counts
# ✅ CORRECTO: usar array directo (si las keys son índices enteros)
@njit
def count_with_array(data, n_bins):
counts = np.zeros(n_bins, dtype=np.int64)
for i in range(len(data)):
idx = int(data[i] * n_bins)
if idx >= n_bins:
idx = n_bins - 1
counts[idx] += 1 # ✅ Acceso directo O(1), sin hash
return counts
# Benchmark con 10M elementos:
# Dict version: ~2.1 segundos
# Array version: ~0.05 segundos → 42x más rápido
# REGLA: Si tus keys son enteros consecutivos 0..N → usa un array
# Usa Dict SOLO cuando las keys son no-consecutivas o strings
Severidad: MEDIA | Frecuencia: ALTA
# ☠️ TRAMPA: paralelizar operaciones sobre arrays pequeños
@njit(parallel=True)
def overkill(small_array):
return np.sum(np.sqrt(small_array))
data = np.random.rand(100) # 100 elementos = 800 bytes
# Benchmark:
# @njit (sin parallel): 0.3 μs
# @njit(parallel=True): 45 μs → 150x MÁS LENTO
# ¡El overhead de coordinar 8 threads es mayor que todo el cómputo!
# ✅ REGLA PRÁCTICA: parallel=True solo rinde con:
# - Arrays de > 10,000 elementos para operaciones simples
# - Arrays de > 1,000 elementos para operaciones costosas (sin, cos, etc.)
# - Siempre verificar con un scaling test (módulo de benchmarking)
# ✅ Si no sabes el tamaño de antemano, decide en runtime:
@njit
def smart_serial(x):
return np.sum(np.sqrt(x))
@njit(parallel=True)
def smart_parallel(x):
return np.sum(np.sqrt(x))
def smart_dispatch(x, threshold=10_000):
if len(x) < threshold:
return smart_serial(x)
return smart_parallel(x)
Severidad: ALTA | Frecuencia: MEDIA
# ☠️ TRAMPAS que destruyen la contigüidad sin que lo notes:
# 1. Slices con step
data = np.random.rand(1000000)
every_other = data[::2] # stride = 16 bytes, NO contiguo
# 2. Transponer sin copiar
matrix = np.random.rand(1000, 1000)
transposed = matrix.T # NO es C-contiguous, es F-contiguous
# Si tu loop interno recorre filas del transpuesto → cache disaster
# 3. Fancy indexing devuelve copia contigua... pero el loop puede no serlo
indices = np.array([0, 500, 1, 501, 2, 502])
selected = matrix[indices] # selected ES contiguo, pero los accesos no
# 4. Reshape que cambia strides
flat = np.random.rand(1000000)
matrix_view = flat.reshape(1000, 1000) # OK, C-contiguous
col_major = matrix_view.T # ☠️ F-contiguous
# ✅ SIEMPRE verificar antes de procesar:
print(f"C-contiguo: {matrix_view.flags.c_contiguous}")
print(f"F-contiguo: {matrix_view.flags.f_contiguous}")
print(f"Strides: {matrix_view.strides}")
# ✅ Forzar contigüidad cuando sea necesario:
safe_data = np.ascontiguousarray(col_major) # copia a C-contiguous
# ✅ Identifica que slices y transposes destruyen performance
| Anti-Pattern | Síntoma | Costo a escala | Detección |
|---|---|---|---|
| #1 parallel en loop externo | Lentitud inexplicable | 100-1000x más lento | Profiler muestra overhead en dispatch |
| #2 Alocaciones en loop | Uso de memoria creciente | 2-10x más lento | NUMBA_DEBUG_NRT=1 |
| #3 parallel=True inactivo | Usa 1 core en vez de 8 | 8x más lento (en 8 cores) | parallel_diagnostics() |
| #4 Race conditions | Resultados variables | Datos corruptos | Ejecutar 10x y comparar sumas |
| #5 Recompilación | Pausas periódicas | Segundos por tipo nuevo | len(func.signatures) |
| #6 Numba donde NumPy gana | Más lento que sin Numba | Hasta 30x más lento | Benchmark vs NumPy puro |
| #7 Cache invalidado | Resultados incorrectos | Bugs silenciosos | Limpiar cache, re-test |
| #8 Sin boundscheck | Segfaults aleatorios | Corrupción de memoria | NUMBA_BOUNDSCHECK=1 |
| #9 Oversubscription | Más threads = más lento | 2-5x más lento | htop, top |
| #10 Dict como array | Lentitud en lookups | 10-50x más lento | Benchmark vs array |
| #11 parallel con datos chicos | Overhead domina | 10-150x más lento | Scaling test |
| #12 Acceso no contiguo | Throughput (Rendimiento real) bajo | 3-10x más lento | arr.flags.c_contiguous |
@njit(parallel=True) → parallel_diagnostics(level=1)NUMBA_BOUNDSCHECK=1len(func.signatures) no debe crecer con el tiempoEsta sección resuelve la pregunta más importante al usar Numba: ¿qué decorador necesito para MI caso? Cada decorador existe para resolver un problema distinto. Usar el incorrecto no solo no ayuda — puede hacer tu código más lento, incorrecto, o directamente roto.
Antes de leer todo, usa esta guía rápida:
| Tu situación | Decorador correcto |
|---|---|
| Tengo una función con loops numéricos y quiero que vaya rápido | @njit |
| Tengo una operación escalar que quiero aplicar a todo un array | @vectorize |
| Tengo una operación que recibe/devuelve sub-arrays (no escalares) | @guvectorize |
| Tengo un cálculo donde cada elemento depende de sus vecinos | @stencil |
| Necesito una clase con estado y métodos rápidos | @jitclass |
| Necesito que una función Python sea llamable desde C/Fortran | @cfunc |
| Quiero que una función de terceros funcione dentro de @njit | @overload |
@njit / @jit — La navaja suizaToma tu función Python, analiza el bytecode, infiere los tipos de todas las variables, y genera código máquina nativo vía LLVM. Es como si un compilador de C tomara tu Python y lo convirtiera en un ejecutable. Desde Numba 0.59, @jit y @njit son idénticos.
Ejemplo 1: Filtro de partículas en una simulación física
import numpy as np
from numba import njit
@njit
def actualizar_particulas(posiciones, velocidades, masas, dt, gravedad):
"""Simulación N-body simplificada: actualiza posiciones por un paso dt."""
n = posiciones.shape[0]
fuerzas = np.zeros_like(posiciones)
# Calcular fuerzas gravitacionales entre pares
for i in range(n):
for j in range(i + 1, n):
dx = posiciones[j, 0] - posiciones[i, 0]
dy = posiciones[j, 1] - posiciones[i, 1]
dist_sq = dx * dx + dy * dy + 1e-10 # evitar div/0
dist = np.sqrt(dist_sq)
fuerza = gravedad * masas[i] * masas[j] / dist_sq
fx = fuerza * dx / dist
fy = fuerza * dy / dist
fuerzas[i, 0] += fx
fuerzas[i, 1] += fy
fuerzas[j, 0] -= fx
fuerzas[j, 1] -= fy
# Integrar: actualizar velocidades y posiciones
for i in range(n):
velocidades[i, 0] += (fuerzas[i, 0] / masas[i]) * dt
velocidades[i, 1] += (fuerzas[i, 1] / masas[i]) * dt
posiciones[i, 0] += velocidades[i, 0] * dt
posiciones[i, 1] += velocidades[i, 1] * dt
# Uso
n_particulas = 1000
pos = np.random.rand(n_particulas, 2) * 100
vel = np.zeros((n_particulas, 2))
masas = np.random.rand(n_particulas) * 10 + 1
actualizar_particulas(pos, vel, masas, 0.01, 9.8)
# Sin Numba: ~15 segundos | Con @njit: ~0.02 segundos
Ejemplo 2: Búsqueda en historial de precios (datos financieros)
@njit
def max_drawdown(precios):
"""Calcula la máxima caída desde un pico en una serie de precios.
Métrica fundamental en finanzas para medir riesgo."""
n = len(precios)
pico = precios[0]
max_dd = 0.0
for i in range(1, n):
if precios[i] > pico:
pico = precios[i]
dd = (pico - precios[i]) / pico
if dd > max_dd:
max_dd = dd
return max_dd
# 10 años de datos diarios
precios = np.cumsum(np.random.randn(2520)) + 100
resultado = max_drawdown(precios) # instantáneo
Ejemplo 3: Procesamiento de señal (correlación cruzada manual)
@njit
def cross_correlation(signal, template):
"""Encuentra dónde un template aparece en una señal larga."""
n_signal = len(signal)
n_template = len(template)
n_output = n_signal - n_template + 1
result = np.empty(n_output)
for i in range(n_output):
total = 0.0
for j in range(n_template):
total += signal[i + j] * template[j]
result[i] = total
return result
# ERROR 1: Usar listas Python normales
@njit
def malo_lista(n):
resultado = [] # ❌ Lista Python, NO funciona
for i in range(n):
resultado.append(i * 2)
return resultado
# SOLUCIÓN: Usa array NumPy o numba.typed.List
@njit
def bueno_array(n):
resultado = np.empty(n, dtype=np.int64) # ✅ Pre-aloca array
for i in range(n):
resultado[i] = i * 2
return resultado
# ERROR 2: Llamar funciones no soportadas
@njit
def malo_pandas(df):
return df.groupby('col').mean() # ❌ Pandas NO funciona en Numba
# ERROR 3: Usar @njit en funciones que YA son rápidas con NumPy
@njit # ❌ Innecesario: NumPy ya hace esto en C internamente
def innecesario(a, b):
return np.dot(a, b) # np.dot ya llama a BLAS optimizado
# ERROR 4: Olvidar decorar funciones auxiliares
def helper(x): # ❌ Sin @njit, Numba sale al intérprete
return x ** 2
@njit
def principal(arr):
for i in range(len(arr)):
arr[i] = helper(arr[i]) # Lentísimo: sale de nativo a Python en cada iteración
@vectorize — Fábrica de ufuncs escalaresTú escribes una función que opera sobre un solo par de valores escalares. Numba genera automáticamente el loop que aplica esa operación a arrays completos, con soporte automático de broadcasting, reduce, accumulate — como un ufunc nativo de NumPy escrito en C, pero sin escribir C.
target='parallel'Ejemplo 1: Cálculo de impuestos con tramos progresivos
from numba import vectorize, float64
@vectorize([float64(float64)])
def calcular_impuesto(ingreso):
"""Impuesto progresivo: aplica a CADA empleado de un array."""
if ingreso <= 12000:
return ingreso * 0.0
elif ingreso <= 30000:
return (ingreso - 12000) * 0.15
elif ingreso <= 60000:
return 2700 + (ingreso - 30000) * 0.25
else:
return 10200 + (ingreso - 60000) * 0.35
# Funciona sobre arrays completos automáticamente
salarios = np.array([8000, 25000, 45000, 120000])
impuestos = calcular_impuesto(salarios)
# → array([0., 1950., 6450., 31200.])
# ¡Broadcasting gratis! Funciona con matrices también
salarios_departamentos = np.random.rand(50, 200) * 80000
impuestos_todos = calcular_impuesto(salarios_departamentos) # shape (50, 200)
Ejemplo 2: Función de activación personalizada para ML
import math
@vectorize([float64(float64)], target='parallel')
def swish(x):
"""Swish activation: x * sigmoid(x). Mejor que ReLU en muchos casos."""
return x / (1.0 + math.exp(-x))
# Aplicar a toda una capa de red neuronal (millones de valores)
activaciones = np.random.randn(512, 1024)
resultado = swish(activaciones) # Paralelo en todos los cores
Ejemplo 3: Distancia con reduce (¡imposible con @njit!)
@vectorize([float64(float64, float64)])
def max_custom(a, b):
"""Un max que funciona como ufunc con reduce."""
if a >= b:
return a
return b
data = np.array([3.0, 7.0, 2.0, 9.0, 1.0])
max_custom.reduce(data) # → 9.0 (reduce funciona automáticamente)
# Con una matriz: máximo por filas o columnas
matrix = np.random.rand(100, 50)
max_custom.reduce(matrix, axis=1) # máximo de cada fila
max_custom.accumulate(matrix, axis=0) # máximo acumulado por columnas
# ERROR 1: Intentar acceder a índices del array
@vectorize([float64(float64[:])]) # ❌ No puedes pasar arrays
def malo(arr):
return arr[0] + arr[1]
# vectorize solo recibe ESCALARES. Para arrays usa @guvectorize
# ERROR 2: Orden incorrecto de signatures (tipos más específicos primero)
@vectorize([
float64(float64, float64), # ❌ float64 antes que int → int nunca se usa
int64(int64, int64),
])
def malo_orden(a, b):
return a + b
# CORRECTO: tipos más específicos/pequeños primero
@vectorize([
int32(int32, int32), # ✅ Más específico primero
int64(int64, int64),
float32(float32, float32),
float64(float64, float64),
])
def bien_orden(a, b):
return a + b
# ERROR 3: Usar target='parallel' con datos muy pequeños
@vectorize([float64(float64)], target='parallel')
def doble(x):
return x * 2
doble(np.array([1.0, 2.0, 3.0])) # ❌ 3 elementos: el overhead de threading
# es mayor que el cálculo en sí
@guvectorize — Ufuncs sobre sub-arraysComo @vectorize, pero tu función recibe porciones de arrays (filas, vectores, submatrices) en lugar de escalares. Tú defines un "layout" simbólico que dice qué dimensiones tiene cada argumento. NumPy se encarga del broadcasting y de iterar sobre las dimensiones externas.
Ejemplo 1: Normalización por filas (muy común en ML)
from numba import guvectorize, float64
@guvectorize(
[(float64[:], float64[:])],
'(n)->(n)' # recibe vector de n elementos, devuelve vector de n
)
def normalize_row(row, result):
"""Normaliza un vector a norma unitaria (L2)."""
total = 0.0
for i in range(row.shape[0]):
total += row[i] ** 2
norm = np.sqrt(total)
if norm > 0:
for i in range(row.shape[0]):
result[i] = row[i] / norm
else:
for i in range(row.shape[0]):
result[i] = 0.0
# Aplica automáticamente a CADA FILA de una matriz
embeddings = np.random.rand(10000, 768) # 10K vectores de 768 dimensiones
normalized = normalize_row(embeddings) # shape (10000, 768), cada fila normalizada
Ejemplo 2: Media móvil por serie temporal (datos financieros)
@guvectorize(
[(float64[:], float64[:], float64[:])],
'(n),(m)->(n)' # señal de n puntos, ventana de m pesos → n resultados
)
def weighted_moving_avg(signal, weights, result):
"""Media móvil ponderada. weights define la ventana."""
m = weights.shape[0]
w_sum = 0.0
for k in range(m):
w_sum += weights[k]
for i in range(signal.shape[0]):
if i < m - 1:
result[i] = np.nan # no hay suficientes datos
else:
total = 0.0
for j in range(m):
total += signal[i - m + 1 + j] * weights[j]
result[i] = total / w_sum
# 500 acciones, 252 días de cotización cada una
cotizaciones = np.random.rand(500, 252) * 100 + 50
pesos = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) # ventana de 5 días, peso creciente
medias = weighted_moving_avg(cotizaciones, pesos) # shape (500, 252)
# ¡Broadcasting automático! La misma ventana se aplica a las 500 series
Ejemplo 3: Distancia euclidiana entre dos conjuntos de vectores
@guvectorize(
[(float64[:], float64[:], float64[:])],
'(d),(d)->()' # dos vectores de d dimensiones → un escalar
)
def euclidean_dist(a, b, result):
total = 0.0
for i in range(a.shape[0]):
diff = a[i] - b[i]
total += diff * diff
result[0] = np.sqrt(total)
# Distancia entre 1000 pares de vectores 3D
puntos_a = np.random.rand(1000, 3)
puntos_b = np.random.rand(1000, 3)
distancias = euclidean_dist(puntos_a, puntos_b) # shape (1000,)
# Broadcasting: distancia de UN punto a TODOS los demás
query = np.array([0.5, 0.5, 0.5])
todas_dist = euclidean_dist(puntos_a, query) # shape (1000,) — automático
# ERROR 1: Intentar retornar un valor (como en @vectorize)
@guvectorize([(float64[:], float64[:])], '(n)->()')
def malo(arr, result):
return np.sum(arr) # ❌ El return se IGNORA
# ✅ Debes escribir en result[0] = np.sum(arr)
# ERROR 2: Layout incorrecto — olvidar los paréntesis del escalar
# Escalar en el layout se escribe como () no como (1)
'(n)->(1)' # ❌ Esto crea un array de 1 elemento
'(n)->()' # ✅ Esto crea un escalar
# ERROR 3: Modificar un array de entrada pensando que se guarda
@guvectorize([(float64[:], float64[:])], '()->()')
def malo_overwrite(inval, outval):
inval[0] = 99.0 # ❌ Puede no guardarse si NumPy hizo un cast
outval[0] = 42.0
# SOLUCIÓN: usa writable_args=('inval',) si necesitas modificar inputs
@stencil — Patrones de vecindarioDefines cómo calcular UN elemento del resultado en función de sus vecinos en el array de entrada (usando índices relativos). Numba genera todo el loop y maneja los bordes automáticamente. Cuando se combina con parallel=True, se paraleliza.
standard_indexing)Ejemplo 1: Detección de bordes (Sobel) en una imagen
from numba import stencil, njit, prange
import numpy as np
@stencil
def sobel_x(img):
"""Filtro Sobel horizontal: detecta bordes verticales."""
return (-1 * img[-1, -1] + 0 * img[-1, 0] + 1 * img[-1, 1] +
-2 * img[ 0, -1] + 0 * img[ 0, 0] + 2 * img[ 0, 1] +
-1 * img[ 1, -1] + 0 * img[ 1, 0] + 1 * img[ 1, 1])
@stencil
def sobel_y(img):
"""Filtro Sobel vertical: detecta bordes horizontales."""
return (-1 * img[-1, -1] - 2 * img[-1, 0] - 1 * img[-1, 1] +
1 * img[ 1, -1] + 2 * img[ 1, 0] + 1 * img[ 1, 1])
@njit(parallel=True)
def edge_detection(imagen):
"""Magnitud del gradiente = sqrt(Gx² + Gy²)"""
gx = sobel_x(imagen)
gy = sobel_y(imagen)
# np.sqrt se paraleliza automáticamente
return np.sqrt(gx ** 2 + gy ** 2)
imagen = np.random.rand(1920, 1080) # imagen Full HD
bordes = edge_detection(imagen)
Ejemplo 2: Simulación de difusión de calor
@stencil
def heat_step(T):
"""Un paso de la ecuación de calor 2D discretizada."""
return 0.25 * (T[-1, 0] + T[1, 0] + T[0, -1] + T[0, 1])
@njit
def simulate_heat(grid, n_steps):
"""Simula difusión de calor por n pasos."""
for _ in range(n_steps):
grid = heat_step(grid)
return grid
# Placa de metal 200x200, caliente en el centro
placa = np.zeros((200, 200))
placa[90:110, 90:110] = 100.0 # fuente de calor
resultado = simulate_heat(placa, 500)
Ejemplo 3: Suavizado de datos de sensor (1D)
@stencil(neighborhood=((-2, 2),))
def smooth_sensor(data):
"""Suavizado con ventana de 5 puntos (media simple)."""
return (data[-2] + data[-1] + data[0] + data[1] + data[2]) / 5.0
sensor_ruidoso = np.sin(np.linspace(0, 10, 10000)) + np.random.randn(10000) * 0.3
sensor_limpio = smooth_sensor(sensor_ruidoso)
@jitclass — Clases con estado compiladoCompila una clase entera: los datos se almacenan como una estructura C en memoria (sin overhead de objetos Python) y todos los métodos se compilan a nopython. Ideal cuando necesitas encapsular estado mutable y operarlo eficientemente.
Ejemplo 1: Filtro Kalman 1D para tracking de sensor
from numba import float64
from numba.experimental import jitclass
@jitclass([
('x', float64), # estimación actual
('P', float64), # incertidumbre
('Q', float64), # ruido del proceso
('R', float64), # ruido de medición
])
class KalmanFilter1D:
def __init__(self, x0, P0, Q, R):
self.x = x0
self.P = P0
self.Q = Q
self.R = R
def predict(self):
# En modelo simple: predicción = estado anterior
self.P += self.Q
def update(self, measurement):
K = self.P / (self.P + self.R) # Kalman gain
self.x += K * (measurement - self.x)
self.P *= (1.0 - K)
def filter_signal(self, measurements, output):
for i in range(len(measurements)):
self.predict()
self.update(measurements[i])
output[i] = self.x
# Uso
kf = KalmanFilter1D(x0=0.0, P0=1.0, Q=0.01, R=0.5)
mediciones = np.sin(np.linspace(0, 10, 1000)) + np.random.randn(1000) * 0.5
salida = np.empty(1000)
kf.filter_signal(mediciones, salida)
Ejemplo 2: Acumulador de estadísticas online (Welford)
@jitclass([
('n', float64),
('mean', float64),
('M2', float64),
])
class OnlineStats:
"""Calcula media y varianza incrementalmente (Welford's algorithm).
Útil cuando los datos llegan en streaming y no caben en memoria."""
def __init__(self):
self.n = 0.0
self.mean = 0.0
self.M2 = 0.0
def add(self, x):
self.n += 1
delta = x - self.mean
self.mean += delta / self.n
delta2 = x - self.mean
self.M2 += delta * delta2
def variance(self):
if self.n < 2:
return 0.0
return self.M2 / (self.n - 1)
# Procesar un stream de datos enorme
@njit
def procesar_stream(data):
stats = OnlineStats()
for i in range(len(data)):
stats.add(data[i])
return stats.mean, stats.variance()
data = np.random.randn(10_000_000)
media, var = procesar_stream(data)
# ERROR 1: No inicializar contenedores en __init__
@jitclass([('d', types.DictType(types.int64, types.float64))])
class Malo:
def __init__(self):
self.d[1] = 10.0 # ❌ SEGFAULT: d no fue inicializado
# CORRECTO:
def __init__(self):
self.d = typed.Dict.empty(types.int64, types.float64) # ✅ primero inicializar
self.d[1] = 10.0
# ERROR 2: NumPy arrays necesitan spec explícito (no se infieren de annotations)
@jitclass
class Malo2:
data: np.ndarray # ❌ Numba no sabe el dtype ni dimensiones
# CORRECTO: spec explícito para arrays
@jitclass([('data', float64[:])])
class Bueno:
def __init__(self, n):
self.data = np.zeros(n)
@cfunc — Callbacks para C/FortranCompila una función Python en un callback con convención de llamada C. Es la forma de pasar funciones Numba a librerías C o a scipy.integrate, scipy.optimize, etc.
scipy.integrate.quadfrom numba import cfunc, float64
from scipy import integrate
# Integración numérica con SciPy, pero el integrando es Numba-compilado
@cfunc(float64(float64))
def integrando(x):
return np.exp(-x ** 2) * np.cos(2 * x)
# .ctypes pasa el puntero C a SciPy
resultado, error = integrate.quad(integrando.ctypes, 0, 10)
# Mucho más rápido que pasar una función Python normal a quad()
@overload — Extender el ecosistemaTe permite enseñarle a Numba cómo ejecutar funciones que no soporta nativamente. Defines una implementación alternativa que Numba usará en nopython mode. Se ejecuta en compile time (no runtime) — Numba llama a tu overload con los TIPOS de los argumentos, y tú devuelves una función que se compilará.
from numba import types
from numba.extending import overload
# Supón que tienes esta función Python que usas en tu proyecto
def clip(value, low, high):
return min(max(value, low), high)
# La haces funcionar dentro de @njit
@overload(clip)
def clip_overload(value, low, high):
# Este código se ejecuta en COMPILE TIME
# value, low, high son TIPOS, no valores
if isinstance(value, types.Float):
def impl(value, low, high):
# Este código se ejecuta en RUNTIME
if value < low:
return low
elif value > high:
return high
return value
return impl
# Si retorna None, Numba prueba otros overloads
# Ahora clip() funciona dentro de código @njit
@njit
def procesar(arr):
for i in range(len(arr)):
arr[i] = clip(arr[i], 0.0, 1.0) # ✅ Funciona gracias al overload
return arr
Para que quede claro cuándo elegir cada uno, veamos cómo resolverías "calcular la norma L2 de vectores" con cada decorador y por qué elegirías uno u otro.
import numpy as np
from numba import njit, vectorize, guvectorize, stencil, float64, prange
# ─── OPCIÓN A: @njit ───
# Cuando tienes UNA matriz y quieres las normas de cada fila
# Más control, escribes el loop tú mismo
@njit(parallel=True)
def normas_njit(matrix):
n_rows = matrix.shape[0]
result = np.empty(n_rows)
for i in prange(n_rows):
total = 0.0
for j in range(matrix.shape[1]):
total += matrix[i, j] ** 2
result[i] = np.sqrt(total)
return result
# ✅ Mejor cuando: necesitas control total del loop, o la lógica es compleja
# ❌ Evitar cuando: necesitas broadcasting automático
# ─── OPCIÓN B: @guvectorize ───
# Cuando quieres que funcione con arrays de CUALQUIER forma via broadcasting
@guvectorize([(float64[:], float64[:])], '(n)->()')
def norma_guv(vec, result):
total = 0.0
for i in range(vec.shape[0]):
total += vec[i] ** 2
result[0] = np.sqrt(total)
# ✅ Mejor cuando: quieres broadcasting automático
# norma_guv(vector_1d) → escalar
# norma_guv(matrix_2d) → vector (norma por fila)
# norma_guv(tensor_3d) → matrix (norma por última dimensión)
# ─── OPCIÓN C: @vectorize ───
# NO es adecuado aquí porque vectorize es para operaciones escalar→escalar
# La norma necesita ver el vector completo, no elemento por elemento
# ─── OPCIÓN D: @stencil ───
# NO es adecuado aquí porque la norma no es un patrón de vecindario
# Stencil es para cuando el resultado en posición [i] depende de [i-1], [i+1], etc.
¿Qué tipo de operación tienes?
│
├─ Escalar → Escalar (sin ver vecinos ni el array completo)
│ └─ ¿Necesitas broadcasting/reduce/accumulate?
│ ├─ SÍ → @vectorize
│ └─ NO → @njit (más simple)
│
├─ Sub-array → Escalar o Sub-array (operas sobre filas, vectores, etc.)
│ └─ ¿Necesitas broadcasting entre inputs de distintas dimensiones?
│ ├─ SÍ → @guvectorize
│ └─ NO → @njit con loop manual
│
├─ Cada resultado depende de VECINOS en un patrón fijo
│ └─ @stencil (+ parallel=True en la función que lo llama)
│
├─ Necesitas una clase con estado y métodos rápidos
│ └─ @jitclass
│
├─ Necesitas pasar una función como callback a C/SciPy
│ └─ @cfunc
│
└─ Necesitas que una función externa funcione dentro de @njit
└─ @overload
@njit. Solo cambia a otro decorador cuando necesites una capacidad específica que @njit no te da (broadcasting → @vectorize/@guvectorize, vecinos → @stencil, estado → @jitclass).
| Decorador | Para qué | Ejemplo clave |
|---|---|---|
@jit / @njit | Compilar funciones generales | @njit(parallel=True, fastmath=True, cache=True) |
@vectorize | Crear ufuncs (escalar→escalar) | @vectorize([float64(float64, float64)]) |
@guvectorize | Ufuncs generalizados (array→array) | @guvectorize([(f64[:], f64[:])], '(n)->()') |
@stencil | Operaciones sobre vecindarios | @stencil(neighborhood=((-1,1),(-1,1))) |
@jitclass | Clases compiladas con estado | @jitclass([('x', float64)]) |
@cfunc | Crear callbacks C | @cfunc("float64(float64)") |
@overload | Extender funciones para nopython | @overload(mi_funcion) |
@intrinsic | Generar LLVM IR directamente | Para expertos en LLVM |
# El "full power" Numba decorator
@njit(
parallel=True, # Auto-paralelizar operaciones
fastmath=True, # Relajar IEEE 754
cache=True, # Guardar compilación en disco
nogil=True, # Liberar GIL para multithreading
boundscheck=False, # No verificar límites de array (default)
error_model='numpy', # Seguir semántica de errores de NumPy
)
def ultimate_function(data):
...
import threading
from numba import njit
@njit(nogil=True)
def process_chunk(data, result, start, end):
for i in range(start, end):
result[i] = np.sqrt(data[i]) * np.log(data[i] + 1)
# Dividir trabajo en threads
data = np.random.rand(10_000_000)
result = np.empty_like(data)
n_threads = 4
chunk = len(data) // n_threads
threads = []
for t in range(n_threads):
start = t * chunk
end = start + chunk if t < n_threads - 1 else len(data)
thread = threading.Thread(target=process_chunk, args=(data, result, start, end))
threads.append(thread)
thread.start()
for t in threads:
t.join()
from numba.pycc import CC
cc = CC('my_module')
@cc.export('multf', 'f8(f8, f8)')
def mult(a, b):
return a * b
@cc.export('square', 'f8(f8)')
def square(a):
return a ** 2
cc.compile() # Genera my_module.so / my_module.pyd
# Luego: import my_module; my_module.multf(3.0, 4.0)
| Variable | Efecto |
|---|---|
NUMBA_NUM_THREADS=N | Número de threads para paralelismo |
NUMBA_PARALLEL_DIAGNOSTICS=4 | Diagnósticos de auto-paralelización |
NUMBA_DEVELOPER_MODE=1 | Mensajes de error detallados |
NUMBA_DISABLE_JIT=1 | Desactiva JIT (para debugging) |
NUMBA_CACHE_DIR=path | Directorio para el cache de compilación |
NUMBA_THREADING_LAYER=tbb | Seleccionar backend de threading (tbb/omp/workqueue) |
numba.typed.List)numba.typed.Dict)@jitclass)try/except limitado (solo excepciones básicas)yield (soporte limitado)**kwargs (solo *args limitado)Este árbol de decisión interactivo te guía paso a paso para elegir el decorador y las opciones correctas de Numba según tu caso de uso. Responde las preguntas y obtén una recomendación fundamentada con código listo para usar.
Si prefieres ver todo de un vistazo sin navegar el árbol interactivo:
¿Qué tipo de problema tienes?
│
├─ LOOP NUMÉRICO LENTO
│ ├─ ¿Iteraciones independientes?
│ │ ├─ SÍ + >10K iteraciones
│ │ │ ├─ Precisión IEEE no crítica → @njit(parallel=True, fastmath=True) + prange
│ │ │ └─ Precisión IEEE crítica → @njit(parallel=True) + prange
│ │ ├─ SÍ + <10K iteraciones → @njit (paralelo no vale la pena)
│ │ └─ NO (dependencia secuencial)
│ │ ├─ Llamada >1K veces desde Python → @njit con batch (loop DENTRO)
│ │ └─ Pocas llamadas
│ │ ├─ Script repetitivo → @njit(cache=True)
│ │ └─ Desarrollo activo → @njit
│ └─ Loop externo independiente + interno dependiente
│ └─ @njit(parallel=True): prange externo + range interno
│
├─ OPERACIÓN ESCALAR → ARRAY
│ ├─ ¿Función puramente escalar→escalar?
│ │ ├─ SÍ + necesita broadcasting/.reduce()/.accumulate()
│ │ │ ├─ Array >1M elementos → @vectorize(target='parallel')
│ │ │ └─ Array normal → @vectorize
│ │ ├─ SÍ + solo array 1D
│ │ │ ├─ Es para librería/API → @vectorize (interfaz NumPy estándar)
│ │ │ └─ Es código interno → @njit con loop manual (más simple)
│ │ └─ NO (necesita ver varios elementos) → Ir a "Sub-array ops"
│ │
├─ OPERACIÓN SOBRE SUB-ARRAYS (filas, ventanas, secciones)
│ ├─ ¿Necesita broadcasting automático?
│ │ ├─ SÍ + millones de llamadas con sub-arrays pequeños
│ │ │ └─ @njit loop manual + output pre-allocado (evita overhead ufunc)
│ │ ├─ SÍ + pocas llamadas o sub-arrays grandes
│ │ │ └─ @guvectorize con firma dimensional: "(n)->()"
│ │ └─ NO (itero filas con loop)
│ │ ├─ Filas independientes → @njit(parallel=True) + prange
│ │ └─ Filas dependientes → @njit
│ │
├─ CADA RESULTADO DEPENDE DE VECINOS (grilla/imagen)
│ ├─ Patrón de vecindario fijo
│ │ ├─ Quiero paralelismo → @stencil llamado desde @njit(parallel=True)
│ │ └─ Serial está bien → @stencil
│ └─ Patrón variable → @njit con loop manual
│
├─ OBJETO CON ESTADO MUTABLE
│ ├─ ¿Se usa dentro de @njit?
│ │ ├─ SÍ → @jitclass
│ │ └─ NO + métodos pesados → @jitclass
│ │ └─ NO + métodos ligeros → Clase Python normal + @njit para lo pesado
│ └─ ¿Solo proceso datos sin estado residual?
│ └─ @njit con funciones puras (pasar estado como argumentos)
│
├─ INTEROPERABILIDAD
│ ├─ Callback para SciPy/C → @cfunc("float64(float64)")
│ ├─ Función externa en @njit → @overload(mi_funcion)
│ ├─ Jittear módulo entero → jit_module(nopython=True) al final del módulo
│ └─ Código Python dentro @njit → with objmode(var='tipo') # ⚠️ escape hatch, no es rápido
│
└─ OPTIMIZACIONES TRANSVERSALES (aplican a cualquier decorador)
├─ cache=True → Evitar recompilación al reiniciar script
├─ fastmath=True → ~2x más rápido, error ~4 ULP (no finanzas)
├─ nogil=True → Liberar GIL para threading.Thread manual
├─ Intel SVML → conda install intel-cmplr-lib-rt (~2-4x en sin/cos/exp)
├─ Warm-up → Primera llamada con datos dummy para pre-compilar
├─ Batch → Mover loops Python DENTRO de @njit (>1K llamadas)
├─ Pre-allocar → Pasar output como argumento, reutilizar buffers
├─ float32 → Mitad de memoria, doble SIMD, doble Throughput (Rendimiento real) RAM
├─ C-order → Acceder última dimensión en loop interno (contiguo)
└─ Verificar → inspect_types(), parallel_diagnostics(4), inspect_llvm()
@njit. Solo cambia de decorador cuando necesites una capacidad específica que @njit no ofrece. Optimiza memoria antes que CPU, porque la CPU casi nunca es el bottleneck real. Y mide siempre — sin benchmarks con tus datos reales, estás adivinando.