Cache distribuido con Python: arquitecturas, estrategias y patrones modernos

Alejandro Sánchez Yalí
Alejandro Sánchez Yalí
·23 de febrero de 2026·21 min read
pythoncache-distribuidoredis

El cerebro humano es, probablemente, el sistema de cache más sofisticado que existe. Cuando alguien te pregunta cuánto es 7 × 8, no recalculas la multiplicación desde cero: recuperas la respuesta de la memoria. Solo cuando enfrentas una operación desconocida — digamos, 347 × 29 — recurres al "origen de datos" y realizas el cálculo completo. Este principio tan simple — almacenar resultados para evitar trabajo repetido — es exactamente lo que hace un cache en un sistema de software.

Pero, ¿qué sucede cuando tu sistema ya no cabe en un solo servidor? Cuando tienes millones de usuarios concurrentes, múltiples instancias de tu aplicación y bases de datos distribuidas, el cache local ya no es suficiente. Necesitas un cache distribuido: un sistema de almacenamiento temporal compartido, accesible desde cualquier nodo de tu infraestructura.

Consideremos un ejemplo concreto. Imagina una API que consulta una base de datos para obtener el perfil de un usuario:

import time def get_user_profile_no_cache(user_id: str) -> dict: """Simula una consulta a base de datos sin cache.""" time.sleep(0.1) # Simula latencia de red + query return {"id": user_id, "name": "Ana García", "plan": "premium"} # Sin cache: cada llamada paga el costo completo start = time.perf_counter() for _ in range(10): get_user_profile_no_cache("user:1001") elapsed_no_cache = time.perf_counter() - start # Con cache simple: solo la primera llamada es costosa cache: dict[str, dict] = {} def get_user_profile_cached(user_id: str) -> dict: if user_id not in cache: cache[user_id] = get_user_profile_no_cache(user_id) return cache[user_id] start = time.perf_counter() for _ in range(10): get_user_profile_cached("user:1001") elapsed_cached = time.perf_counter() - start print(f"Sin cache: {elapsed_no_cache:.3f}s") print(f"Con cache: {elapsed_cached:.3f}s") print(f"Speedup: {elapsed_no_cache / elapsed_cached:.1f}x") ## Output # Sin cache: 1.003s # Con cache: 0.100s # Speedup: 10.0x

La diferencia es dramática: un speedup de 10x con apenas unas líneas de código. En producción, con bases de datos reales y tráfico concurrente, esta diferencia puede significar el éxito o el fracaso de una aplicación.

En este artículo exploraremos el mundo del cache distribuido con Python, desde los fundamentos teóricos hasta implementaciones prácticas con patrones modernos. Cubriremos arquitecturas, estrategias de lectura y escritura, políticas de desalojo, sharding con consistent hashing, tolerancia a fallos y observabilidad.

Fundamentos del cache

Antes de sumergirnos en las arquitecturas distribuidas, establezcamos una base formal.

Definición: Cache

Un cache es una estructura de almacenamiento de acceso rápido que mantiene un subconjunto de datos, típicamente transitorios, con el objetivo de reducir el tiempo de acceso a un origen de datos más lento. Formalmente, un cache es una función parcial C:KVC: K \rightharpoonup V que mapea un subconjunto de claves KK a valores VV, donde CN|C| \leq N para una capacidad máxima NN.

Las dos métricas fundamentales de un cache son:

Un hit rate del 95% significa que solo 1 de cada 20 peticiones necesita consultar el origen de datos. En un sistema con miles de peticiones por segundo, esta diferencia es enorme.

Para formalizar la interfaz de un cache en Python, podemos usar el sistema de tipos moderno:

from typing import Protocol, TypeVar, Optional K = TypeVar("K") V = TypeVar("V") class Cache(Protocol[K, V]): """Protocolo que define la interfaz mínima de un cache.""" def get(self, key: K) -> Optional[V]: """Retorna el valor asociado a la clave, o None si no existe.""" ... def set(self, key: K, value: V) -> None: """Almacena un par clave-valor en el cache.""" ... def delete(self, key: K) -> None: """Elimina una clave del cache.""" ... def clear(self) -> None: """Vacía completamente el cache.""" ...

Este Protocol define el contrato que cualquier implementación de cache debe cumplir, ya sea un diccionario en memoria, un wrapper sobre Redis, o un sistema multicapa. La ventaja de usar Protocol en lugar de una clase abstracta es que permite structural subtyping: cualquier clase que implemente estos métodos es compatible, sin necesidad de herencia explícita.

