Volver a Spring Boot Intermedio

Mensajería con RabbitMQ y Kafka

La mensajería asíncrona desacopla los microservicios, mejorando la resiliencia y escalabilidad.

RabbitMQ con Spring AMQP

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

Configuración

@Configuration public class ConfiguracionRabbit { public static final String COLA_PEDIDOS = "pedido.creado"; public static final String EXCHANGE_PEDIDOS = "pedidos"; public static final String CLAVE_ENRUTAMIENTO = "pedido.creado"; @Bean public Queue colaPedidos() { return QueueBuilder.durable(COLA_PEDIDOS) .withArgument("x-dead-letter-exchange", "pedidos.dlx") .build(); } @Bean public TopicExchange exchangePedidos() { return new TopicExchange(EXCHANGE_PEDIDOS); } @Bean public Binding enlacePedidos(Queue colaPedidos, TopicExchange exchangePedidos) { return BindingBuilder.bind(colaPedidos).to(exchangePedidos).with(CLAVE_ENRUTAMIENTO); } @Bean public MessageConverter convertidorMensaje() { return new Jackson2JsonMessageConverter(); } }

Productor

@Service public class ProductorPedidos { private final AmqpTemplate amqpTemplate; public void publicarPedidoCreado(EventoPedidoCreado evento) { amqpTemplate.convertAndSend( ConfiguracionRabbit.EXCHANGE_PEDIDOS, ConfiguracionRabbit.CLAVE_ENRUTAMIENTO, evento ); log.info("Publicado EventoPedidoCreado para pedido {}", evento.getPedidoId()); } } // DTO del evento public record EventoPedidoCreado( Long pedidoId, Long clienteId, BigDecimal total, LocalDateTime creadoEn ) {}

Consumidor

@Component public class ConsumidorPedidos { private final ServicioInventario servicioInventario; @RabbitListener(queues = ConfiguracionRabbit.COLA_PEDIDOS) public void manejarPedidoCreado(EventoPedidoCreado evento) { log.info("Recibido EventoPedidoCreado: {}", evento.getPedidoId()); servicioInventario.reservarItems(evento.getPedidoId()); } @RabbitListener(queues = "pedidos.dlx.cola") public void manejarMensajeMuerto(Message mensaje) { log.error("Mensaje muerto recibido: {}", new String(mensaje.getBody())); } }

Apache Kafka con Spring Kafka

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

Configuración

spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: group-id: servicio-pedidos key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.ejemplo.eventos" auto-offset-reset: earliest

Productor Kafka

@Service public class ProductorKafkaPedidos { private final KafkaTemplate<String, EventoPedidoCreado> kafkaTemplate; public void publicarPedidoCreado(EventoPedidoCreado evento) { CompletableFuture<SendResult<String, EventoPedidoCreado>> futuro = kafkaTemplate.send("pedidos.creados", evento.getPedidoId().toString(), evento); futuro.thenAccept(resultado -> log.info("Enviado a partición {} offset {}", resultado.getRecordMetadata().partition(), resultado.getRecordMetadata().offset())) .exceptionally(ex -> { log.error("Error al enviar mensaje: {}", ex.getMessage()); return null; }); } }

Consumidor Kafka

@Component public class ConsumidorKafkaPedidos { @KafkaListener(topics = "pedidos.creados", groupId = "servicio-inventario") public void manejarPedidoCreado( @Payload EventoPedidoCreado evento, @Header(KafkaHeaders.RECEIVED_PARTITION) int particion, @Header(KafkaHeaders.OFFSET) long offset) { log.info("Consumiendo partición {} offset {}: {}", particion, offset, evento); servicioInventario.reservarItems(evento.getPedidoId()); } }

Configuración de Topics Kafka

@Configuration public class ConfiguracionTopicsKafka { @Bean public NewTopic topicPedidos() { return TopicBuilder.name("pedidos.creados") .partitions(3) .replicas(1) .build(); } }

Patrón Outbox (Mensajería Transaccional)

Evita problemas de doble escritura almacenando eventos en la misma transacción:

@Service @Transactional public class ServicioPedidos { public Pedido crearPedido(CrearPedidoRequest request) { Pedido pedido = construirPedido(request); repositorioPedidos.save(pedido); // Escribir evento en outbox en la misma TX EventoOutbox outbox = new EventoOutbox( "pedidos.creados", objectMapper.writeValueAsString(new EventoPedidoCreado(pedido)) ); repositorioOutbox.save(outbox); return pedido; } } // Planificador publica eventos de outbox @Scheduled(fixedDelay = 1000) @Transactional public void publicarEventosOutbox() { List<EventoOutbox> pendientes = repositorioOutbox.findByPublicadoFalse(); for (EventoOutbox evento : pendientes) { kafkaTemplate.send(evento.getTopic(), evento.getPayload()); evento.setPublicado(true); } repositorioOutbox.saveAll(pendientes); }