diff --git a/client/Event.d.ts b/client/Event.d.ts deleted file mode 100644 index cace05c..0000000 --- a/client/Event.d.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Callback, InitOptions } from "./adapters/types"; -declare const event: { - init(options: InitOptions): Promise; - publish(...args: [...string[], T]): Promise; - subscribe(type: string, callback: Callback): Promise<() => void>; - cleanup(): Promise; -}; -export { event }; diff --git a/client/Event.js b/client/Event.js deleted file mode 100644 index 286545b..0000000 --- a/client/Event.js +++ /dev/null @@ -1,135 +0,0 @@ -"use strict"; -var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { - function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } - return new (P || (P = Promise))(function (resolve, reject) { - function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } - function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } - function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } - step((generator = generator.apply(thisArg, _arguments || [])).next()); - }); -}; -var __generator = (this && this.__generator) || function (thisArg, body) { - var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g = Object.create((typeof Iterator === "function" ? Iterator : Object).prototype); - return g.next = verb(0), g["throw"] = verb(1), g["return"] = verb(2), typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; - function verb(n) { return function (v) { return step([n, v]); }; } - function step(op) { - if (f) throw new TypeError("Generator is already executing."); - while (g && (g = 0, op[0] && (_ = 0)), _) try { - if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; - if (y = 0, t) op = [op[0] & 2, t.value]; - switch (op[0]) { - case 0: case 1: t = op; break; - case 4: _.label++; return { value: op[1], done: false }; - case 5: _.label++; y = op[1]; op = [0]; continue; - case 7: op = _.ops.pop(); _.trys.pop(); continue; - default: - if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } - if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } - if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } - if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } - if (t[2]) _.ops.pop(); - _.trys.pop(); continue; - } - op = body.call(thisArg, _); - } catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } - if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; - } -}; -Object.defineProperty(exports, "__esModule", { value: true }); -exports.event = void 0; -var InMemoryAdapter_1 = require("./adapters/InMemoryAdapter"); -var KafkaAdapter_1 = require("./adapters/KafkaAdapter"); -var event = { - init: function (options) { - return __awaiter(this, void 0, void 0, function () { - var adapter; - return __generator(this, function (_a) { - switch (_a.label) { - case 0: - adapter = options.type === "inMemory" ? new InMemoryAdapter_1.InMemoryAdapter() : new KafkaAdapter_1.KafkaAdapter(); - this._adapter = adapter; - return [4 /*yield*/, adapter.init(options)]; - case 1: - _a.sent(); - return [2 /*return*/]; - } - }); - }); - }, - publish: function () { - var args = []; - for (var _i = 0; _i < arguments.length; _i++) { - args[_i] = arguments[_i]; - } - return __awaiter(this, void 0, void 0, function () { - var adapter; - return __generator(this, function (_a) { - switch (_a.label) { - case 0: - adapter = this._adapter; - if (!adapter) - throw new Error("Event not initialized"); - return [4 /*yield*/, adapter.publish.apply(adapter, args)]; - case 1: - _a.sent(); - return [2 /*return*/]; - } - }); - }); - }, - subscribe: function (type, callback) { - return __awaiter(this, void 0, void 0, function () { - var adapter; - return __generator(this, function (_a) { - adapter = this._adapter; - if (!adapter) - throw new Error("Event not initialized"); - return [2 /*return*/, adapter.subscribe(type, callback)]; - }); - }); - }, - cleanup: function () { - return __awaiter(this, void 0, void 0, function () { - var adapter; - return __generator(this, function (_a) { - switch (_a.label) { - case 0: - adapter = this._adapter; - if (!adapter) - return [2 /*return*/]; - return [4 /*yield*/, adapter.cleanup()]; - case 1: - _a.sent(); - return [2 /*return*/]; - } - }); - }); - }, -}; -exports.event = event; -process.on("SIGINT", function () { return __awaiter(void 0, void 0, void 0, function () { - return __generator(this, function (_a) { - switch (_a.label) { - case 0: - console.log("Shutting down gracefully..."); - return [4 /*yield*/, event.cleanup()]; - case 1: - _a.sent(); - process.exit(0); - return [2 /*return*/]; - } - }); -}); }); -process.on("SIGTERM", function () { return __awaiter(void 0, void 0, void 0, function () { - return __generator(this, function (_a) { - switch (_a.label) { - case 0: - console.log("Shutting down gracefully..."); - return [4 /*yield*/, event.cleanup()]; - case 1: - _a.sent(); - process.exit(0); - return [2 /*return*/]; - } - }); -}); }); diff --git a/client/Event.ts b/client/Event.ts deleted file mode 100644 index 569c82b..0000000 --- a/client/Event.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { Callback, InitOptions } from "./adapters/types"; - -import { EventAdapter } from "./adapters/Adapter"; -import { InMemoryAdapter } from "./adapters/InMemoryAdapter"; -import { KafkaAdapter } from "./adapters/KafkaAdapter"; - -const event = { - async init(options: InitOptions) { - const adapter: EventAdapter = - options.type === "inMemory" ? new InMemoryAdapter() : new KafkaAdapter(); - (this as any)._adapter = adapter; - await adapter.init(options); - }, - - async publish(...args: [...string[], T]): Promise { - const adapter: EventAdapter | undefined = (this as any)._adapter; - if (!adapter) throw new Error("Event not initialized"); - await adapter.publish(...args); - }, - - async subscribe( - type: string, - callback: Callback - ): Promise<() => void> { - const adapter: EventAdapter | undefined = (this as any)._adapter; - if (!adapter) throw new Error("Event not initialized"); - return adapter.subscribe(type, callback as any); - }, - - async cleanup() { - const adapter: EventAdapter | undefined = (this as any)._adapter; - if (!adapter) return; - await adapter.cleanup(); - }, -}; - -process.on("SIGINT", async () => { - console.log("Shutting down gracefully..."); - await event.cleanup(); - process.exit(0); -}); - -process.on("SIGTERM", async () => { - console.log("Shutting down gracefully..."); - await event.cleanup(); - process.exit(0); -}); - -export { event }; diff --git a/client/adapters/Adapter.d.ts b/client/adapters/Adapter.d.ts deleted file mode 100644 index 3abf3dd..0000000 --- a/client/adapters/Adapter.d.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { Callback, InitOptions } from "./types"; -export interface EventAdapter { - init(options: InitOptions): Promise; - publish(...args: [...string[], T]): Promise; - subscribe(type: string, callback: Callback): Promise<() => void>; - cleanup(): Promise; -} diff --git a/client/adapters/Adapter.js b/client/adapters/Adapter.js deleted file mode 100644 index c8ad2e5..0000000 --- a/client/adapters/Adapter.js +++ /dev/null @@ -1,2 +0,0 @@ -"use strict"; -Object.defineProperty(exports, "__esModule", { value: true }); diff --git a/client/adapters/Adapter.ts b/client/adapters/Adapter.ts deleted file mode 100644 index e4bf098..0000000 --- a/client/adapters/Adapter.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { Callback, InitOptions } from "./types"; - -export interface EventAdapter { - init(options: InitOptions): Promise; - publish(...args: [...string[], T]): Promise; - subscribe(type: string, callback: Callback): Promise<() => void>; - cleanup(): Promise; -} - - diff --git a/client/adapters/InMemoryAdapter.d.ts b/client/adapters/InMemoryAdapter.d.ts deleted file mode 100644 index 5cadd1d..0000000 --- a/client/adapters/InMemoryAdapter.d.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { Callback, InitOptions } from "./types"; -import { EventAdapter } from "./Adapter"; -export declare class InMemoryAdapter implements EventAdapter { - private socket; - init(options: InitOptions): Promise; - publish(...args: [...string[], T]): Promise; - subscribe(type: string, callback: Callback): Promise<() => void>; - cleanup(): Promise; -} diff --git a/client/adapters/InMemoryAdapter.js b/client/adapters/InMemoryAdapter.js deleted file mode 100644 index b54f942..0000000 --- a/client/adapters/InMemoryAdapter.js +++ /dev/null @@ -1,128 +0,0 @@ -"use strict"; -var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { - function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } - return new (P || (P = Promise))(function (resolve, reject) { - function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } - function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } - function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } - step((generator = generator.apply(thisArg, _arguments || [])).next()); - }); -}; -var __generator = (this && this.__generator) || function (thisArg, body) { - var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g = Object.create((typeof Iterator === "function" ? Iterator : Object).prototype); - return g.next = verb(0), g["throw"] = verb(1), g["return"] = verb(2), typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; - function verb(n) { return function (v) { return step([n, v]); }; } - function step(op) { - if (f) throw new TypeError("Generator is already executing."); - while (g && (g = 0, op[0] && (_ = 0)), _) try { - if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; - if (y = 0, t) op = [op[0] & 2, t.value]; - switch (op[0]) { - case 0: case 1: t = op; break; - case 4: _.label++; return { value: op[1], done: false }; - case 5: _.label++; y = op[1]; op = [0]; continue; - case 7: op = _.ops.pop(); _.trys.pop(); continue; - default: - if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } - if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } - if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } - if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } - if (t[2]) _.ops.pop(); - _.trys.pop(); continue; - } - op = body.call(thisArg, _); - } catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } - if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; - } -}; -Object.defineProperty(exports, "__esModule", { value: true }); -exports.InMemoryAdapter = void 0; -var socket_io_client_1 = require("socket.io-client"); -var callbacks = {}; -var InMemoryAdapter = /** @class */ (function () { - function InMemoryAdapter() { - this.socket = null; - } - InMemoryAdapter.prototype.init = function (options) { - return __awaiter(this, void 0, void 0, function () { - var opts, host, protocol, socketPath; - return __generator(this, function (_a) { - opts = options; - if (!opts.host) { - throw new Error("host is required for inMemory initialization"); - } - if (!opts.protocol) { - throw new Error("protocol is required for inMemory initialization"); - } - host = opts.host, protocol = opts.protocol; - socketPath = (opts === null || opts === void 0 ? void 0 : opts.port) ? "".concat(protocol, "://").concat(host, ":").concat(opts.port) : "".concat(protocol, "://").concat(host); - this.socket = (0, socket_io_client_1.io)(socketPath); - this.socket.on("event", function (_a) { - var type = _a.type, payload = _a.payload; - if (callbacks[type]) { - callbacks[type].forEach(function (cb) { return cb(payload); }); - } - }); - return [2 /*return*/]; - }); - }); - }; - InMemoryAdapter.prototype.publish = function () { - var args = []; - for (var _i = 0; _i < arguments.length; _i++) { - args[_i] = arguments[_i]; - } - return __awaiter(this, void 0, void 0, function () { - var payload, types; - var _this = this; - return __generator(this, function (_a) { - if (!this.socket) - return [2 /*return*/]; - payload = args[args.length - 1]; - types = args.slice(0, -1); - types.forEach(function (type) { - _this.socket.emit("publish", { type: type, payload: payload }); - }); - return [2 /*return*/]; - }); - }); - }; - InMemoryAdapter.prototype.subscribe = function (type, callback) { - return __awaiter(this, void 0, void 0, function () { - var _this = this; - return __generator(this, function (_a) { - if (!callbacks[type]) - callbacks[type] = new Set(); - callbacks[type].add(callback); - if (this.socket) { - this.socket.emit("subscribe", type); - } - return [2 /*return*/, function () { return __awaiter(_this, void 0, void 0, function () { - return __generator(this, function (_a) { - callbacks[type].delete(callback); - if (callbacks[type].size === 0) { - delete callbacks[type]; - if (this.socket) { - this.socket.emit("unsubscribe", type); - } - } - return [2 /*return*/]; - }); - }); }]; - }); - }); - }; - InMemoryAdapter.prototype.cleanup = function () { - return __awaiter(this, void 0, void 0, function () { - return __generator(this, function (_a) { - if (this.socket) { - this.socket.disconnect(); - this.socket = null; - } - return [2 /*return*/]; - }); - }); - }; - return InMemoryAdapter; -}()); -exports.InMemoryAdapter = InMemoryAdapter; diff --git a/client/adapters/InMemoryAdapter.ts b/client/adapters/InMemoryAdapter.ts deleted file mode 100644 index 01c2ccd..0000000 --- a/client/adapters/InMemoryAdapter.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { Callback, InMemoryOptions, InitOptions } from "./types"; -import { Socket, io } from "socket.io-client"; - -import { EventAdapter } from "./Adapter"; - -const callbacks: Record> = {}; - -export class InMemoryAdapter implements EventAdapter { - private socket: Socket | null = null; - - async init(options: InitOptions): Promise { - const opts = options as InMemoryOptions; - if (!opts.host) { - throw new Error("host is required for inMemory initialization"); - } - if (!opts.protocol) { - throw new Error("protocol is required for inMemory initialization"); - } - - const { host, protocol } = opts; - const socketPath = opts?.port ? `${protocol}://${host}:${opts.port}` : `${protocol}://${host}`; - - this.socket = io(socketPath); - this.socket.on("event", ({ type, payload }: { type: string; payload: any }) => { - if (callbacks[type]) { - callbacks[type].forEach((cb) => cb(payload)); - } - }); - } - - async publish(...args: [...string[], T]): Promise { - if (!this.socket) return; - const payload = args[args.length - 1]; - const types = args.slice(0, -1) as string[]; - types.forEach((type) => { - this.socket!.emit("publish", { type, payload }); - }); - } - - async subscribe(type: string, callback: Callback): Promise<() => void> { - if (!callbacks[type]) callbacks[type] = new Set(); - callbacks[type].add(callback as Callback); - if (this.socket) { - this.socket.emit("subscribe", type); - } - return async () => { - callbacks[type].delete(callback as Callback); - if (callbacks[type].size === 0) { - delete callbacks[type]; - if (this.socket) { - this.socket.emit("unsubscribe", type); - } - } - }; - } - - async cleanup(): Promise { - if (this.socket) { - this.socket.disconnect(); - this.socket = null; - } - } -} - - diff --git a/client/adapters/KafkaAdapter.d.ts b/client/adapters/KafkaAdapter.d.ts index 43b86b4..df0e582 100644 --- a/client/adapters/KafkaAdapter.d.ts +++ b/client/adapters/KafkaAdapter.d.ts @@ -1,12 +1,21 @@ -import { Callback, InitOptions } from "./types"; -import { EventAdapter } from "./Adapter"; +import { EventAdapter } from "../types/types"; export declare class KafkaAdapter implements EventAdapter { + private readonly options; private kafka; - private kafkaProducer; - private kafkaConsumers; - private kafkaGroupId; - init(options: InitOptions): Promise; - publish(...args: [...string[], T]): Promise; - subscribe(type: string, callback: Callback): Promise<() => void>; - cleanup(): Promise; + private consumer; + private producer; + private messageHandler?; + constructor(options: { + clientId: string; + brokers: string[]; + groupId: string; + topics: string[]; + }); + connect(): Promise; + disconnect(): Promise; + publish(type: string, payload: T): Promise; + subscribe(type: string): Promise; + unsubscribe(type: string): Promise; + onMessage(handler: (type: string, payload: object) => void): void; + getBacklog(topics: string[]): Promise>; } diff --git a/client/adapters/KafkaAdapter.js b/client/adapters/KafkaAdapter.js index e5ea3bd..ccf8077 100644 --- a/client/adapters/KafkaAdapter.js +++ b/client/adapters/KafkaAdapter.js @@ -1,285 +1,115 @@ "use strict"; -var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { - function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } - return new (P || (P = Promise))(function (resolve, reject) { - function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } - function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } - function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } - step((generator = generator.apply(thisArg, _arguments || [])).next()); - }); -}; -var __generator = (this && this.__generator) || function (thisArg, body) { - var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g = Object.create((typeof Iterator === "function" ? Iterator : Object).prototype); - return g.next = verb(0), g["throw"] = verb(1), g["return"] = verb(2), typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; - function verb(n) { return function (v) { return step([n, v]); }; } - function step(op) { - if (f) throw new TypeError("Generator is already executing."); - while (g && (g = 0, op[0] && (_ = 0)), _) try { - if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; - if (y = 0, t) op = [op[0] & 2, t.value]; - switch (op[0]) { - case 0: case 1: t = op; break; - case 4: _.label++; return { value: op[1], done: false }; - case 5: _.label++; y = op[1]; op = [0]; continue; - case 7: op = _.ops.pop(); _.trys.pop(); continue; - default: - if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } - if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } - if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } - if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } - if (t[2]) _.ops.pop(); - _.trys.pop(); continue; - } - op = body.call(thisArg, _); - } catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } - if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; - } -}; Object.defineProperty(exports, "__esModule", { value: true }); exports.KafkaAdapter = void 0; -var kafkajs_1 = require("kafkajs"); -var callbacks = {}; -var KafkaAdapter = /** @class */ (function () { - function KafkaAdapter() { - this.kafka = null; - this.kafkaProducer = null; - this.kafkaConsumers = new Map(); - this.kafkaGroupId = null; +const kafkajs_1 = require("kafkajs"); +class KafkaAdapter { + options; + kafka; + consumer = null; + producer = null; + messageHandler; + constructor(options) { + this.options = options; + this.kafka = new kafkajs_1.Kafka({ + clientId: options.clientId, + brokers: options.brokers, + }); } - KafkaAdapter.prototype.init = function (options) { - return __awaiter(this, void 0, void 0, function () { - var opts; - return __generator(this, function (_a) { - switch (_a.label) { - case 0: - opts = options; - if (!opts.clientId) { - throw new Error("clientId is required for Kafka initialization"); - } - if (!opts.brokers || !Array.isArray(opts.brokers) || opts.brokers.length === 0) { - throw new Error("brokers array is required for Kafka initialization"); - } - if (!opts.groupId) { - throw new Error("groupId is required for Kafka initialization"); - } - this.kafka = new kafkajs_1.Kafka({ - clientId: opts.clientId, - brokers: opts.brokers, - retry: { - initialRetryTime: 100, - retries: 8, - multiplier: 2, - maxRetryTime: 30000, - }, - connectionTimeout: 10000, - requestTimeout: 30000, - }); - this.kafkaGroupId = opts.groupId; - this.kafkaProducer = this.kafka.producer({ - allowAutoTopicCreation: true, - transactionTimeout: 30000, - }); - return [4 /*yield*/, this.kafkaProducer.connect()]; - case 1: - _a.sent(); - this.kafkaProducer.on("producer.disconnect", function () { - console.error("Producer disconnected"); - }); - return [2 /*return*/]; + async connect() { + this.producer = this.kafka.producer(); + await this.producer.connect(); + this.consumer = this.kafka.consumer({ groupId: this.options.groupId }); + await this.consumer.connect(); + await this.consumer.subscribe({ + topics: [/^(?!__).*$/], + fromBeginning: false, + }); + await this.consumer.run({ + partitionsConsumedConcurrently: 160, + eachMessage: async ({ topic, message }) => { + if (topic.startsWith("__")) { + return; } - }); + if (this.messageHandler) { + try { + const payload = JSON.parse(message.value?.toString() || "{}"); + this.messageHandler(topic, payload); + } + catch (error) { + console.error(`Error processing message for topic ${topic}:`, error); + } + } + }, }); - }; - KafkaAdapter.prototype.publish = function () { - var args = []; - for (var _i = 0; _i < arguments.length; _i++) { - args[_i] = arguments[_i]; + console.log(`Kafka consumer connected`); + } + async disconnect() { + if (this.consumer) { + await this.consumer.stop(); + await this.consumer.disconnect(); + this.consumer = null; } - return __awaiter(this, void 0, void 0, function () { - var payload, types, messages, error_1, reconnectError_1; - var _this = this; - var _a; - return __generator(this, function (_b) { - switch (_b.label) { - case 0: - if (!this.kafka || !this.kafkaProducer) - return [2 /*return*/]; - payload = args[args.length - 1]; - types = args.slice(0, -1); - _b.label = 1; - case 1: - _b.trys.push([1, 3, , 11]); - messages = types.map(function (type) { return ({ - topic: type, - messages: [ - { - value: JSON.stringify(payload), - timestamp: Date.now().toString(), - }, - ], - }); }); - return [4 /*yield*/, Promise.all(messages.map(function (msg) { return _this.kafkaProducer.send(msg); }))]; - case 2: - _b.sent(); - return [3 /*break*/, 11]; - case 3: - error_1 = _b.sent(); - console.error("Failed to publish to Kafka:", error_1); - if (!((_a = error_1.message) === null || _a === void 0 ? void 0 : _a.includes("disconnected"))) return [3 /*break*/, 9]; - _b.label = 4; - case 4: - _b.trys.push([4, 7, , 8]); - return [4 /*yield*/, this.kafkaProducer.connect()]; - case 5: - _b.sent(); - return [4 /*yield*/, this.publish.apply(this, args)]; - case 6: - _b.sent(); - return [3 /*break*/, 8]; - case 7: - reconnectError_1 = _b.sent(); - throw new Error("Failed to reconnect producer: ".concat(reconnectError_1.message)); - case 8: return [3 /*break*/, 10]; - case 9: throw error_1; - case 10: return [3 /*break*/, 11]; - case 11: return [2 /*return*/]; - } - }); + if (this.producer) { + await this.producer.disconnect(); + this.producer = null; + } + } + async publish(type, payload) { + if (!this.producer) { + throw new Error("Producer not connected"); + } + this.producer.send({ + topic: type, + messages: [{ value: JSON.stringify(payload) }], + }).then(() => { + console.log(`Message published to topic ${type}`); + }).catch((error) => { + console.error(`Error publishing message to topic ${type}:`, error); + return Promise.reject(error); }); - }; - KafkaAdapter.prototype.subscribe = function (type, callback) { - return __awaiter(this, void 0, void 0, function () { - var consumer; - var _this = this; - return __generator(this, function (_a) { - switch (_a.label) { - case 0: - if (!callbacks[type]) - callbacks[type] = new Set(); - callbacks[type].add(callback); - if (!this.kafka) { - return [2 /*return*/, function () { return __awaiter(_this, void 0, void 0, function () { - return __generator(this, function (_a) { - callbacks[type].delete(callback); - if (callbacks[type].size === 0) - delete callbacks[type]; - return [2 /*return*/]; - }); - }); }]; + } + async subscribe(type) { + // No-op: EventManager handles callback registration in memory + } + async unsubscribe(type) { + // No-op: EventManager handles callback removal in memory + } + onMessage(handler) { + this.messageHandler = handler; + } + async getBacklog(topics) { + const backlogMap = new Map(); + if (topics.length === 0) { + return backlogMap; + } + const admin = this.kafka.admin(); + await admin.connect(); + try { + for (const topic of topics) { + const offsetsResponse = await admin.fetchOffsets({ + groupId: this.options.groupId, + topics: [topic], + }); + const topicOffsets = await admin.fetchTopicOffsets(topic); + let totalLag = 0; + const topicResponse = offsetsResponse.find((r) => r.topic === topic); + if (topicResponse) { + topicResponse.partitions.forEach((partitionOffset) => { + const latestOffset = topicOffsets.find((to) => to.partition === partitionOffset.partition); + if (latestOffset) { + const consumerOffset = parseInt(partitionOffset.offset); + const latestOffsetValue = parseInt(latestOffset.offset); + totalLag += Math.max(0, latestOffsetValue - consumerOffset); } - if (!!this.kafkaConsumers.has(type)) return [3 /*break*/, 4]; - consumer = this.kafka.consumer({ - groupId: "".concat(this.kafkaGroupId, "-").concat(type), - sessionTimeout: 30000, - heartbeatInterval: 3000, - }); - return [4 /*yield*/, consumer.connect()]; - case 1: - _a.sent(); - return [4 /*yield*/, consumer.subscribe({ topic: type, fromBeginning: false })]; - case 2: - _a.sent(); - return [4 /*yield*/, consumer.run({ - autoCommit: true, - eachMessage: function (_a) { return __awaiter(_this, [_a], void 0, function (_b) { - var payload_1; - var _c; - var topic = _b.topic, partition = _b.partition, message = _b.message; - return __generator(this, function (_d) { - if (callbacks[topic]) { - try { - payload_1 = JSON.parse(((_c = message.value) === null || _c === void 0 ? void 0 : _c.toString()) || "{}"); - callbacks[topic].forEach(function (cb) { return cb(payload_1); }); - } - catch (error) { - console.error("Failed to parse message from topic ".concat(topic, ":"), error); - } - } - return [2 /*return*/]; - }); - }); }, - })]; - case 3: - _a.sent(); - consumer.on("consumer.disconnect", function () { - console.error("Consumer for topic ".concat(type, " disconnected")); - _this.kafkaConsumers.delete(type); - }); - this.kafkaConsumers.set(type, consumer); - _a.label = 4; - case 4: return [2 /*return*/, function () { return __awaiter(_this, void 0, void 0, function () { - var consumer; - return __generator(this, function (_a) { - switch (_a.label) { - case 0: - callbacks[type].delete(callback); - if (!(callbacks[type].size === 0)) return [3 /*break*/, 2]; - delete callbacks[type]; - consumer = this.kafkaConsumers.get(type); - if (!consumer) return [3 /*break*/, 2]; - return [4 /*yield*/, consumer.disconnect()]; - case 1: - _a.sent(); - this.kafkaConsumers.delete(type); - _a.label = 2; - case 2: return [2 /*return*/]; - } - }); - }); }]; - } - }); - }); - }; - KafkaAdapter.prototype.cleanup = function () { - return __awaiter(this, void 0, void 0, function () { - var entries, _i, entries_1, _a, topic, consumer, error_2, error_3; - return __generator(this, function (_b) { - switch (_b.label) { - case 0: - entries = Array.from(this.kafkaConsumers.entries()); - _i = 0, entries_1 = entries; - _b.label = 1; - case 1: - if (!(_i < entries_1.length)) return [3 /*break*/, 6]; - _a = entries_1[_i], topic = _a[0], consumer = _a[1]; - _b.label = 2; - case 2: - _b.trys.push([2, 4, , 5]); - return [4 /*yield*/, consumer.disconnect()]; - case 3: - _b.sent(); - return [3 /*break*/, 5]; - case 4: - error_2 = _b.sent(); - console.error("Failed to disconnect consumer for ".concat(topic, ":"), error_2); - return [3 /*break*/, 5]; - case 5: - _i++; - return [3 /*break*/, 1]; - case 6: - this.kafkaConsumers.clear(); - if (!this.kafkaProducer) return [3 /*break*/, 11]; - _b.label = 7; - case 7: - _b.trys.push([7, 9, , 10]); - return [4 /*yield*/, this.kafkaProducer.disconnect()]; - case 8: - _b.sent(); - return [3 /*break*/, 10]; - case 9: - error_3 = _b.sent(); - console.error("Failed to disconnect producer:", error_3); - return [3 /*break*/, 10]; - case 10: - this.kafkaProducer = null; - _b.label = 11; - case 11: - this.kafka = null; - return [2 /*return*/]; + }); } - }); - }); - }; - return KafkaAdapter; -}()); + backlogMap.set(topic, totalLag); + } + } + finally { + await admin.disconnect(); + } + return backlogMap; + } +} exports.KafkaAdapter = KafkaAdapter; diff --git a/client/adapters/KafkaAdapter.ts b/client/adapters/KafkaAdapter.ts index 808d07c..a92a224 100644 --- a/client/adapters/KafkaAdapter.ts +++ b/client/adapters/KafkaAdapter.ts @@ -1,162 +1,140 @@ -import { Callback, InitOptions, KafkaOptions } from "./types"; import { Consumer, Kafka, Producer } from "kafkajs"; -import { EventAdapter } from "./Adapter"; - -const callbacks: Record> = {}; +import { EventAdapter } from "../types/types"; export class KafkaAdapter implements EventAdapter { - private kafka: Kafka | null = null; - private kafkaProducer: Producer | null = null; - private kafkaConsumers: Map = new Map(); - private kafkaGroupId: string | null = null; - - async init(options: InitOptions): Promise { - const opts = options as KafkaOptions; - if (!opts.clientId) { - throw new Error("clientId is required for Kafka initialization"); - } - if (!opts.brokers || !Array.isArray(opts.brokers) || opts.brokers.length === 0) { - throw new Error("brokers array is required for Kafka initialization"); - } - if (!opts.groupId) { - throw new Error("groupId is required for Kafka initialization"); + private kafka: Kafka; + private consumer: Consumer | null = null; + private producer: Producer | null = null; + private messageHandler?: (type: string, payload: object) => void; + + constructor( + private readonly options: { + clientId: string; + brokers: string[]; + groupId: string; + topics: string[]; } - + ) { this.kafka = new Kafka({ - clientId: opts.clientId, - brokers: opts.brokers, - retry: { - initialRetryTime: 100, - retries: 8, - multiplier: 2, - maxRetryTime: 30000, - }, - connectionTimeout: 10000, - requestTimeout: 30000, + clientId: options.clientId, + brokers: options.brokers, }); + } - this.kafkaGroupId = opts.groupId; + async connect(): Promise { + this.producer = this.kafka.producer(); + await this.producer.connect(); + this.consumer = this.kafka.consumer({ groupId: this.options.groupId }); - this.kafkaProducer = this.kafka.producer({ - allowAutoTopicCreation: true, - transactionTimeout: 30000, + await this.consumer.connect(); + await this.consumer.subscribe({ + topics: [/^(?!__).*$/], + fromBeginning: false, }); + await this.consumer.run({ + partitionsConsumedConcurrently: 160, + eachMessage: async ({ topic, message }) => { + if (topic.startsWith("__")) { + return; + } - await this.kafkaProducer.connect(); - this.kafkaProducer.on("producer.disconnect", () => { - console.error("Producer disconnected"); + if (this.messageHandler) { + try { + const payload = JSON.parse(message.value?.toString() || "{}"); + this.messageHandler(topic, payload); + } catch (error) { + console.error( + `Error processing message for topic ${topic}:`, + error + ); + } + } + }, }); + console.log(`Kafka consumer connected`); } - async publish(...args: [...string[], T]): Promise { - if (!this.kafka || !this.kafkaProducer) return; - const payload = args[args.length - 1]; - const types = args.slice(0, -1) as string[]; - try { - const messages = types.map((type) => ({ - topic: type, - messages: [ - { - value: JSON.stringify(payload), - timestamp: Date.now().toString(), - }, - ], - })); - await Promise.all(messages.map((msg) => this.kafkaProducer!.send(msg))); - } catch (error: any) { - console.error("Failed to publish to Kafka:", error); - if (error.message?.includes("disconnected")) { - try { - await this.kafkaProducer.connect(); - await this.publish(...args); - } catch (reconnectError: any) { - throw new Error(`Failed to reconnect producer: ${reconnectError.message}`); - } - } else { - throw error; - } + async disconnect(): Promise { + if (this.consumer) { + await this.consumer.stop(); + await this.consumer.disconnect(); + this.consumer = null; + } + if (this.producer) { + await this.producer.disconnect(); + this.producer = null; } } - async subscribe(type: string, callback: Callback): Promise<() => void> { - if (!callbacks[type]) callbacks[type] = new Set(); - callbacks[type].add(callback as Callback); - - if (!this.kafka) { - return async () => { - callbacks[type].delete(callback as Callback); - if (callbacks[type].size === 0) delete callbacks[type]; - }; + async publish(type: string, payload: T): Promise { + if (!this.producer) { + throw new Error("Producer not connected"); } + this.producer.send({ + topic: type, + messages: [{ value: JSON.stringify(payload) }], + }).then(() => { + console.log(`Message published to topic ${type}`); + }).catch((error) => { + console.error(`Error publishing message to topic ${type}:`, error); + return Promise.reject(error); + }); + } - if (!this.kafkaConsumers.has(type)) { - const consumer = this.kafka.consumer({ - groupId: `${this.kafkaGroupId}-${type}`, - sessionTimeout: 30000, - heartbeatInterval: 3000, - }); - - await consumer.connect(); - await consumer.subscribe({ topic: type, fromBeginning: false }); - await consumer.run({ - autoCommit: true, - eachMessage: async ({ topic, partition, message }) => { - if (callbacks[topic]) { - try { - const payload = JSON.parse(message.value?.toString() || "{}"); - callbacks[topic].forEach((cb) => cb(payload)); - } catch (error) { - console.error(`Failed to parse message from topic ${topic}:`, error); - } - } - }, - }); - - consumer.on("consumer.disconnect", () => { - console.error(`Consumer for topic ${type} disconnected`); - this.kafkaConsumers.delete(type); - }); + async subscribe(type: string): Promise { + // No-op: EventManager handles callback registration in memory + } - this.kafkaConsumers.set(type, consumer); - } + async unsubscribe(type: string): Promise { + // No-op: EventManager handles callback removal in memory + } - return async () => { - callbacks[type].delete(callback as Callback); - if (callbacks[type].size === 0) { - delete callbacks[type]; - const consumer = this.kafkaConsumers.get(type); - if (consumer) { - await consumer.disconnect(); - this.kafkaConsumers.delete(type); - } - } - }; + onMessage(handler: (type: string, payload: object) => void): void { + this.messageHandler = handler; } - async cleanup(): Promise { - const entries = Array.from(this.kafkaConsumers.entries()); - for (const [topic, consumer] of entries) { - try { - await consumer.disconnect(); - } catch (error) { - console.error(`Failed to disconnect consumer for ${topic}:`, error); - } + async getBacklog(topics: string[]): Promise> { + const backlogMap = new Map(); + + if (topics.length === 0) { + return backlogMap; } - this.kafkaConsumers.clear(); - if (this.kafkaProducer) { - try { - await this.kafkaProducer.disconnect(); - } catch (error) { - console.error("Failed to disconnect producer:", error); + const admin = this.kafka.admin(); + await admin.connect(); + + try { + for (const topic of topics) { + const offsetsResponse = await admin.fetchOffsets({ + groupId: this.options.groupId, + topics: [topic], + }); + + const topicOffsets = await admin.fetchTopicOffsets(topic); + let totalLag = 0; + + const topicResponse = offsetsResponse.find((r) => r.topic === topic); + if (topicResponse) { + topicResponse.partitions.forEach((partitionOffset) => { + const latestOffset = topicOffsets.find( + (to) => to.partition === partitionOffset.partition + ); + + if (latestOffset) { + const consumerOffset = parseInt(partitionOffset.offset); + const latestOffsetValue = parseInt(latestOffset.offset); + totalLag += Math.max(0, latestOffsetValue - consumerOffset); + } + }); + } + + backlogMap.set(topic, totalLag); } - this.kafkaProducer = null; + } finally { + await admin.disconnect(); } - this.kafka = null; + return backlogMap; } } - - - diff --git a/client/adapters/SocketAdapter.d.ts b/client/adapters/SocketAdapter.d.ts new file mode 100644 index 0000000..97d3317 --- /dev/null +++ b/client/adapters/SocketAdapter.d.ts @@ -0,0 +1,17 @@ +import { EventAdapter } from "../types/types"; +export declare class SocketAdapter implements EventAdapter { + private readonly options; + private socket; + private messageHandler?; + constructor(options: { + host: string; + port?: number; + protocol: string; + }); + connect(): Promise; + disconnect(): Promise; + publish(type: string, payload: object): Promise; + subscribe(type: string): Promise; + unsubscribe(type: string): Promise; + onMessage(handler: (type: string, payload: object) => void): void; +} diff --git a/client/adapters/SocketAdapter.js b/client/adapters/SocketAdapter.js new file mode 100644 index 0000000..99d18ec --- /dev/null +++ b/client/adapters/SocketAdapter.js @@ -0,0 +1,50 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.SocketAdapter = void 0; +const socket_io_client_1 = require("socket.io-client"); +class SocketAdapter { + options; + socket = null; + messageHandler; + constructor(options) { + this.options = options; + } + async connect() { + const { host, port, protocol } = this.options; + const socketPath = port ? `${protocol}://${host}:${port}` : `${protocol}://${host}`; + this.socket = (0, socket_io_client_1.io)(socketPath); + this.socket.on("event", ({ type, payload }) => { + if (this.messageHandler) { + this.messageHandler(type, payload); + } + }); + } + async disconnect() { + if (this.socket) { + this.socket.disconnect(); + this.socket = null; + } + } + async publish(type, payload) { + if (!this.socket) { + throw new Error("Socket not connected"); + } + this.socket.emit("publish", { type, payload }); + } + async subscribe(type) { + if (!this.socket) { + throw new Error("Socket not connected"); + } + this.socket.emit("subscribe", type); + } + async unsubscribe(type) { + if (!this.socket) { + throw new Error("Socket not connected"); + } + this.socket.emit("unsubscribe", type); + } + onMessage(handler) { + this.messageHandler = handler; + } +} +exports.SocketAdapter = SocketAdapter; diff --git a/client/adapters/SocketAdapter.ts b/client/adapters/SocketAdapter.ts new file mode 100644 index 0000000..0f940ae --- /dev/null +++ b/client/adapters/SocketAdapter.ts @@ -0,0 +1,59 @@ +import { Socket, io } from "socket.io-client"; + +import { EventAdapter } from "../types/types"; + +export class SocketAdapter implements EventAdapter { + private socket: Socket | null = null; + private messageHandler?: (type: string, payload: object) => void; + + constructor(private readonly options: { + host: string; + port?: number; + protocol: string; + }) {} + + async connect(): Promise { + const { host, port, protocol } = this.options; + const socketPath = port ? `${protocol}://${host}:${port}` : `${protocol}://${host}`; + + this.socket = io(socketPath); + + this.socket.on("event", ({ type, payload }: { type: string; payload: object }) => { + if (this.messageHandler) { + this.messageHandler(type, payload); + } + }); + } + + async disconnect(): Promise { + if (this.socket) { + this.socket.disconnect(); + this.socket = null; + } + } + + async publish(type: string, payload: object): Promise { + if (!this.socket) { + throw new Error("Socket not connected"); + } + this.socket.emit("publish", { type, payload }); + } + + async subscribe(type: string): Promise { + if (!this.socket) { + throw new Error("Socket not connected"); + } + this.socket.emit("subscribe", type); + } + + async unsubscribe(type: string): Promise { + if (!this.socket) { + throw new Error("Socket not connected"); + } + this.socket.emit("unsubscribe", type); + } + + onMessage(handler: (type: string, payload: object) => void): void { + this.messageHandler = handler; + } +} \ No newline at end of file diff --git a/client/adapters/TxEventQAdapter.d.ts b/client/adapters/TxEventQAdapter.d.ts new file mode 100644 index 0000000..2ae3eb9 --- /dev/null +++ b/client/adapters/TxEventQAdapter.d.ts @@ -0,0 +1,28 @@ +import { EventAdapter } from "../types/types"; +export declare class TxEventQAdapter implements EventAdapter { + private readonly options; + private connection; + private queue; + private queueCache; + private messageHandler?; + private isRunning; + constructor(options: { + connectString: string; + user: string; + password: string; + instantClientPath?: string; + walletPath?: string; + consumerName?: string; + batchSize?: number; + waitTime?: number; + autoCommit?: boolean; + }); + connect(): Promise; + disconnect(): Promise; + private getOrCreateQueue; + publish(type: string, payload: T): Promise; + subscribe(type: string): Promise; + unsubscribe(type: string): Promise; + onMessage(handler: (type: string, payload: object) => void): void; + getBacklog(topics: string[]): Promise>; +} diff --git a/client/adapters/TxEventQAdapter.js b/client/adapters/TxEventQAdapter.js new file mode 100644 index 0000000..cb65ba7 --- /dev/null +++ b/client/adapters/TxEventQAdapter.js @@ -0,0 +1,219 @@ +"use strict"; +var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + var desc = Object.getOwnPropertyDescriptor(m, k); + if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { + desc = { enumerable: true, get: function() { return m[k]; } }; + } + Object.defineProperty(o, k2, desc); +}) : (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + o[k2] = m[k]; +})); +var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { + Object.defineProperty(o, "default", { enumerable: true, value: v }); +}) : function(o, v) { + o["default"] = v; +}); +var __importStar = (this && this.__importStar) || (function () { + var ownKeys = function(o) { + ownKeys = Object.getOwnPropertyNames || function (o) { + var ar = []; + for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k; + return ar; + }; + return ownKeys(o); + }; + return function (mod) { + if (mod && mod.__esModule) return mod; + var result = {}; + if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]); + __setModuleDefault(result, mod); + return result; + }; +})(); +Object.defineProperty(exports, "__esModule", { value: true }); +exports.TxEventQAdapter = void 0; +const oracledb = __importStar(require("oracledb")); +class TxEventQAdapter { + options; + connection = null; + queue = null; + queueCache = new Map(); + messageHandler; + isRunning = false; + constructor(options) { + this.options = options; + } + async connect() { + try { + if (this.options.instantClientPath && oracledb.thin) { + try { + oracledb.initOracleClient({ + libDir: this.options.instantClientPath, + configDir: this.options.walletPath, + walletPath: this.options.walletPath, + }); + console.log("Oracle Thick client initialized"); + } + catch (initError) { + if (initError.code !== "NJS-509") { + throw initError; + } + console.log("Oracle Thick client already initialized"); + } + } + this.connection = await oracledb.getConnection({ + connectString: this.options.connectString, + user: this.options.user, + password: this.options.password, + configDir: this.options.walletPath, + walletPath: this.options.walletPath, + }); + this.isRunning = true; + console.log("TxEventQ adapter connected successfully"); + } + catch (error) { + console.error("Failed to connect to TxEventQ:", error.message); + throw error; + } + } + async disconnect() { + this.isRunning = false; + if (this.connection) { + try { + this.queueCache.clear(); + await this.connection.close(); + console.log("TxEventQ connection closed"); + } + catch (error) { + console.error("Error closing TxEventQ connection:", error); + } + this.connection = null; + this.queue = null; + } + } + async getOrCreateQueue(queueName, options) { + if (!this.connection) { + throw new Error("TxEventQAdapter not connected"); + } + if (this.queueCache.has(queueName)) { + return this.queueCache.get(queueName); + } + const queue = await this.connection.getQueue(queueName, options); + this.queueCache.set(queueName, queue); + console.log(`Queue ${queueName} cached`); + return queue; + } + async publish(type, payload) { + if (!this.connection) { + throw new Error("TxEventQAdapter not connected"); + } + const queueName = type; + this.queue = await this.getOrCreateQueue(queueName, { + payloadType: oracledb.DB_TYPE_JSON, + }); + const message = { + topic: type, + payload: payload, + }; + this.queue + .enqOne({ + payload: message, + correlation: type, + priority: 0, + delay: 0, + expiration: -1, + exceptionQueue: "", + }) + .then(() => { + this.connection.commit(); + }); + } + async subscribe(type) { + if (!this.connection) { + throw new Error("Subscriber not initialized"); + } + this.isRunning = true; + const queueName = `TXEVENTQ_USER.${type}`; + this.queue = await this.getOrCreateQueue(queueName, { + payloadType: oracledb.DB_TYPE_JSON, + }); + this.queue.deqOptions.wait = 5000; + this.queue.deqOptions.consumerName = + this.options.consumerName || `${type.toLowerCase()}_subscriber`; + try { + while (this.isRunning) { + let messages = []; + const message = await this.queue.deqOne(); + if (message) { + messages = [message]; + } + if (messages && messages.length > 0) { + if (this.messageHandler) { + try { + const payload = message.payload.payload || {}; + this.messageHandler(type, payload); + } + catch (error) { + console.error(`Error processing message for topic ${type}:`, error); + } + } + if (this.options.autoCommit) { + await this.connection.commit(); + console.log(`Transaction committed for ${messages.length} message(s)`); + } + } + } + } + catch (error) { + console.error("Fatal error during consumption:", error); + throw error; + } + } + async unsubscribe(type) { + if (!this.connection) { + throw new Error("Subscriber not initialized"); + } + this.isRunning = false; + this.queue = null; + } + onMessage(handler) { + this.messageHandler = handler; + } + async getBacklog(topics) { + const backlogMap = new Map(); + if (!this.connection || !topics?.length) + return backlogMap; + const sql = ` + SELECT NVL(SUM(s.ENQUEUED_MSGS - s.DEQUEUED_MSGS), 0) AS BACKLOG + FROM GV$AQ_SHARDED_SUBSCRIBER_STAT s + JOIN USER_QUEUES q + ON q.QID = s.QUEUE_ID + JOIN USER_QUEUE_SUBSCRIBERS sub + ON sub.SUBSCRIBER_ID = s.SUBSCRIBER_ID + AND sub.QUEUE_NAME = q.NAME + WHERE q.NAME IN (:queueName1, :queueName2) + AND (:consumerName IS NULL OR sub.CONSUMER_NAME = :consumerName) + `; + const consumerName = typeof this.options.consumerName === "string" + ? this.options.consumerName + : null; + for (const topic of topics) { + const queueName1 = `TXEVENTQ_USER.${topic}`; + const queueName2 = topic; + try { + const result = await this.connection.execute(sql, { queueName1, queueName2, consumerName }, { outFormat: oracledb.OUT_FORMAT_OBJECT }); + const rows = (result.rows || []); + const val = Number(rows?.[0]?.BACKLOG ?? 0); + backlogMap.set(topic, isNaN(val) ? 0 : val); + } + catch (err) { + console.error(`Backlog query failed for topic ${topic}:`, err); + backlogMap.set(topic, 0); + } + } + return backlogMap; + } +} +exports.TxEventQAdapter = TxEventQAdapter; diff --git a/client/adapters/TxEventQAdapter.ts b/client/adapters/TxEventQAdapter.ts new file mode 100644 index 0000000..3591122 --- /dev/null +++ b/client/adapters/TxEventQAdapter.ts @@ -0,0 +1,232 @@ +import * as oracledb from "oracledb"; + +import { EventAdapter } from "../types/types"; + +export class TxEventQAdapter implements EventAdapter { + private connection: oracledb.Connection | null = null; + private queue: oracledb.AdvancedQueue | null = null; + private queueCache: Map> = new Map(); + private messageHandler?: (type: string, payload: object) => void; + private isRunning: boolean = false; + + constructor( + private readonly options: { + connectString: string; + user: string; + password: string; + instantClientPath?: string; + walletPath?: string; + consumerName?: string; + batchSize?: number; + waitTime?: number; + autoCommit?: boolean; + } + ) {} + + async connect(): Promise { + try { + if (this.options.instantClientPath && oracledb.thin) { + try { + oracledb.initOracleClient({ + libDir: this.options.instantClientPath, + configDir: this.options.walletPath, + walletPath: this.options.walletPath, + }); + console.log("Oracle Thick client initialized"); + } catch (initError: any) { + if (initError.code !== "NJS-509") { + throw initError; + } + console.log("Oracle Thick client already initialized"); + } + } + + this.connection = await oracledb.getConnection({ + connectString: this.options.connectString, + user: this.options.user, + password: this.options.password, + configDir: this.options.walletPath, + walletPath: this.options.walletPath, + }); + + this.isRunning = true; + + console.log("TxEventQ adapter connected successfully"); + } catch (error: any) { + console.error("Failed to connect to TxEventQ:", error.message); + throw error; + } + } + + async disconnect(): Promise { + this.isRunning = false; + + if (this.connection) { + try { + this.queueCache.clear(); + + await this.connection.close(); + console.log("TxEventQ connection closed"); + } catch (error) { + console.error("Error closing TxEventQ connection:", error); + } + this.connection = null; + this.queue = null; + } + } + + private async getOrCreateQueue( + queueName: string, + options: any + ): Promise> { + if (!this.connection) { + throw new Error("TxEventQAdapter not connected"); + } + + if (this.queueCache.has(queueName)) { + return this.queueCache.get(queueName)!; + } + + const queue = await this.connection.getQueue(queueName, options); + this.queueCache.set(queueName, queue); + + console.log(`Queue ${queueName} cached`); + + return queue; + } + + async publish(type: string, payload: T): Promise { + if (!this.connection) { + throw new Error("TxEventQAdapter not connected"); + } + + const queueName = type; + + this.queue = await this.getOrCreateQueue(queueName, { + payloadType: oracledb.DB_TYPE_JSON, + } as any); + + const message = { + topic: type, + payload: payload, + }; + + this.queue + .enqOne({ + payload: message, + correlation: type, + priority: 0, + delay: 0, + expiration: -1, + exceptionQueue: "", + } as any) + .then(() => { + this.connection.commit(); + }); + } + + async subscribe(type: string): Promise { + if (!this.connection) { + throw new Error("Subscriber not initialized"); + } + this.isRunning = true; + + const queueName = `TXEVENTQ_USER.${type}`; + + this.queue = await this.getOrCreateQueue(queueName, { + payloadType: oracledb.DB_TYPE_JSON, + }); + + this.queue.deqOptions.wait = 5000; + this.queue.deqOptions.consumerName = + this.options.consumerName || `${type.toLowerCase()}_subscriber`; + try { + while (this.isRunning) { + let messages: oracledb.AdvancedQueueMessage[] = []; + + const message = await this.queue.deqOne(); + if (message) { + messages = [message]; + } + if (messages && messages.length > 0) { + if (this.messageHandler) { + try { + const payload = message.payload.payload || {}; + this.messageHandler(type, payload); + } catch (error) { + console.error( + `Error processing message for topic ${type}:`, + error + ); + } + } + if (this.options.autoCommit) { + await this.connection.commit(); + console.log( + `Transaction committed for ${messages.length} message(s)` + ); + } + } + } + } catch (error) { + console.error("Fatal error during consumption:", error); + throw error; + } + } + + async unsubscribe(type: string): Promise { + if (!this.connection) { + throw new Error("Subscriber not initialized"); + } + this.isRunning = false; + this.queue = null; + } + + onMessage(handler: (type: string, payload: object) => void): void { + this.messageHandler = handler; + } + + async getBacklog(topics: string[]): Promise> { + const backlogMap = new Map(); + if (!this.connection || !topics?.length) return backlogMap; + + const sql = ` + SELECT NVL(SUM(s.ENQUEUED_MSGS - s.DEQUEUED_MSGS), 0) AS BACKLOG + FROM GV$AQ_SHARDED_SUBSCRIBER_STAT s + JOIN USER_QUEUES q + ON q.QID = s.QUEUE_ID + JOIN USER_QUEUE_SUBSCRIBERS sub + ON sub.SUBSCRIBER_ID = s.SUBSCRIBER_ID + AND sub.QUEUE_NAME = q.NAME + WHERE q.NAME IN (:queueName1, :queueName2) + AND (:consumerName IS NULL OR sub.CONSUMER_NAME = :consumerName) + `; + + const consumerName = + typeof this.options.consumerName === "string" + ? this.options.consumerName + : null; + + for (const topic of topics) { + const queueName1 = `TXEVENTQ_USER.${topic}`; + const queueName2 = topic; + + try { + const result = await this.connection.execute( + sql, + { queueName1, queueName2, consumerName }, + { outFormat: oracledb.OUT_FORMAT_OBJECT } + ); + + const rows = (result.rows || []) as Array<{ BACKLOG: number }>; + const val = Number(rows?.[0]?.BACKLOG ?? 0); + backlogMap.set(topic, isNaN(val) ? 0 : val); + } catch (err) { + console.error(`Backlog query failed for topic ${topic}:`, err); + backlogMap.set(topic, 0); + } + } + + return backlogMap; + } +} diff --git a/client/adapters/types.d.ts b/client/adapters/types.d.ts deleted file mode 100644 index 29a057c..0000000 --- a/client/adapters/types.d.ts +++ /dev/null @@ -1,17 +0,0 @@ -export type Callback = (payload: T) => void; -export interface BaseInitOptions { - type: "inMemory" | "kafka"; -} -export interface InMemoryOptions extends BaseInitOptions { - type: "inMemory"; - host: string; - port?: number; - protocol: string; -} -export interface KafkaOptions extends BaseInitOptions { - type: "kafka"; - clientId: string; - brokers: string[]; - groupId: string; -} -export type InitOptions = InMemoryOptions | KafkaOptions; diff --git a/client/adapters/types.ts b/client/adapters/types.ts deleted file mode 100644 index 1911d9e..0000000 --- a/client/adapters/types.ts +++ /dev/null @@ -1,24 +0,0 @@ -export type Callback = (payload: T) => void; - -export interface BaseInitOptions { - type: "inMemory" | "kafka"; -} - -export interface InMemoryOptions extends BaseInitOptions { - type: "inMemory"; - host: string; - port?: number; - protocol: string; -} - -export interface KafkaOptions extends BaseInitOptions { - type: "kafka"; - clientId: string; - brokers: string[]; - groupId: string; -} - -export type InitOptions = InMemoryOptions | KafkaOptions; - - - diff --git a/client/eventManager.d.ts b/client/eventManager.d.ts new file mode 100644 index 0000000..3a67e84 --- /dev/null +++ b/client/eventManager.d.ts @@ -0,0 +1,23 @@ +import { Callback, InitOptions } from "./types/types"; +import { PushgatewayConfig } from "./metrics"; +export declare class EventManager { + private adapter; + private callbacks; + private metrics; + private backlogInterval; + init(options: InitOptions): Promise; + publish(...args: [...string[], T]): Promise; + subscribe(type: string, callback: Callback): Promise<() => void>; + disconnect(): Promise; + private handleIncomingMessage; + private executeCallbacks; + private validateEventType; + private startBacklogMonitoring; + private stopBacklogMonitoring; + private updateBacklogMetrics; + checkBacklog(): Promise; + startPushgateway(config?: PushgatewayConfig): void; + stopPushgateway(): void; + pushMetricsToGateway(): Promise; + getPushgatewayConfig(): PushgatewayConfig | undefined; +} diff --git a/client/eventManager.js b/client/eventManager.js new file mode 100644 index 0000000..02d74ac --- /dev/null +++ b/client/eventManager.js @@ -0,0 +1,208 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.EventManager = void 0; +const metrics_1 = require("./metrics"); +const KafkaAdapter_1 = require("./adapters/KafkaAdapter"); +const SocketAdapter_1 = require("./adapters/SocketAdapter"); +const TxEventQAdapter_1 = require("./adapters/TxEventQAdapter"); +const TOPICS = [ + "KNOWLEDGE_CREATED", + "MESSAGE_USER_MESSAGED", + "SESSION_USER_MESSAGED", + "TASK_CREATED", + "STEP_ADDED", + "STEP_COMPLETED", + "MESSAGE_USER_MESSAGED", + "MESSAGE_ASSISTANT_MESSAGED", + "RESPONSIBILITY_CREATED", + "RESPONSIBILITY_DESCRIPTION_GENERATED", + "SESSION_INITIATED", + "SESSION_USER_MESSAGED", + "SESSION_AI_MESSAGED", + "SUPERVISING_RAISED", + "SUPERVISING_ANSWERED", + "TASK_COMPLETED", + "KNOWLEDGES_LOADED", + "MESSAGES_LOADED", +]; +class EventManager { + adapter = null; + callbacks = new Map(); + metrics = new metrics_1.EventMetrics(); + backlogInterval = null; + async init(options) { + if (this.adapter) { + await this.disconnect(); + } + switch (options.type) { + case "inMemory": + this.adapter = new SocketAdapter_1.SocketAdapter({ + host: options.host, + port: options.port, + protocol: options.protocol, + }); + break; + case "kafka": + this.adapter = new KafkaAdapter_1.KafkaAdapter({ + clientId: options.clientId, + brokers: options.brokers, + groupId: options.groupId, + topics: TOPICS, + }); + this.startBacklogMonitoring(); + break; + case "txeventq": + this.adapter = new TxEventQAdapter_1.TxEventQAdapter({ + connectString: options.connectString, + user: options.user, + password: options.password, + instantClientPath: options.instantClientPath, + walletPath: options.walletPath, + consumerName: options.consumerName, + batchSize: options.batchSize, + waitTime: options.waitTime, + }); + this.startBacklogMonitoring(); + break; + default: + throw new Error(`Unknown adapter type`); + } + await this.adapter.connect(); + this.adapter.onMessage((type, payload) => { + this.handleIncomingMessage(type, payload); + }); + } + async publish(...args) { + if (args.length < 1) { + throw new Error("publish requires at least one event type and a payload"); + } + if (!this.adapter) { + throw new Error("Event system not initialized"); + } + const payload = args[args.length - 1]; + const type = args.slice(0, -1); + const mergedType = type.join("_"); + this.validateEventType(mergedType); + const payloadSize = JSON.stringify(payload).length; + const endTimer = this.metrics.recordPublish(mergedType, payloadSize); + try { + await this.adapter.publish(mergedType, payload); + this.executeCallbacks(mergedType, payload); + endTimer(); + } + catch (error) { + this.metrics.recordPublishError(mergedType, "publish_error"); + endTimer(); + throw error; + } + } + async subscribe(type, callback) { + if (!this.callbacks.has(type)) { + this.callbacks.set(type, new Set()); + } + const callbackSet = this.callbacks.get(type); + callbackSet.add(callback); + this.metrics.updateSubscriptions(type, callbackSet.size); + if (this.adapter && callbackSet.size === 1) { + await this.adapter.subscribe(type); + } + return async () => { + callbackSet.delete(callback); + if (callbackSet.size === 0) { + this.callbacks.delete(type); + if (this.adapter) { + await this.adapter.unsubscribe(type); + } + } + this.metrics.updateSubscriptions(type, callbackSet.size); + }; + } + async disconnect() { + this.stopBacklogMonitoring(); + if (this.adapter) { + await this.adapter.disconnect(); + this.adapter = null; + } + this.callbacks.clear(); + } + handleIncomingMessage(type, payload) { + this.executeCallbacks(type, payload); + } + executeCallbacks(type, payload) { + const callbackSet = this.callbacks.get(type); + if (!callbackSet) + return; // No callbacks for this topic - message ignored + callbackSet.forEach((callback) => { + setTimeout(() => { + const endTimer = this.metrics.recordCallback(type); + try { + callback(payload); + } + catch (error) { + console.error(`Error in callback for ${type}:`, error); + } + endTimer(); + }, 0); + }); + } + validateEventType(type) { + if (type === "__proto__" || + type === "constructor" || + type === "prototype") { + throw new Error("Invalid event type"); + } + } + startBacklogMonitoring(intervalMs = 60000) { + if (!this.adapter) + return; + // Only monitor for adapters that implement meaningful backlog + const supportsBacklog = this.adapter instanceof KafkaAdapter_1.KafkaAdapter || + this.adapter instanceof TxEventQAdapter_1.TxEventQAdapter; + if (!supportsBacklog) + return; + this.updateBacklogMetrics(); + this.backlogInterval = setInterval(() => { + this.updateBacklogMetrics(); + }, intervalMs); + } + stopBacklogMonitoring() { + if (this.backlogInterval) { + clearInterval(this.backlogInterval); + this.backlogInterval = null; + } + } + async updateBacklogMetrics() { + if (!this.adapter) + return; + const supportsBacklog = this.adapter instanceof KafkaAdapter_1.KafkaAdapter || + this.adapter instanceof TxEventQAdapter_1.TxEventQAdapter; + if (!supportsBacklog) + return; + try { + const backlog = await this.adapter.getBacklog(TOPICS); + backlog.forEach((size, topic) => { + this.metrics.updateEventBacklog(topic, size); + console.log(`Backlog for topic ${topic}: ${size} messages`); + }); + } + catch (error) { + console.error("Error updating backlog metrics:", error); + } + } + async checkBacklog() { + await this.updateBacklogMetrics(); + } + startPushgateway(config) { + this.metrics.startPushgateway(config); + } + stopPushgateway() { + this.metrics.stopPushgateway(); + } + async pushMetricsToGateway() { + await this.metrics.pushMetricsToGateway(); + } + getPushgatewayConfig() { + return this.metrics.getPushgatewayConfig(); + } +} +exports.EventManager = EventManager; diff --git a/client/eventManager.ts b/client/eventManager.ts new file mode 100644 index 0000000..b783c86 --- /dev/null +++ b/client/eventManager.ts @@ -0,0 +1,244 @@ +import { Callback, EventAdapter, InitOptions } from "./types/types"; +import { EventMetrics, PushgatewayConfig } from "./metrics"; + +import { KafkaAdapter } from "./adapters/KafkaAdapter"; +import { SocketAdapter } from "./adapters/SocketAdapter"; +import { TxEventQAdapter } from "./adapters/TxEventQAdapter"; + +const TOPICS = [ + "KNOWLEDGE_CREATED", + "MESSAGE_USER_MESSAGED", + "SESSION_USER_MESSAGED", + "TASK_CREATED", + "STEP_ADDED", + "STEP_COMPLETED", + "MESSAGE_USER_MESSAGED", + "MESSAGE_ASSISTANT_MESSAGED", + "RESPONSIBILITY_CREATED", + "RESPONSIBILITY_DESCRIPTION_GENERATED", + "SESSION_INITIATED", + "SESSION_USER_MESSAGED", + "SESSION_AI_MESSAGED", + "SUPERVISING_RAISED", + "SUPERVISING_ANSWERED", + "TASK_COMPLETED", + "KNOWLEDGES_LOADED", + "MESSAGES_LOADED", +]; +export class EventManager { + private adapter: EventAdapter | null = null; + private callbacks: Map> = new Map(); + private metrics = new EventMetrics(); + private backlogInterval: NodeJS.Timeout | null = null; + + async init(options: InitOptions): Promise { + if (this.adapter) { + await this.disconnect(); + } + switch (options.type) { + case "inMemory": + this.adapter = new SocketAdapter({ + host: options.host, + port: options.port, + protocol: options.protocol, + }); + break; + case "kafka": + this.adapter = new KafkaAdapter({ + clientId: options.clientId, + brokers: options.brokers, + groupId: options.groupId, + topics: TOPICS, + }); + this.startBacklogMonitoring(); + break; + + case "txeventq": + this.adapter = new TxEventQAdapter({ + connectString: options.connectString, + user: options.user, + password: options.password, + instantClientPath: options.instantClientPath, + walletPath: options.walletPath, + consumerName: options.consumerName, + batchSize: options.batchSize, + waitTime: options.waitTime, + }); + this.startBacklogMonitoring(); + break; + + default: + throw new Error(`Unknown adapter type`); + } + await this.adapter.connect(); + + this.adapter.onMessage((type, payload) => { + this.handleIncomingMessage(type, payload); + }); + } + + async publish( + ...args: [...string[], T] + ): Promise { + if (args.length < 1) { + throw new Error("publish requires at least one event type and a payload"); + } + if (!this.adapter) { + throw new Error("Event system not initialized"); + } + const payload = args[args.length - 1] as T; + const type = args.slice(0, -1) as string[]; + const mergedType = type.join("_"); + this.validateEventType(mergedType); + const payloadSize = JSON.stringify(payload).length; + const endTimer = this.metrics.recordPublish(mergedType, payloadSize); + try { + await this.adapter.publish(mergedType, payload); + this.executeCallbacks(mergedType, payload); + endTimer(); + } catch (error) { + this.metrics.recordPublishError(mergedType, "publish_error"); + endTimer(); + throw error; + } + } + + async subscribe( + type: string, + callback: Callback + ): Promise<() => void> { + if (!this.callbacks.has(type)) { + this.callbacks.set(type, new Set()); + } + + const callbackSet = this.callbacks.get(type)!; + callbackSet.add(callback as Callback); + + this.metrics.updateSubscriptions(type, callbackSet.size); + + if (this.adapter && callbackSet.size === 1) { + await this.adapter.subscribe(type); + } + + return async () => { + callbackSet.delete(callback as Callback); + + if (callbackSet.size === 0) { + this.callbacks.delete(type); + if (this.adapter) { + await this.adapter.unsubscribe(type); + } + } + + this.metrics.updateSubscriptions(type, callbackSet.size); + }; + } + + async disconnect(): Promise { + this.stopBacklogMonitoring(); + + if (this.adapter) { + await this.adapter.disconnect(); + this.adapter = null; + } + + this.callbacks.clear(); + } + + private handleIncomingMessage(type: string, payload: object): void { + this.executeCallbacks(type, payload); + } + + private executeCallbacks(type: string, payload: object): void { + const callbackSet = this.callbacks.get(type); + if (!callbackSet) return; // No callbacks for this topic - message ignored + + callbackSet.forEach((callback) => { + setTimeout(() => { + const endTimer = this.metrics.recordCallback(type); + try { + callback(payload); + } catch (error) { + console.error(`Error in callback for ${type}:`, error); + } + endTimer(); + }, 0); + }); + } + + private validateEventType(type: string): void { + if ( + type === "__proto__" || + type === "constructor" || + type === "prototype" + ) { + throw new Error("Invalid event type"); + } + } + + private startBacklogMonitoring(intervalMs: number = 60000): void { + if (!this.adapter) return; + + // Only monitor for adapters that implement meaningful backlog + const supportsBacklog = + this.adapter instanceof KafkaAdapter || + this.adapter instanceof TxEventQAdapter; + + if (!supportsBacklog) return; + + this.updateBacklogMetrics(); + + this.backlogInterval = setInterval(() => { + this.updateBacklogMetrics(); + }, intervalMs); + } + + private stopBacklogMonitoring(): void { + if (this.backlogInterval) { + clearInterval(this.backlogInterval); + this.backlogInterval = null; + } + } + + private async updateBacklogMetrics(): Promise { + if (!this.adapter) return; + + const supportsBacklog = + this.adapter instanceof KafkaAdapter || + this.adapter instanceof TxEventQAdapter; + + if (!supportsBacklog) return; + + try { + const backlog = await ( + this.adapter as KafkaAdapter | TxEventQAdapter + ).getBacklog(TOPICS); + backlog.forEach((size, topic) => { + this.metrics.updateEventBacklog(topic, size); + console.log(`Backlog for topic ${topic}: ${size} messages`); + }); + } catch (error) { + console.error("Error updating backlog metrics:", error); + } + } + + async checkBacklog(): Promise { + await this.updateBacklogMetrics(); + } + + startPushgateway(config?: PushgatewayConfig): void { + this.metrics.startPushgateway(config); + } + + stopPushgateway(): void { + this.metrics.stopPushgateway(); + } + + async pushMetricsToGateway(): Promise { + await this.metrics.pushMetricsToGateway(); + } + + getPushgatewayConfig(): PushgatewayConfig | undefined { + return this.metrics.getPushgatewayConfig(); + } +} diff --git a/client/index.d.ts b/client/index.d.ts new file mode 100644 index 0000000..7561f88 --- /dev/null +++ b/client/index.d.ts @@ -0,0 +1,25 @@ +import * as client from "prom-client"; +import { Callback, InitOptions } from "./types/types"; +import { EventMetrics, PushgatewayConfig } from "./metrics"; +import { EventManager } from "./eventManager"; +import { KafkaAdapter } from "./adapters/KafkaAdapter"; +import { SocketAdapter } from "./adapters/SocketAdapter"; +import { TxEventQAdapter } from "./adapters/TxEventQAdapter"; +export declare const event: { + init: (options: InitOptions) => Promise; + publish: (...args: [...string[], T]) => Promise; + subscribe: (type: string, callback: Callback) => Promise<() => void>; + disconnect: () => Promise; + checkBacklog: () => Promise; + startBacklogMonitoring: () => void; + stopBacklogMonitoring: () => void; + restartKafkaConsumer: () => Promise; + startPushgateway: (config?: PushgatewayConfig) => void; + stopPushgateway: () => void; + pushMetricsToGateway: () => Promise; + getPushgatewayConfig: () => PushgatewayConfig | undefined; +}; +export { client }; +export { EventManager, EventMetrics, SocketAdapter, KafkaAdapter, TxEventQAdapter, }; +export type { PushgatewayConfig }; +export * from "./types/types"; diff --git a/client/index.js b/client/index.js new file mode 100644 index 0000000..f3a4ce8 --- /dev/null +++ b/client/index.js @@ -0,0 +1,73 @@ +"use strict"; +var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + var desc = Object.getOwnPropertyDescriptor(m, k); + if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { + desc = { enumerable: true, get: function() { return m[k]; } }; + } + Object.defineProperty(o, k2, desc); +}) : (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + o[k2] = m[k]; +})); +var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { + Object.defineProperty(o, "default", { enumerable: true, value: v }); +}) : function(o, v) { + o["default"] = v; +}); +var __importStar = (this && this.__importStar) || (function () { + var ownKeys = function(o) { + ownKeys = Object.getOwnPropertyNames || function (o) { + var ar = []; + for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k; + return ar; + }; + return ownKeys(o); + }; + return function (mod) { + if (mod && mod.__esModule) return mod; + var result = {}; + if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]); + __setModuleDefault(result, mod); + return result; + }; +})(); +var __exportStar = (this && this.__exportStar) || function(m, exports) { + for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p); +}; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.TxEventQAdapter = exports.KafkaAdapter = exports.SocketAdapter = exports.EventMetrics = exports.EventManager = exports.client = exports.event = void 0; +const client = __importStar(require("prom-client")); +exports.client = client; +const metrics_1 = require("./metrics"); +Object.defineProperty(exports, "EventMetrics", { enumerable: true, get: function () { return metrics_1.EventMetrics; } }); +const eventManager_1 = require("./eventManager"); +Object.defineProperty(exports, "EventManager", { enumerable: true, get: function () { return eventManager_1.EventManager; } }); +const KafkaAdapter_1 = require("./adapters/KafkaAdapter"); +Object.defineProperty(exports, "KafkaAdapter", { enumerable: true, get: function () { return KafkaAdapter_1.KafkaAdapter; } }); +const SocketAdapter_1 = require("./adapters/SocketAdapter"); +Object.defineProperty(exports, "SocketAdapter", { enumerable: true, get: function () { return SocketAdapter_1.SocketAdapter; } }); +const TxEventQAdapter_1 = require("./adapters/TxEventQAdapter"); +Object.defineProperty(exports, "TxEventQAdapter", { enumerable: true, get: function () { return TxEventQAdapter_1.TxEventQAdapter; } }); +const manager = new eventManager_1.EventManager(); +exports.event = { + init: (options) => manager.init(options), + publish: (...args) => manager.publish(...args), + subscribe: (type, callback) => manager.subscribe(type, callback), + disconnect: () => manager.disconnect(), + checkBacklog: () => manager.checkBacklog(), + startBacklogMonitoring: () => { + console.log("Backlog monitoring starts automatically with Kafka adapter"); + }, + stopBacklogMonitoring: () => { + console.log("Backlog monitoring stops automatically on disconnect"); + }, + restartKafkaConsumer: async () => { + console.log("Consumer restart is handled automatically"); + }, + startPushgateway: (config) => manager.startPushgateway(config), + stopPushgateway: () => manager.stopPushgateway(), + pushMetricsToGateway: () => manager.pushMetricsToGateway(), + getPushgatewayConfig: () => manager.getPushgatewayConfig(), +}; +__exportStar(require("./types/types"), exports); diff --git a/client/index.ts b/client/index.ts new file mode 100644 index 0000000..0dbacdb --- /dev/null +++ b/client/index.ts @@ -0,0 +1,50 @@ +import * as client from "prom-client"; + +import { Callback, InitOptions } from "./types/types"; +import { EventMetrics, PushgatewayConfig } from "./metrics"; + +import { EventManager } from "./eventManager"; +import { KafkaAdapter } from "./adapters/KafkaAdapter"; +import { SocketAdapter } from "./adapters/SocketAdapter"; +import { TxEventQAdapter } from "./adapters/TxEventQAdapter"; + +const manager = new EventManager(); + +export const event = { + init: (options: InitOptions) => manager.init(options), + publish: (...args: [...string[], T]) => + manager.publish(...args), + subscribe: (type: string, callback: Callback) => + manager.subscribe(type, callback), + disconnect: () => manager.disconnect(), + checkBacklog: () => manager.checkBacklog(), + + startBacklogMonitoring: () => { + console.log("Backlog monitoring starts automatically with Kafka adapter"); + }, + stopBacklogMonitoring: () => { + console.log("Backlog monitoring stops automatically on disconnect"); + }, + restartKafkaConsumer: async () => { + console.log("Consumer restart is handled automatically"); + }, + + startPushgateway: (config?: PushgatewayConfig) => + manager.startPushgateway(config), + stopPushgateway: () => manager.stopPushgateway(), + pushMetricsToGateway: () => manager.pushMetricsToGateway(), + getPushgatewayConfig: () => manager.getPushgatewayConfig(), +}; + +export { client }; + +export { + EventManager, + EventMetrics, + SocketAdapter, + KafkaAdapter, + TxEventQAdapter, +}; +export type { PushgatewayConfig }; +export * from "./types/types"; + diff --git a/client/metrics.d.ts b/client/metrics.d.ts new file mode 100644 index 0000000..ee1aa33 --- /dev/null +++ b/client/metrics.d.ts @@ -0,0 +1,29 @@ +export interface PushgatewayConfig { + url?: string; + jobName?: string; + instance?: string; + interval?: number; +} +export declare class EventMetrics { + private readonly registry; + private pushgatewayInterval?; + private pushgatewayConfig?; + private readonly publishCounter; + private readonly subscriptionGauge; + private readonly publishDuration; + private readonly payloadSize; + private readonly publishErrors; + private readonly callbackDuration; + private readonly throughput; + private readonly eventBacklog; + constructor(); + recordPublish(type: string, payloadSizeBytes: number): () => void; + recordPublishError(type: string, errorType: string): void; + recordCallback(type: string): () => void; + updateSubscriptions(type: string, count: number): void; + updateEventBacklog(topic: string, size: number): void; + startPushgateway(config?: PushgatewayConfig): void; + stopPushgateway(): void; + pushMetricsToGateway(): Promise; + getPushgatewayConfig(): PushgatewayConfig | undefined; +} diff --git a/client/metrics.js b/client/metrics.js new file mode 100644 index 0000000..82e3887 --- /dev/null +++ b/client/metrics.js @@ -0,0 +1,170 @@ +"use strict"; +var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + var desc = Object.getOwnPropertyDescriptor(m, k); + if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { + desc = { enumerable: true, get: function() { return m[k]; } }; + } + Object.defineProperty(o, k2, desc); +}) : (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + o[k2] = m[k]; +})); +var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { + Object.defineProperty(o, "default", { enumerable: true, value: v }); +}) : function(o, v) { + o["default"] = v; +}); +var __importStar = (this && this.__importStar) || (function () { + var ownKeys = function(o) { + ownKeys = Object.getOwnPropertyNames || function (o) { + var ar = []; + for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k; + return ar; + }; + return ownKeys(o); + }; + return function (mod) { + if (mod && mod.__esModule) return mod; + var result = {}; + if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]); + __setModuleDefault(result, mod); + return result; + }; +})(); +Object.defineProperty(exports, "__esModule", { value: true }); +exports.EventMetrics = void 0; +const client = __importStar(require("prom-client")); +class EventMetrics { + registry; + pushgatewayInterval; + pushgatewayConfig; + publishCounter; + subscriptionGauge; + publishDuration; + payloadSize; + publishErrors; + callbackDuration; + throughput; + eventBacklog; + constructor() { + this.registry = new client.Registry(); + this.publishCounter = new client.Counter({ + name: "events_published_total", + help: "Total number of events published", + labelNames: ["event_type"], + registers: [this.registry], + }); + this.subscriptionGauge = new client.Gauge({ + name: "active_event_subscriptions", + help: "Number of active event subscriptions", + labelNames: ["event_type"], + registers: [this.registry], + }); + this.publishDuration = new client.Histogram({ + name: "event_publish_duration_seconds", + help: "Time taken to publish events", + labelNames: ["event_type"], + buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5], + registers: [this.registry], + }); + this.payloadSize = new client.Histogram({ + name: "event_payload_size_bytes", + help: "Size of event payloads in bytes", + labelNames: ["event_type"], + buckets: [10, 100, 1000, 10000, 100000, 1000000], + registers: [this.registry], + }); + this.publishErrors = new client.Counter({ + name: "event_publish_errors_total", + help: "Total number of event publish errors", + labelNames: ["event_type", "error_type"], + registers: [this.registry], + }); + this.callbackDuration = new client.Histogram({ + name: "event_callback_duration_seconds", + help: "Time taken to process event callbacks", + labelNames: ["event_type"], + buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5], + registers: [this.registry], + }); + this.throughput = new client.Counter({ + name: "event_callbacks_processed_total", + help: "Total number of event callbacks processed successfully", + labelNames: ["event_type"], + registers: [this.registry], + }); + this.eventBacklog = new client.Gauge({ + name: "backlog_events_total", + help: "Total number of events waiting to be processed", + labelNames: ["topic"], + registers: [this.registry], + }); + } + recordPublish(type, payloadSizeBytes) { + this.publishCounter.labels(type).inc(); + this.payloadSize.labels(type).observe(payloadSizeBytes); + return this.publishDuration.labels(type).startTimer(); + } + recordPublishError(type, errorType) { + this.publishErrors.labels(type, errorType).inc(); + } + recordCallback(type) { + this.throughput.labels(type).inc(); + return this.callbackDuration.labels(type).startTimer(); + } + updateSubscriptions(type, count) { + this.subscriptionGauge.labels(type).set(count); + } + updateEventBacklog(topic, size) { + this.eventBacklog.labels(topic).set(size); + } + startPushgateway(config = {}) { + this.pushgatewayConfig = { + url: config.url || "http://localhost:9091", + jobName: config.jobName || "node_events", + instance: config.instance || "default_instance", + interval: config.interval || 15000, + }; + this.stopPushgateway(); + this.pushgatewayInterval = setInterval(() => { + this.pushMetricsToGateway(); + }, this.pushgatewayConfig.interval); + console.log(`Started pushing metrics to Pushgateway every ${this.pushgatewayConfig.interval}ms`); + } + stopPushgateway() { + if (this.pushgatewayInterval) { + clearInterval(this.pushgatewayInterval); + this.pushgatewayInterval = undefined; + console.log("Stopped pushing metrics to Pushgateway"); + } + } + async pushMetricsToGateway() { + if (!this.pushgatewayConfig) { + throw new Error("Pushgateway not configured. Call startPushgateway() first."); + } + try { + const body = await this.registry.metrics(); + let url = `${this.pushgatewayConfig.url}/metrics/job/${this.pushgatewayConfig.jobName}`; + if (this.pushgatewayConfig.instance) { + url += `/instance/${this.pushgatewayConfig.instance}`; + } + const response = await fetch(url, { + method: "POST", + headers: { "Content-Type": "text/plain" }, + body, + }); + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + console.log("Metrics pushed to Pushgateway successfully"); + } + catch (err) { + console.error("Failed to push metrics to Pushgateway:", err); + } + } + getPushgatewayConfig() { + return this.pushgatewayConfig; + } +} +exports.EventMetrics = EventMetrics; diff --git a/client/metrics.ts b/client/metrics.ts new file mode 100644 index 0000000..b8794fc --- /dev/null +++ b/client/metrics.ts @@ -0,0 +1,171 @@ +import * as client from "prom-client"; + +export interface PushgatewayConfig { + url?: string; + jobName?: string; + instance?: string; + interval?: number; +} + +export class EventMetrics { + private readonly registry: client.Registry; + private pushgatewayInterval?: NodeJS.Timeout; + private pushgatewayConfig?: PushgatewayConfig; + + private readonly publishCounter: client.Counter; + private readonly subscriptionGauge: client.Gauge; + private readonly publishDuration: client.Histogram; + private readonly payloadSize: client.Histogram; + private readonly publishErrors: client.Counter; + private readonly callbackDuration: client.Histogram; + private readonly throughput: client.Counter; + private readonly eventBacklog: client.Gauge; + + constructor() { + this.registry = new client.Registry(); + + this.publishCounter = new client.Counter({ + name: "events_published_total", + help: "Total number of events published", + labelNames: ["event_type"], + registers: [this.registry], + }); + + this.subscriptionGauge = new client.Gauge({ + name: "active_event_subscriptions", + help: "Number of active event subscriptions", + labelNames: ["event_type"], + registers: [this.registry], + }); + + this.publishDuration = new client.Histogram({ + name: "event_publish_duration_seconds", + help: "Time taken to publish events", + labelNames: ["event_type"], + buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5], + registers: [this.registry], + }); + + this.payloadSize = new client.Histogram({ + name: "event_payload_size_bytes", + help: "Size of event payloads in bytes", + labelNames: ["event_type"], + buckets: [10, 100, 1000, 10000, 100000, 1000000], + registers: [this.registry], + }); + + this.publishErrors = new client.Counter({ + name: "event_publish_errors_total", + help: "Total number of event publish errors", + labelNames: ["event_type", "error_type"], + registers: [this.registry], + }); + + this.callbackDuration = new client.Histogram({ + name: "event_callback_duration_seconds", + help: "Time taken to process event callbacks", + labelNames: ["event_type"], + buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5], + registers: [this.registry], + }); + + this.throughput = new client.Counter({ + name: "event_callbacks_processed_total", + help: "Total number of event callbacks processed successfully", + labelNames: ["event_type"], + registers: [this.registry], + }); + + this.eventBacklog = new client.Gauge({ + name: "backlog_events_total", + help: "Total number of events waiting to be processed", + labelNames: ["topic"], + registers: [this.registry], + }); + } + + recordPublish(type: string, payloadSizeBytes: number): () => void { + this.publishCounter.labels(type).inc(); + this.payloadSize.labels(type).observe(payloadSizeBytes); + return this.publishDuration.labels(type).startTimer(); + } + + recordPublishError(type: string, errorType: string): void { + this.publishErrors.labels(type, errorType).inc(); + } + + recordCallback(type: string): () => void { + this.throughput.labels(type).inc(); + return this.callbackDuration.labels(type).startTimer(); + } + + updateSubscriptions(type: string, count: number): void { + this.subscriptionGauge.labels(type).set(count); + } + + updateEventBacklog(topic: string, size: number): void { + this.eventBacklog.labels(topic).set(size); + } + + startPushgateway(config: PushgatewayConfig = {}): void { + this.pushgatewayConfig = { + url: config.url || "http://localhost:9091", + jobName: config.jobName || "node_events", + instance: config.instance || "default_instance", + interval: config.interval || 15000, + }; + + this.stopPushgateway(); + + this.pushgatewayInterval = setInterval(() => { + this.pushMetricsToGateway(); + }, this.pushgatewayConfig.interval); + + console.log( + `Started pushing metrics to Pushgateway every ${this.pushgatewayConfig.interval}ms` + ); + } + + stopPushgateway(): void { + if (this.pushgatewayInterval) { + clearInterval(this.pushgatewayInterval); + this.pushgatewayInterval = undefined; + console.log("Stopped pushing metrics to Pushgateway"); + } + } + + async pushMetricsToGateway(): Promise { + if (!this.pushgatewayConfig) { + throw new Error( + "Pushgateway not configured. Call startPushgateway() first." + ); + } + + try { + const body = await this.registry.metrics(); + let url = `${this.pushgatewayConfig.url}/metrics/job/${this.pushgatewayConfig.jobName}`; + + if (this.pushgatewayConfig.instance) { + url += `/instance/${this.pushgatewayConfig.instance}`; + } + + const response = await fetch(url, { + method: "POST", + headers: { "Content-Type": "text/plain" }, + body, + }); + + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + console.log("Metrics pushed to Pushgateway successfully"); + } catch (err) { + console.error("Failed to push metrics to Pushgateway:", err); + } + } + + getPushgatewayConfig(): PushgatewayConfig | undefined { + return this.pushgatewayConfig; + } +} diff --git a/client/types/types.d.ts b/client/types/types.d.ts new file mode 100644 index 0000000..89cfef2 --- /dev/null +++ b/client/types/types.d.ts @@ -0,0 +1,37 @@ +export type Callback = (payload: T) => void; +export interface EventAdapter { + connect(): Promise; + disconnect(): Promise; + publish(type: string, payload: object): Promise; + subscribe(type: string): Promise; + unsubscribe(type: string): Promise; + onMessage(handler: (type: string, payload: object) => void): void; +} +export interface BaseInitOptions { + type: "inMemory" | "kafka" | "txeventq"; +} +export interface InMemoryOptions extends BaseInitOptions { + type: "inMemory"; + host: string; + port?: number; + protocol: string; +} +export interface KafkaOptions extends BaseInitOptions { + type: "kafka"; + clientId: string; + brokers: string[]; + groupId: string; +} +export interface TxEventQOptions extends BaseInitOptions { + type: "txeventq"; + connectString: string; + user: string; + password: string; + instantClientPath?: string; + walletPath?: string; + consumerName?: string; + batchSize?: number; + waitTime?: number; + topics?: string[]; +} +export type InitOptions = InMemoryOptions | KafkaOptions | TxEventQOptions; diff --git a/client/adapters/types.js b/client/types/types.js similarity index 100% rename from client/adapters/types.js rename to client/types/types.js diff --git a/client/types/types.ts b/client/types/types.ts new file mode 100644 index 0000000..5500182 --- /dev/null +++ b/client/types/types.ts @@ -0,0 +1,43 @@ +export type Callback = (payload: T) => void; + +export interface EventAdapter { + connect(): Promise; + disconnect(): Promise; + publish(type: string, payload: object): Promise; + subscribe(type: string): Promise; + unsubscribe(type: string): Promise; + onMessage(handler: (type: string, payload: object) => void): void; +} + +export interface BaseInitOptions { + type: "inMemory" | "kafka" | "txeventq"; +} + +export interface InMemoryOptions extends BaseInitOptions { + type: "inMemory"; + host: string; + port?: number; + protocol: string; +} + +export interface KafkaOptions extends BaseInitOptions { + type: "kafka"; + clientId: string; + brokers: string[]; + groupId: string; +} + +export interface TxEventQOptions extends BaseInitOptions { + type: "txeventq"; + connectString: string; + user: string; + password: string; + instantClientPath?: string; + walletPath?: string; + consumerName?: string; + batchSize?: number; + waitTime?: number; + topics?: string[]; +} + +export type InitOptions = InMemoryOptions | KafkaOptions | TxEventQOptions; diff --git a/docker-compose-oracle.yml b/docker-compose-oracle.yml new file mode 100644 index 0000000..378ef2c --- /dev/null +++ b/docker-compose-oracle.yml @@ -0,0 +1,37 @@ +services: + oracle-db: + image: container-registry.oracle.com/database/free:latest + container_name: oracle-txeventq-poc-db + ports: + - "1522:1521" + - "5501:5500" + environment: + - ORACLE_PWD=OraclePassword123 + - ORACLE_CHARACTERSET=AL32UTF8 + - ORACLE_EDITION=free + volumes: + - oracle-data:/opt/oracle/oradata + - ./sql-start-up-scripts:/opt/oracle/scripts/startup + networks: + - oracle-network + healthcheck: + test: + [ + "CMD", + "sqlplus", + "-L", + "system/OraclePassword123@//localhost:1521/FREEPDB1", + "@/dev/null", + ] + interval: 30s + timeout: 10s + retries: 5 + start_period: 60s + +volumes: + oracle-data: + driver: local + +networks: + oracle-network: + driver: bridge diff --git a/package-lock.json b/package-lock.json index a634ac6..9e3fd75 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,15 +1,17 @@ { - "name": "@nucleoidai/node-event", - "version": "1.1.5", + "name": "node-event-test-package", + "version": "1.1.39", "lockfileVersion": 3, "requires": true, "packages": { "": { - "name": "@nucleoidai/node-event", - "version": "1.1.5", + "name": "node-event-test-package", + "version": "1.1.39", "dependencies": { "chalk": "^4.1.2", "kafkajs": "^2.2.4", + "node-event-test-package": "^1.1.45", + "oracledb": "^6.9.0", "prom-client": "^15.1.3", "socket.io": "^4.8.1", "socket.io-client": "^4.8.1", @@ -4397,6 +4399,23 @@ "integrity": "sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==", "dev": true }, + "node_modules/node-event-test-package": { + "version": "1.1.45", + "resolved": "https://registry.npmjs.org/node-event-test-package/-/node-event-test-package-1.1.45.tgz", + "integrity": "sha512-RzPYZZ4mUV6DF/ahkztFgvyFyLxop3fMpz4C/dJQOWr//RLlMWKoG1FFg6q32rXMaCH/jF768ak5TnDWQv0g+w==", + "dependencies": { + "chalk": "^4.1.2", + "kafkajs": "^2.2.4", + "oracledb": "^6.9.0", + "prom-client": "^15.1.3", + "socket.io": "^4.8.1", + "socket.io-client": "^4.8.1", + "uuid": "^9.0.0" + }, + "bin": { + "server": "server/server.js" + } + }, "node_modules/node-int64": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", @@ -4462,6 +4481,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/oracledb": { + "version": "6.9.0", + "resolved": "https://registry.npmjs.org/oracledb/-/oracledb-6.9.0.tgz", + "integrity": "sha512-NwPbIGPv6m0GTFSbyy4/5WEjsKMiiJRxztLmYUcfD3oyh/uXdmVmKOwEWr84wFwWJ/0wQrYQh4PjnzvShibRaA==", + "hasInstallScript": true, + "engines": { + "node": ">=14.17" + } + }, "node_modules/p-limit": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz", diff --git a/package.json b/package.json index bb678d9..3c7e8b7 100644 --- a/package.json +++ b/package.json @@ -1,22 +1,24 @@ { - "name": "@nucleoidai/node-event", - "version": "1.1.5", + "name": "node-event-test-package", + "version": "1.1.66", "description": "Event-driven Message Broker", "main": "index.js", "keywords": [ "event" ], "scripts": { - "test": "echo 'No tests'", "start": "node sample/publisher/index.js && node sample/subscriber/index.js && npx nuc-node-event-test/server", - "build": "npx tsc client/Event.ts --outDir client --declaration && npx tsc src/Event.ts --outDir src --declaration", + "build": "npx tsc --project tsconfig.json", "kafka:up": "docker-compose up -d", "kafka:down": "docker-compose down", - "kafka:test": "node examples/kafka-example.js" + "kafka:test": "node examples/kafka-example.js", + "test": "echo 'No tests specified'" }, "dependencies": { "chalk": "^4.1.2", "kafkajs": "^2.2.4", + "node-event-test-package": "^1.1.45", + "oracledb": "^6.9.0", "prom-client": "^15.1.3", "socket.io": "^4.8.1", "socket.io-client": "^4.8.1", @@ -53,9 +55,9 @@ "types": "./src/Event.d.ts" }, "./client": { - "import": "./client/Event.js", - "require": "./client/Event.js", - "types": "./client/Event.d.ts" + "import": "./client/index.js", + "require": "./client/index.js", + "types": "./client/index.d.ts" }, "./server": { "import": "./server/server.js", diff --git a/pub_test.js b/pub_test.js new file mode 100644 index 0000000..2787d17 --- /dev/null +++ b/pub_test.js @@ -0,0 +1,18 @@ +const { EventManager } = require("./client"); + +const eventManager = new EventManager(); + +eventManager.init({ + type: "txeventq", + connectString: "localhost:1522/FREEPDB1", + user: "txeventq_user", + password: "pass123", + instantClientPath: + "C:\\Users\\Halil\\Downloads\\instantclient-basic-windows.x64-23.9.0.25.07\\instantclient_23_9", + autoCommit: true, +}); + +eventManager.publish("test", { + message: "Hello, world!", +}); + diff --git a/server/server.ts b/server/server.ts index 9d5e3ad..6fe7a9f 100644 --- a/server/server.ts +++ b/server/server.ts @@ -29,7 +29,7 @@ io.on('connection', (socket: Socket) => { } }); - socket.on('publish', ({ type, payload }: { type: string; payload: any }) => { + socket.on('publish', ({ type, payload }: { type: string; payload: object }) => { console.log(`Publish: ${type}`, payload); if (subscriptions[type]) { subscriptions[type].forEach((sid) => { diff --git a/src/Event.ts b/src/Event.ts index 1d8ed16..f43ae26 100644 --- a/src/Event.ts +++ b/src/Event.ts @@ -1,5 +1,4 @@ -import * as client from "prom-client"; - +import client from "prom-client"; import { v4 as uuid } from "uuid"; const subscriptions = {}; @@ -24,6 +23,7 @@ const eventPublishDuration = new client.Histogram({ buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5], }); +// Track payload size for analysis const eventPayloadSize = new client.Histogram({ name: "event_payload_size_bytes", help: "Size of event payloads in bytes", @@ -31,12 +31,14 @@ const eventPayloadSize = new client.Histogram({ buckets: [10, 100, 1000, 10000, 100000, 1000000], }); +// Track error rates const eventPublishErrors = new client.Counter({ name: "event_publish_errors_total", help: "Total number of event publish errors", labelNames: ["event_type", "error_type"], }); +// Track callback processing duration const callbackProcessingDuration = new client.Histogram({ name: "event_callback_duration_seconds", help: "Time taken to process event callbacks", @@ -44,18 +46,21 @@ const callbackProcessingDuration = new client.Histogram({ buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5], }); +// Track subscription rates const subscriptionRate = new client.Counter({ name: "event_subscriptions_total", help: "Total number of event subscriptions created", labelNames: ["event_type"], }); +// Track unsubscription rates const unsubscriptionRate = new client.Counter({ name: "event_unsubscriptions_total", help: "Total number of event unsubscriptions", labelNames: ["event_type"], }); +// Track throughput (events processed per second) const eventThroughput = new client.Counter({ name: "event_callbacks_processed_total", help: "Total number of event callbacks processed successfully", @@ -114,6 +119,7 @@ const subscribe = (...args) => { console.debug("node-event", "unsubscribe", type, id); delete subscriptions[type][id]; + // Track unsubscription unsubscriptionRate.labels(type).inc(); if (Object.keys(subscriptions[type]).length === 0) { @@ -129,6 +135,7 @@ const subscribe = (...args) => { subscriptions[type][id] = registry; + // Update subscription metrics subscriptionRate.labels(type).inc(); eventSubscriptionGauge .labels(type) @@ -152,9 +159,11 @@ const publish = (...args) => { throw new Error("Invalid publish type"); } + // Track metrics for event publishing const endTimer = eventPublishDuration.labels(type).startTimer(); eventPublishCounter.labels(type).inc(); + // Track payload size const payloadSize = JSON.stringify(payload).length; eventPayloadSize.labels(type).observe(payloadSize); diff --git a/test.js b/test.js new file mode 100644 index 0000000..9ae3bcb --- /dev/null +++ b/test.js @@ -0,0 +1,21 @@ +const { EventManager } = require("./client"); + +const eventManager = new EventManager(); + +(async () => { + await eventManager.init({ + type: "txeventq", + connectString: "localhost:1522/FREEPDB1", + user: "txeventq_user", + password: "pass123", + instantClientPath: + "C:\\Users\\Halil\\Downloads\\instantclient-basic-windows.x64-23.9.0.25.07\\instantclient_23_9", + autoCommit: true, + }); +})(); + +await (async () => { + await eventManager.subscribe("test", (payload) => { + console.log(payload); + }); +})(); diff --git a/tsconfig.json b/tsconfig.json index 64df4b9..9307acd 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -2,11 +2,23 @@ "compilerOptions": { "target": "esnext", "module": "nodenext", + "moduleResolution": "NodeNext", "esModuleInterop": true, "forceConsistentCasingInFileNames": true, "strict": true, "skipLibCheck": true, "noImplicitAny": false, - "isolatedModules": true - } + "isolatedModules": true, + "declaration": true, + "outDir": "client", + "rootDir": "client" + }, + "include": [ + "client/**/*" + ], + "exclude": [ + "node_modules", + "**/*.test.ts", + "**/*.spec.ts" + ] }