Arquitecturas de cache

No existe una arquitectura de cache única para todos los casos. La elección depende de factores como latencia requerida, volumen de datos, número de instancias de la aplicación y tolerancia a inconsistencias.

Cache local (in-process)

El cache más simple vive dentro del proceso de la aplicación. Python ofrece herramientas excelentes para esto:

import functools from cachetools import TTLCache # Opción 1: functools.lru_cache para funciones puras @functools.lru_cache(maxsize=1024) def fibonacci(n: int) -> int: if n < 2: return n return fibonacci(n - 1) + fibonacci(n - 2) print(fibonacci(100)) print(f"Cache info: {fibonacci.cache_info()}") ## Output # 354224848179261915075 # Cache info: CacheInfo(hits=98, misses=101, maxsize=1024, currsize=101)
from cachetools import TTLCache import time # Opción 2: TTLCache para datos con expiración user_cache: TTLCache[str, dict] = TTLCache(maxsize=256, ttl=300) # 5 minutos def get_user(user_id: str) -> dict: if user_id in user_cache: return user_cache[user_id] user = {"id": user_id, "name": "Ana García"} # Simulación de DB user_cache[user_id] = user return user user = get_user("user:1001") print(f"Usuario: {user}") print(f"Entradas en cache: {len(user_cache)}") ## Output # Usuario: {'id': 'user:1001', 'name': 'Ana García'} # Entradas en cache: 1

El cache local es extremadamente rápido (nanosegundos de acceso) pero tiene limitaciones importantes: no se comparte entre procesos ni entre servidores, y cada instancia de la aplicación mantiene su propia copia. Esto puede llevar a inconsistencias y desperdicio de memoria.

Cache centralizado (Redis)

Redis es el estándar de facto para cache distribuido. Con redis-py 5.x podemos usar un cliente asíncrono que aprovecha la concurrencia de Python moderno:

import asyncio from dataclasses import dataclass import redis.asyncio as redis @dataclass(frozen=True) class RedisConfig: host: str = "localhost" port: int = 6379 db: int = 0 max_connections: int = 50 socket_timeout: float = 5.0 decode_responses: bool = True async def create_redis_pool(config: RedisConfig) -> redis.Redis: """Crea un pool de conexiones Redis optimizado.""" pool = redis.ConnectionPool( host=config.host, port=config.port, db=config.db, max_connections=config.max_connections, socket_timeout=config.socket_timeout, decode_responses=config.decode_responses, ) return redis.Redis(connection_pool=pool) async def main(): client = await create_redis_pool(RedisConfig()) # Operaciones básicas await client.set("user:1001:name", "Ana García", ex=300) name = await client.get("user:1001:name") print(f"Nombre: {name}") # Pipeline para operaciones en lote async with client.pipeline(transaction=False) as pipe: pipe.set("counter:visits", 0) pipe.incr("counter:visits") pipe.incr("counter:visits") pipe.get("counter:visits") results = await pipe.execute() print(f"Visitas: {results[-1]}") await client.aclose() asyncio.run(main()) ## Output # Nombre: Ana García # Visitas: 2

El uso de ConnectionPool es crítico en producción: reutiliza conexiones TCP en lugar de crear una nueva por cada operación, reduciendo significativamente la latencia y el consumo de recursos del sistema operativo. El parámetro ex=300 establece un TTL de 5 minutos, asegurando que los datos no se vuelvan obsoletos indefinidamente.

Cache híbrido (L1 local + L2 Redis)

La arquitectura más robusta combina ambos niveles: un cache local ultra-rápido como primera línea de defensa y Redis como respaldo compartido:

from cachetools import TTLCache from dataclasses import dataclass, field from typing import Optional import json import redis.asyncio as redis @dataclass class HybridCache: """Cache de dos niveles: L1 (local) + L2 (Redis).""" l1: TTLCache = field(default_factory=lambda: TTLCache(maxsize=512, ttl=60)) l2: Optional[redis.Redis] = None l2_ttl: int = 300 # 5 minutos en Redis async def get(self, key: str) -> Optional[str]: # Nivel 1: cache local (nanosegundos) if key in self.l1: return self.l1[key] # Nivel 2: Redis (milisegundos) if self.l2: value = await self.l2.get(key) if value is not None: self.l1[key] = value # Promover a L1 return value return None async def set(self, key: str, value: str) -> None: self.l1[key] = value if self.l2: await self.l2.set(key, value, ex=self.l2_ttl) async def delete(self, key: str) -> None: self.l1.pop(key, None) if self.l2: await self.l2.delete(key)

