Upsert a granel en MongoDB usando mangosta


¿ Hay alguna opción para realizar upserts a granel con mangosta? Así que básicamente tener una matriz e insertar cada elemento si no existe o actualizarlo si existe? (Estoy usando customs _ids)

Cuando uso .insert MongoDB devuelve un error E11000 para las claves duplicadas (que deben actualizarse). Insertar varios documentos nuevos funciona bien:

var Users = self.db.collection('Users');

Users.insert(data, function(err){
            if (err) {
                callback(err);
            }
            else {
                callback(null);
            }
        });

Usando .save devuelve un error que indica que el parámetro debe ser un único documento:

Users.save(data, function(err){
   ...
}

Esto answer sugiere que no existe tal opción, sin embargo, es específica para C# y también ya tiene 3 años. Así que me preguntaba si hay alguna opción para hacer eso usando mangosta?

¡Gracias!

Author: Neil Lunn, 2014-08-13

5 answers

No en "mangosta" específicamente, o al menos no todavía en el momento de escribir. El shell de MongoDB a partir de la versión 2.6 en realidad utiliza la "API de operaciones masivas" "bajo el capó" como si fuera para todos los métodos auxiliares generales. En su implementación, intenta hacer esto primero, y si se detecta un servidor de versión anterior, entonces hay un "respaldo" a la implementación heredada.

Todos los métodos mongoose" currently "utilizan la implementación "legacy" o la respuesta write concern y los métodos básicos heredados. Pero hay un .collection accessor de cualquier modelo dado de mongoose que esencialmente accede al "objeto de colección"desde el "controlador nativo de nodo" subyacente en el que mongoose se implementa:

 var mongoose = require('mongoose'),
     Schema = mongoose.Schema;

 mongoose.connect('mongodb://localhost/test');

 var sampleSchema  = new Schema({},{ "strict": false });

 var Sample = mongoose.model( "Sample", sampleSchema, "sample" );

 mongoose.connection.on("open", function(err,conn) { 

    var bulk = Sample.collection.initializeOrderedBulkOp();
    var counter = 0;

    // representing a long loop
    for ( var x = 0; x < 100000; x++ ) {

        bulk.find(/* some search */).upsert().updateOne(
            /* update conditions */
        });
        counter++;

        if ( counter % 1000 == 0 )
            bulk.execute(function(err,result) {             
                bulk = Sample.collection.initializeOrderedBulkOp();
            });
    }

    if ( counter % 1000 != 0 )
        bulk.execute(function(err,result) {
           // maybe do something with result
        });

 });

El principal problema es que los "métodos de mangosta" son realmente conscientes de que una conexión no se puede hacer todavía y "cola" hasta que esto se complete. El controlador nativo en el que está "excavando" no hace esta distinción.

Así que realmente tienes que tenga en cuenta que la conexión se establece de alguna manera o forma. Pero puede utilizar los métodos de controlador nativo, siempre y cuando tenga cuidado con lo que está haciendo.

 21
Author: Neil Lunn,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-08-13 12:51:56

No necesitas administrar limit (1000) como sugirió @neil-lunn. Mangosta ya hace esto. Usé su gran respuesta como base para esta completa implementación basada en Promesas y ejemplo:

var Promise = require('bluebird');
var mongoose = require('mongoose');

var Show = mongoose.model('Show', {
  "id": Number,
  "title": String,
  "provider":  {'type':String, 'default':'eztv'}
});

/**
 * Atomic connect Promise - not sure if I need this, might be in mongoose already..
 * @return {Priomise}
 */
function connect(uri, options){
  return new Promise(function(resolve, reject){
    mongoose.connect(uri, options, function(err){
      if (err) return reject(err);
      resolve(mongoose.connection);
    });
  });
}

/**
 * Bulk-upsert an array of records
 * @param  {Array}    records  List of records to update
 * @param  {Model}    Model    Mongoose model to update
 * @param  {Object}   match    Database field to match
 * @return {Promise}  always resolves a BulkWriteResult
 */
function save(records, Model, match){
  match = match || 'id';
  return new Promise(function(resolve, reject){
    var bulk = Model.collection.initializeUnorderedBulkOp();
    records.forEach(function(record){
      var query = {};
      query[match] = record[match];
      bulk.find(query).upsert().updateOne( record );
    });
    bulk.execute(function(err, bulkres){
        if (err) return reject(err);
        resolve(bulkres);
    });
  });
}

/**
 * Map function for EZTV-to-Show
 * @param  {Object} show EZTV show
 * @return {Object}      Mongoose Show object
 */
function mapEZ(show){
  return {
    title: show.title,
    id: Number(show.id),
    provider: 'eztv'
  };
}

// if you are  not using EZTV, put shows in here
var shows = []; // giant array of {id: X, title: "X"}

// var eztv = require('eztv');
// eztv.getShows({}, function(err, shows){
//   if(err) return console.log('EZ Error:', err);

//   var shows = shows.map(mapEZ);
  console.log('found', shows.length, 'shows.');
  connect('mongodb://localhost/tv', {}).then(function(db){
    save(shows, Show).then(function(bulkRes){
      console.log('Bulk complete.', bulkRes);
      db.close();
    }, function(err){
        console.log('Bulk Error:', err);
        db.close();
    });
  }, function(err){
    console.log('DB Error:', err);
  });

// });

