Skip to content

Commit caee18f

Browse files
Kafka Adapter (#8)
* Add Kafka support to event system Extended the event module to support Kafka as an alternative to socket-based event handling. Added initialization, publish, and subscribe logic for Kafka, allowing events to be sent and received via Kafka topics. Refactored type definitions and updated method signatures to support both socket and Kafka backends. * Add Kafka support to event system Refactored event client to support both in-memory (socket.io) and Kafka backends. Added type-safe initialization options, async publish/subscribe methods, and error handling for Kafka. Introduced docker-compose for local Kafka setup and provided a Kafka usage example. Removed Jest config and legacy tests to focus on integration-based workflows. * Refactor event system for improved Kafka resource management Refactored the event system to maintain persistent Kafka producer and per-topic consumers, improving connection reuse and error handling. Added an async cleanup method to gracefully disconnect all resources, and process signal handlers for shutdown. Updated API to make init, publish, subscribe, and cleanup methods asynchronous for better reliability. * Refactor event system to use adapter pattern Reorganized event handling logic by introducing EventAdapter interface and splitting in-memory and Kafka implementations into separate adapter classes. This improves modularity and maintainability, allowing easier extension for new event backends. The Event.ts file now delegates to the appropriate adapter based on initialization options. * Refactor event system to use adapter pattern Moved event adapter interfaces and implementations into a new 'adapters' directory. The event system now delegates to either InMemoryAdapter or KafkaAdapter via a unified EventAdapter interface, improving modularity and separation of concerns. Updated type imports and removed socket and kafka logic from Event.js, delegating all transport-specific logic to adapters. * Fix build script formatting in package.json * Delete package-lock.json * Create package-lock.json * Add test script using Jest Introduced a 'test' script to package.json that runs Jest on the src directory for easier testing. * Update package.json --------- Co-authored-by: Can Mingir <[email protected]>
1 parent dada2f8 commit caee18f

22 files changed

+1048
-4224
lines changed

client/Event.d.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
1-
interface InitOptions {
2-
host: string;
3-
port?: number;
4-
protocol: string;
5-
}
6-
type Callback<T = any> = (payload: T) => void;
1+
import { Callback, InitOptions } from "./adapters/types";
72
declare const event: {
8-
init({ host, port, protocol }: InitOptions): void;
9-
subscribe<T = any>(type: string, callback: Callback<T>): () => void;
10-
publish<T = any>(...args: [...string[], T]): void;
3+
init(options: InitOptions): Promise<void>;
4+
publish<T = any>(...args: [...string[], T]): Promise<void>;
5+
subscribe<T = any>(type: string, callback: Callback<T>): Promise<() => void>;
6+
cleanup(): Promise<void>;
117
};
128
export { event };

client/Event.js

Lines changed: 120 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,135 @@
11
"use strict";
2+
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
3+
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
4+
return new (P || (P = Promise))(function (resolve, reject) {
5+
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
6+
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
7+
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
8+
step((generator = generator.apply(thisArg, _arguments || [])).next());
9+
});
10+
};
11+
var __generator = (this && this.__generator) || function (thisArg, body) {
12+
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g = Object.create((typeof Iterator === "function" ? Iterator : Object).prototype);
13+
return g.next = verb(0), g["throw"] = verb(1), g["return"] = verb(2), typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
14+
function verb(n) { return function (v) { return step([n, v]); }; }
15+
function step(op) {
16+
if (f) throw new TypeError("Generator is already executing.");
17+
while (g && (g = 0, op[0] && (_ = 0)), _) try {
18+
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
19+
if (y = 0, t) op = [op[0] & 2, t.value];
20+
switch (op[0]) {
21+
case 0: case 1: t = op; break;
22+
case 4: _.label++; return { value: op[1], done: false };
23+
case 5: _.label++; y = op[1]; op = [0]; continue;
24+
case 7: op = _.ops.pop(); _.trys.pop(); continue;
25+
default:
26+
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
27+
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
28+
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
29+
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
30+
if (t[2]) _.ops.pop();
31+
_.trys.pop(); continue;
32+
}
33+
op = body.call(thisArg, _);
34+
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
35+
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
36+
}
37+
};
238
Object.defineProperty(exports, "__esModule", { value: true });
339
exports.event = void 0;
4-
var socket_io_client_1 = require("socket.io-client");
5-
var socket = null;
6-
var callbacks = {};
40+
var InMemoryAdapter_1 = require("./adapters/InMemoryAdapter");
41+
var KafkaAdapter_1 = require("./adapters/KafkaAdapter");
742
var event = {
8-
init: function (_a) {
9-
var host = _a.host, port = _a.port, protocol = _a.protocol;
10-
if (socket)
11-
return;
12-
var socketPath = port ? "".concat(protocol, "://").concat(host, ":").concat(port) : "".concat(protocol, "://").concat(host);
13-
socket = (0, socket_io_client_1.io)(socketPath);
14-
socket.on("event", function (_a) {
15-
var type = _a.type, payload = _a.payload;
16-
if (callbacks[type]) {
17-
callbacks[type].forEach(function (cb) { return cb(payload); });
18-
}
43+
init: function (options) {
44+
return __awaiter(this, void 0, void 0, function () {
45+
var adapter;
46+
return __generator(this, function (_a) {
47+
switch (_a.label) {
48+
case 0:
49+
adapter = options.type === "inMemory" ? new InMemoryAdapter_1.InMemoryAdapter() : new KafkaAdapter_1.KafkaAdapter();
50+
this._adapter = adapter;
51+
return [4 /*yield*/, adapter.init(options)];
52+
case 1:
53+
_a.sent();
54+
return [2 /*return*/];
55+
}
56+
});
1957
});
2058
},
21-
subscribe: function (type, callback) {
22-
if (!socket)
23-
throw new Error("Event not initialized. Call event.init first.");
24-
if (!callbacks[type])
25-
callbacks[type] = new Set();
26-
callbacks[type].add(callback);
27-
socket.emit("subscribe", type);
28-
return function () {
29-
callbacks[type].delete(callback);
30-
if (callbacks[type].size === 0) {
31-
delete callbacks[type];
32-
socket.emit("unsubscribe", type);
33-
}
34-
};
35-
},
3659
publish: function () {
3760
var args = [];
3861
for (var _i = 0; _i < arguments.length; _i++) {
3962
args[_i] = arguments[_i];
4063
}
41-
if (!socket)
42-
throw new Error("Event not initialized. Call event.init first.");
43-
if (args.length < 2) {
44-
throw new Error("publish requires at least one event type and a payload");
45-
}
46-
var payload = args[args.length - 1];
47-
var types = args.slice(0, -1);
48-
// Publish to all specified event types
49-
types.forEach(function (type) {
50-
socket.emit("publish", { type: type, payload: payload });
64+
return __awaiter(this, void 0, void 0, function () {
65+
var adapter;
66+
return __generator(this, function (_a) {
67+
switch (_a.label) {
68+
case 0:
69+
adapter = this._adapter;
70+
if (!adapter)
71+
throw new Error("Event not initialized");
72+
return [4 /*yield*/, adapter.publish.apply(adapter, args)];
73+
case 1:
74+
_a.sent();
75+
return [2 /*return*/];
76+
}
77+
});
78+
});
79+
},
80+
subscribe: function (type, callback) {
81+
return __awaiter(this, void 0, void 0, function () {
82+
var adapter;
83+
return __generator(this, function (_a) {
84+
adapter = this._adapter;
85+
if (!adapter)
86+
throw new Error("Event not initialized");
87+
return [2 /*return*/, adapter.subscribe(type, callback)];
88+
});
89+
});
90+
},
91+
cleanup: function () {
92+
return __awaiter(this, void 0, void 0, function () {
93+
var adapter;
94+
return __generator(this, function (_a) {
95+
switch (_a.label) {
96+
case 0:
97+
adapter = this._adapter;
98+
if (!adapter)
99+
return [2 /*return*/];
100+
return [4 /*yield*/, adapter.cleanup()];
101+
case 1:
102+
_a.sent();
103+
return [2 /*return*/];
104+
}
105+
});
51106
});
52107
},
53108
};
54109
exports.event = event;
110+
process.on("SIGINT", function () { return __awaiter(void 0, void 0, void 0, function () {
111+
return __generator(this, function (_a) {
112+
switch (_a.label) {
113+
case 0:
114+
console.log("Shutting down gracefully...");
115+
return [4 /*yield*/, event.cleanup()];
116+
case 1:
117+
_a.sent();
118+
process.exit(0);
119+
return [2 /*return*/];
120+
}
121+
});
122+
}); });
123+
process.on("SIGTERM", function () { return __awaiter(void 0, void 0, void 0, function () {
124+
return __generator(this, function (_a) {
125+
switch (_a.label) {
126+
case 0:
127+
console.log("Shutting down gracefully...");
128+
return [4 /*yield*/, event.cleanup()];
129+
case 1:
130+
_a.sent();
131+
process.exit(0);
132+
return [2 /*return*/];
133+
}
134+
});
135+
}); });

client/Event.ts

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,49 @@
1-
import { Socket, io } from "socket.io-client";
1+
import { Callback, InitOptions } from "./adapters/types";
22

3-
let socket: Socket | null = null;
4-
const callbacks: Record<string, Set<Callback>> = {};
5-
6-
interface InitOptions {
7-
host: string;
8-
port?: number;
9-
protocol: string;
10-
}
11-
type Callback<T = any> = (payload: T) => void;
3+
import { EventAdapter } from "./adapters/Adapter";
4+
import { InMemoryAdapter } from "./adapters/InMemoryAdapter";
5+
import { KafkaAdapter } from "./adapters/KafkaAdapter";
126

137
const event = {
14-
init({ host, port, protocol }: InitOptions) {
15-
if (socket) return;
16-
const socketPath = port ? `${protocol}://${host}:${port}` : `${protocol}://${host}`;
17-
socket = io(socketPath);
18-
socket.on("event", ({ type, payload }: { type: string; payload: any }) => {
19-
if (callbacks[type]) {
20-
callbacks[type].forEach((cb) => cb(payload));
21-
}
22-
});
8+
async init(options: InitOptions) {
9+
const adapter: EventAdapter =
10+
options.type === "inMemory" ? new InMemoryAdapter() : new KafkaAdapter();
11+
(this as any)._adapter = adapter;
12+
await adapter.init(options);
13+
},
14+
15+
async publish<T = any>(...args: [...string[], T]): Promise<void> {
16+
const adapter: EventAdapter | undefined = (this as any)._adapter;
17+
if (!adapter) throw new Error("Event not initialized");
18+
await adapter.publish(...args);
2319
},
2420

25-
subscribe<T = any>(type: string, callback: Callback<T>): () => void {
26-
if (!socket)
27-
throw new Error("Event not initialized. Call event.init first.");
28-
if (!callbacks[type]) callbacks[type] = new Set();
29-
callbacks[type].add(callback as Callback);
30-
socket!.emit("subscribe", type);
31-
return () => {
32-
callbacks[type].delete(callback as Callback);
33-
if (callbacks[type].size === 0) {
34-
delete callbacks[type];
35-
socket!.emit("unsubscribe", type);
36-
}
37-
};
21+
async subscribe<T = any>(
22+
type: string,
23+
callback: Callback<T>
24+
): Promise<() => void> {
25+
const adapter: EventAdapter | undefined = (this as any)._adapter;
26+
if (!adapter) throw new Error("Event not initialized");
27+
return adapter.subscribe(type, callback as any);
3828
},
3929

40-
publish<T = any>(...args: [...string[], T]): void {
41-
if (!socket)
42-
throw new Error("Event not initialized. Call event.init first.");
43-
44-
if (args.length < 2) {
45-
throw new Error("publish requires at least one event type and a payload");
46-
}
47-
48-
const payload = args[args.length - 1];
49-
const types = args.slice(0, -1) as string[];
50-
51-
// Publish to all specified event types
52-
types.forEach(type => {
53-
socket!.emit("publish", { type, payload });
54-
});
30+
async cleanup() {
31+
const adapter: EventAdapter | undefined = (this as any)._adapter;
32+
if (!adapter) return;
33+
await adapter.cleanup();
5534
},
5635
};
5736

37+
process.on("SIGINT", async () => {
38+
console.log("Shutting down gracefully...");
39+
await event.cleanup();
40+
process.exit(0);
41+
});
42+
43+
process.on("SIGTERM", async () => {
44+
console.log("Shutting down gracefully...");
45+
await event.cleanup();
46+
process.exit(0);
47+
});
48+
5849
export { event };

client/adapters/Adapter.d.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { Callback, InitOptions } from "./types";
2+
export interface EventAdapter {
3+
init(options: InitOptions): Promise<void>;
4+
publish<T = any>(...args: [...string[], T]): Promise<void>;
5+
subscribe<T = any>(type: string, callback: Callback<T>): Promise<() => void>;
6+
cleanup(): Promise<void>;
7+
}

client/adapters/Adapter.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
"use strict";
2+
Object.defineProperty(exports, "__esModule", { value: true });

client/adapters/Adapter.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { Callback, InitOptions } from "./types";
2+
3+
export interface EventAdapter {
4+
init(options: InitOptions): Promise<void>;
5+
publish<T = any>(...args: [...string[], T]): Promise<void>;
6+
subscribe<T = any>(type: string, callback: Callback<T>): Promise<() => void>;
7+
cleanup(): Promise<void>;
8+
}
9+
10+
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { Callback, InitOptions } from "./types";
2+
import { EventAdapter } from "./Adapter";
3+
export declare class InMemoryAdapter implements EventAdapter {
4+
private socket;
5+
init(options: InitOptions): Promise<void>;
6+
publish<T = any>(...args: [...string[], T]): Promise<void>;
7+
subscribe<T = any>(type: string, callback: Callback<T>): Promise<() => void>;
8+
cleanup(): Promise<void>;
9+
}

0 commit comments

Comments
 (0)