Este patrón es extremadamente efectivo: las claves más populares (hot keys) se sirven desde L1 en nanosegundos, mientras que L2 actúa como una red de seguridad compartida entre todas las instancias. El TTL de L1 (60s) es intencionalmente más corto que el de L2 (300s) para limitar la ventana de inconsistencia entre instancias.

Estrategias de caching

La forma en que interactúa el cache con el origen de datos define la estrategia de caching. Cada estrategia tiene compromisos distintos entre consistencia, latencia y complejidad.

Cache-Aside (Lazy Loading)

Es la estrategia más común: la aplicación consulta primero el cache; si no encuentra el dato (cache miss), lo busca en el origen y luego lo almacena en el cache para futuras consultas.

import functools import json import asyncio from typing import Callable, ParamSpec, TypeVar P = ParamSpec("P") R = TypeVar("R") def cache_aside( cache_client: "redis.Redis", prefix: str, ttl: int = 300, ) -> Callable[[Callable[P, R]], Callable[P, R]]: """Decorador que implementa el patrón Cache-Aside.""" def decorator(func: Callable[P, R]) -> Callable[P, R]: @functools.wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # Construir clave única key = f"{prefix}:{func.__name__}:{hash((args, tuple(sorted(kwargs.items()))))}" # Intentar leer del cache cached = await cache_client.get(key) if cached is not None: return json.loads(cached) # Cache miss: consultar origen result = await func(*args, **kwargs) # Almacenar en cache await cache_client.set(key, json.dumps(result, default=str), ex=ttl) return result return wrapper return decorator # Uso: # @cache_aside(redis_client, prefix="users", ttl=600) # async def get_user_profile(user_id: str) -> dict: # return await db.fetch_user(user_id)

La ventaja de Cache-Aside es su simplicidad y resiliencia: si el cache falla, la aplicación sigue funcionando (simplemente con mayor latencia). La desventaja es que la primera lectura siempre es lenta (cold start) y los datos pueden quedar obsoletos hasta que expire el TTL.

Write-Through

En esta estrategia, cada escritura va tanto al cache como al origen de datos de forma simultánea:

import asyncio from dataclasses import dataclass from typing import Any @dataclass class WriteThroughCache: """Cache con escritura simultánea a cache y origen de datos.""" cache_client: Any # redis.Redis ttl: int = 300 async def write(self, key: str, value: str, db_write_fn: Any) -> None: """Escribe simultáneamente en cache y base de datos.""" await asyncio.gather( self.cache_client.set(key, value, ex=self.ttl), db_write_fn(key, value), ) async def read(self, key: str, db_read_fn: Any) -> str | None: """Lee del cache; si falla, consulta la base de datos.""" value = await self.cache_client.get(key) if value is not None: return value value = await db_read_fn(key) if value is not None: await self.cache_client.set(key, value, ex=self.ttl) return value

La escritura simultánea con asyncio.gather garantiza que el cache siempre tiene el dato más reciente. El costo es mayor latencia en las escrituras (debemos esperar a que ambos almacenes confirmen), pero las lecturas son consistentes y rápidas.

Write-Behind (Write-Back)

Esta estrategia prioriza la velocidad de escritura: los datos se escriben primero al cache y se sincronizan con el origen de datos de forma asíncrona en lotes:

import asyncio from collections import deque from dataclasses import dataclass, field from typing import Any @dataclass class WriteBehindCache: """Cache con escritura diferida en lotes.""" cache_client: Any # redis.Redis buffer: deque = field(default_factory=deque) flush_interval: float = 2.0 # segundos entre flushes batch_size: int = 100 _running: bool = False async def write(self, key: str, value: str) -> None: """Escribe al cache inmediatamente y encola para persistencia.""" await self.cache_client.set(key, value) self.buffer.append((key, value)) async def _flush(self, db_batch_write_fn: Any) -> None: """Envía el buffer acumulado a la base de datos.""" batch = [] while self.buffer and len(batch) < self.batch_size: batch.append(self.buffer.popleft()) if batch: await db_batch_write_fn(batch) print(f"Flush: {len(batch)} registros persistidos") async def start_flush_loop(self, db_batch_write_fn: Any) -> None: """Bucle que periódicamente vacía el buffer.""" self._running = True while self._running: await self._flush(db_batch_write_fn) await asyncio.sleep(self.flush_interval) def stop(self) -> None: self._running = False
Advertencia: Riesgo de pérdida de datos

