diff --git a/src/client.ts b/src/client.ts index de6009c7..2690b145 100644 --- a/src/client.ts +++ b/src/client.ts @@ -23,7 +23,7 @@ import { RouteQuery } from "./requests/route_query" import { StreamStatsRequest } from "./requests/stream_stats_request" import { Offset, SubscribeRequest } from "./requests/subscribe_request" import { UnsubscribeRequest } from "./requests/unsubscribe_request" -import { MetadataUpdateListener, PublishConfirmListener, PublishErrorListener } from "./response_decoder" +import { MetadataUpdateListener } from "./response_decoder" import { ConsumerUpdateQuery } from "./responses/consumer_update_query" import { CreateStreamResponse } from "./responses/create_stream_response" import { CreateSuperStreamResponse } from "./responses/create_super_stream_response" @@ -43,8 +43,12 @@ import { SuperStreamConsumer } from "./super_stream_consumer" import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher" import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, ResponseCode, sample, wait } from "./util" import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy" +import { PublishConfirmResponse } from "./responses/publish_confirm_response" +import { PublishErrorResponse } from "./responses/publish_error_response" export type ConnectionClosedListener = (hadError: boolean) => void +export type ConnectionPublishConfirmListener = (confirm: PublishConfirmResponse, connectionId: string) => void +export type ConnectionPublishErrorListener = (confirm: PublishErrorResponse, connectionId: string) => void export type ClosingParams = { closingCode: number; closingReason: string; manuallyClose?: boolean } @@ -738,8 +742,8 @@ export class Client { export type ClientListenersParams = { metadata_update?: MetadataUpdateListener - publish_confirm?: PublishConfirmListener - publish_error?: PublishErrorListener + publish_confirm?: ConnectionPublishConfirmListener + publish_error?: ConnectionPublishErrorListener connection_closed?: ConnectionClosedListener } diff --git a/src/connection.ts b/src/connection.ts index 5ba90ba8..2e087398 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -304,8 +304,14 @@ export class Connection { }) if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update) - if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm) - if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error) + if (listeners?.publish_confirm) { + const publishConfirmListener = listeners.publish_confirm + this.decoder.on("publish_confirm", (confirm) => publishConfirmListener(confirm, this.connectionId)) + } + if (listeners?.publish_error) { + const publishErrorListener = listeners.publish_error + this.decoder.on("publish_error", (confirm) => publishErrorListener(confirm, this.connectionId)) + } if (listeners?.deliverV1) this.decoder.on("deliverV1", listeners.deliverV1) if (listeners?.deliverV2) this.decoder.on("deliverV2", listeners.deliverV2) if (listeners?.consumer_update_query) this.decoder.on("consumer_update_query", listeners.consumer_update_query) diff --git a/src/publisher.ts b/src/publisher.ts index bc757828..8fee00f1 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -155,7 +155,7 @@ export interface Publisher { export type FilterFunc = (msg: Message) => string | undefined type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void -export type SendResult = { sent: boolean; publishingId: bigint } +export type SendResult = { sent: boolean; publishingId: bigint; publisherId: number; connectionId: string } export class StreamPublisher implements Publisher { private connection: Connection private stream: string @@ -304,7 +304,12 @@ export class StreamPublisher implements Publisher { } this.checkMessageSize(publishRequestMessage) const sendCycleNeeded = this.add(publishRequestMessage) - const result = { sent: false, publishingId: publishRequestMessage.publishingId } + const result = { + sent: false, + publishingId: publishRequestMessage.publishingId, + publisherId: this.publisherId, + connectionId: this.connection.connectionId, + } if (sendCycleNeeded) { result.sent = await this.sendBuffer() } diff --git a/src/responses/publish_confirm_response.ts b/src/responses/publish_confirm_response.ts index a5164db5..188ec7c8 100644 --- a/src/responses/publish_confirm_response.ts +++ b/src/responses/publish_confirm_response.ts @@ -7,7 +7,7 @@ export class PublishConfirmResponse implements Response { static readonly Version = 1 public publishingIds: bigint[] - private publisherId: number + readonly publisherId: number constructor(private response: RawPublishConfirmResponse) { if (this.response.key !== PublishConfirmResponse.key) { throw new Error(`Unable to create ${PublishConfirmResponse.name} from data of type ${this.response.key}`) diff --git a/src/responses/publish_error_response.ts b/src/responses/publish_error_response.ts index 56928feb..34a4a6de 100644 --- a/src/responses/publish_error_response.ts +++ b/src/responses/publish_error_response.ts @@ -11,7 +11,7 @@ export class PublishErrorResponse implements Response { static key = 0x0004 static readonly Version = 1 - private publisherId: number + readonly publisherId: number public publishingError: PublishingError constructor(private response: RawPublishErrorResponse) { if (this.response.key !== PublishErrorResponse.key) {