Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 7 additions & 123 deletions api/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ const log = require('./utils/log.js')('aggregator-core:api');
const common = require('./utils/common.js');
const {WriteBatcher} = require('./parts/data/batcher.js');
const {Cacher} = require('./parts/data/cacher.js');
const {changeStreamReader} = require('./parts/data/changeStreamReader.js');
const usage = require('./aggregator/usage.js');
const QueryRunner = require('./parts/data/QueryRunner.js');
//Core aggregators
require('./aggregator/processing.js');
var t = ["countly:", "aggregator"];
t.push("node");

Expand All @@ -32,9 +33,10 @@ plugins.connectToAllDatabases(true).then(function() {
// common.writeBatcher = new WriteBatcher(common.db);

common.writeBatcher = new WriteBatcher(common.db);
common.secondaryWriteBatcher = new WriteBatcher(common.db);
common.secondaryWriteBatcher = new WriteBatcher(common.db);//Remove once all plugins are updated
common.manualWriteBatcher = new WriteBatcher(common.db, true); //Manually trigerable batcher
common.readBatcher = new Cacher(common.db); //Used for Apps info

common.queryRunner = new QueryRunner();
common.readBatcher.transformationFunctions = {
"event_object": function(data) {
if (data && data.list) {
Expand Down Expand Up @@ -80,123 +82,6 @@ plugins.connectToAllDatabases(true).then(function() {
}
};


//Events processing
plugins.register("/aggregator", function() {
var changeStream = new changeStreamReader(common.drillDb, {
pipeline: [
{"$match": {"operationType": "insert", "fullDocument.e": "[CLY]_custom"}},
{"$project": {"__iid": "$fullDocument._id", "cd": "$fullDocument.cd", "a": "$fullDocument.a", "e": "$fullDocument.e", "n": "$fullDocument.n", "ts": "$fullDocument.ts", "sg": "$fullDocument.sg", "c": "$fullDocument.c", "s": "$fullDocument.s", "dur": "$fullDocument.dur"}}
],
fallback: {
pipeline: [{
"$match": {"e": {"$in": ["[CLY]_custom"]}}
}, {"$project": {"__id": "$_id", "cd": "$cd", "a": "$a", "e": "$e", "n": "$n", "ts": "$ts", "sg": "$sg", "c": "$c", "s": "$s", "dur": "$dur"}}],
},
"name": "event-ingestion"
}, (token, currEvent) => {
if (currEvent && currEvent.a && currEvent.e) {
usage.processEventFromStream(token, currEvent);
}
// process next document
});
common.writeBatcher.addFlushCallback("events_data", function(token) {
changeStream.acknowledgeToken(token);
});
});


//Sessions processing
plugins.register("/aggregator", function() {
var changeStream = new changeStreamReader(common.drillDb, {
pipeline: [
{"$match": {"operationType": "insert", "fullDocument.e": "[CLY]_session"}},
{"$addFields": {"__id": "$fullDocument._id", "cd": "$fullDocument.cd"}},
],
fallback: {
pipeline: [{
"$match": {"e": {"$in": ["[CLY]_session"]}}
}]
},
"name": "session-ingestion"
}, (token, next) => {
if (next.fullDocument) {
next = next.fullDocument;
}
var currEvent = next;
if (currEvent && currEvent.a) {
//Record in session data
common.readBatcher.getOne("apps", common.db.ObjectID(currEvent.a), function(err, app) {
//record event totals in aggregated data
if (err) {
log.e("Error getting app data for session", err);
return;
}
if (app) {
usage.processSessionFromStream(token, currEvent, {"app_id": currEvent.a, "app": app, "time": common.initTimeObj(app.timezone, currEvent.ts), "appTimezone": (app.timezone || "UTC")});
}
});
}
});
common.writeBatcher.addFlushCallback("users", function(token) {
changeStream.acknowledgeToken(token);
});
});

plugins.register("/aggregator", function() {
var writeBatcher = new WriteBatcher(common.db);
var changeStream = new changeStreamReader(common.drillDb, {
pipeline: [
{"$match": {"operationType": "update"}},
{"$addFields": {"__id": "$fullDocument._id", "cd": "$fullDocument.cd"}}
],
fallback: {
pipeline: [{"$match": {"e": {"$in": ["[CLY]_session"]}}}],
"timefield": "lu"
},
"options": {fullDocument: "updateLookup"},
"name": "session-updates",
"collection": "drill_events",
"onClose": async function(callback) {
await common.writeBatcher.flush("countly", "users");
if (callback) {
callback();
}
}
}, (token, fullDoc) => {
var fallback_processing = true;
var next = fullDoc;
if (next.fullDocument) {
fallback_processing = false;
next = fullDoc.fullDocument;
}
if (next && next.a && next.e && next.e === "[CLY]_session" && next.n && next.ts) {
common.readBatcher.getOne("apps", common.db.ObjectID(next.a), function(err, app) {
//record event totals in aggregated data
if (err) {
log.e("Error getting app data for session", err);
return;
}
if (app) {
var dur = 0;
if (fallback_processing) {
dur = next.dur || 0;
}
else {
dur = (fullDoc && fullDoc.updateDescription && fullDoc.updateDescription.updatedFields && fullDoc.updateDescription.updatedFields.dur) || 0;
}//if(dur){
usage.processSessionDurationRange(writeBatcher, token, dur, next.did, {"app_id": next.a, "app": app, "time": common.initTimeObj(app.timezone, next.ts), "appTimezone": (app.timezone || "UTC")});
//}
}
});
}
});
writeBatcher.addFlushCallback("users", function(token) {
changeStream.acknowledgeToken(token);
});
});


/**
* Set Plugins APIs Config
*/
Expand Down Expand Up @@ -301,7 +186,6 @@ plugins.connectToAllDatabases(true).then(function() {
async function storeBatchedData(code) {
try {
await common.writeBatcher.flushAll();
await common.secondaryWriteBatcher.flushAll();
// await common.insertBatcher.flushAll();
console.log("Successfully stored batch state");
}
Expand Down Expand Up @@ -360,7 +244,7 @@ plugins.connectToAllDatabases(true).then(function() {


plugins.init({"skipDependencies": true, "filename": "aggregator"});
plugins.loadConfigs(common.db, function() {
plugins.loadConfigs(common.db, async function() {
plugins.dispatch("/aggregator", {common: common});
});
});
Expand Down
222 changes: 222 additions & 0 deletions api/aggregator/processing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/**
* File contains core data aggregators
*/


var common = require('../utils/common.js');
const { dataBatchReader } = require('../parts/data/dataBatchReader');
const plugins = require('../../plugins/pluginManager.js');
var usage = require('./usage.js');
const log = require('../utils/log.js')('aggregator-core:api');
const {WriteBatcher} = require('../parts/data/batcher.js');
const {Cacher} = require('../parts/data/cacher.js');

//dataviews = require('./parts/data/dataviews.js');

var crypto = require('crypto');

(function() {
function determineType(value) {
var type = "l";
if (Array.isArray(value)) {
type = "a";
}
else if (common.isNumber(value)) {
if ((value + "").length < 16) {
if ((value + "").length === 10 || (value + "").length === 13) {
type = "d"; //timestamp
}
else {
type = "n";
}
}
}
return type;
}
//Core events data aggregator
plugins.register("/aggregator", function() {
new dataBatchReader(common.drillDb, {
pipeline: [
{"$match": {"e": "[CLY]_custom"}},
{
"$group": {
"_id": {
"a": "$a",
"e": "$n",
"h": {"$dateToString": {"date": {"$toDate": "$ts"}, "format": "%Y:%m:%d:%H", "timezone": "UTC"}},
},
"c": {"$sum": "$c"},
"s": {"$sum": "$s"},
"dur": {"$sum": "$dur"}
}
},
{"$sort": {"a": 1}},
{
"$project": {
"_id": 0,
"a": "$_id.a",
"e": "$_id.e",
"h": "$_id.h",
"c": 1,
"s": 1,
"dur": 1
}
}
],
"interval": 10000, //10 seconds
"name": "event-aggregation",
"collection": "drill_events",
}, async function(token, results) {
if (results && results.length > 0) {
await usage.processEventTotalsFromStream(token, results, common.manualWriteBatcher);
//Flush collected changes
await common.manualWriteBatcher.flush("countly", "events_data", token.cd);
}
// process next batch of documents
});
});

//processes session data and updates in aggregated data
plugins.register("/aggregator", function() {
new dataBatchReader(common.drillDb, {
pipeline: [
{"$match": {"e": "[CLY]_session"}},
],
"name": "session-ingestion"
}, async function(token, data) {
if (data.length > 0) {
for (var k = 0; k < data.length; k++) {
var currEvent = data[k];
if (currEvent && currEvent.a) {
//Record in session data
try {
var app = await common.readBatcher.getOne("apps", common.db.ObjectID(currEvent.a));
//record event totals in aggregated data
if (app) {
await usage.processSessionFromStream(token, currEvent, {"app_id": currEvent.a, "app": app, "time": common.initTimeObj(app.timezone, currEvent.ts), "appTimezone": (app.timezone || "UTC")});
}
}
catch (ex) {
log.e("Error processing session event", ex);
return;
}
}
}
await common.manualWriteBatcher.flush("countly", "users", token.cd);
}
});
});

plugins.register("/aggregator", function() {
var writeBatcher = new WriteBatcher(common.db, true);
new dataBatchReader(common.drillDb, {
pipeline: [{"$match": {"e": {"$in": ["[CLY]_session_update"]}}}],
"name": "session-updates",
"collection": "drill_events",
}, async function(token, docs) {
console.log("Processing session updates " + docs.length);
if (docs.length > 0) {
for (var z = 0; z < docs.length; z++) {
var next = docs[z];
if (next && next.a && next.e && next.e === "[CLY]_session_update" && next.ts) {
try {
var app = await common.readBatcher.getOne("apps", common.db.ObjectID(next.a));
if (app) {
var dur = 0;
dur = next.dur || 0;

await usage.processSessionDurationRange(writeBatcher, token, dur, next.did, {"app_id": next.a, "app": app, "time": common.initTimeObj(app.timezone, next.ts), "appTimezone": (app.timezone || "UTC")});
//}
}
}
catch (e) {
log.e(e);
return;
}

}
}
await writeBatcher.flush("countly", "users", token.cd);
}
});
});


//Drill meta aggregator
plugins.register("/aggregator", function() {
var drillMetaCache = new Cacher(common.drillDb); //Used for Apps info
new dataBatchReader(common.drillDb, {
pipeline: [
{
"$project": {
"a": "$a",
"e": "$e",
"n": "$n",
"sg": {"$ifNull": [{"$objectToArray": "$sg"}, [{"k": null, "v": null}]]}
}
},
{"$unwind": "$sg"},
{"$group": {"_id": {"a": "$a", "e": "$e", "n": "$n", "sgk": "$sg.k"}, "sgv": {"$first": "$sg.v"}}}],
"interval": 10000, ///default update interval
"name": "drill-meta",
"collection": "drill_events",
"onClose": async function(callback) {
if (callback) {
callback();
}
},
}, async function(token, results) {
var updates = {};
for (var z = 0; z < results.length; z++) {
if (results[z]._id && results[z]._id.a && results[z]._id.e) {
if (results[z]._id.e === "[CLY]_custom") {
results[z]._id.e = results[z]._id.n;
}
let event_hash = crypto.createHash("sha1").update(results[z]._id.e + results[z]._id.a).digest("hex");
var meta = await drillMetaCache.getOne("drill_meta", {_id: results[z]._id.a + "_meta_" + event_hash});
var app_id = results[z]._id.a;
if ((!meta || !meta._id) && !updates[app_id + "_meta_" + event_hash]) {
updates[app_id + "_meta_" + event_hash] = {
_id: app_id + "_meta_" + event_hash,
app_id: results[z]._id.a,
e: results[z]._id.e,
type: "e"
};
meta = {
_id: app_id + "_meta_" + event_hash,
app_id: results[z]._id.a,
e: results[z]._id.e,
type: "e"
};
}
if (results[z]._id.sgk) {
if (!meta.sg || !meta.sg[results[z]._id.sgk]) {
meta.sg = meta.sg || {};
var type = determineType(results[z].sgv);
meta.sg[results[z]._id.sgk] = {
type: type
};
updates[app_id + "_meta_" + event_hash] = updates[app_id + "_meta_" + event_hash] || {};
updates[app_id + "_meta_" + event_hash]["sg." + results[z]._id.sgk + ".type"] = type;
}
}
}
}
//trigger all updates.
if (Object.keys(updates).length > 0) {
//bulk operation
const bulkOps = Object.keys(updates).map(u => {
return {
updateOne: {
filter: {"_id": u},
update: {$set: updates[u]},
upsert: true
}
};
});
await common.drillDb.collection("drill_meta").bulkWrite(bulkOps);
}
// process next document
});
});
}());
Loading
Loading