Volver a Redis Básico

Pub/Sub y Streams en Redis

Pub/Sub — Disparar y Olvidar

# Suscriptor (bloquea esperando mensajes) SUBSCRIBE pedidos # Publicador (conexión separada) PUBLISH pedidos '{"id":"123","estado":"creado","total":59.99}' # Suscripción por patrón PSUBSCRIBE pedidos.* # coincide con pedidos.creado, pedidos.enviado, ... # Cancelar suscripción UNSUBSCRIBE pedidos
const suscriptor = new Redis(); const publicador = new Redis(); await suscriptor.subscribe('notificaciones'); suscriptor.on('message', (canal, mensaje) => { const evento = JSON.parse(mensaje); console.log(`[${canal}]`, evento); }); await publicador.publish('notificaciones', JSON.stringify({ tipo: 'PEDIDO_ENVIADO', userId: 'u123', pedidoId: 'o456' }));

Limitaciones del Pub/Sub

  • Mensajes publicados sin suscriptores activos se pierden
  • Sin persistencia de mensajes
  • Sin grupos de consumidores
  • Sin confirmación de entrega

Para mensajería durable → usar Streams

Redis Streams — Log de Mensajes Persistente

# Agregar mensajes (ID auto-generado) XADD pedidos * userId u1 productoId p1 cantidad 2 total 59.98 XADD pedidos * userId u2 productoId p2 cantidad 1 total 29.99 # Leer todos los mensajes XRANGE pedidos - + # Leer N mensajes desde el inicio XRANGE pedidos - + COUNT 10 # Leer nuevos mensajes XREAD COUNT 10 STREAMS pedidos 0 # desde el principio XREAD COUNT 10 STREAMS pedidos $ # solo nuevos ($ = más reciente) # Longitud del stream XLEN pedidos

Grupos de Consumidores

# Crear grupo de consumidores XGROUP CREATE pedidos migrupo $ MKSTREAM # Leer como consumidor XREADGROUP GROUP migrupo trabajador1 COUNT 5 STREAMS pedidos > # Confirmar mensaje procesado XACK pedidos migrupo <id-mensaje> # Ver mensajes pendientes (no confirmados) XPENDING pedidos migrupo - + 10 # Reclamar mensajes atascados (consumidor caído) XCLAIM pedidos migrupo trabajador2 60000 <id-atascado>
async function procesarPedidos() { while (true) { const mensajes = await redis.xreadgroup( 'GROUP', 'fulfillment', 'worker-1', 'COUNT', 10, 'BLOCK', 2000, 'STREAMS', 'pedidos', '>' ); if (!mensajes) continue; for (const [stream, entradas] of mensajes) { for (const [id, campos] of entradas) { const pedido = Object.fromEntries( campos.reduce((acc: any[], _: any, i: number, arr: any[]) => i % 2 === 0 ? [...acc, [arr[i], arr[i+1]]] : acc, [] ) ); await procesarPedido(pedido); await redis.xack('pedidos', 'fulfillment', id); } } } }

Recorte del Stream

# Conservar solo las últimas 1000 entradas XTRIM pedidos MAXLEN 1000 # Recorte aproximado (más eficiente) XTRIM pedidos MAXLEN ~ 1000 # Agregar con recorte automático XADD pedidos MAXLEN ~ 1000 * clave valor