Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
},
"homepage": "https://github.com/Eppo-exp/node-server-sdk#readme",
"dependencies": {
"@eppo/js-client-sdk-common": "4.3.0",
"@eppo/js-client-sdk-common": "^4.5.0",
"lru-cache": "^10.0.1"
},
"devDependencies": {
Expand All @@ -46,7 +46,7 @@
"eslint-plugin-import": "^2.25.4",
"eslint-plugin-prettier": "^4.0.0",
"eslint-plugin-promise": "^6.0.0",
"express": "^4.18.0",
"express": "^4.21.1",
"husky": "^6.0.0",
"jest": "^29.7.0",
"lint-staged": "^12.3.5",
Expand All @@ -58,5 +58,6 @@
"engines": {
"node": ">=18.x",
"yarn": "1.x"
}
},
"packageManager": "[email protected]+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
}
204 changes: 204 additions & 0 deletions src/events/file-backed-named-event-queue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import * as fs from 'fs';
import * as path from 'path';

import FileBackedNamedEventQueue from './file-backed-named-event-queue';

describe('FileBackedNamedEventQueue', () => {
describe('string events', () => {
const queueName = 'testQueue';
let queue: FileBackedNamedEventQueue<string>;
const queueDirectory = path.resolve(process.cwd(), `.queues/${queueName}`);

beforeEach(() => {
// Clean up the queue directory
if (fs.existsSync(queueDirectory)) {
fs.rmSync(queueDirectory, { recursive: true, force: true });
}
queue = new FileBackedNamedEventQueue(queueName);
});

afterAll(() => {
if (fs.existsSync(queueDirectory)) {
fs.rmSync(queueDirectory, { recursive: true, force: true });
}
});

it('should initialize with an empty queue', () => {
expect(queue.length).toBe(0);
});

it('should persist and retrieve events correctly via push and iterator', () => {
queue.push('event1');
queue.push('event2');

expect(queue.length).toBe(2);

const events = Array.from(queue);
expect(events).toEqual(['event1', 'event2']);
});

it('should persist and retrieve events correctly via push and shift', () => {
queue.push('event1');
queue.push('event2');

const firstEvent = queue.shift();
expect(firstEvent).toBe('event1');
expect(queue.length).toBe(1);

const secondEvent = queue.shift();
expect(secondEvent).toBe('event2');
expect(queue.length).toBe(0);
});

it('should remove events from file system after shift', () => {
queue.push('event1');
const eventFiles = fs.readdirSync(queueDirectory);
expect(eventFiles.length).toBe(2); // One for metadata.json, one for the event file

queue.shift();
const updatedEventFiles = fs.readdirSync(queueDirectory);
expect(updatedEventFiles.length).toBe(1); // Only metadata.json should remain
});

it('should reconstruct the queue from metadata file', () => {
queue.push('event1');
queue.push('event2');

const newQueueInstance = new FileBackedNamedEventQueue<string>(queueName);
expect(newQueueInstance.length).toBe(2);

const events = Array.from(newQueueInstance);
expect(events).toEqual(['event1', 'event2']);
});

it('should handle empty shift gracefully', () => {
expect(queue.shift()).toBeUndefined();
});

it('should not fail if metadata file is corrupted', () => {
const corruptedMetadataFile = path.join(queueDirectory, 'metadata.json');
fs.writeFileSync(corruptedMetadataFile, '{ corrupted state }');

const newQueueInstance = new FileBackedNamedEventQueue<string>(queueName);
expect(newQueueInstance.length).toBe(0);
});

it('should handle events with the same content correctly using consistent hashing', () => {
queue.push('event1');
queue.push('event1'); // Push the same event content twice

expect(queue.length).toBe(2);

const events = Array.from(queue);
expect(events).toEqual(['event1', 'event1']);
});

it('should store each event as a separate file', () => {
queue.push('event1');
queue.push('event2');

const eventFiles = fs.readdirSync(queueDirectory).filter((file) => file !== 'metadata.json');
expect(eventFiles.length).toBe(2);

const eventData1 = fs.readFileSync(path.join(queueDirectory, eventFiles[0]), 'utf8');
const eventData2 = fs.readFileSync(path.join(queueDirectory, eventFiles[1]), 'utf8');

expect([JSON.parse(eventData1), JSON.parse(eventData2)]).toEqual(['event1', 'event2']);
});
});

describe('arbitrary object shapes', () => {
const queueName = 'objectQueue';
let queue: FileBackedNamedEventQueue<{ id: number; name: string }>;
const queueDirectory = path.resolve(process.cwd(), `.queues/${queueName}`);

beforeEach(() => {
// Clean up the queue directory
if (fs.existsSync(queueDirectory)) {
fs.rmdirSync(queueDirectory, { recursive: true });
}
queue = new FileBackedNamedEventQueue(queueName);
});

afterAll(() => {
if (fs.existsSync(queueDirectory)) {
fs.rmdirSync(queueDirectory, { recursive: true });
}
});

it('should handle objects with arbitrary shapes via push and shift', () => {
queue.push({ id: 1, name: 'event1' });
queue.push({ id: 2, name: 'event2' });

expect(queue.length).toBe(2);

const firstEvent = queue.shift();
expect(firstEvent).toEqual({ id: 1, name: 'event1' });

const secondEvent = queue.shift();
expect(secondEvent).toEqual({ id: 2, name: 'event2' });

expect(queue.length).toBe(0);
});

it('should persist and reconstruct queue with objects from metadata file', () => {
queue.push({ id: 1, name: 'event1' });
queue.push({ id: 2, name: 'event2' });

const newQueueInstance = new FileBackedNamedEventQueue<{ id: number; name: string }>(
queueName,
);
expect(newQueueInstance.length).toBe(2);

const events = Array.from(newQueueInstance);
expect(events).toEqual([
{ id: 1, name: 'event1' },
{ id: 2, name: 'event2' },
]);
});
});

describe('splice', () => {
const queueName = 'spliceQueue';
let queue: FileBackedNamedEventQueue<string>;
const queueDirectory = path.resolve(process.cwd(), `.queues/${queueName}`);

beforeEach(() => {
// Clean up the queue directory
if (fs.existsSync(queueDirectory)) {
fs.rmdirSync(queueDirectory, { recursive: true });
}
queue = new FileBackedNamedEventQueue(queueName);
});

afterAll(() => {
if (fs.existsSync(queueDirectory)) {
fs.rmdirSync(queueDirectory, { recursive: true });
}
});

it('should return the correct number of events and remove them from the queue', () => {
queue.push('event1');
queue.push('event2');
queue.push('event3');

const splicedEvents = queue.splice(2);
expect(splicedEvents).toEqual(['event1', 'event2']);
expect(queue.length).toBe(1);
});

it('should return all events if the count is greater than the queue length', () => {
queue.push('event1');
queue.push('event2');

const splicedEvents = queue.splice(3);
expect(splicedEvents).toEqual(['event1', 'event2']);
expect(queue.length).toBe(0);
});

it('should return an empty array if the queue is empty', () => {
const splicedEvents = queue.splice(1);
expect(splicedEvents).toEqual([]);
});
})

Check warning on line 203 in src/events/file-backed-named-event-queue.spec.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk

Insert `;`
});
105 changes: 105 additions & 0 deletions src/events/file-backed-named-event-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import * as fs from 'fs';
import * as path from 'path';

