Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions client/Event.d.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
interface InitOptions {
host: string;
port?: number;
protocol: string;
}
type Callback<T = any> = (payload: T) => void;
import { Callback, InitOptions } from "./adapters/types";
declare const event: {
init({ host, port, protocol }: InitOptions): void;
subscribe<T = any>(type: string, callback: Callback<T>): () => void;
publish<T = any>(...args: [...string[], T]): void;
init(options: InitOptions): Promise<void>;
publish<T = any>(...args: [...string[], T]): Promise<void>;
subscribe<T = any>(type: string, callback: Callback<T>): Promise<() => void>;
cleanup(): Promise<void>;
};
export { event };
159 changes: 120 additions & 39 deletions client/Event.js
Original file line number Diff line number Diff line change
@@ -1,54 +1,135 @@
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g = Object.create((typeof Iterator === "function" ? Iterator : Object).prototype);
return g.next = verb(0), g["throw"] = verb(1), g["return"] = verb(2), typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (g && (g = 0, op[0] && (_ = 0)), _) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
default:
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
}
op = body.call(thisArg, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
}
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.event = void 0;
var socket_io_client_1 = require("socket.io-client");
var socket = null;
var callbacks = {};
var InMemoryAdapter_1 = require("./adapters/InMemoryAdapter");
var KafkaAdapter_1 = require("./adapters/KafkaAdapter");
var event = {
init: function (_a) {
var host = _a.host, port = _a.port, protocol = _a.protocol;
if (socket)
return;
var socketPath = port ? "".concat(protocol, "://").concat(host, ":").concat(port) : "".concat(protocol, "://").concat(host);
socket = (0, socket_io_client_1.io)(socketPath);
socket.on("event", function (_a) {
var type = _a.type, payload = _a.payload;
if (callbacks[type]) {
callbacks[type].forEach(function (cb) { return cb(payload); });
}
init: function (options) {
return __awaiter(this, void 0, void 0, function () {
var adapter;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
adapter = options.type === "inMemory" ? new InMemoryAdapter_1.InMemoryAdapter() : new KafkaAdapter_1.KafkaAdapter();
this._adapter = adapter;
return [4 /*yield*/, adapter.init(options)];
case 1:
_a.sent();
return [2 /*return*/];
}
});
});
},
subscribe: function (type, callback) {
if (!socket)
throw new Error("Event not initialized. Call event.init first.");
if (!callbacks[type])
callbacks[type] = new Set();
callbacks[type].add(callback);
socket.emit("subscribe", type);
return function () {
callbacks[type].delete(callback);
if (callbacks[type].size === 0) {
delete callbacks[type];
socket.emit("unsubscribe", type);
}
};
},
publish: function () {
var args = [];
for (var _i = 0; _i < arguments.length; _i++) {
args[_i] = arguments[_i];
}
if (!socket)
throw new Error("Event not initialized. Call event.init first.");
if (args.length < 2) {
throw new Error("publish requires at least one event type and a payload");
}
var payload = args[args.length - 1];
var types = args.slice(0, -1);
// Publish to all specified event types
types.forEach(function (type) {
socket.emit("publish", { type: type, payload: payload });
return __awaiter(this, void 0, void 0, function () {
var adapter;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
adapter = this._adapter;
if (!adapter)
throw new Error("Event not initialized");
return [4 /*yield*/, adapter.publish.apply(adapter, args)];
case 1:
_a.sent();
return [2 /*return*/];
}
});
});
},
subscribe: function (type, callback) {
return __awaiter(this, void 0, void 0, function () {
var adapter;
return __generator(this, function (_a) {
adapter = this._adapter;
if (!adapter)
throw new Error("Event not initialized");
return [2 /*return*/, adapter.subscribe(type, callback)];
});
});
},
cleanup: function () {
return __awaiter(this, void 0, void 0, function () {
var adapter;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
adapter = this._adapter;
if (!adapter)
return [2 /*return*/];
return [4 /*yield*/, adapter.cleanup()];
case 1:
_a.sent();
return [2 /*return*/];
}
});
});
},
};
exports.event = event;
process.on("SIGINT", function () { return __awaiter(void 0, void 0, void 0, function () {
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
console.log("Shutting down gracefully...");
return [4 /*yield*/, event.cleanup()];
case 1:
_a.sent();
process.exit(0);
return [2 /*return*/];
}
});
}); });
process.on("SIGTERM", function () { return __awaiter(void 0, void 0, void 0, function () {
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
console.log("Shutting down gracefully...");
return [4 /*yield*/, event.cleanup()];
case 1:
_a.sent();
process.exit(0);
return [2 /*return*/];
}
});
}); });
85 changes: 38 additions & 47 deletions client/Event.ts
Original file line number Diff line number Diff line change
@@ -1,58 +1,49 @@
import { Socket, io } from "socket.io-client";
import { Callback, InitOptions } from "./adapters/types";