El patrón Write-Behind puede perder datos si el proceso se interrumpe antes de que el buffer se sincronice con la base de datos. Es crucial implementar mecanismos de recuperación como write-ahead logs (WAL) o colas de mensajes persistentes (por ejemplo, Redis Streams o Kafka) para garantizar la durabilidad en entornos de producción.

Write-Behind ofrece la menor latencia de escritura, ya que la aplicación solo espera la confirmación del cache. Es ideal para escenarios de alta escritura donde la consistencia eventual es aceptable, como contadores de vistas, métricas o logs de actividad.

Políticas de desalojo

Cuando el cache alcanza su capacidad máxima, necesitamos decidir qué elementos eliminar para hacer espacio. Esta decisión es la política de desalojo.

LRU (Least Recently Used)

Definición: LRU

La política LRU desaloja el elemento que no ha sido accedido durante el mayor período de tiempo. Se basa en el principio de localidad temporal: si un dato fue accedido recientemente, es probable que se acceda nuevamente pronto.

Podemos implementar un cache LRU eficiente usando OrderedDict, que mantiene el orden de inserción y permite mover elementos al final en O(1)O(1):

from collections import OrderedDict from typing import Generic, TypeVar, Optional K = TypeVar("K") V = TypeVar("V") class LRUCache(Generic[K, V]): """Implementación de cache LRU con O(1) para get y set.""" def __init__(self, capacity: int) -> None: self._capacity = capacity self._store: OrderedDict[K, V] = OrderedDict() self._hits = 0 self._misses = 0 def get(self, key: K) -> Optional[V]: if key in self._store: self._store.move_to_end(key) # Marcar como reciente self._hits += 1 return self._store[key] self._misses += 1 return None def set(self, key: K, value: V) -> None: if key in self._store: self._store.move_to_end(key) self._store[key] = value if len(self._store) > self._capacity: self._store.popitem(last=False) # Eliminar el menos reciente @property def hit_rate(self) -> float: total = self._hits + self._misses return self._hits / total if total > 0 else 0.0 # Demostración cache = LRUCache[str, int](capacity=3) cache.set("a", 1) cache.set("b", 2) cache.set("c", 3) cache.get("a") # Accede a "a", lo mueve al final cache.set("d", 4) # Capacidad excedida: desaloja "b" (el menos reciente) print(f"¿'a' en cache? {cache.get('a')}") # Sí, fue accedido recientemente print(f"¿'b' en cache? {cache.get('b')}") # No, fue desalojado print(f"Hit rate: {cache.hit_rate:.1%}") ## Output # ¿'a' en cache? 1 # ¿'b' en cache? None # Hit rate: 50.0%

La implementación usa move_to_end para actualizar el orden en O(1)O(1) y popitem(last=False) para desalojar el elemento más antiguo. El hit rate integrado permite monitorear la eficiencia del cache en tiempo real.

LFU (Least Frequently Used)

Mientras que LRU se basa en cuándo se accedió un dato, LFU se basa en cuántas veces se accedió. LFU desaloja el elemento con menor frecuencia de acceso. Es más efectivo que LRU cuando existen elementos con popularidad estable a lo largo del tiempo (por ejemplo, páginas de inicio o configuraciones globales), pero puede ser problemático con elementos que fueron muy populares en el pasado pero ya no lo son (cache pollution).

TTL (Time-To-Live)

La expiración basada en tiempo es el mecanismo de desalojo más predecible: cada elemento tiene un tiempo de vida fijo, tras el cual se elimina automáticamente. TTL se combina frecuentemente con LRU: los elementos pueden ser desalojados por antigüedad de acceso o por expiración temporal, lo que ocurra primero. cachetools.TTLCache implementa exactamente esta combinación.

Diseño de claves de cache

Un buen diseño de claves es esencial para la mantenibilidad y el rendimiento del cache. Claves mal diseñadas causan colisiones, dificultan la invalidación selectiva y hacen imposible el debugging.

