Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/EventProcessor.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const EventSender = require('./EventSender');
const EventSummarizer = require('./EventSummarizer');
const MultiEventSummarizer = require('./MultiEventSummarizer');
const ContextFilter = require('./ContextFilter');
const errors = require('./errors');
const messages = require('./messages');
Expand All @@ -17,8 +17,8 @@ function EventProcessor(
const processor = {};
const eventSender = sender || EventSender(platform, environmentId, options);
const mainEventsUrl = utils.appendUrlPath(options.eventsUrl, '/events/bulk/' + environmentId);
const summarizer = EventSummarizer();
const contextFilter = ContextFilter(options);
const summarizer = MultiEventSummarizer(contextFilter);
const samplingInterval = options.samplingInterval;
const eventCapacity = options.eventCapacity;
const flushInterval = options.flushInterval;
Expand Down Expand Up @@ -117,17 +117,19 @@ function EventProcessor(
}
};

processor.flush = function() {
processor.flush = async function() {
if (disabled) {
return Promise.resolve();
}
const eventsToSend = queue;
const summary = summarizer.getSummary();
summarizer.clearSummary();
if (summary) {
summary.kind = 'summary';
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This moved inside the summarizer.

eventsToSend.push(summary);
}
const summaries = summarizer.getSummaries();

summaries.forEach(summary => {
if (Object.keys(summary.features).length) {
eventsToSend.push(summary);
}
});

if (diagnosticsAccumulator) {
// For diagnostic events, we record how many events were in the queue at the last flush (since "how
// many events happened to be in the queue at the moment we decided to send a diagnostic event" would
Expand Down
1 change: 1 addition & 0 deletions src/EventSummarizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ function EventSummarizer() {
startDate,
endDate,
features: flagsOut,
kind: 'summary',
};
};

Expand Down
61 changes: 61 additions & 0 deletions src/MultiEventSummarizer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
const canonicalize = require('./canonicalize');
const EventSummarizer = require('./EventSummarizer');

/**
*
* @param {{filter: (context: any) => any}} contextFilter
* @param {() => {update: (value: string) => void, digest: (format: string) => Promise<string>}} hasherFactory
*/
function MultiEventSummarizer(contextFilter) {
let summarizers = {};
let contexts = {};

/**
* Summarize the given event.
* @param {{
* kind: string,
* context?: any,
* }} event
*/
function summarizeEvent(event) {
if (event.kind === 'feature') {
const key = canonicalize(event.context);
if (!key) {
return;
}

let summarizer = summarizers[key];
if (!summarizer) {
summarizers[key] = EventSummarizer();
summarizer = summarizers[key];
contexts[key] = event.context;
}

summarizer.summarizeEvent(event);
}
}

/**
* Get the summaries of the events that have been summarized.
* @returns {any[]}
*/
function getSummaries() {
const summarizersToFlush = summarizers;
const contextsForSummaries = contexts;

summarizers = {};
contexts = {};
return Object.entries(summarizersToFlush).map(([key, summarizer]) => {
const summary = summarizer.getSummary();
summary.context = contextFilter.filter(contextsForSummaries[key]);
return summary;
});
}

return {
summarizeEvent,
getSummaries,
};
}

module.exports = MultiEventSummarizer;
112 changes: 112 additions & 0 deletions src/__tests__/EventProcessor-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,73 @@ describe.each([
});
});

it('generates separate summary events for different contexts', async () => {
await withProcessorAndSender(defaultConfig, async (ep, mockEventSender) => {
const context1 = { key: 'user1', kind: 'user' };
const context2 = { key: 'user2', kind: 'user' };

// Create feature events for two different contexts
const event1 = {
kind: 'feature',
creationDate: 1000,
context: context1,
key: 'flag1',
version: 11,
variation: 1,
value: 'value1',
default: 'default1',
trackEvents: false,
};

const event2 = {
kind: 'feature',
creationDate: 1000,
context: context2,
key: 'flag2',
version: 22,
variation: 2,
value: 'value2',
default: 'default2',
trackEvents: false,
};

ep.enqueue(event1);
ep.enqueue(event2);
await ep.flush();

expect(mockEventSender.calls.length()).toEqual(1);
const output = (await mockEventSender.calls.take()).events;

// Should have two summary events, one for each context
expect(output.length).toEqual(2);

// Find the summary event for each context
const summary1 = output.find(e => e.context.key === 'user1');
const summary2 = output.find(e => e.context.key === 'user2');

// Verify each summary event has the correct context and flag data
expect(summary1).toBeDefined();
expect(summary1.context).toEqual(context1);
expect(summary1.features).toEqual({
flag1: {
contextKinds: ['user'],
default: 'default1',
counters: [{ version: 11, variation: 1, value: 'value1', count: 1 }],
},
});

expect(summary2).toBeDefined();
expect(summary2.context).toEqual(context2);
expect(summary2.features).toEqual({
flag2: {
contextKinds: ['user'],
default: 'default2',
counters: [{ version: 22, variation: 2, value: 'value2', count: 1 }],
},
});
});
});

describe('interaction with diagnostic events', () => {
it('sets eventsInLastBatch on flush', async () => {
const e0 = { kind: 'custom', creationDate: 1000, context: eventContext, key: 'key0' };
Expand Down Expand Up @@ -525,5 +592,50 @@ describe.each([
expect(diagnosticAccumulator.getProps().droppedEvents).toEqual(2);
});
});

it('filters context in summary events', async () => {
const event = {
kind: 'feature',
creationDate: 1000,
context: eventContext,
key: 'flagkey',
version: 11,
variation: 1,
value: 'value',
default: 'default',
trackEvents: true,
};

// Configure with allAttributesPrivate set to true
const config = { ...defaultConfig, allAttributesPrivate: true };

const sender = MockEventSender();
const ep = EventProcessor(platform, config, envId, null, null, sender);
try {
ep.enqueue(event);
await ep.flush();

expect(sender.calls.length()).toEqual(1);
const output = (await sender.calls.take()).events;
expect(output.length).toEqual(2);

// Verify the feature event has filtered context
checkFeatureEvent(output[0], event, false, filteredContext);

// Verify the summary event has filtered context
const summaryEvent = output[1];
checkSummaryEvent(summaryEvent);
expect(summaryEvent.context).toEqual(filteredContext);
expect(summaryEvent.features).toEqual({
flagkey: {
contextKinds: ['user'],
default: 'default',
counters: [{ version: 11, variation: 1, value: 'value', count: 1 }],
},
});
} finally {
ep.stop();
}
});
});
});
4 changes: 3 additions & 1 deletion src/__tests__/LDClient-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,9 @@ describe('LDClient', () => {
await client.waitForInitialization(5);
});

expect(eventsServer.requests.length()).toEqual(1);
// Flushing is an async operation, so we cannot ensure that the requests are made by
// the time we reach this point. If we await the nextRequest(), then it will catch
// whatever was flushed.
const req = await eventsServer.nextRequest();
const data = JSON.parse(req.body);
expect(data.length).toEqual(1);
Expand Down
Loading