let socket: Socket | null = null;
const callbacks: Record<string, Set<Callback>> = {};

interface InitOptions {
host: string;
port?: number;
protocol: string;
}
type Callback<T = any> = (payload: T) => void;
import { EventAdapter } from "./adapters/Adapter";
import { InMemoryAdapter } from "./adapters/InMemoryAdapter";
import { KafkaAdapter } from "./adapters/KafkaAdapter";

const event = {
init({ host, port, protocol }: InitOptions) {
if (socket) return;
const socketPath = port ? `${protocol}://${host}:${port}` : `${protocol}://${host}`;
socket = io(socketPath);
socket.on("event", ({ type, payload }: { type: string; payload: any }) => {
if (callbacks[type]) {
callbacks[type].forEach((cb) => cb(payload));
}
});
async init(options: InitOptions) {
const adapter: EventAdapter =
options.type === "inMemory" ? new InMemoryAdapter() : new KafkaAdapter();
(this as any)._adapter = adapter;
await adapter.init(options);
},

async publish<T = any>(...args: [...string[], T]): Promise<void> {
const adapter: EventAdapter | undefined = (this as any)._adapter;
if (!adapter) throw new Error("Event not initialized");
await adapter.publish(...args);
},

subscribe<T = any>(type: string, callback: Callback<T>): () => void {
if (!socket)
throw new Error("Event not initialized. Call event.init first.");
if (!callbacks[type]) callbacks[type] = new Set();
callbacks[type].add(callback as Callback);
socket!.emit("subscribe", type);
return () => {
callbacks[type].delete(callback as Callback);
if (callbacks[type].size === 0) {
delete callbacks[type];
socket!.emit("unsubscribe", type);
}
};
async subscribe<T = any>(
type: string,
callback: Callback<T>
): Promise<() => void> {
const adapter: EventAdapter | undefined = (this as any)._adapter;
if (!adapter) throw new Error("Event not initialized");
return adapter.subscribe(type, callback as any);
},

publish<T = any>(...args: [...string[], T]): void {
if (!socket)
throw new Error("Event not initialized. Call event.init first.");

if (args.length < 2) {
throw new Error("publish requires at least one event type and a payload");
}

const payload = args[args.length - 1];
const types = args.slice(0, -1) as string[];

// Publish to all specified event types
types.forEach(type => {
socket!.emit("publish", { type, payload });
});
async cleanup() {
const adapter: EventAdapter | undefined = (this as any)._adapter;
if (!adapter) return;
await adapter.cleanup();
},
};

process.on("SIGINT", async () => {
console.log("Shutting down gracefully...");
await event.cleanup();
process.exit(0);
});

process.on("SIGTERM", async () => {
console.log("Shutting down gracefully...");
await event.cleanup();
process.exit(0);
});

export { event };
7 changes: 7 additions & 0 deletions client/adapters/Adapter.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Callback, InitOptions } from "./types";
export interface EventAdapter {
init(options: InitOptions): Promise<void>;
publish<T = any>(...args: [...string[], T]): Promise<void>;
subscribe<T = any>(type: string, callback: Callback<T>): Promise<() => void>;
cleanup(): Promise<void>;
}
2 changes: 2 additions & 0 deletions client/adapters/Adapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
10 changes: 10 additions & 0 deletions client/adapters/Adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Callback, InitOptions } from "./types";

export interface EventAdapter {
init(options: InitOptions): Promise<void>;
publish<T = any>(...args: [...string[], T]): Promise<void>;
subscribe<T = any>(type: string, callback: Callback<T>): Promise<() => void>;
cleanup(): Promise<void>;
}


9 changes: 9 additions & 0 deletions client/adapters/InMemoryAdapter.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Callback, InitOptions } from "./types";
import { EventAdapter } from "./Adapter";
export declare class InMemoryAdapter implements EventAdapter {
private socket;
init(options: InitOptions): Promise<void>;
publish<T = any>(...args: [...string[], T]): Promise<void>;
subscribe<T = any>(type: string, callback: Callback<T>): Promise<() => void>;
cleanup(): Promise<void>;
}
Loading