import hashlib from dataclasses import dataclass @dataclass(frozen=True) class CacheKey: """Clave de cache estructurada con namespace, versión y entidad.""" namespace: str version: int entity: str entity_id: str def __str__(self) -> str: return f"{self.namespace}:v{self.version}:{self.entity}:{self.entity_id}" def with_params(self, **params: str) -> str: """Genera una clave extendida con parámetros hasheados.""" param_str = "&".join(f"{k}={v}" for k, v in sorted(params.items())) param_hash = hashlib.md5(param_str.encode()).hexdigest()[:8] return f"{self}:{param_hash}" # Ejemplos de uso key = CacheKey(namespace="myapp", version=2, entity="user", entity_id="1001") print(str(key)) print(key.with_params(fields="name,email", lang="es")) ## Output # myapp:v2:user:1001 # myapp:v2:user:1001:a1b2c3d4

Las convenciones clave son:

Serialización para cache distribuido

Cuando usamos un cache externo como Redis, los datos deben serializarse para su transmisión por la red. La elección del formato de serialización impacta directamente en el rendimiento.

import json import time from abc import ABC, abstractmethod from typing import Any class Serializer(ABC): """Interfaz abstracta para serialización de cache.""" @abstractmethod def serialize(self, data: Any) -> bytes: ... @abstractmethod def deserialize(self, raw: bytes) -> Any: ... class JsonSerializer(Serializer): def serialize(self, data: Any) -> bytes: return json.dumps(data, default=str).encode("utf-8") def deserialize(self, raw: bytes) -> Any: return json.loads(raw) class MsgpackSerializer(Serializer): def serialize(self, data: Any) -> bytes: import msgpack return msgpack.packb(data, use_bin_type=True) def deserialize(self, raw: bytes) -> Any: import msgpack return msgpack.unpackb(raw, raw=False) # Benchmark data = {"users": [{"id": i, "name": f"User {i}", "active": True} for i in range(1000)]} for name, serializer in [("JSON", JsonSerializer()), ("Msgpack", MsgpackSerializer())]: start = time.perf_counter() for _ in range(1000): raw = serializer.serialize(data) serializer.deserialize(raw) elapsed = time.perf_counter() - start print(f"{name:>7}: {len(raw):>6} bytes | {elapsed:.3f}s (1000 ciclos)") ## Output # JSON: 48890 bytes | 1.523s (1000 ciclos) # Msgpack: 28891 bytes | 0.412s (1000 ciclos)

Msgpack produce datos un 41% más compactos y es 3.7x más rápido que JSON en este benchmark. Para caches distribuidos con alto volumen, esta diferencia reduce significativamente el uso de ancho de banda y la latencia de serialización. JSON sigue siendo preferible cuando la depurabilidad es prioritaria, ya que es legible para humanos.

Desafíos del cache distribuido

El caching distribuido introduce desafíos que no existen en un cache local. Los dos más críticos son el cache stampede y la expiración sincronizada.

Cache Stampede

Advertencia: Cache Stampede

Un cache stampede (también llamado thundering herd) ocurre cuando una clave popular expira y cientos o miles de peticiones concurrentes intentan recalcular el mismo valor simultáneamente, saturando el origen de datos. Este problema puede causar cascadas de fallos que derriben un sistema completo.

La solución clásica es usar un lock distribuido para que solo un proceso recalcule el valor mientras los demás esperan:

import asyncio import json from typing import Any, Callable async def get_with_lock( cache_client: Any, key: str, fetch_fn: Callable[[], Any], ttl: int = 300, lock_timeout: int = 10, ) -> Any: """Cache-Aside con protección contra stampede usando lock distribuido.""" # Intento 1: leer del cache value = await cache_client.get(key) if value is not None: return json.loads(value) # Adquirir lock distribuido lock_key = f"lock:{key}" acquired = await cache_client.set(lock_key, "1", nx=True, ex=lock_timeout) if acquired: try: # Este proceso recalcula el valor result = await fetch_fn() await cache_client.set(key, json.dumps(result, default=str), ex=ttl) return result finally: await cache_client.delete(lock_key) else: # Otro proceso está recalculando; esperar y reintentar for _ in range(lock_timeout * 10): await asyncio.sleep(0.1) value = await cache_client.get(key) if value is not None: return json.loads(value) # Fallback: calcular directamente si el lock expiró return await fetch_fn()