import { applicationLogger, NamedEventQueue } from '@eppo/js-client-sdk-common';

import { takeWhile } from '../util';

export default class FileBackedNamedEventQueue<T> implements NamedEventQueue<T> {
private readonly queueDirectory: string;
private readonly metadataFile: string;
private eventKeys: string[] = [];

constructor(public readonly name: string) {
this.queueDirectory = path.resolve(process.cwd(), `.queues/${this.name}`);
this.metadataFile = path.join(this.queueDirectory, 'metadata.json');

if (!fs.existsSync(this.queueDirectory)) {
fs.mkdirSync(this.queueDirectory, { recursive: true });
}

this.loadStateFromFile();
}

splice(count: number): T[] {
const arr = Array.from({ length: count }, () => this.shift());
return takeWhile(arr, (item) => item !== undefined) as T[];
}

isEmpty(): boolean {
return this.length === 0;
}

get length(): number {
return this.eventKeys.length;
}

push(event: T): void {
const eventKey = this.generateEventKey(event);
const eventFilePath = this.getEventFilePath(eventKey);
fs.writeFileSync(eventFilePath, JSON.stringify(event), 'utf8');
this.eventKeys.push(eventKey);
this.saveStateToFile();
}

*[Symbol.iterator](): IterableIterator<T> {
for (const key of this.eventKeys) {
const eventFilePath = this.getEventFilePath(key);
if (fs.existsSync(eventFilePath)) {
const eventData = fs.readFileSync(eventFilePath, 'utf8');
yield JSON.parse(eventData) as T;
}
}
}

shift(): T | undefined {
if (this.isEmpty()) {
return;
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const eventKey = this.eventKeys.shift()!;
const eventFilePath = this.getEventFilePath(eventKey);

if (fs.existsSync(eventFilePath)) {
const eventData = fs.readFileSync(eventFilePath, 'utf8');
fs.unlinkSync(eventFilePath);
this.saveStateToFile();
return JSON.parse(eventData) as T;
}
}

private loadStateFromFile(): void {
if (fs.existsSync(this.metadataFile)) {
try {
const metadata = fs.readFileSync(this.metadataFile, 'utf8');
this.eventKeys = JSON.parse(metadata);
} catch {
applicationLogger.error('Failed to parse metadata file. Initializing empty queue.');
this.eventKeys = [];
}
}
}

private saveStateToFile(): void {
fs.writeFileSync(this.metadataFile, JSON.stringify(this.eventKeys), 'utf8');
}

private generateEventKey(event: T): string {
return this.hashEvent(event);
}

private getEventFilePath(eventKey: string): string {
return path.join(this.queueDirectory, `${eventKey}.json`);
}

private hashEvent(event: T): string {
const value = JSON.stringify(event);
let hash = 0;
for (let i = 0; i < value.length; i++) {
hash = (hash << 5) - hash + value.charCodeAt(i);
hash |= 0; // Convert to 32bit integer
}
return hash.toString(36);
}
}
48 changes: 48 additions & 0 deletions src/i-client-config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { IAssignmentLogger, IBanditLogger } from "@eppo/js-client-sdk-common";

Check warning on line 1 in src/i-client-config.ts

View workflow job for this annotation

GitHub Actions / lint-test-sdk

Replace `"@eppo/js-client-sdk-common"` with `'@eppo/js-client-sdk-common'`

/**
* Configuration used for initializing the Eppo client
* @public
*/
export interface IClientConfig {
/** Eppo SDK key */
apiKey: string;

/**
* Base URL of the Eppo API.
* Clients should use the default setting in most cases.
*/
baseUrl?: string;

/** Provide a logging implementation to send variation assignments to your data warehouse. */
assignmentLogger: IAssignmentLogger;

/** Logging implementation to send bandit actions to your data warehouse */
banditLogger?: IBanditLogger;

/** Timeout in milliseconds for the HTTPS request for the experiment configuration. (Default: 5000) */
requestTimeoutMs?: number;

/**
* Number of additional times the initial configuration request will be attempted if it fails.
* This is the request servers typically synchronously wait for completion. A small wait will be
* done between requests. (Default: 1)
*/
numInitialRequestRetries?: number;

/**
* Number of additional times polling for updated configurations will be attempted before giving up.
* Polling is done after a successful initial request. Subsequent attempts are done using an exponential
* backoff. (Default: 7)
*/
numPollRequestRetries?: number;

/** Throw error if unable to fetch an initial configuration during initialization. (default: true) */
throwOnFailedInitialization?: boolean;

/** Poll for new configurations even if the initial configuration request failed. (default: false) */
pollAfterFailedInitialization?: boolean;

/** Amount of time in milliseconds to wait between API calls to refresh configuration data. Default of 30_000 (30s). */
pollingIntervalMs?: number;
}
Loading
Loading