diff --git a/src/client.ts b/src/client.ts index de6009c7..22acd5aa 100644 --- a/src/client.ts +++ b/src/client.ts @@ -43,6 +43,7 @@ import { SuperStreamConsumer } from "./super_stream_consumer" import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher" import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, ResponseCode, sample, wait } from "./util" import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy" +import { Locator } from "./locator" export type ConnectionClosedListener = (hadError: boolean) => void @@ -62,37 +63,191 @@ type DeliverData = { subscriptionId: number consumerId: string } + +export type ClientListenersParams = { + metadata_update?: MetadataUpdateListener + publish_confirm?: PublishConfirmListener + publish_error?: PublishErrorListener + connection_closed?: ConnectionClosedListener +} + +export interface SSLConnectionParams { + key?: string + cert?: string + ca?: string + rejectUnauthorized?: boolean +} + +export type AddressResolverParams = + | { + enabled: true + endpoint?: { host: string; port: number } + } + | { enabled: false } + +//TODO: Create some documentation for this interface +export interface ClientParams { + hostname: string + port: number + username: string + password: string + mechanism?: "PLAIN" | "EXTERNAL" + vhost: string + frameMax?: number + heartbeat?: number + listeners?: ClientListenersParams + ssl?: SSLConnectionParams | boolean + bufferSizeSettings?: BufferSizeSettings + socketTimeout?: number + addressResolver?: AddressResolverParams + leader?: boolean // what does this mean? + streamName?: string + connectionName?: string +} + +export interface DeclarePublisherParams { + stream: string + publisherRef?: string + maxChunkLength?: number + connectionClosedListener?: ConnectionClosedListener +} + +export type RoutingStrategy = "key" | "hash" + +export interface DeclareSuperStreamPublisherParams { + superStream: string + publisherRef?: string + routingStrategy?: RoutingStrategy +} + +export type MessageFilter = (msg: Message) => boolean + +export interface ConsumerFilter { + values: string[] + postFilterFunc: MessageFilter + matchUnfiltered: boolean +} + +export interface DeclareConsumerParams { + stream: string + consumerRef?: string + offset: Offset + connectionClosedListener?: ConnectionClosedListener + consumerUpdateListener?: ConsumerUpdateListener + singleActive?: boolean + filter?: ConsumerFilter + creditPolicy?: ConsumerCreditPolicy + consumerTag?: string +} + +export interface DeclareSuperStreamConsumerParams { + superStream: string + consumerRef?: string + offset?: Offset + creditPolicy?: ConsumerCreditPolicy +} + +export interface SubscribeParams { + subscriptionId: number + stream: string + credit: number + offset: Offset +} + +export interface StoreOffsetParams { + reference: string + stream: string + offsetValue: bigint +} + +export interface QueryOffsetParams { + reference: string + stream: string +} + +export interface QueryMetadataParams { + streams: string[] +} + +export interface QueryPartitionsParams { + superStream: string +} + +export function connect(params: ClientParams, logger?: Logger): Promise { + return Client.connect(params, logger) +} + +class LocatorConnection implements Locator { + constructor( + private locatorConnection: Connection, + private logger: Logger + ) {} + + public async routeQuery(params: { routingKey: string; superStream: string }) { + const res = await this.locatorConnection.sendAndWait(new RouteQuery(params)) + if (!res.ok) { + throw new Error(`Route Query command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) + } + this.logger.info(`Route Response for super stream ${params.superStream}, ${res.streams}`) + return res.streams + } + + public async queryPartitions(params: QueryPartitionsParams): Promise { + const { superStream } = params + const res = await this.locatorConnection.sendAndWait(new PartitionsQuery({ superStream })) + if (!res.ok) { + throw new Error(`Query Partitions command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) + } + this.logger.info(`Returned superstream partitions for superstream ${superStream}`) + return res.streams + } + + queryPublisherSequence(params: { stream: string; publisherRef: string }) { + return this.locatorConnection.queryPublisherSequence(params) + } + + start() { + return this.locatorConnection.start() + } + + close(params: ClosingParams = { closingCode: 0, closingReason: "", manuallyClose: false }) { + return this.locatorConnection.close(params) + } +} + export class Client { public readonly id: string = randomUUID() private consumers = new Map() private publishers = new Map() private compressions = new Map() - private locatorConnection: Connection + private locatorConnection: LocatorConnection private pool: ConnectionPool + public static async connect(params: ClientParams, logger?: Logger): Promise { + return new Client(logger ?? new NullLogger(), { + ...params, + vhost: getVhostOrDefault(params.vhost), + }).start() + } + private constructor( private readonly logger: Logger, private readonly params: ClientParams ) { this.compressions.set(CompressionType.None, NoneCompression.create()) this.compressions.set(CompressionType.Gzip, GzipCompression.create()) - this.locatorConnection = this.getLocatorConnection() + const connectionParams = this.buildConnectionParams({ + leader: false, + streamName: "", + connectionClosedListener: this.params.listeners?.connection_closed, + }) + this.locatorConnection = new LocatorConnection(Connection.create(connectionParams, this.logger), this.logger) this.pool = new ConnectionPool(logger) } - getCompression(compressionType: CompressionType) { - return this.locatorConnection.getCompression(compressionType) - } - - registerCompression(compression: Compression) { - this.locatorConnection.registerCompression(compression) - } - - public start(): Promise { + private start(): Promise { return this.locatorConnection.start().then( - (_res) => { - return this - }, + (_res) => this, (rej) => { if (rej instanceof Error) throw rej throw new Error(`${inspect(rej)}`) @@ -113,6 +268,14 @@ export class Client { await this.locatorConnection.close({ ...params, manuallyClose: true }) } + public getCompression(compressionType: CompressionType) { + return this.locatorConnection.getCompression(compressionType) + } + + public registerCompression(compression: Compression) { + this.locatorConnection.registerCompression(compression) + } + public async queryMetadata(params: QueryMetadataParams): Promise { const { streams } = params const res = await this.locatorConnection.sendAndWait(new MetadataRequest({ streams })) @@ -125,16 +288,6 @@ export class Client { return streamInfos } - public async queryPartitions(params: QueryPartitionsParams): Promise { - const { superStream } = params - const res = await this.locatorConnection.sendAndWait(new PartitionsQuery({ superStream })) - if (!res.ok) { - throw new Error(`Query Partitions command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) - } - this.logger.info(`Returned superstream partitions for superstream ${superStream}`) - return res.streams - } - public async declarePublisher(params: DeclarePublisherParams, filter?: FilterFunc): Promise { const connection = await this.getConnection(params.stream, "publisher", params.connectionClosedListener) const publisherId = connection.getNextPublisherId() @@ -167,6 +320,7 @@ export class Client { return publisher } + // TODO: Why don't move this move to the publisher? public async deletePublisher(extendedPublisherId: string) { const { publisher, connection } = this.publishers.get(extendedPublisherId) ?? { publisher: undefined, @@ -225,6 +379,8 @@ export class Client { return consumer } + // TODO: We have closeConsumer vs deletePublisher, should we unify them? + // TODO: What is the lifecycle of the consumer? Should we close it here or we can close the close on the object? public async closeConsumer(extendedConsumerId: string) { const activeConsumer = this.consumers.get(extendedConsumerId) if (!activeConsumer) { @@ -243,6 +399,7 @@ export class Client { return true } + // TODO: Why do we have the declareSuperStreamConsumer and declareSuperStreamPublisher but not the close? public async declareSuperStreamConsumer( { superStream, offset, consumerRef, creditPolicy }: DeclareSuperStreamConsumerParams, handle: ConsumerFunc @@ -271,20 +428,12 @@ export class Client { }) } + // why make the query here instead of in the consumer? public queryOffset(params: QueryOffsetParams) { return this.locatorConnection.queryOffset(params) } - private async closeAllConsumers() { - await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close())) - this.consumers = new Map() - } - - private async closeAllPublishers() { - await Promise.all([...this.publishers.values()].map((c) => c.publisher.close())) - this.publishers = new Map() - } - + // TODO: VVV --- Why these type of methods? What are the purpose? --- VVV public consumerCounts() { return this.consumers.size } @@ -296,7 +445,9 @@ export class Client { public getConsumers() { return Array.from(this.consumers.values()) } + // ^^^ --- ^^^ + // TODO: VVV --- STREAM MANAGEMENT --- VVV public async createStream(params: { stream: string; arguments?: CreateStreamArguments }): Promise { this.logger.debug(`Create Stream...`) const res = await this.locatorConnection.sendAndWait(new CreateStreamRequest(params)) @@ -381,11 +532,13 @@ export class Client { this.logger.info(`Statistics for stream name ${streamName}, ${res.statistics}`) return res.statistics } + // ^^^ --- ^^^ public getConnectionInfo(): ConnectionInfo { return this.locatorConnection.getConnectionInfo() } + // TODO: Where is it used this function? public async subscribe(params: SubscribeParams): Promise { const res = await this.locatorConnection.sendAndWait(new SubscribeRequest({ ...params })) if (!res.ok) { @@ -394,12 +547,13 @@ export class Client { return res } + // TODO: Write some documentation about this method public async restart() { this.logger.info(`Restarting client connection ${this.locatorConnection.connectionId}`) const uniqueConnectionIds = new Set() uniqueConnectionIds.add(this.locatorConnection.connectionId) - await wait(5000) + await wait(5000) // Can we use something we can verify instead of a fixed wait? await this.locatorConnection.restart() for (const { consumer, connection, params } of this.consumers.values()) { @@ -434,15 +588,6 @@ export class Client { return this.locatorConnection.rabbitManagementVersion } - public async routeQuery(params: { routingKey: string; superStream: string }) { - const res = await this.locatorConnection.sendAndWait(new RouteQuery(params)) - if (!res.ok) { - throw new Error(`Route Query command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) - } - this.logger.info(`Route Response for super stream ${params.superStream}, ${res.streams}`) - return res.streams - } - public async partitionsQuery(params: { superStream: string }) { const res = await this.locatorConnection.sendAndWait(new PartitionsQuery(params)) if (!res.ok) { @@ -452,6 +597,16 @@ export class Client { return res.streams } + private async closeAllConsumers() { + await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close())) + this.consumers = new Map() + } + + private async closeAllPublishers() { + await Promise.all([...this.publishers.values()].map((c) => c.publisher.close())) + this.publishers = new Map() + } + private async declarePublisherOnConnection( params: DeclarePublisherParams, publisherId: number, @@ -544,7 +699,7 @@ export class Client { } } - private handleDelivery = async (deliverData: DeliverData) => { + private async handleDelivery(deliverData: DeliverData) { const { messages, subscriptionId, consumerId, messageFilteringSupported } = deliverData const { consumer, connection } = this.consumers.get(consumerId) ?? { consumer: undefined, @@ -610,11 +765,6 @@ export class Client { return consumer.offset } - private getLocatorConnection() { - const connectionParams = this.buildConnectionParams(false, "", this.params.listeners?.connection_closed) - return Connection.create(connectionParams, this.logger) - } - private async getConnection( streamName: string, purpose: ConnectionPurpose, @@ -661,24 +811,23 @@ export class Client { return { partitions, streamBindingKeys: bindingKeys } } - private buildConnectionParams( - leader: boolean, - streamName: string, + private buildConnectionParams(opts: { + leader: boolean + streamName: string connectionClosedListener?: ConnectionClosedListener - ): ConnectionParams { + }): ConnectionParams { const connectionId = randomUUID() - const connectionListeners = { - ...this.params.listeners, - connection_closed: connectionClosedListener, - deliverV1: this.getDeliverV1Callback(connectionId), - deliverV2: this.getDeliverV2Callback(connectionId), - consumer_update_query: this.getConsumerUpdateCallback(connectionId), - } return { ...this.params, - listeners: connectionListeners, - leader: leader, - streamName: streamName, + listeners: { + ...this.params.listeners, + connection_closed: opts.connectionClosedListener, + deliverV1: this.getDeliverV1Callback(connectionId), + deliverV2: this.getDeliverV2Callback(connectionId), + consumer_update_query: this.getConsumerUpdateCallback(connectionId), + }, + leader: opts.leader, + streamName: opts.streamName, connectionId, } } @@ -690,7 +839,7 @@ export class Client { metadata: StreamMetadata, connectionClosedListener?: ConnectionClosedListener ): Promise { - const connectionParams = this.buildConnectionParams(isPublisher, streamName, connectionClosedListener) + const connectionParams = this.buildConnectionParams({ leader: isPublisher, streamName, connectionClosedListener }) if (this.params.addressResolver && this.params.addressResolver.enabled) { const maxAttempts = computeMaxAttempts(metadata) const resolver = this.params.addressResolver @@ -727,125 +876,6 @@ export class Client { this.consumers.delete(extendedConsumerId) this.logger.info(`Closed consumer with id: ${extendedConsumerId}`) } - - static async connect(params: ClientParams, logger?: Logger): Promise { - return new Client(logger ?? new NullLogger(), { - ...params, - vhost: getVhostOrDefault(params.vhost), - }).start() - } -} - -export type ClientListenersParams = { - metadata_update?: MetadataUpdateListener - publish_confirm?: PublishConfirmListener - publish_error?: PublishErrorListener - connection_closed?: ConnectionClosedListener -} - -export interface SSLConnectionParams { - key?: string - cert?: string - ca?: string - rejectUnauthorized?: boolean -} - -export type AddressResolverParams = - | { - enabled: true - endpoint?: { host: string; port: number } - } - | { enabled: false } - -export interface ClientParams { - hostname: string - port: number - username: string - password: string - mechanism?: "PLAIN" | "EXTERNAL" - vhost: string - frameMax?: number - heartbeat?: number - listeners?: ClientListenersParams - ssl?: SSLConnectionParams | boolean - bufferSizeSettings?: BufferSizeSettings - socketTimeout?: number - addressResolver?: AddressResolverParams - leader?: boolean - streamName?: string - connectionName?: string -} - -export interface DeclarePublisherParams { - stream: string - publisherRef?: string - maxChunkLength?: number - connectionClosedListener?: ConnectionClosedListener -} - -export type RoutingStrategy = "key" | "hash" - -export interface DeclareSuperStreamPublisherParams { - superStream: string - publisherRef?: string - routingStrategy?: RoutingStrategy -} - -export type MessageFilter = (msg: Message) => boolean - -export interface ConsumerFilter { - values: string[] - postFilterFunc: MessageFilter - matchUnfiltered: boolean -} - -export interface DeclareConsumerParams { - stream: string - consumerRef?: string - offset: Offset - connectionClosedListener?: ConnectionClosedListener - consumerUpdateListener?: ConsumerUpdateListener - singleActive?: boolean - filter?: ConsumerFilter - creditPolicy?: ConsumerCreditPolicy - consumerTag?: string -} - -export interface DeclareSuperStreamConsumerParams { - superStream: string - consumerRef?: string - offset?: Offset - creditPolicy?: ConsumerCreditPolicy -} - -export interface SubscribeParams { - subscriptionId: number - stream: string - credit: number - offset: Offset -} - -export interface StoreOffsetParams { - reference: string - stream: string - offsetValue: bigint -} - -export interface QueryOffsetParams { - reference: string - stream: string -} - -export interface QueryMetadataParams { - streams: string[] -} - -export interface QueryPartitionsParams { - superStream: string -} - -export function connect(params: ClientParams, logger?: Logger): Promise { - return Client.connect(params, logger) } const chooseNode = (metadata: { leader?: Broker; replicas?: Broker[] }, leader: boolean): Broker | undefined => { diff --git a/src/locator.ts b/src/locator.ts new file mode 100644 index 00000000..5d725726 --- /dev/null +++ b/src/locator.ts @@ -0,0 +1,4 @@ +export interface Locator { + queryPartitions: (params: { superStream: string }) => Promise + routeQuery: (params: { routingKey: string; superStream: string }) => Promise +} diff --git a/src/super_stream_consumer.ts b/src/super_stream_consumer.ts index 87fac430..94a95c81 100644 --- a/src/super_stream_consumer.ts +++ b/src/super_stream_consumer.ts @@ -1,13 +1,14 @@ import { Client } from "./client" import { Consumer, ConsumerFunc } from "./consumer" import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_policy" +import { Locator } from "./locator" import { Offset } from "./requests/subscribe_request" export class SuperStreamConsumer { private consumers: Map = new Map() public consumerRef: string readonly superStream: string - private locator: Client + private locator: Locator private partitions: string[] private offset: Offset private creditPolicy: ConsumerCreditPolicy @@ -16,7 +17,7 @@ export class SuperStreamConsumer { readonly handle: ConsumerFunc, params: { superStream: string - locator: Client + locator: Locator partitions: string[] consumerRef: string offset: Offset diff --git a/src/super_stream_publisher.ts b/src/super_stream_publisher.ts index bb1b4320..05d3b3b3 100644 --- a/src/super_stream_publisher.ts +++ b/src/super_stream_publisher.ts @@ -1,13 +1,14 @@ import { Client, RoutingStrategy } from "./client" import { CompressionType } from "./compression" import { murmur32 } from "./hash/murmur32" +import { Locator } from "./locator" import { Message, MessageOptions, Publisher, SendResult } from "./publisher" import { bigIntMax } from "./util" export type MessageKeyExtractorFunction = (content: string, opts: MessageOptions) => string | undefined type SuperStreamPublisherParams = { - locator: Client + locator: Locator superStream: string publisherRef?: string routingStrategy?: RoutingStrategy @@ -15,7 +16,7 @@ type SuperStreamPublisherParams = { } export class SuperStreamPublisher { - private locator: Client + private locator: Locator private partitions: string[] = [] private publishers: Map = new Map() private superStream: string