Volver a Redis Básico
Pub/Sub y Streams en Redis
Pub/Sub — Disparar y Olvidar
# Suscriptor (bloquea esperando mensajes)
SUBSCRIBE pedidos
# Publicador (conexión separada)
PUBLISH pedidos '{"id":"123","estado":"creado","total":59.99}'
# Suscripción por patrón
PSUBSCRIBE pedidos.* # coincide con pedidos.creado, pedidos.enviado, ...
# Cancelar suscripción
UNSUBSCRIBE pedidos
const suscriptor = new Redis();
const publicador = new Redis();
await suscriptor.subscribe('notificaciones');
suscriptor.on('message', (canal, mensaje) => {
const evento = JSON.parse(mensaje);
console.log(`[${canal}]`, evento);
});
await publicador.publish('notificaciones', JSON.stringify({
tipo: 'PEDIDO_ENVIADO',
userId: 'u123',
pedidoId: 'o456'
}));
Limitaciones del Pub/Sub
- Mensajes publicados sin suscriptores activos se pierden
- Sin persistencia de mensajes
- Sin grupos de consumidores
- Sin confirmación de entrega
Para mensajería durable → usar Streams
Redis Streams — Log de Mensajes Persistente
# Agregar mensajes (ID auto-generado)
XADD pedidos * userId u1 productoId p1 cantidad 2 total 59.98
XADD pedidos * userId u2 productoId p2 cantidad 1 total 29.99
# Leer todos los mensajes
XRANGE pedidos - +
# Leer N mensajes desde el inicio
XRANGE pedidos - + COUNT 10
# Leer nuevos mensajes
XREAD COUNT 10 STREAMS pedidos 0 # desde el principio
XREAD COUNT 10 STREAMS pedidos $ # solo nuevos ($ = más reciente)
# Longitud del stream
XLEN pedidos
Grupos de Consumidores
# Crear grupo de consumidores
XGROUP CREATE pedidos migrupo $ MKSTREAM
# Leer como consumidor
XREADGROUP GROUP migrupo trabajador1 COUNT 5 STREAMS pedidos >
# Confirmar mensaje procesado
XACK pedidos migrupo <id-mensaje>
# Ver mensajes pendientes (no confirmados)
XPENDING pedidos migrupo - + 10
# Reclamar mensajes atascados (consumidor caído)
XCLAIM pedidos migrupo trabajador2 60000 <id-atascado>
async function procesarPedidos() {
while (true) {
const mensajes = await redis.xreadgroup(
'GROUP', 'fulfillment', 'worker-1',
'COUNT', 10,
'BLOCK', 2000,
'STREAMS', 'pedidos', '>'
);
if (!mensajes) continue;
for (const [stream, entradas] of mensajes) {
for (const [id, campos] of entradas) {
const pedido = Object.fromEntries(
campos.reduce((acc: any[], _: any, i: number, arr: any[]) =>
i % 2 === 0 ? [...acc, [arr[i], arr[i+1]]] : acc, []
)
);
await procesarPedido(pedido);
await redis.xack('pedidos', 'fulfillment', id);
}
}
}
}
Recorte del Stream
# Conservar solo las últimas 1000 entradas
XTRIM pedidos MAXLEN 1000
# Recorte aproximado (más eficiente)
XTRIM pedidos MAXLEN ~ 1000
# Agregar con recorte automático
XADD pedidos MAXLEN ~ 1000 * clave valor