Volver a Spring Boot Intermedio
Spring Boot
Spring Boot Intermedio
Mensajería con RabbitMQ y Kafka
La mensajería asíncrona desacopla los microservicios, mejorando la resiliencia y escalabilidad.
RabbitMQ con Spring AMQP
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Configuración
@Configuration
public class ConfiguracionRabbit {
public static final String COLA_PEDIDOS = "pedido.creado";
public static final String EXCHANGE_PEDIDOS = "pedidos";
public static final String CLAVE_ENRUTAMIENTO = "pedido.creado";
@Bean
public Queue colaPedidos() {
return QueueBuilder.durable(COLA_PEDIDOS)
.withArgument("x-dead-letter-exchange", "pedidos.dlx")
.build();
}
@Bean
public TopicExchange exchangePedidos() {
return new TopicExchange(EXCHANGE_PEDIDOS);
}
@Bean
public Binding enlacePedidos(Queue colaPedidos, TopicExchange exchangePedidos) {
return BindingBuilder.bind(colaPedidos).to(exchangePedidos).with(CLAVE_ENRUTAMIENTO);
}
@Bean
public MessageConverter convertidorMensaje() {
return new Jackson2JsonMessageConverter();
}
}
Productor
@Service
public class ProductorPedidos {
private final AmqpTemplate amqpTemplate;
public void publicarPedidoCreado(EventoPedidoCreado evento) {
amqpTemplate.convertAndSend(
ConfiguracionRabbit.EXCHANGE_PEDIDOS,
ConfiguracionRabbit.CLAVE_ENRUTAMIENTO,
evento
);
log.info("Publicado EventoPedidoCreado para pedido {}", evento.getPedidoId());
}
}
// DTO del evento
public record EventoPedidoCreado(
Long pedidoId,
Long clienteId,
BigDecimal total,
LocalDateTime creadoEn
) {}
Consumidor
@Component
public class ConsumidorPedidos {
private final ServicioInventario servicioInventario;
@RabbitListener(queues = ConfiguracionRabbit.COLA_PEDIDOS)
public void manejarPedidoCreado(EventoPedidoCreado evento) {
log.info("Recibido EventoPedidoCreado: {}", evento.getPedidoId());
servicioInventario.reservarItems(evento.getPedidoId());
}
@RabbitListener(queues = "pedidos.dlx.cola")
public void manejarMensajeMuerto(Message mensaje) {
log.error("Mensaje muerto recibido: {}", new String(mensaje.getBody()));
}
}
Apache Kafka con Spring Kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Configuración
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: servicio-pedidos
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.ejemplo.eventos"
auto-offset-reset: earliest
Productor Kafka
@Service
public class ProductorKafkaPedidos {
private final KafkaTemplate<String, EventoPedidoCreado> kafkaTemplate;
public void publicarPedidoCreado(EventoPedidoCreado evento) {
CompletableFuture<SendResult<String, EventoPedidoCreado>> futuro =
kafkaTemplate.send("pedidos.creados", evento.getPedidoId().toString(), evento);
futuro.thenAccept(resultado ->
log.info("Enviado a partición {} offset {}",
resultado.getRecordMetadata().partition(),
resultado.getRecordMetadata().offset()))
.exceptionally(ex -> {
log.error("Error al enviar mensaje: {}", ex.getMessage());
return null;
});
}
}
Consumidor Kafka
@Component
public class ConsumidorKafkaPedidos {
@KafkaListener(topics = "pedidos.creados", groupId = "servicio-inventario")
public void manejarPedidoCreado(
@Payload EventoPedidoCreado evento,
@Header(KafkaHeaders.RECEIVED_PARTITION) int particion,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("Consumiendo partición {} offset {}: {}", particion, offset, evento);
servicioInventario.reservarItems(evento.getPedidoId());
}
}
Configuración de Topics Kafka
@Configuration
public class ConfiguracionTopicsKafka {
@Bean
public NewTopic topicPedidos() {
return TopicBuilder.name("pedidos.creados")
.partitions(3)
.replicas(1)
.build();
}
}
Patrón Outbox (Mensajería Transaccional)
Evita problemas de doble escritura almacenando eventos en la misma transacción:
@Service
@Transactional
public class ServicioPedidos {
public Pedido crearPedido(CrearPedidoRequest request) {
Pedido pedido = construirPedido(request);
repositorioPedidos.save(pedido);
// Escribir evento en outbox en la misma TX
EventoOutbox outbox = new EventoOutbox(
"pedidos.creados",
objectMapper.writeValueAsString(new EventoPedidoCreado(pedido))
);
repositorioOutbox.save(outbox);
return pedido;
}
}
// Planificador publica eventos de outbox
@Scheduled(fixedDelay = 1000)
@Transactional
public void publicarEventosOutbox() {
List<EventoOutbox> pendientes = repositorioOutbox.findByPublicadoFalse();
for (EventoOutbox evento : pendientes) {
kafkaTemplate.send(evento.getTopic(), evento.getPayload());
evento.setPublicado(true);
}
repositorioOutbox.saveAll(pendientes);
}