Esto tiene la ventaja de cerrar la conexión cuando haya terminado, mostrando cualquier error si le importa, pero ignorándolos si no (las devoluciones de llamada de error en las promesas son opcionales. También es muy rápido. Solo dejo esto aquí para compartir mis hallazgos. Puede descomentar las cosas eztv si desea guardar todo eztv muestra a una base de datos, como ejemplo.

 17
Author: konsumer,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-01-22 05:24:25

He lanzado un plugin para Mangosta que expone un método estático upsertMany para realizar operaciones upsert masivas con una interfaz promise.

Un beneficio adicional de usar este complemento sobre la inicialización de su propia operación masiva en la colección subyacente, es que este complemento convierte sus datos en el primer modelo de Mangosta, y luego vuelve a objetos simples antes del upsert. Esto garantiza que se aplique la validación del esquema de Mangosta, y que los datos se despoblen y se ajusten a raw inserción.

Https://github.com/meanie/mongoose-upsert-many https://www.npmjs.com/package/@meanie/mongoose-upsert-many

Espero que ayude!

 2
Author: Adam Reis,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-01-20 20:09:30

Si no está viendo los métodos masivos en su base de datos.es decir, está recibiendo un error al efecto de la variable xxx no tiene método: initializeOrderedBulkOp ()

Intenta actualizar tu versión de mangosta. Aparentemente las versiones más antiguas de mongoose no pasan a través de toda la base de datos de mongo subyacente.métodos de recogida.

Npm instalar mangosta

Se encargó de ello por mí.

 1
Author: zstew,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-01-27 22:09:43

Tuve que lograr esto recientemente mientras almacenaba productos en mi aplicación de comercio electrónico. Mi base de datos solía perder el tiempo, ya que tenía que publicar 10000 artículos cada 4 horas. Una opción para mí era establecer los socketTimeoutMS y connectTimeoutMS en mongoose mientras se conectaba a la base de datos, pero se sentía algo hackeado y no quería manipular los valores predeterminados de tiempo de espera de conexión de la base de datos. También veo que la solución de @ neil lunn toma un enfoque de sincronización simple de tomar un módulo dentro del bucle for. Aquí está un versión asincrónica de la mía que creo que hace el trabajo mucho mejor

let BATCH_SIZE = 500
Array.prototype.chunk = function (groupsize) {
    var sets = [];
    var chunks = this.length / groupsize;

    for (var i = 0, j = 0; i < chunks; i++ , j += groupsize) {
        sets[i] = this.slice(j, j + groupsize);
    }

    return sets;
}

function upsertDiscountedProducts(products) {

    //Take the input array of products and divide it into chunks of BATCH_SIZE

    let chunks = products.chunk(BATCH_SIZE), current = 0

    console.log('Number of chunks ', chunks.length)

    let bulk = models.Product.collection.initializeUnorderedBulkOp();

    //Get the current time as timestamp
    let timestamp = new Date(),

        //Keep track of the number of items being looped
        pendingCount = 0,
        inserted = 0,
        upserted = 0,
        matched = 0,
        modified = 0,
        removed = 0,

        //If atleast one upsert was performed
        upsertHappened = false;

    //Call the load function to get started
    load()
    function load() {

        //If we have a chunk to process
        if (current < chunks.length) {
            console.log('Current value ', current)

            for (let i = 0; i < chunks[current].length; i++) {
                //For each item set the updated timestamp to the current time
                let item = chunks[current][i]

                //Set the updated timestamp on each item
                item.updatedAt = timestamp;

                bulk.find({ _id: item._id })
                    .upsert()
                    .updateOne({
                        "$set": item,

                        //If the item is being newly inserted, set a created timestamp on it
                        "$setOnInsert": {
                            "createdAt": timestamp
                        }
                    })
            }

            //Execute the bulk operation for the current chunk
            bulk.execute((error, result) => {
                if (error) {
                    console.error('Error while inserting products' + JSON.stringify(error))
                    next()
                }
                else {

                    //Atleast one upsert has happened
                    upsertHappened = true;
                    inserted += result.nInserted
                    upserted += result.nUpserted
                    matched += result.nMatched
                    modified += result.nModified
                    removed += result.nRemoved

                    //Move to the next chunk
                    next()
                }
            })



        }
        else {
            console.log("Calling finish")
            finish()
        }

    }

    function next() {
        current++;

        //Reassign bulk to a new object and call load once again on the new object after incrementing chunk
        bulk = models.Product.collection.initializeUnorderedBulkOp();
        setTimeout(load, 0)
    }

    function finish() {

        console.log('Inserted ', inserted + ' Upserted ', upserted, ' Matched ', matched, ' Modified ', modified, ' Removed ', removed)

        //If atleast one chunk was inserted, remove all items with a 0% discount or not updated in the latest upsert
        if (upsertHappened) {
            console.log("Calling remove")
            remove()
        }


    }

    /**
     * Remove all the items that were not updated in the recent upsert or those items with a discount of 0
     */
    function remove() {

        models.Product.remove(
            {
                "$or":
                [{
                    "updatedAt": { "$lt": timestamp }
                },
                {
                    "discount": { "$eq": 0 }
                }]
            }, (error, obj) => {
                if (error) {
                    console.log('Error while removing', JSON.stringify(error))
                }
                else {
                    if (obj.result.n === 0) {
                        console.log('Nothing was removed')
                    } else {
                        console.log('Removed ' + obj.result.n + ' documents')
                    }
                }
            }
        )
    }
}
 0
Author: PirateApp,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-28 07:11:05