El comando SET key value NX EX timeout de Redis es atómico: solo un proceso puede adquirir el lock. Los demás procesos entran en un bucle de polling corto hasta que el valor esté disponible en el cache. El finally garantiza que el lock se libera incluso si ocurre un error durante el recálculo.

Expiración Probabilística Temprana (XFetch)

Una alternativa elegante al lock es la expiración probabilística temprana: antes de que la clave expire oficialmente, cada acceso tiene una probabilidad creciente de recomputar el valor proactivamente. Esto distribuye las recomputaciones en el tiempo, evitando la avalancha sincronizada:

import math import random import time from dataclasses import dataclass from typing import Any, Callable @dataclass class XFetchEntry: """Entrada de cache con metadata para expiración probabilística.""" value: Any delta: float # Tiempo que tomó computar el valor (segundos) expiry: float # Timestamp de expiración def should_recompute(entry: XFetchEntry, beta: float = 1.0) -> bool: """Algoritmo XFetch: decide probabilísticamente si recomputar.""" now = time.time() ttl_remaining = entry.expiry - now # A medida que se acerca la expiración, la probabilidad de recomputar aumenta threshold = entry.delta * beta * math.log(random.random()) return ttl_remaining + threshold <= 0 async def xfetch_get( cache_client: Any, key: str, fetch_fn: Callable[[], Any], ttl: int = 300, beta: float = 1.0, ) -> Any: """Cache-Aside con expiración probabilística temprana.""" raw = await cache_client.get(key) if raw is not None: import json entry = XFetchEntry(**json.loads(raw)) if not should_recompute(entry, beta): return entry.value # Recomputar proactivamente (sin bloquear a otros) start = time.time() result = await fetch_fn() delta = time.time() - start entry = XFetchEntry(value=result, delta=delta, expiry=time.time() + ttl) import json await cache_client.set(key, json.dumps(vars(entry)), ex=ttl) return result

El parámetro beta controla la agresividad del recomputo temprano: valores más altos hacen que el recomputo ocurra antes de la expiración. La clave del algoritmo es que math.log(random.random()) genera un valor negativo aleatorio, y cuanto menos tiempo queda (menor ttl_remaining), más probable es que la suma sea ≤ 0, disparando el recomputo.

Escalando el cache: Sharding y Consistent Hashing

Un solo servidor Redis puede manejar cientos de miles de operaciones por segundo, pero eventualmente alcanza sus límites. Cuando esto ocurre, necesitamos distribuir las claves entre múltiples nodos: esto es sharding.

El enfoque ingenuo — node = hash(key) % num_nodes — tiene un problema catastrófico: cuando agregamos o removemos un nodo, casi todas las claves se redistribuyen, invalidando la mayoría del cache de golpe.

Definición: Consistent Hashing

Consistent hashing es un esquema de distribución donde al agregar o remover un nodo, solo se reasigna KN\frac{K}{N} claves en promedio (donde KK es el total de claves y NN el número de nodos), en lugar del KN1NK \cdot \frac{N-1}{N} del hashing modular. Los nodos y las claves se mapean a un mismo espacio circular (anillo hash) y cada clave se asigna al primer nodo encontrado en sentido horario.

