¿Cómo escuchar los cambios en una colección MongoDB?


Estoy creando una especie de sistema de cola de trabajos en segundo plano con MongoDB como almacén de datos. ¿Cómo puedo "escuchar" inserciones en una colección MongoDB antes de generar trabajadores para procesar el trabajo? ¿Necesito sondear cada pocos segundos para ver si hay algún cambio con respecto a la última vez, o hay alguna forma en que mi script pueda esperar a que ocurran inserciones? Este es un proyecto PHP en el que estoy trabajando, pero no dude en responder en Ruby o lenguaje agnóstico.

 161
Author: Andrew, 2012-03-14

9 answers

Lo que estás pensando suena mucho a desencadenantes. MongoDB no tiene ningún soporte para disparadores, sin embargo algunas personas han "lanzado sus propios" usando algunos trucos. La clave aquí es el oplog.

Cuando ejecuta MongoDB en un Conjunto de réplicas, todas las acciones de MongoDB se registran en un registro de operaciones (conocido como oplog). El oplog es básicamente una lista en ejecución de las modificaciones realizadas a los datos. Réplicas Establece la función de escuchar los cambios en este oplog y luego aplicar el cambios locales.

¿Te suena familiar?

No puedo detallar todo el proceso aquí, son varias páginas de documentación, pero las herramientas que necesita están disponibles.

Primero algunas reseñas sobre el oplog - Breve descripción - Diseño de la colección local (que contiene el oplog)

También querrá aprovechar cursores adaptables. Esto le proporcionará una manera de escuchar los cambios en lugar de sondear para ellos. Tenga en cuenta que la replicación utiliza cursores adaptables, por lo que esta es una característica compatible.

 103
Author: Gates VP,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-03-19 05:33:05

MongoDB tiene lo que se llama capped collections y tailable cursors que permite MongoDB para enviar datos a los oyentes.

A capped collection es esencialmente una colección que tiene un tamaño fijo y solo permite inserciones. Así es como se vería crear uno:

db.createCollection("messages", { capped: true, size: 100000000 })

Cursores adaptables MongoDB (post original de Jonathan H. Wage)

Ruby

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python (por Robert Stewart)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (by Max)

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Recursos adicionales:

Ruby / Node.tutorial de js que lo guía a través de la creación de una aplicación que escucha inserciones en una colección limitada de MongoDB.

Un artículo que habla de cursores adaptables con más detalle.

Ejemplos de PHP, Ruby, Python y Perl del uso de cursores adaptables.

 89
Author: Andrew,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-02-05 23:33:21

Desde MongoDB 3.6 habrá una nueva API de notificaciones llamada Flujos de cambio que puede usar para esto. Ver esta entrada del blog para un ejemplo. Ejemplo de ello:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])
 28
Author: Mitar,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-01-12 16:04:23

Mira esto: Cambiar flujos

10 de enero de 2018 - Release 3.6

* EDITAR: Escribí un artículo sobre cómo hacer esto https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

Https://docs.mongodb.com/v3.6/changeStreams /


Es nuevo en mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

Para usar changeStreams la base de datos debe ser un Conjunto de Replicación

Más información sobre los Conjuntos de Replicación: https://docs.mongodb.com/manual/replication/

Su base de datos será " Independiente" por defecto.

Cómo convertir un Standalone en un Conjunto de Réplicas: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set /


El siguiente ejemplo es una práctica aplicación para saber cómo usar esto.
* Específicamente para Nodo.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Útil links:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

Https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

 16
Author: Rio Weber,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-03-28 19:56:59

MongoDB versión 3.6 ahora incluye flujos de cambio que es esencialmente una API en la parte superior de la OpLog que permite trigger/notificación-como casos de uso.

Aquí hay un enlace a un ejemplo de Java: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams /

Un ejemplo de NodeJS podría verse como:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });
 13
Author: Robert Walters,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-12-17 12:35:04

Alternativamente, puede usar el método estándar Mongo FindAndUpdate, y dentro de la devolución de llamada, disparar un evento EventEmitter (en el nodo) cuando se ejecuta la devolución de llamada.

Cualquier otra parte de la aplicación o arquitectura que escuche este evento será notificada de la actualización, y cualquier dato relevante enviado allí también. Esta es una manera muy simple de lograr notificaciones de Mongo.

 3
Author: Alex,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-08-03 12:12:28

Hay un ejemplo de java que funciona que se puede encontrar aquí.

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

La clave es LAS OPCIONES DE CONSULTA dadas aquí.

También puede cambiar la consulta de búsqueda, si no necesita cargar todos los datos cada vez.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
 1
Author: Maleen Abewardana,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-08-24 13:08:47

En realidad, en lugar de ver la salida, ¿por qué no recibe aviso cuando se inserta algo nuevo utilizando middle-ware que fue proporcionado por mongoose schema

Puede capturar el evento de insertar un nuevo documento y hacer algo después de esta inserción hecho

 1
Author: Duong Nguyen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-11-08 04:09:57

Muchas de estas respuestas solo le darán nuevos registros y no actualizaciones y / o son extremadamente ineficaces

La única forma fiable y eficaz de hacer esto es crear un cursor personalizable en la base de datos local: oplog.rs colección para obtener TODOS los cambios en MongoDB y hacer con él lo que quieras. (MongoDB incluso hace esto internamente más o menos para soportar la replicación!)

Explicación de lo que el oplog contener: https://www.compose.com/articles/the-mongodb-oplog-and-node-js /

Ejemplo de nodo.biblioteca js que proporciona una API en torno a lo que está disponible para hacer con el oplog: https://github.com/cayasso/mongo-oplog

 1
Author: John Culviner,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-25 19:26:01