Volver a MongoDB Intermedio
Transacciones y Change Streams en MongoDB
Transacciones Multi-Documento (Requiere Replica Set)
const session = client.startSession();
try {
session.startTransaction({
readConcern: { level: "snapshot" },
writeConcern: { w: "majority" }
});
const pedidos = client.db("tienda").collection("pedidos");
const inventario = client.db("tienda").collection("inventario");
await pedidos.insertOne(
{ usuarioId: "u1", items: [{ productoId: "p1", cantidad: 2 }], total: 59.98 },
{ session }
);
await inventario.updateOne(
{ _id: "p1" },
{ $inc: { cantidad: -2 } },
{ session }
);
await session.commitTransaction();
} catch (err) {
await session.abortTransaction();
throw err;
} finally {
session.endSession();
}
Reintentos con Errores Transitorios
async function ejecutarConReintento(fn, session) {
while (true) {
try {
await fn(session);
break;
} catch (err) {
if (err.hasErrorLabel("TransientTransactionError")) {
continue; // reintentar
}
throw err;
}
}
}
async function confirmarConReintento(session) {
while (true) {
try {
await session.commitTransaction();
break;
} catch (err) {
if (err.hasErrorLabel("UnknownTransactionCommitResult")) {
continue; // reintentar commit
}
throw err;
}
}
}
Change Streams
// Escuchar cambios en una colección
const changeStream = db.collection("pedidos").watch([
{ $match: { "operationType": { $in: ["insert", "update"] } } }
]);
changeStream.on("change", (evento) => {
console.log("Operación:", evento.operationType);
console.log("Documento:", evento.fullDocument);
console.log("Campos cambiados:", evento.updateDescription?.updatedFields);
});
// Reanudar desde token (sobrevive reinicios)
const streamReanudado = db.collection("pedidos").watch([], {
resumeAfter: ultimoToken
});
Patrón Outbox con Change Streams
// Escritura atómica de datos de negocio + evento en outbox
session.startTransaction();
await pedidos.insertOne({ ...datosPedido }, { session });
await outbox.insertOne({
evento: "PEDIDO_CREADO",
payload: datosPedido,
estado: "pendiente"
}, { session });
await session.commitTransaction();
// Publicador separado lee el outbox vía change stream
const streamOutbox = db.collection("outbox").watch([
{ $match: { "fullDocument.estado": "pendiente" } }
]);
streamOutbox.on("change", async (event) => {
const doc = event.fullDocument;
await publicarEnBroker(doc.evento, doc.payload);
await outbox.updateOne({ _id: doc._id }, { $set: { estado: "publicado" } });
});
Consistencia Causal
// Asegurar que las lecturas reflejen los propios escritos
const session = client.startSession({ causalConsistency: true });
await collection.insertOne({ clave: "valor" }, { session });
// Lecturas posteriores en esta sesión siempre verán el doc anterior
const doc = await collection.findOne({ clave: "valor" }, { session });