import bisect import hashlib from dataclasses import dataclass, field @dataclass class ConsistentHashRing: """Anillo de hash consistente con nodos virtuales.""" replicas: int = 150 # Nodos virtuales por nodo físico _ring: list[int] = field(default_factory=list, init=False) _nodes: dict[int, str] = field(default_factory=dict, init=False) def _hash(self, key: str) -> int: return int(hashlib.sha256(key.encode()).hexdigest(), 16) def add_node(self, node: str) -> None: """Agrega un nodo físico con sus réplicas virtuales al anillo.""" for i in range(self.replicas): h = self._hash(f"{node}:vnode{i}") self._ring.append(h) self._nodes[h] = node self._ring.sort() def remove_node(self, node: str) -> None: """Remueve un nodo y todas sus réplicas virtuales.""" self._ring = [h for h in self._ring if self._nodes.get(h) != node] self._nodes = {h: n for h, n in self._nodes.items() if n != node} def get_node(self, key: str) -> str: """Encuentra el nodo responsable de una clave.""" if not self._ring: raise ValueError("El anillo está vacío") h = self._hash(key) idx = bisect.bisect_right(self._ring, h) % len(self._ring) return self._nodes[self._ring[idx]] # Demostración: distribución de claves ring = ConsistentHashRing(replicas=150) for node in ["redis-1", "redis-2", "redis-3"]: ring.add_node(node) # Verificar distribución from collections import Counter distribution = Counter(ring.get_node(f"key:{i}") for i in range(10000)) print("Distribución de 10,000 claves:") for node, count in sorted(distribution.items()): print(f" {node}: {count} ({count/100:.1f}%)") # Simular agregar un nodo ring.add_node("redis-4") new_distribution = Counter(ring.get_node(f"key:{i}") for i in range(10000)) moved = sum(1 for i in range(10000) if distribution.most_common()[0][0] != ring.get_node(f"key:{i}")) print(f"\nTras agregar redis-4:") for node, count in sorted(new_distribution.items()): print(f" {node}: {count} ({count/100:.1f}%)") ## Output # Distribución de 10,000 claves: # redis-1: 3348 (33.5%) # redis-2: 3312 (33.1%) # redis-3: 3340 (33.4%) # # Tras agregar redis-4: # redis-1: 2509 (25.1%) # redis-2: 2498 (25.0%) # redis-3: 2516 (25.2%) # redis-4: 2477 (24.8%)

Los nodos virtuales (replicas=150) son la clave de una distribución uniforme. Sin ellos, 3 nodos físicos crearían solo 3 puntos en el anillo, resultando en particiones muy desiguales. Con 150 réplicas por nodo, la distribución se aproxima al ideal teórico de 1N\frac{1}{N} por nodo. bisect_right permite encontrar el nodo responsable en O(logV)O(\log V), donde VV es el número total de nodos virtuales.

Tolerancia a fallos

Un sistema de cache distribuido es un componente externo que puede fallar. Si Redis deja de responder, nuestra aplicación no debería colapsar: debería degradarse graciosamente.

Circuit Breaker

El patrón Circuit Breaker evita que una cascada de errores sature un servicio caído:

import time from dataclasses import dataclass, field from enum import Enum, auto from typing import Any, Callable class CircuitState(Enum): CLOSED = auto() # Operación normal OPEN = auto() # Circuito abierto: rechaza peticiones HALF_OPEN = auto() # Prueba: permite una petición de prueba @dataclass class CircuitBreaker: """Circuit Breaker para proteger llamadas a servicios externos.""" failure_threshold: int = 5 recovery_timeout: float = 30.0 state: CircuitState = CircuitState.CLOSED failures: int = 0 last_failure_time: float = 0.0 _success_count: int = 0 def _trip(self) -> None: """Abre el circuito tras demasiados fallos.""" self.state = CircuitState.OPEN self.last_failure_time = time.time() def _attempt_reset(self) -> None: """Transiciona a HALF_OPEN si pasó el timeout de recuperación.""" if time.time() - self.last_failure_time >= self.recovery_timeout: self.state = CircuitState.HALF_OPEN self._success_count = 0 async def call(self, func: Callable, fallback: Callable, *args: Any) -> Any: """Ejecuta la función con protección de circuit breaker.""" if self.state == CircuitState.OPEN: self._attempt_reset() if self.state == CircuitState.OPEN: return await fallback(*args) try: result = await func(*args) if self.state == CircuitState.HALF_OPEN: self._success_count += 1 if self._success_count >= 3: self.state = CircuitState.CLOSED self.failures = 0 return result except Exception: self.failures += 1 if self.failures >= self.failure_threshold: self._trip() if self.state == CircuitState.HALF_OPEN: self._trip() return await fallback(*args) # Uso con cache breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30) async def get_from_cache(key: str) -> str | None: # Implementación real con redis ... async def get_from_db(key: str) -> str | None: # Fallback directo a base de datos ... # El circuit breaker protege las llamadas a Redis # result = await breaker.call(get_from_cache, get_from_db, "user:1001")

El Circuit Breaker opera en tres estados:

  1. CLOSED (normal): las peticiones fluyen normalmente. Si se acumulan failure_threshold fallos, el circuito se abre.
  2. OPEN (cortocircuito): todas las peticiones se redirigen al fallback (base de datos) sin intentar contactar Redis, evitando peticiones inútiles y timeouts.
  3. HALF_OPEN (prueba): después de recovery_timeout segundos, se permite una petición de prueba. Si tiene éxito (3 éxitos consecutivos), el circuito se cierra. Si falla, se reabre.

