"use strict"; var Cursor = require("../cursor"), Provider = require("../provider"), Err = require("../utils/error"), MongoClient = require("mongodb").MongoClient, util = require("util"), _ = require("lodash"), _conns = {}; function getCollection(prov, callback) { var connStr = prov.options.connStr, collName = prov.schema.__collName, mdb = _conns[connStr]; if (mdb) { mdb.__prefs[prov._uuid] = true; callback(null, mdb.collection(collName)); } else { MongoClient.connect(connStr, function (err, db) { if (err) { return callback(err); } _conns[connStr] = db; if (!db.__prefs) { db.__prefs = {}; } db.__prefs[prov._uuid] = true; callback(null, db.collection(collName)); }); } } function removeReference(prov) { var db = _conns[prov.options.connStr]; delete db.__prefs[prov._uuid]; if (!_.keys(db.__prefs).length) { delete _conns[prov.options.connStr]; db.close(); } } function update(prov, item, opts, cb) { var id = prov._getId(item), idKey = prov._getIdKey(), select = { _id: id }, itm, set; if (item[idKey]) { itm = true; delete item[idKey]; } if (item.$set && item.$set[idKey]) { set = true; delete item.$set[idKey]; } prov.collection.update(select, item, opts, cb); if (itm) { item[idKey] = id; } if (set) { item.$set[idKey] = id; } } // *** Cursor Implementation *** function MongoCursor(provider, query, options) { Cursor.call(this, provider, query || {}, options || {}); } util.inherits(MongoCursor, Cursor); MongoCursor.prototype._getCursor = function (callback) { if (this._cursor) { return callback(this._cursor); } var opts = {}; if (this.mapping) { if (_.isFunction(this.mapping)) { this._mapFunc = true; } else { opts.fields = this.mapping; } } if (this.criteria) { opts.sort = this.criteria; } if (this.limitValue !== 0) { opts.limit = this.limitValue; } if (this.skipValue !== 0) { opts.skip = this.skipValue; } this._cursor = this.provider.collection.find(this.query, opts); callback(this._cursor); }; MongoCursor.prototype.reset = function () { this._getCursor(function (cursor) { cursor.rewind(); }); return this; }; MongoCursor.prototype.limit = function (limit, callback) { var that = this; this.limitValue = limit; this._getCursor(function (cursor) { if (callback) { cursor.limit(limit, function (err) { callback(err, that); }); } else { cursor.limit(limit); } }); return that; }; MongoCursor.prototype.skip = function (skip, callback) { var that = this; this.skipValue = skip; this._getCursor(function (cursor) { if (callback) { cursor.skip(skip, function (err) { callback(err, that); }); } else { cursor.skip(skip); } }); return that; }; MongoCursor.prototype.sort = function (criteria, callback) { var that = this; this.criteria = criteria; this._getCursor(function (cursor) { if (callback) { cursor.sort(criteria, function (err) { callback(err, that); }); } else { cursor.sort(criteria); } }); return that; }; MongoCursor.prototype.map = function (mapping, callback) { this.mapping = mapping; if (_.isFunction(mapping)) { this._mapFunc = true; if (callback) { callback(null, this); } return this; } var fields = {}, i, l; if (_.isArray(mapping)) { if (!mapping.length) { fields._id = 1; } else { for (i = 0, l = mapping.length; i < l; i++) { fields[mapping[i]] = 1; } } } this._getCursor(function (cursor) { cursor.fields = fields; if (callback) { callback(null, this); } }); return this; }; MongoCursor.prototype.toArray = function (callback) { var arr = []; this.each(function (err, item) { if (err) { return callback(err); } if (item) { arr.push(item); } else { callback(null, arr); } }); }; MongoCursor.prototype.each = function (callback) { var that = this; this._getCursor(function (cursor) { cursor.each(function (err, item) { if (err) { return callback(err); } that._nextObject(callback, item); }); }); }; MongoCursor.prototype.next = function (callback) { var that = this; this._getCursor(function (cursor) { cursor.nextObject(function (err, item) { if (err) { return callback(err); } that._nextObject(callback, item); }); }); }; MongoCursor.prototype.count = function (callback) { this._getCursor(function (cursor) { cursor.count(false, callback); }); }; MongoCursor.prototype.update = function (data, callback) { this.provider.collection.update(this.query, data, this.provider.options.updateMulti, callback); }; MongoCursor.prototype.delete = function (callback) { this.provider.collection.remove(this.query, this.provider.options.deleteMulti, callback); }; MongoCursor.prototype._nextObject = function (callback, item) { callback(null, this._map(item)); }; MongoCursor.prototype._map = function (item) { if (item && this._mapFunc) { return this.mapping(item); } return item; }; // *** Provider Implementation *** /** * Constructor for MongoDB provider. This class inherits from {@link Provider}. * * This provider is a thin wrapper around the native MongoDB [driver]( https://github.com/mongodb/node-mongodb-native) for NodeJS. * Each provider instance represents a MongoDB [collection]( http://docs.mongodb.org/manual/reference/glossary/#term-collection). * * For usage reference, please refer to [Provider documentation]{@link Provider}. * * Options * * - `connStr` {String} <required> The [connection string](http://docs.mongodb.org/manual/reference/connection-string) to MongoDB instance. * * @class Represents a provider for MongoDB storage. * @param {object} opts - Additional options. The only required option is `connStr`. * @param {object=} schema - Defines the fields of the collection represented by the provider instance. */ function MongoProvider(opts, schema) { if (!opts.insert) { opts.insert = { continueOnError: true }; } if (!opts.update) { opts.update = { w: 1 }; } opts.update.multi = false; if (!opts.updateMulti) { opts.updateMulti = { w: 1 }; } opts.updateMulti.multi = true; if (!opts.upsert) { opts.upsert = { w: 1 }; } opts.upsert.upsert = true; if (!opts.delete) { opts.delete = { w: 1 }; } opts.delete.single = true; if (!opts.deleteMulti) { opts.deleteMulti = { w: 1 }; } opts.deleteMulti.single = false; Provider.call(this, opts, schema); } util.inherits(MongoProvider, Provider); MongoProvider.prototype.init = function (callback) { var that = this; getCollection(this, function (err, coll) { if (!err) { that.collection = coll; } callback(err); }); }; MongoProvider.prototype.dispose = function (callback) { removeReference(this); callback(); }; MongoProvider.prototype._insert = function (items, callback) { this.collection.insert(items, this.options.insert, function (err, result) { var res; if (_.isArray(items)) { res = result; } else if (result && result.length === 1) { res = result[0]; } if (err && (err.message || err).indexOf("must not start with '$'") !== -1) { err = new Err("OPERS_NOT_ALLOWED"); } callback(err, res); }); }; MongoProvider.prototype._update = function (item, callback) { update(this, item, this.options.update, callback); }; MongoProvider.prototype._upsert = function (item, callback) { update(this, item, this.options.upsert, callback); }; MongoProvider.prototype._delete = function (item, callback) { var opts = _.clone(this.options.delete), select = {_id: this._getId(item)}; opts.single = true; this.collection.remove(select, opts, callback); }; MongoProvider.prototype._get = function (item, callback) { var select = {_id: this._getId(item)}, that = this; this.collection.findOne(select, function (err, result) { if (!err && !result) { return that.handleError("ITEM_DOESNOT_EXIST", callback); } callback(err, result); }); }; MongoProvider.prototype._select = function (args, callback) { var cursor = new MongoCursor(this, args.query, args.options); if (callback) { callback(null, cursor); } else { return cursor; } }; module.exports = MongoProvider;