Volver a MongoDB Intermedio

Transacciones y Change Streams en MongoDB

Transacciones Multi-Documento (Requiere Replica Set)

const session = client.startSession(); try { session.startTransaction({ readConcern: { level: "snapshot" }, writeConcern: { w: "majority" } }); const pedidos = client.db("tienda").collection("pedidos"); const inventario = client.db("tienda").collection("inventario"); await pedidos.insertOne( { usuarioId: "u1", items: [{ productoId: "p1", cantidad: 2 }], total: 59.98 }, { session } ); await inventario.updateOne( { _id: "p1" }, { $inc: { cantidad: -2 } }, { session } ); await session.commitTransaction(); } catch (err) { await session.abortTransaction(); throw err; } finally { session.endSession(); }

Reintentos con Errores Transitorios

async function ejecutarConReintento(fn, session) { while (true) { try { await fn(session); break; } catch (err) { if (err.hasErrorLabel("TransientTransactionError")) { continue; // reintentar } throw err; } } } async function confirmarConReintento(session) { while (true) { try { await session.commitTransaction(); break; } catch (err) { if (err.hasErrorLabel("UnknownTransactionCommitResult")) { continue; // reintentar commit } throw err; } } }

Change Streams

// Escuchar cambios en una colección const changeStream = db.collection("pedidos").watch([ { $match: { "operationType": { $in: ["insert", "update"] } } } ]); changeStream.on("change", (evento) => { console.log("Operación:", evento.operationType); console.log("Documento:", evento.fullDocument); console.log("Campos cambiados:", evento.updateDescription?.updatedFields); }); // Reanudar desde token (sobrevive reinicios) const streamReanudado = db.collection("pedidos").watch([], { resumeAfter: ultimoToken });

Patrón Outbox con Change Streams

// Escritura atómica de datos de negocio + evento en outbox session.startTransaction(); await pedidos.insertOne({ ...datosPedido }, { session }); await outbox.insertOne({ evento: "PEDIDO_CREADO", payload: datosPedido, estado: "pendiente" }, { session }); await session.commitTransaction(); // Publicador separado lee el outbox vía change stream const streamOutbox = db.collection("outbox").watch([ { $match: { "fullDocument.estado": "pendiente" } } ]); streamOutbox.on("change", async (event) => { const doc = event.fullDocument; await publicarEnBroker(doc.evento, doc.payload); await outbox.updateOne({ _id: doc._id }, { $set: { estado: "publicado" } }); });

Consistencia Causal

// Asegurar que las lecturas reflejen los propios escritos const session = client.startSession({ causalConsistency: true }); await collection.insertOne({ clave: "valor" }, { session }); // Lecturas posteriores en esta sesión siempre verán el doc anterior const doc = await collection.findOne({ clave: "valor" }, { session });