"use strict";
var TransactionManager = require("./transactions/transaction-manager"),
Replica = require("./replica"),
Context = require("./utils/context"),
Err = require("./utils/error"),
_ = require("lodash"),
async = require("async"),
path = require("path");
function mergeConnectons(man, source) {
if (source) {
_.merge(man.connections, source);
}
}
function mergeSchemas(man, source) {
if (source) {
_.merge(man.schema, source);
}
}
function mergeInterceptors(src1, src2) {
var res = src1 ? src1.concat() : [], item, i;
if (src2) {
for (i = 0; i < src2.length; i++) {
item = src2[i];
_.remove(res, { interceptor: item.interceptor });
res.push(item);
}
}
return res;
}
function mergeCollections(man, colls, done) {
function addColl(el, cb) {
var conn = man.connections[el.connection] || man.defaultConnection;
man.addCollection(el.name, el.connection, mergeInterceptors(conn.interceptors, el.interceptors), el.schema, cb);
}
if (colls) {
async.each(colls, function (el, cb) {
if (man[el.name]) {
man.removeCollection(el.name, function (err) {
if (err) {
return cb(err);
}
addColl(el, cb);
});
} else {
addColl(el, cb);
}
}, done);
} else {
done();
}
}
function mergeReplicas(man, source) {
// TODO: Implement merge replica set configuration. [2014-01-12]
}
function mergeShards(man, source) {
// TODO: Implement merge shard configurations. [2014-01-12]
}
function parseModel(man, model, done) {
var envModel;
if (model.environments && model.environments[man.env]) {
envModel = model.environments[man.env];
} else {
envModel = {};
}
mergeConnectons(man, model.connections);
mergeConnectons(man, envModel.connections);
man.defaultConnection = envModel.defaultConnection || model.defaultConnection || man.defaultConnection;
mergeSchemas(man, model.schema);
mergeSchemas(man, envModel.schema);
async.series([
function (cb) {
mergeCollections(man, model.collections, cb);
},
function (cb) {
mergeCollections(man, envModel.collections, cb);
},
function (cb) {
mergeReplicas(man, model.replicas);
mergeReplicas(man, envModel.replicas);
mergeShards(man, model.shards);
mergeShards(man, envModel.shards);
cb();
}
], done);
}
function setProvider(man, name, connection, interceptors, schema, done) {
var conn, schm, provider;
if (!done) {
done = function (err) {
if (err) {
throw err;
}
};
}
conn = man.connections[connection || man.defaultConnection];
if (!conn) {
return done(new Err("NO_SUCH_CONN", [connection]));
}
if (schema) {
if (typeof schema === "string") {
schm = man.schema[schema];
if (!schm) {
return done(new Err("NO_SUCH_SCHEMA", [schema]));
}
} else {
if (!schema.__name__) {
return done(new Err("REQUIRED_SCHEMA_NAME"));
}
if (man.schema[schema.__name__]) {
return done(new Err("SCHEMA_EXISTS", [schema.__name__]));
}
man.schema[schema.__name__] = schm = schema;
}
} else {
schm = {};
}
if (!schm.__collName) {
schm.__collName = name;
}
provider = new (man.resolveProvider(conn.provider))(conn.options, schm);
if (interceptors) {
interceptors.forEach(function (el) {
if (typeof el !== "function") {
if (!el.disable) {
var interceptor = man.resolveInterceptor(el.interceptor);
el = interceptor.interception.apply(interceptor.interception, el.options);
}
}
provider.use(el);
});
}
provider.name = name;
provider.manager = man;
man[name] = provider;
man.collections.push(provider);
provider.init(done);
}
function parseArgs() {
var args = {}, i;
function setArg(arg, idx) {
if (arg) {
switch (typeof arg) {
case "string":
if (idx === 0) {
args.connection = arg;
} else {
args.schema = arg;
}
break;
case "object":
if (Array.isArray(arg)) {
args.interceptors = arg;
} else {
args.schema = arg;
}
break;
case "function":
args.done = arg;
break;
}
}
}
for (i = 0; i < 4; i++) {
setArg(arguments[i], i);
}
return args;
}
function FluentContext(setters, manager, name) {
this.setters = setters;
this.manager = manager;
this.name = name;
}
FluentContext.prototype = {
setConnection: function (connName) {
this.connection = connName;
return this;
},
use: function (interceptor) {
if (!this.interceptors) {
this.interceptors = [];
}
this.interceptors.push(interceptor);
return this;
},
setSchema: function (schema) {
this.schema = schema;
return this;
},
addCollection: function (name) {
var that = this;
this.setters.push(function (cb) {
setProvider(that.manager, that.name, that.connection, that.interceptors, that.schema, cb);
});
return new FluentContext(this.setters, this.manager, name);
},
done: function (callback) {
var that = this;
this.setters.push(function (cb) {
setProvider(that.manager, that.name, that.connection, that.interceptors, that.schema, cb);
});
async.parallel(this.setters, callback);
return this.manager;
}
};
// ****************************************************************************************************************
// Manager Members
// ****************************************************************************************************************
var addConnection = function (name, provider, opts) {
if (!name) {
throw new Err("MISSING_ARG", ["name"]);
}
if (!provider) {
throw new Err("MISSING_ARG", ["provider"]);
}
if (!opts) {
throw new Err("MISSING_ARG", ["opts"]);
}
if (this.connections[name]) {
throw new Err("CONN_EXISTS", [name]);
}
if (typeof opts === "string") {
opts = {
connStr: opts
};
}
var conn = {
provider: provider,
options: opts
};
this.connections[name] = conn;
return this;
},
removeConnection = function (name) {
delete this.connections[name];
return this;
},
setCollections = function (name) {
return new FluentContext([], this, name);
},
addCollection = function (name, connection, interceptors, schema, done) {
if (!name) {
throw new Err("MISSING_ARG", ["name"]);
}
if (this[name]) {
var err = new Err("PROVIDER_EXISTS", [name]);
if (done) {
return done(err);
}
throw err;
}
var args = parseArgs(connection, interceptors, schema, done);
setProvider(this, name, args.connection, args.interceptors, args.schema, args.done);
return this;
},
addReplicaSet = function (name, collections, opts, done) {
if (!name || typeof name !== "string") {
throw new Err("MISSING_ARG", ["name"]);
}
if (typeof opts === "function") {
done = opts;
opts = null;
}
if (this[name]) {
var err = new Err("PROVIDER_EXISTS", [name]);
if (done) {
return done(err);
}
throw err;
}
var replica = new Replica(this, collections, opts);
this[name] = replica;
this.collections.push(replica);
if (done) {
done();
}
},
removeCollection = function (name, done) {
if (!this[name]) {
var err = new Err("NO_SUCH_PROVIDER", [name]);
if (done) {
return done(err);
}
throw err;
}
delete this[name];
var provs = _.remove(this.collections, function (item) {
return item.name === name;
});
provs[0].dispose(done || function (err) {
if (err) {
throw err;
}
});
return this;
},
addProvider = function (provider, name, done) {
var that = this;
if (this[name]) {
var err = new Err("PROVIDER_EXISTS", [name]);
if (done) {
return done(err);
}
throw err;
}
provider.init(function (err) {
if (!err) {
provider.name = name;
provider.manager = that;
that[name] = provider;
that.collections.push(provider);
} else if (!done) {
throw err;
}
if (done) {
done(err);
}
});
},
dispose = function (done) {
var that = this;
function iterator(prov, done) {
delete that[prov.name];
prov.dispose(done);
}
async.each(_.values(that.collections), iterator, function (err) {
if (!err) {
that.collections.length = 0;
} else if (!done) {
throw err;
}
if (done) {
done(err);
}
});
},
resolveProvider = function (name) {
switch (name) {
case "file-system":
case "mongodb":
case "everlive":
name = path.join(__dirname, "providers", name);
break;
}
return require(name);
},
resolveInterceptor = function (name) {
switch (name) {
case "cache":
case "log":
case "autofields":
name = path.join(__dirname, "interceptors", name);
break;
}
return require(name);
},
configure = function (opts, callback) {
if (typeof opts === "function") {
callback = opts;
opts = null;
}
if (!callback) {
callback = function (err) {
if (err) {
throw err;
}
};
}
// this._opts is deprecated, maintained for backward compatibility only.
if (!opts) {
opts = this._opts || {};
}
delete this._opts;
var that = this,
transStore = opts.transStore,
dataModelDoc,
confConn;
if (!this.transaction && !transStore) {
transStore = "_transactions";
}
this.env = opts.env || process.env.NODE_ENV || "development";
// FUTURE: Add tests for custom config collection. [2014-01-12]
if (opts.config) {
dataModelDoc = opts.config.modelDocument;
if (!dataModelDoc) {
return callback(new Err("MISSING_CONF_PARAMS"));
}
switch (typeof opts.config.connection) {
case "undefined":
confConn = this.defaultConnection;
break;
case "string":
confConn = opts.config.connection;
break;
case "object":
confConn = opts.config.connection;
this.addConnection(confConn.name, confConn.provider, confConn.options);
confConn = confConn.name;
break;
default:
return new Err("Invalid configuration.");
}
} else if (!this.config) {
dataModelDoc = "data-model";
confConn = this.defaultConnection;
}
async.series([
function (done) {
if (!dataModelDoc) {
return done();
}
if (that.config) {
that.removeCollection("config", function (err) {
if (err) {
return done(err);
}
that.addCollection("config", confConn, done);
});
} else {
that.addCollection("config", confConn, done);
}
},
function (done) {
var type = typeof transStore,
transMan;
if (type === "undefined") {
done();
} else if (type === "string") {
that.addCollection(transStore, function (err) {
if (err) {
done(err);
}
that.transaction = new TransactionManager(that[transStore], that);
done();
});
} else if (type === "object") {
if (transStore.insert && transStore.update && transStore.upsert && transStore.get && transStore.delete) {
transMan = new TransactionManager(transStore, that);
} else if (transStore.module) {
transMan = new TransactionManager(require(transStore.module)(transStore.options));
}
if (!transMan) {
return done(new Err("INVALID_TRANS_STORE"));
}
that.transaction = transMan;
done();
} else {
done(new Err("INVALID_TRANS_STORE"));
}
},
function (done) {
if (!dataModelDoc || opts.omitModelDoc) {
return done();
}
that.config.get(null, dataModelDoc, function (err, model) {
if (err) {
if (opts.config) {
if (err.code === "ITEM_DOESNOT_EXIST") {
err = new Err("NO_CONF_FILE");
}
} else {
if (err.code === "ITEM_DOESNOT_EXIST") {
err = null;
}
}
return done(err);
}
parseModel(that, model, done);
});
},
function (done) {
parseModel(that, opts, done);
}
], callback);
return this;
},
createContext = function (context) {
return new Context(context);
};
/**
* A manager instance acts as an entry point for a group of data collections.
*
* @class Represents an Entree instance.
* @param {object=} opts - Deprecated: Use [configure]{@link Manager#configure} method to setup the instance.
*/
function Manager(opts) {
/**
* Specifies the default connection for this instance. The default value is `"fs-default"`.
*/
this.defaultConnection = "fs-default";
/**
* A collection of predefined data store (database) connections.
*/
this.connections = {
"fs-default": {
provider: "file-system",
options: {
connStr: "./data"
}
}
};
/**
* A collection of data, index and mapping definitions. Definitions are optional in most cases.
*/
this.schema = {};
/**
* An array containing all established data collections. Individual collections may be connected to different data stores.
*/
this.collections = [];
this._opts = opts;
}
/**
* Adds new data sotre connection for this instance.
* Each connection represents a connection to a database or data storage service with specific configuration (options).
*
* Every manager instance has one pre-initialized default connection with the following parameters:
*
* - name: "fs-defalut"
* - provider: "file-system"
* - connection string: "./data" - the path is relative to the `process.cwd()`
*
* The default connection can be changed: `entree.defaultConnection = "mongo-default";`
*
* @example
* var entree = require("entree");
*
* entree.addConnection("fs-default", "file-system", "./data");
* entree.addConnection("mongo-default", "mongodb", "mongodb://localhost/entreeTest");
* entree.addConnection("mongo-2", "mongodb", { connStr: "mongodb://191.168.0.100/node2" });
* entree.addConnection("everlive", "everlive", {
* connStr: "http://api.everlive.com/v1/uZEGyZYKiSq5CTSq/",
* authorization: "MasterKey PqmmvlWWBF8svReW8p3mkYG9X61nus1w"
* });
*
* @func
* @param {string} name - The name of the connection. The name must be unique within a Manger instance.
* @param {string} provider - This could be the name of a built-in data provider,
* e.g. `file-system`, `mongodb` and etc., or `require` module implementing custom provider.
* @param {(string|object)} opts - The configuration information for initializing data provider instances.
* If this argument is string, it is assumed to be a connection string.
* This argument is required and at least connection string must be present.
* @returns {Manger} - This instance.
*/
Manager.prototype.addConnection = addConnection;
/**
* Removes the specified connection form this instance.
*
* @func
* @param {string} name - The name of the connecton to remove.
* @returns {Manger} - This instance.
*/
Manager.prototype.removeConnection = removeConnection;
/**
* Configures a set of data collections using fluent interface.
*
* @example
* manager
* .setCollections("users")
* .setConnection("mongo-default")
* .setSchema({ __name__: "user", name: String, age: Number, roles: Array })
* .use(logging)
* .use(caching)
* .addCollection("comments")
* .setConnection("mongo-hq")
* .use(caching)
* .done(function (err) {
* // collections are ready to be used
* });
*
* @func
* @param {stirng} name - The name of the first data collection in the set.
* @returns {FluentContext}
*/
Manager.prototype.setCollections = setCollections;
/**
* Creates new data colledtion and adds it to this instance.
*
* @func
* @param {string} name - The name of the data collection. The name must be unique within a Manger instance.
* @param {string=} connection - Specifies which connection should the collection use to store and retrieve data.
* If omitted the default connection will be used.
* @param {Array=} interceptors - An array of interceptor function to attach to this collection.
* @param {(string|object)=} schema - The name of a predefined schema for this colleciton or an object representing the data schema.
* @param {function=} done - Optional callback that will be called when the collection is initialized.
* @returns {Manger} - This instance.
*/
Manager.prototype.addCollection = addCollection;
// TODO: document this method.
/**
* Creates new replica set.
*/
Manager.prototype.addReplicaSet = addReplicaSet;
/**
* Removes the specified data collection from this instance.
*
* @func
* @param {string} name - The name of the collection to remove.
* @throws Will throw an error if the specified collection does not exist.
* @returns {Manger} - This instance.
*/
Manager.prototype.removeCollection = removeCollection;
/**
* Does the same as [addCollection]{@link Manager#addCollection} method, but it accepts data provider instance instead of configuration.
*
* @func
* @param {Provider} provider - An instance of {@link Provider} class.
* @param {string} name - The name of the data collection. The name must be unique within a Manger instance.
* @param {function=} done - Optional callback that will be called when the collection is initialized.
* @returns {Manger} - This instance.
*/
Manager.prototype.addProvider = addProvider;
/**
* Closes all opened database connections, clears current state and cached data if any.
* The manager instance can no longer be used after dispose.
*
* @func
* @param {function} done - An optional callback that will be called after the manager is disposed.
* @returns {null}
*/
Manager.prototype.dispose = dispose;
/**
* Resolves and loads the specified data provider type.
*
* @func
* @param {string} name - The name of a built-in data provider or path to a custom provider module.
* @returns {Provider} - An instance of the specified data provider.
*/
Manager.prototype.resolveProvider = resolveProvider;
/**
* Resolves and loads the specified interception module.
*
* @example
* var entree = require("entree"),
* cache = enree.resolveInterceptor("cache"),
* stores = [
* { store: "memory", max: 1000, ttl: 10 },
* { store: "redis", db: 0, ttl: 100 }
* ],
* caching = cache.interception(stores, ["get", "select"]);
*
* entree.addCollection("users", [caching]);
*
* // or add interceptor at later stage
* entree.users.use(caching);
*
* @func
* @param {string} name - The name of a built-in interceptor or path to a custom interception module.
* @returns {module} - An instance of interception module.
*/
Manager.prototype.resolveInterceptor = resolveInterceptor;
/**
* Sets up the entire infrastructure for the current Entree instance.
* That includes database connections, data collections, data schemas, interceptors, replica sets and shards.
* The configuration information can be provided either as an object argument for this method or a configuration document (file) in JSON format.
* For full description of the configuration format please see the {@tutorial configuration} tutorial.
*
* If the `opts` argument is omitted, Entree will try to find "data-model" document in the {@tutorial default-config-collection}.
* By convention "data-model" is the default configuration file for Entree.
* There may be additional configuration documents as described in the {@tutorial default-config-collection} tutorial.
*
* This method may be called multiple times; in which case all non-overlapping settings will be appended and overlapping ones will be merged.
*
* @func
* @param {object=} opts - Object containing configuration information and additional options.
* @param {function=} callback - An optional callback that will be called after the configuration is processed and all connections are initialized.
*/
Manager.prototype.configure = configure;
/**
* Alias to [configure]{@link Manager#configure} method.
*
* @func
*/
Manager.prototype.init = configure;
/**
* Creates a context object that is passed as the first argument in action methods
* such as [insert]{@link Provider#insert}, [update]{@link Provider#update}, [select]{@link Provider#select} and etc.
* This function just wraps the original context that is passed as an argument.
* The wrapper is needed to properly distinguish the context from the rest of the arguments.
*
* Execution context is used to pass arguments to interceptors and shards. For instance,
* authorization interceptor will require an `user` object in the context to determine the
* rights of the caller for the current operation.
*
* @example
* var entree = require("entree");
*
* app.post("/items", function (req, res) {
* var context = entree.createContext(req.user);
* entree.items.insert(context, req.body, function (err, result) {
* if (err) {
* res.statusCode = 500;
* return res.send(err.message);
* }
* res.send(JSON.stringify(result));
* });
* });
*
* @func
* @param {object} context - The actual context.
* @return {Context} - The context for the current execution;
*/
Manager.prototype.createContext = createContext;
module.exports = Manager;