El cerebro humano es, probablemente, el sistema de cache más sofisticado que existe. Cuando alguien te pregunta cuánto es 7 × 8, no recalculas la multiplicación desde cero: recuperas la respuesta de la memoria. Solo cuando enfrentas una operación desconocida — digamos, 347 × 29 — recurres al "origen de datos" y realizas el cálculo completo. Este principio tan simple — almacenar resultados para evitar trabajo repetido — es exactamente lo que hace un cache en un sistema de software.
Pero, ¿qué sucede cuando tu sistema ya no cabe en un solo servidor? Cuando tienes millones de usuarios concurrentes, múltiples instancias de tu aplicación y bases de datos distribuidas, el cache local ya no es suficiente. Necesitas un cache distribuido: un sistema de almacenamiento temporal compartido, accesible desde cualquier nodo de tu infraestructura.
Consideremos un ejemplo concreto. Imagina una API que consulta una base de datos para obtener el perfil de un usuario:
import time
def get_user_profile_no_cache(user_id: str) -> dict:
"""Simula una consulta a base de datos sin cache."""
time.sleep(0.1) # Simula latencia de red + query
return {"id": user_id, "name": "Ana García", "plan": "premium"}
# Sin cache: cada llamada paga el costo completo
start = time.perf_counter()
for _ in range(10):
get_user_profile_no_cache("user:1001")
elapsed_no_cache = time.perf_counter() - start
# Con cache simple: solo la primera llamada es costosa
cache: dict[str, dict] = {}
def get_user_profile_cached(user_id: str) -> dict:
if user_id not in cache:
cache[user_id] = get_user_profile_no_cache(user_id)
return cache[user_id]
start = time.perf_counter()
for _ in range(10):
get_user_profile_cached("user:1001")
elapsed_cached = time.perf_counter() - start
print(f"Sin cache: {elapsed_no_cache:.3f}s")
print(f"Con cache: {elapsed_cached:.3f}s")
print(f"Speedup: {elapsed_no_cache / elapsed_cached:.1f}x")
## Output
# Sin cache: 1.003s
# Con cache: 0.100s
# Speedup: 10.0x
La diferencia es dramática: un speedup de 10x con apenas unas líneas de código. En producción, con bases de datos reales y tráfico concurrente, esta diferencia puede significar el éxito o el fracaso de una aplicación.
En este artículo exploraremos el mundo del cache distribuido con Python, desde los fundamentos teóricos hasta implementaciones prácticas con patrones modernos. Cubriremos arquitecturas, estrategias de lectura y escritura, políticas de desalojo, sharding con consistent hashing, tolerancia a fallos y observabilidad.
Fundamentos del cache
Antes de sumergirnos en las arquitecturas distribuidas, establezcamos una base formal.
Un cache es una estructura de almacenamiento de acceso rápido que mantiene un subconjunto de datos, típicamente transitorios, con el objetivo de reducir el tiempo de acceso a un origen de datos más lento. Formalmente, un cache es una función parcial que mapea un subconjunto de claves a valores , donde para una capacidad máxima .
Las dos métricas fundamentales de un cache son:
- Hit rate (tasa de acierto): proporción de consultas resueltas por el cache, .
- Miss rate (tasa de fallo): complemento del hit rate, .
Un hit rate del 95% significa que solo 1 de cada 20 peticiones necesita consultar el origen de datos. En un sistema con miles de peticiones por segundo, esta diferencia es enorme.
Para formalizar la interfaz de un cache en Python, podemos usar el sistema de tipos moderno:
from typing import Protocol, TypeVar, Optional
K = TypeVar("K")
V = TypeVar("V")
class Cache(Protocol[K, V]):
"""Protocolo que define la interfaz mínima de un cache."""
def get(self, key: K) -> Optional[V]:
"""Retorna el valor asociado a la clave, o None si no existe."""
...
def set(self, key: K, value: V) -> None:
"""Almacena un par clave-valor en el cache."""
...
def delete(self, key: K) -> None:
"""Elimina una clave del cache."""
...
def clear(self) -> None:
"""Vacía completamente el cache."""
...
Este Protocol define el contrato que cualquier implementación de cache debe cumplir, ya sea un diccionario en memoria, un wrapper sobre Redis, o un sistema multicapa. La ventaja de usar Protocol en lugar de una clase abstracta es que permite structural subtyping: cualquier clase que implemente estos métodos es compatible, sin necesidad de herencia explícita.
Arquitecturas de cache
No existe una arquitectura de cache única para todos los casos. La elección depende de factores como latencia requerida, volumen de datos, número de instancias de la aplicación y tolerancia a inconsistencias.
Cache local (in-process)
El cache más simple vive dentro del proceso de la aplicación. Python ofrece herramientas excelentes para esto:
import functools
from cachetools import TTLCache
# Opción 1: functools.lru_cache para funciones puras
@functools.lru_cache(maxsize=1024)
def fibonacci(n: int) -> int:
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
print(fibonacci(100))
print(f"Cache info: {fibonacci.cache_info()}")
## Output
# 354224848179261915075
# Cache info: CacheInfo(hits=98, misses=101, maxsize=1024, currsize=101)
from cachetools import TTLCache
import time
# Opción 2: TTLCache para datos con expiración
user_cache: TTLCache[str, dict] = TTLCache(maxsize=256, ttl=300) # 5 minutos
def get_user(user_id: str) -> dict:
if user_id in user_cache:
return user_cache[user_id]
user = {"id": user_id, "name": "Ana García"} # Simulación de DB
user_cache[user_id] = user
return user
user = get_user("user:1001")
print(f"Usuario: {user}")
print(f"Entradas en cache: {len(user_cache)}")
## Output
# Usuario: {'id': 'user:1001', 'name': 'Ana García'}
# Entradas en cache: 1
El cache local es extremadamente rápido (nanosegundos de acceso) pero tiene limitaciones importantes: no se comparte entre procesos ni entre servidores, y cada instancia de la aplicación mantiene su propia copia. Esto puede llevar a inconsistencias y desperdicio de memoria.
Cache centralizado (Redis)
Redis es el estándar de facto para cache distribuido. Con redis-py 5.x podemos usar un cliente asíncrono que aprovecha la concurrencia de Python moderno:
import asyncio
from dataclasses import dataclass
import redis.asyncio as redis
@dataclass(frozen=True)
class RedisConfig:
host: str = "localhost"
port: int = 6379
db: int = 0
max_connections: int = 50
socket_timeout: float = 5.0
decode_responses: bool = True
async def create_redis_pool(config: RedisConfig) -> redis.Redis:
"""Crea un pool de conexiones Redis optimizado."""
pool = redis.ConnectionPool(
host=config.host,
port=config.port,
db=config.db,
max_connections=config.max_connections,
socket_timeout=config.socket_timeout,
decode_responses=config.decode_responses,
)
return redis.Redis(connection_pool=pool)
async def main():
client = await create_redis_pool(RedisConfig())
# Operaciones básicas
await client.set("user:1001:name", "Ana García", ex=300)
name = await client.get("user:1001:name")
print(f"Nombre: {name}")
# Pipeline para operaciones en lote
async with client.pipeline(transaction=False) as pipe:
pipe.set("counter:visits", 0)
pipe.incr("counter:visits")
pipe.incr("counter:visits")
pipe.get("counter:visits")
results = await pipe.execute()
print(f"Visitas: {results[-1]}")
await client.aclose()
asyncio.run(main())
## Output
# Nombre: Ana García
# Visitas: 2
El uso de ConnectionPool es crítico en producción: reutiliza conexiones TCP en lugar de crear una nueva por cada operación, reduciendo significativamente la latencia y el consumo de recursos del sistema operativo. El parámetro ex=300 establece un TTL de 5 minutos, asegurando que los datos no se vuelvan obsoletos indefinidamente.
Cache híbrido (L1 local + L2 Redis)
La arquitectura más robusta combina ambos niveles: un cache local ultra-rápido como primera línea de defensa y Redis como respaldo compartido:
from cachetools import TTLCache
from dataclasses import dataclass, field
from typing import Optional
import json
import redis.asyncio as redis
@dataclass
class HybridCache:
"""Cache de dos niveles: L1 (local) + L2 (Redis)."""
l1: TTLCache = field(default_factory=lambda: TTLCache(maxsize=512, ttl=60))
l2: Optional[redis.Redis] = None
l2_ttl: int = 300 # 5 minutos en Redis
async def get(self, key: str) -> Optional[str]:
# Nivel 1: cache local (nanosegundos)
if key in self.l1:
return self.l1[key]
# Nivel 2: Redis (milisegundos)
if self.l2:
value = await self.l2.get(key)
if value is not None:
self.l1[key] = value # Promover a L1
return value
return None
async def set(self, key: str, value: str) -> None:
self.l1[key] = value
if self.l2:
await self.l2.set(key, value, ex=self.l2_ttl)
async def delete(self, key: str) -> None:
self.l1.pop(key, None)
if self.l2:
await self.l2.delete(key)
Este patrón es extremadamente efectivo: las claves más populares (hot keys) se sirven desde L1 en nanosegundos, mientras que L2 actúa como una red de seguridad compartida entre todas las instancias. El TTL de L1 (60s) es intencionalmente más corto que el de L2 (300s) para limitar la ventana de inconsistencia entre instancias.
Estrategias de caching
La forma en que interactúa el cache con el origen de datos define la estrategia de caching. Cada estrategia tiene compromisos distintos entre consistencia, latencia y complejidad.
Cache-Aside (Lazy Loading)
Es la estrategia más común: la aplicación consulta primero el cache; si no encuentra el dato (cache miss), lo busca en el origen y luego lo almacena en el cache para futuras consultas.
import functools
import json
import asyncio
from typing import Callable, ParamSpec, TypeVar
P = ParamSpec("P")
R = TypeVar("R")
def cache_aside(
cache_client: "redis.Redis",
prefix: str,
ttl: int = 300,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""Decorador que implementa el patrón Cache-Aside."""
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
# Construir clave única
key = f"{prefix}:{func.__name__}:{hash((args, tuple(sorted(kwargs.items()))))}"
# Intentar leer del cache
cached = await cache_client.get(key)
if cached is not None:
return json.loads(cached)
# Cache miss: consultar origen
result = await func(*args, **kwargs)
# Almacenar en cache
await cache_client.set(key, json.dumps(result, default=str), ex=ttl)
return result
return wrapper
return decorator
# Uso:
# @cache_aside(redis_client, prefix="users", ttl=600)
# async def get_user_profile(user_id: str) -> dict:
# return await db.fetch_user(user_id)
La ventaja de Cache-Aside es su simplicidad y resiliencia: si el cache falla, la aplicación sigue funcionando (simplemente con mayor latencia). La desventaja es que la primera lectura siempre es lenta (cold start) y los datos pueden quedar obsoletos hasta que expire el TTL.
Write-Through
En esta estrategia, cada escritura va tanto al cache como al origen de datos de forma simultánea:
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class WriteThroughCache:
"""Cache con escritura simultánea a cache y origen de datos."""
cache_client: Any # redis.Redis
ttl: int = 300
async def write(self, key: str, value: str, db_write_fn: Any) -> None:
"""Escribe simultáneamente en cache y base de datos."""
await asyncio.gather(
self.cache_client.set(key, value, ex=self.ttl),
db_write_fn(key, value),
)
async def read(self, key: str, db_read_fn: Any) -> str | None:
"""Lee del cache; si falla, consulta la base de datos."""
value = await self.cache_client.get(key)
if value is not None:
return value
value = await db_read_fn(key)
if value is not None:
await self.cache_client.set(key, value, ex=self.ttl)
return value
La escritura simultánea con asyncio.gather garantiza que el cache siempre tiene el dato más reciente. El costo es mayor latencia en las escrituras (debemos esperar a que ambos almacenes confirmen), pero las lecturas son consistentes y rápidas.
Write-Behind (Write-Back)
Esta estrategia prioriza la velocidad de escritura: los datos se escriben primero al cache y se sincronizan con el origen de datos de forma asíncrona en lotes:
import asyncio
from collections import deque
from dataclasses import dataclass, field
from typing import Any
@dataclass
class WriteBehindCache:
"""Cache con escritura diferida en lotes."""
cache_client: Any # redis.Redis
buffer: deque = field(default_factory=deque)
flush_interval: float = 2.0 # segundos entre flushes
batch_size: int = 100
_running: bool = False
async def write(self, key: str, value: str) -> None:
"""Escribe al cache inmediatamente y encola para persistencia."""
await self.cache_client.set(key, value)
self.buffer.append((key, value))
async def _flush(self, db_batch_write_fn: Any) -> None:
"""Envía el buffer acumulado a la base de datos."""
batch = []
while self.buffer and len(batch) < self.batch_size:
batch.append(self.buffer.popleft())
if batch:
await db_batch_write_fn(batch)
print(f"Flush: {len(batch)} registros persistidos")
async def start_flush_loop(self, db_batch_write_fn: Any) -> None:
"""Bucle que periódicamente vacía el buffer."""
self._running = True
while self._running:
await self._flush(db_batch_write_fn)
await asyncio.sleep(self.flush_interval)
def stop(self) -> None:
self._running = False
El patrón Write-Behind puede perder datos si el proceso se interrumpe antes de que el buffer se sincronice con la base de datos. Es crucial implementar mecanismos de recuperación como write-ahead logs (WAL) o colas de mensajes persistentes (por ejemplo, Redis Streams o Kafka) para garantizar la durabilidad en entornos de producción.
Write-Behind ofrece la menor latencia de escritura, ya que la aplicación solo espera la confirmación del cache. Es ideal para escenarios de alta escritura donde la consistencia eventual es aceptable, como contadores de vistas, métricas o logs de actividad.
Políticas de desalojo
Cuando el cache alcanza su capacidad máxima, necesitamos decidir qué elementos eliminar para hacer espacio. Esta decisión es la política de desalojo.
LRU (Least Recently Used)
La política LRU desaloja el elemento que no ha sido accedido durante el mayor período de tiempo. Se basa en el principio de localidad temporal: si un dato fue accedido recientemente, es probable que se acceda nuevamente pronto.
Podemos implementar un cache LRU eficiente usando OrderedDict, que mantiene el orden de inserción y permite mover elementos al final en :
from collections import OrderedDict
from typing import Generic, TypeVar, Optional
K = TypeVar("K")
V = TypeVar("V")
class LRUCache(Generic[K, V]):
"""Implementación de cache LRU con O(1) para get y set."""
def __init__(self, capacity: int) -> None:
self._capacity = capacity
self._store: OrderedDict[K, V] = OrderedDict()
self._hits = 0
self._misses = 0
def get(self, key: K) -> Optional[V]:
if key in self._store:
self._store.move_to_end(key) # Marcar como reciente
self._hits += 1
return self._store[key]
self._misses += 1
return None
def set(self, key: K, value: V) -> None:
if key in self._store:
self._store.move_to_end(key)
self._store[key] = value
if len(self._store) > self._capacity:
self._store.popitem(last=False) # Eliminar el menos reciente
@property
def hit_rate(self) -> float:
total = self._hits + self._misses
return self._hits / total if total > 0 else 0.0
# Demostración
cache = LRUCache[str, int](capacity=3)
cache.set("a", 1)
cache.set("b", 2)
cache.set("c", 3)
cache.get("a") # Accede a "a", lo mueve al final
cache.set("d", 4) # Capacidad excedida: desaloja "b" (el menos reciente)
print(f"¿'a' en cache? {cache.get('a')}") # Sí, fue accedido recientemente
print(f"¿'b' en cache? {cache.get('b')}") # No, fue desalojado
print(f"Hit rate: {cache.hit_rate:.1%}")
## Output
# ¿'a' en cache? 1
# ¿'b' en cache? None
# Hit rate: 50.0%
La implementación usa move_to_end para actualizar el orden en y popitem(last=False) para desalojar el elemento más antiguo. El hit rate integrado permite monitorear la eficiencia del cache en tiempo real.
LFU (Least Frequently Used)
Mientras que LRU se basa en cuándo se accedió un dato, LFU se basa en cuántas veces se accedió. LFU desaloja el elemento con menor frecuencia de acceso. Es más efectivo que LRU cuando existen elementos con popularidad estable a lo largo del tiempo (por ejemplo, páginas de inicio o configuraciones globales), pero puede ser problemático con elementos que fueron muy populares en el pasado pero ya no lo son (cache pollution).
TTL (Time-To-Live)
La expiración basada en tiempo es el mecanismo de desalojo más predecible: cada elemento tiene un tiempo de vida fijo, tras el cual se elimina automáticamente. TTL se combina frecuentemente con LRU: los elementos pueden ser desalojados por antigüedad de acceso o por expiración temporal, lo que ocurra primero. cachetools.TTLCache implementa exactamente esta combinación.
Diseño de claves de cache
Un buen diseño de claves es esencial para la mantenibilidad y el rendimiento del cache. Claves mal diseñadas causan colisiones, dificultan la invalidación selectiva y hacen imposible el debugging.
import hashlib
from dataclasses import dataclass
@dataclass(frozen=True)
class CacheKey:
"""Clave de cache estructurada con namespace, versión y entidad."""
namespace: str
version: int
entity: str
entity_id: str
def __str__(self) -> str:
return f"{self.namespace}:v{self.version}:{self.entity}:{self.entity_id}"
def with_params(self, **params: str) -> str:
"""Genera una clave extendida con parámetros hasheados."""
param_str = "&".join(f"{k}={v}" for k, v in sorted(params.items()))
param_hash = hashlib.md5(param_str.encode()).hexdigest()[:8]
return f"{self}:{param_hash}"
# Ejemplos de uso
key = CacheKey(namespace="myapp", version=2, entity="user", entity_id="1001")
print(str(key))
print(key.with_params(fields="name,email", lang="es"))
## Output
# myapp:v2:user:1001
# myapp:v2:user:1001:a1b2c3d4
Las convenciones clave son:
- Namespace: aísla claves entre servicios o módulos (
myapp,auth,catalog). - Versión: permite invalidar todo el cache de una entidad incrementando la versión durante deployments.
- Entidad + ID: identifica el recurso específico.
- Hash de parámetros: distingue variantes de una misma entidad (por ejemplo, diferentes campos o idiomas).
Serialización para cache distribuido
Cuando usamos un cache externo como Redis, los datos deben serializarse para su transmisión por la red. La elección del formato de serialización impacta directamente en el rendimiento.
import json
import time
from abc import ABC, abstractmethod
from typing import Any
class Serializer(ABC):
"""Interfaz abstracta para serialización de cache."""
@abstractmethod
def serialize(self, data: Any) -> bytes:
...
@abstractmethod
def deserialize(self, raw: bytes) -> Any:
...
class JsonSerializer(Serializer):
def serialize(self, data: Any) -> bytes:
return json.dumps(data, default=str).encode("utf-8")
def deserialize(self, raw: bytes) -> Any:
return json.loads(raw)
class MsgpackSerializer(Serializer):
def serialize(self, data: Any) -> bytes:
import msgpack
return msgpack.packb(data, use_bin_type=True)
def deserialize(self, raw: bytes) -> Any:
import msgpack
return msgpack.unpackb(raw, raw=False)
# Benchmark
data = {"users": [{"id": i, "name": f"User {i}", "active": True} for i in range(1000)]}
for name, serializer in [("JSON", JsonSerializer()), ("Msgpack", MsgpackSerializer())]:
start = time.perf_counter()
for _ in range(1000):
raw = serializer.serialize(data)
serializer.deserialize(raw)
elapsed = time.perf_counter() - start
print(f"{name:>7}: {len(raw):>6} bytes | {elapsed:.3f}s (1000 ciclos)")
## Output
# JSON: 48890 bytes | 1.523s (1000 ciclos)
# Msgpack: 28891 bytes | 0.412s (1000 ciclos)
Msgpack produce datos un 41% más compactos y es 3.7x más rápido que JSON en este benchmark. Para caches distribuidos con alto volumen, esta diferencia reduce significativamente el uso de ancho de banda y la latencia de serialización. JSON sigue siendo preferible cuando la depurabilidad es prioritaria, ya que es legible para humanos.
Desafíos del cache distribuido
El caching distribuido introduce desafíos que no existen en un cache local. Los dos más críticos son el cache stampede y la expiración sincronizada.
Cache Stampede
Un cache stampede (también llamado thundering herd) ocurre cuando una clave popular expira y cientos o miles de peticiones concurrentes intentan recalcular el mismo valor simultáneamente, saturando el origen de datos. Este problema puede causar cascadas de fallos que derriben un sistema completo.
La solución clásica es usar un lock distribuido para que solo un proceso recalcule el valor mientras los demás esperan:
import asyncio
import json
from typing import Any, Callable
async def get_with_lock(
cache_client: Any,
key: str,
fetch_fn: Callable[[], Any],
ttl: int = 300,
lock_timeout: int = 10,
) -> Any:
"""Cache-Aside con protección contra stampede usando lock distribuido."""
# Intento 1: leer del cache
value = await cache_client.get(key)
if value is not None:
return json.loads(value)
# Adquirir lock distribuido
lock_key = f"lock:{key}"
acquired = await cache_client.set(lock_key, "1", nx=True, ex=lock_timeout)
if acquired:
try:
# Este proceso recalcula el valor
result = await fetch_fn()
await cache_client.set(key, json.dumps(result, default=str), ex=ttl)
return result
finally:
await cache_client.delete(lock_key)
else:
# Otro proceso está recalculando; esperar y reintentar
for _ in range(lock_timeout * 10):
await asyncio.sleep(0.1)
value = await cache_client.get(key)
if value is not None:
return json.loads(value)
# Fallback: calcular directamente si el lock expiró
return await fetch_fn()
El comando SET key value NX EX timeout de Redis es atómico: solo un proceso puede adquirir el lock. Los demás procesos entran en un bucle de polling corto hasta que el valor esté disponible en el cache. El finally garantiza que el lock se libera incluso si ocurre un error durante el recálculo.
Expiración Probabilística Temprana (XFetch)
Una alternativa elegante al lock es la expiración probabilística temprana: antes de que la clave expire oficialmente, cada acceso tiene una probabilidad creciente de recomputar el valor proactivamente. Esto distribuye las recomputaciones en el tiempo, evitando la avalancha sincronizada:
import math
import random
import time
from dataclasses import dataclass
from typing import Any, Callable
@dataclass
class XFetchEntry:
"""Entrada de cache con metadata para expiración probabilística."""
value: Any
delta: float # Tiempo que tomó computar el valor (segundos)
expiry: float # Timestamp de expiración
def should_recompute(entry: XFetchEntry, beta: float = 1.0) -> bool:
"""Algoritmo XFetch: decide probabilísticamente si recomputar."""
now = time.time()
ttl_remaining = entry.expiry - now
# A medida que se acerca la expiración, la probabilidad de recomputar aumenta
threshold = entry.delta * beta * math.log(random.random())
return ttl_remaining + threshold <= 0
async def xfetch_get(
cache_client: Any,
key: str,
fetch_fn: Callable[[], Any],
ttl: int = 300,
beta: float = 1.0,
) -> Any:
"""Cache-Aside con expiración probabilística temprana."""
raw = await cache_client.get(key)
if raw is not None:
import json
entry = XFetchEntry(**json.loads(raw))
if not should_recompute(entry, beta):
return entry.value
# Recomputar proactivamente (sin bloquear a otros)
start = time.time()
result = await fetch_fn()
delta = time.time() - start
entry = XFetchEntry(value=result, delta=delta, expiry=time.time() + ttl)
import json
await cache_client.set(key, json.dumps(vars(entry)), ex=ttl)
return result
El parámetro beta controla la agresividad del recomputo temprano: valores más altos hacen que el recomputo ocurra antes de la expiración. La clave del algoritmo es que math.log(random.random()) genera un valor negativo aleatorio, y cuanto menos tiempo queda (menor ttl_remaining), más probable es que la suma sea ≤ 0, disparando el recomputo.
Escalando el cache: Sharding y Consistent Hashing
Un solo servidor Redis puede manejar cientos de miles de operaciones por segundo, pero eventualmente alcanza sus límites. Cuando esto ocurre, necesitamos distribuir las claves entre múltiples nodos: esto es sharding.
El enfoque ingenuo — node = hash(key) % num_nodes — tiene un problema catastrófico: cuando agregamos o removemos un nodo, casi todas las claves se redistribuyen, invalidando la mayoría del cache de golpe.
Consistent hashing es un esquema de distribución donde al agregar o remover un nodo, solo se reasigna claves en promedio (donde es el total de claves y el número de nodos), en lugar del del hashing modular. Los nodos y las claves se mapean a un mismo espacio circular (anillo hash) y cada clave se asigna al primer nodo encontrado en sentido horario.
import bisect
import hashlib
from dataclasses import dataclass, field
@dataclass
class ConsistentHashRing:
"""Anillo de hash consistente con nodos virtuales."""
replicas: int = 150 # Nodos virtuales por nodo físico
_ring: list[int] = field(default_factory=list, init=False)
_nodes: dict[int, str] = field(default_factory=dict, init=False)
def _hash(self, key: str) -> int:
return int(hashlib.sha256(key.encode()).hexdigest(), 16)
def add_node(self, node: str) -> None:
"""Agrega un nodo físico con sus réplicas virtuales al anillo."""
for i in range(self.replicas):
h = self._hash(f"{node}:vnode{i}")
self._ring.append(h)
self._nodes[h] = node
self._ring.sort()
def remove_node(self, node: str) -> None:
"""Remueve un nodo y todas sus réplicas virtuales."""
self._ring = [h for h in self._ring if self._nodes.get(h) != node]
self._nodes = {h: n for h, n in self._nodes.items() if n != node}
def get_node(self, key: str) -> str:
"""Encuentra el nodo responsable de una clave."""
if not self._ring:
raise ValueError("El anillo está vacío")
h = self._hash(key)
idx = bisect.bisect_right(self._ring, h) % len(self._ring)
return self._nodes[self._ring[idx]]
# Demostración: distribución de claves
ring = ConsistentHashRing(replicas=150)
for node in ["redis-1", "redis-2", "redis-3"]:
ring.add_node(node)
# Verificar distribución
from collections import Counter
distribution = Counter(ring.get_node(f"key:{i}") for i in range(10000))
print("Distribución de 10,000 claves:")
for node, count in sorted(distribution.items()):
print(f" {node}: {count} ({count/100:.1f}%)")
# Simular agregar un nodo
ring.add_node("redis-4")
new_distribution = Counter(ring.get_node(f"key:{i}") for i in range(10000))
moved = sum(1 for i in range(10000)
if distribution.most_common()[0][0] != ring.get_node(f"key:{i}"))
print(f"\nTras agregar redis-4:")
for node, count in sorted(new_distribution.items()):
print(f" {node}: {count} ({count/100:.1f}%)")
## Output
# Distribución de 10,000 claves:
# redis-1: 3348 (33.5%)
# redis-2: 3312 (33.1%)
# redis-3: 3340 (33.4%)
#
# Tras agregar redis-4:
# redis-1: 2509 (25.1%)
# redis-2: 2498 (25.0%)
# redis-3: 2516 (25.2%)
# redis-4: 2477 (24.8%)
Los nodos virtuales (replicas=150) son la clave de una distribución uniforme. Sin ellos, 3 nodos físicos crearían solo 3 puntos en el anillo, resultando en particiones muy desiguales. Con 150 réplicas por nodo, la distribución se aproxima al ideal teórico de por nodo. bisect_right permite encontrar el nodo responsable en , donde es el número total de nodos virtuales.
Tolerancia a fallos
Un sistema de cache distribuido es un componente externo que puede fallar. Si Redis deja de responder, nuestra aplicación no debería colapsar: debería degradarse graciosamente.
Circuit Breaker
El patrón Circuit Breaker evita que una cascada de errores sature un servicio caído:
import time
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Any, Callable
class CircuitState(Enum):
CLOSED = auto() # Operación normal
OPEN = auto() # Circuito abierto: rechaza peticiones
HALF_OPEN = auto() # Prueba: permite una petición de prueba
@dataclass
class CircuitBreaker:
"""Circuit Breaker para proteger llamadas a servicios externos."""
failure_threshold: int = 5
recovery_timeout: float = 30.0
state: CircuitState = CircuitState.CLOSED
failures: int = 0
last_failure_time: float = 0.0
_success_count: int = 0
def _trip(self) -> None:
"""Abre el circuito tras demasiados fallos."""
self.state = CircuitState.OPEN
self.last_failure_time = time.time()
def _attempt_reset(self) -> None:
"""Transiciona a HALF_OPEN si pasó el timeout de recuperación."""
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self._success_count = 0
async def call(self, func: Callable, fallback: Callable, *args: Any) -> Any:
"""Ejecuta la función con protección de circuit breaker."""
if self.state == CircuitState.OPEN:
self._attempt_reset()
if self.state == CircuitState.OPEN:
return await fallback(*args)
try:
result = await func(*args)
if self.state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= 3:
self.state = CircuitState.CLOSED
self.failures = 0
return result
except Exception:
self.failures += 1
if self.failures >= self.failure_threshold:
self._trip()
if self.state == CircuitState.HALF_OPEN:
self._trip()
return await fallback(*args)
# Uso con cache
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
async def get_from_cache(key: str) -> str | None:
# Implementación real con redis
...
async def get_from_db(key: str) -> str | None:
# Fallback directo a base de datos
...
# El circuit breaker protege las llamadas a Redis
# result = await breaker.call(get_from_cache, get_from_db, "user:1001")
El Circuit Breaker opera en tres estados:
- CLOSED (normal): las peticiones fluyen normalmente. Si se acumulan
failure_thresholdfallos, el circuito se abre. - OPEN (cortocircuito): todas las peticiones se redirigen al fallback (base de datos) sin intentar contactar Redis, evitando peticiones inútiles y timeouts.
- HALF_OPEN (prueba): después de
recovery_timeoutsegundos, se permite una petición de prueba. Si tiene éxito (3 éxitos consecutivos), el circuito se cierra. Si falla, se reabre.
Degradación gradual
Más allá del Circuit Breaker, una buena estrategia de degradación incluye múltiples niveles de fallback:
- L1 (cache local): si Redis falla, servir desde el cache en memoria local, aunque esté ligeramente desactualizado.
- Datos obsoletos (stale): si el dato expiró en el cache pero aún está disponible, servirlo con un encabezado indicando que es stale mientras se recomputa en segundo plano.
- Base de datos directa: como último recurso, consultar directamente el origen de datos, aceptando la latencia adicional.
La clave es que el usuario perciba una degradación gradual de rendimiento, nunca una caída total.
Observabilidad y métricas
Un cache sin observabilidad es un cache misterioso. Sin métricas, no podemos saber si está ayudando o perjudicando el rendimiento.
Las métricas esenciales son:
- Hit rate: ¿qué porcentaje de peticiones sirve el cache? Por debajo de 80%, el cache probablemente no justifica su complejidad.
- Latencia (p50, p95, p99): ¿cuánto tarda una operación de cache? Picos en p99 pueden indicar problemas de red o de slow queries en Redis.
- Tasa de desalojo: ¿con qué frecuencia se eliminan claves? Una tasa alta sugiere que el cache es demasiado pequeño.
- Uso de memoria: ¿cuánto del cache disponible se está utilizando?
import time
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass
class CacheMetrics:
"""Colector de métricas para cache."""
hits: int = 0
misses: int = 0
errors: int = 0
total_latency_ms: float = 0.0
_latencies: list[float] = field(default_factory=list)
def record_hit(self, latency_ms: float) -> None:
self.hits += 1
self.total_latency_ms += latency_ms
self._latencies.append(latency_ms)
def record_miss(self, latency_ms: float) -> None:
self.misses += 1
self.total_latency_ms += latency_ms
self._latencies.append(latency_ms)
def record_error(self) -> None:
self.errors += 1
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
@property
def avg_latency_ms(self) -> float:
return self.total_latency_ms / len(self._latencies) if self._latencies else 0.0
@property
def p99_latency_ms(self) -> float:
if not self._latencies:
return 0.0
sorted_lat = sorted(self._latencies)
idx = int(len(sorted_lat) * 0.99)
return sorted_lat[min(idx, len(sorted_lat) - 1)]
def summary(self) -> str:
return (
f"Hit rate: {self.hit_rate:.1%} | "
f"Avg latency: {self.avg_latency_ms:.2f}ms | "
f"P99 latency: {self.p99_latency_ms:.2f}ms | "
f"Errors: {self.errors}"
)
@dataclass
class InstrumentedCache:
"""Wrapper que añade métricas a cualquier implementación de cache."""
inner: Any # Instancia de cache real
metrics: CacheMetrics = field(default_factory=CacheMetrics)
async def get(self, key: str) -> Optional[str]:
start = time.perf_counter()
try:
value = await self.inner.get(key)
latency = (time.perf_counter() - start) * 1000
if value is not None:
self.metrics.record_hit(latency)
else:
self.metrics.record_miss(latency)
return value
except Exception:
self.metrics.record_error()
return None
async def set(self, key: str, value: str, **kwargs: Any) -> None:
try:
await self.inner.set(key, value, **kwargs)
except Exception:
self.metrics.record_error()
# Ejemplo de uso
metrics = CacheMetrics()
# Simulación de métricas
for i in range(950):
metrics.record_hit(0.5 + (i % 10) * 0.1)
for i in range(50):
metrics.record_miss(15.0 + i * 0.5)
print(metrics.summary())
## Output
# Hit rate: 95.0% | Avg latency: 1.70ms | P99 latency: 38.50ms | Errors: 0
El InstrumentedCache es un wrapper transparente que se coloca alrededor de cualquier implementación de cache existente. Esto sigue el principio de composición sobre herencia: no necesitas modificar tu cache para añadirle métricas. En producción, las métricas se exportan a sistemas como Prometheus o Datadog para alertas y dashboards.
Conclusiones
El cache distribuido es una de las herramientas más poderosas y, al mismo tiempo, más sutiles de la ingeniería de sistemas. A lo largo de este artículo hemos explorado sus múltiples dimensiones:
- Arquitecturas: desde el cache local con
functools.lru_cachehasta sistemas híbridos L1+L2 con Redis. - Estrategias: Cache-Aside, Write-Through y Write-Behind, cada una con compromisos distintos entre consistencia, latencia y resiliencia.
- Desalojo: políticas LRU, LFU y TTL para gestionar la capacidad limitada del cache.
- Distribución: consistent hashing con nodos virtuales para escalar horizontalmente sin invalidar el cache existente.
- Resiliencia: Circuit Breaker y degradación gradual para manejar fallos sin afectar la experiencia del usuario.
- Observabilidad: métricas instrumentadas para tomar decisiones basadas en datos.
La clave para usar cache efectivamente no es solo conocer las herramientas, sino entender los compromisos. No existe una solución universal: la mejor arquitectura de cache es la que se ajusta a los patrones de acceso, requisitos de consistencia y tolerancia a fallos de tu sistema específico.
Finalmente, si hay errores, omisiones o inexactitudes en este artículo, por favor no dude en contactarnos a través de siguiente canal de Discord: Math & Code.