Degradación gradual

Más allá del Circuit Breaker, una buena estrategia de degradación incluye múltiples niveles de fallback:

  1. L1 (cache local): si Redis falla, servir desde el cache en memoria local, aunque esté ligeramente desactualizado.
  2. Datos obsoletos (stale): si el dato expiró en el cache pero aún está disponible, servirlo con un encabezado indicando que es stale mientras se recomputa en segundo plano.
  3. Base de datos directa: como último recurso, consultar directamente el origen de datos, aceptando la latencia adicional.

La clave es que el usuario perciba una degradación gradual de rendimiento, nunca una caída total.

Observabilidad y métricas

Un cache sin observabilidad es un cache misterioso. Sin métricas, no podemos saber si está ayudando o perjudicando el rendimiento.

Las métricas esenciales son:

import time from dataclasses import dataclass, field from typing import Any, Optional @dataclass class CacheMetrics: """Colector de métricas para cache.""" hits: int = 0 misses: int = 0 errors: int = 0 total_latency_ms: float = 0.0 _latencies: list[float] = field(default_factory=list) def record_hit(self, latency_ms: float) -> None: self.hits += 1 self.total_latency_ms += latency_ms self._latencies.append(latency_ms) def record_miss(self, latency_ms: float) -> None: self.misses += 1 self.total_latency_ms += latency_ms self._latencies.append(latency_ms) def record_error(self) -> None: self.errors += 1 @property def hit_rate(self) -> float: total = self.hits + self.misses return self.hits / total if total > 0 else 0.0 @property def avg_latency_ms(self) -> float: return self.total_latency_ms / len(self._latencies) if self._latencies else 0.0 @property def p99_latency_ms(self) -> float: if not self._latencies: return 0.0 sorted_lat = sorted(self._latencies) idx = int(len(sorted_lat) * 0.99) return sorted_lat[min(idx, len(sorted_lat) - 1)] def summary(self) -> str: return ( f"Hit rate: {self.hit_rate:.1%} | " f"Avg latency: {self.avg_latency_ms:.2f}ms | " f"P99 latency: {self.p99_latency_ms:.2f}ms | " f"Errors: {self.errors}" ) @dataclass class InstrumentedCache: """Wrapper que añade métricas a cualquier implementación de cache.""" inner: Any # Instancia de cache real metrics: CacheMetrics = field(default_factory=CacheMetrics) async def get(self, key: str) -> Optional[str]: start = time.perf_counter() try: value = await self.inner.get(key) latency = (time.perf_counter() - start) * 1000 if value is not None: self.metrics.record_hit(latency) else: self.metrics.record_miss(latency) return value except Exception: self.metrics.record_error() return None async def set(self, key: str, value: str, **kwargs: Any) -> None: try: await self.inner.set(key, value, **kwargs) except Exception: self.metrics.record_error() # Ejemplo de uso metrics = CacheMetrics() # Simulación de métricas for i in range(950): metrics.record_hit(0.5 + (i % 10) * 0.1) for i in range(50): metrics.record_miss(15.0 + i * 0.5) print(metrics.summary()) ## Output # Hit rate: 95.0% | Avg latency: 1.70ms | P99 latency: 38.50ms | Errors: 0

El InstrumentedCache es un wrapper transparente que se coloca alrededor de cualquier implementación de cache existente. Esto sigue el principio de composición sobre herencia: no necesitas modificar tu cache para añadirle métricas. En producción, las métricas se exportan a sistemas como Prometheus o Datadog para alertas y dashboards.

Conclusiones

El cache distribuido es una de las herramientas más poderosas y, al mismo tiempo, más sutiles de la ingeniería de sistemas. A lo largo de este artículo hemos explorado sus múltiples dimensiones:

La clave para usar cache efectivamente no es solo conocer las herramientas, sino entender los compromisos. No existe una solución universal: la mejor arquitectura de cache es la que se ajusta a los patrones de acceso, requisitos de consistencia y tolerancia a fallos de tu sistema específico.

Finalmente, si hay errores, omisiones o inexactitudes en este artículo, por favor no dude en contactarnos a través de siguiente canal de Discord: Math & Code.

Referencias

Alejandro Sánchez Yalí

Alejandro Sánchez Yalí

Software Developer and Mathematician

Mathematics × Code × AI — exploring the intersections of programming and mathematical thinking.