Skip to content

Commit 92d1689

Browse files
tunniclmMike Tunnicliffe
andauthored
Make publisher confirms more usable with super streams (coders51#276)
* Make publisher confirms more usable with super streams - In order to wait for a publisher confirm reliably using the listener registered against a Client, we need access to the publisherId and connectionId to properly scope a publishingId - PublishConfirmResponse and PublishErrorResponse changed so that publisherId is public (readonly) - Client publish_confirm and publish_error listeners are now passed an extra connectionId argument. Combined with the now visible publisherId, a scoped publishingId can be constructed - SendResult changed to include a reference to the publisher, which allows the caller of SuperStreamPublisher#send() to know which StreamPublisher the message was sent to allowing construction of a scoped publishingId. * Use publisherId and connectionId in SendResult Incorporating feedback that including publisherId and connectionId in SendResult is preferred over the full publisher object. --------- Co-authored-by: Mike Tunnicliffe <[email protected]>
1 parent 87c8f48 commit 92d1689

File tree

5 files changed

+24
-9
lines changed

5 files changed

+24
-9
lines changed

src/client.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import { RouteQuery } from "./requests/route_query"
2323
import { StreamStatsRequest } from "./requests/stream_stats_request"
2424
import { Offset, SubscribeRequest } from "./requests/subscribe_request"
2525
import { UnsubscribeRequest } from "./requests/unsubscribe_request"
26-
import { MetadataUpdateListener, PublishConfirmListener, PublishErrorListener } from "./response_decoder"
26+
import { MetadataUpdateListener } from "./response_decoder"
2727
import { ConsumerUpdateQuery } from "./responses/consumer_update_query"
2828
import { CreateStreamResponse } from "./responses/create_stream_response"
2929
import { CreateSuperStreamResponse } from "./responses/create_super_stream_response"
@@ -43,8 +43,12 @@ import { SuperStreamConsumer } from "./super_stream_consumer"
4343
import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher"
4444
import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, ResponseCode, sample, wait } from "./util"
4545
import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy"
46+
import { PublishConfirmResponse } from "./responses/publish_confirm_response"
47+
import { PublishErrorResponse } from "./responses/publish_error_response"
4648

4749
export type ConnectionClosedListener = (hadError: boolean) => void
50+
export type ConnectionPublishConfirmListener = (confirm: PublishConfirmResponse, connectionId: string) => void
51+
export type ConnectionPublishErrorListener = (confirm: PublishErrorResponse, connectionId: string) => void
4852

4953
export type ClosingParams = { closingCode: number; closingReason: string; manuallyClose?: boolean }
5054

@@ -738,8 +742,8 @@ export class Client {
738742

739743
export type ClientListenersParams = {
740744
metadata_update?: MetadataUpdateListener
741-
publish_confirm?: PublishConfirmListener
742-
publish_error?: PublishErrorListener
745+
publish_confirm?: ConnectionPublishConfirmListener
746+
publish_error?: ConnectionPublishErrorListener
743747
connection_closed?: ConnectionClosedListener
744748
}
745749

src/connection.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,14 @@ export class Connection {
304304
})
305305

306306
if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update)
307-
if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm)
308-
if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error)
307+
if (listeners?.publish_confirm) {
308+
const publishConfirmListener = listeners.publish_confirm
309+
this.decoder.on("publish_confirm", (confirm) => publishConfirmListener(confirm, this.connectionId))
310+
}
311+
if (listeners?.publish_error) {
312+
const publishErrorListener = listeners.publish_error
313+
this.decoder.on("publish_error", (confirm) => publishErrorListener(confirm, this.connectionId))
314+
}
309315
if (listeners?.deliverV1) this.decoder.on("deliverV1", listeners.deliverV1)
310316
if (listeners?.deliverV2) this.decoder.on("deliverV2", listeners.deliverV2)
311317
if (listeners?.consumer_update_query) this.decoder.on("consumer_update_query", listeners.consumer_update_query)

src/publisher.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ export interface Publisher {
155155

156156
export type FilterFunc = (msg: Message) => string | undefined
157157
type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void
158-
export type SendResult = { sent: boolean; publishingId: bigint }
158+
export type SendResult = { sent: boolean; publishingId: bigint; publisherId: number; connectionId: string }
159159
export class StreamPublisher implements Publisher {
160160
private connection: Connection
161161
private stream: string
@@ -304,7 +304,12 @@ export class StreamPublisher implements Publisher {
304304
}
305305
this.checkMessageSize(publishRequestMessage)
306306
const sendCycleNeeded = this.add(publishRequestMessage)
307-
const result = { sent: false, publishingId: publishRequestMessage.publishingId }
307+
const result = {
308+
sent: false,
309+
publishingId: publishRequestMessage.publishingId,
310+
publisherId: this.publisherId,
311+
connectionId: this.connection.connectionId,
312+
}
308313
if (sendCycleNeeded) {
309314
result.sent = await this.sendBuffer()
310315
}

src/responses/publish_confirm_response.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export class PublishConfirmResponse implements Response {
77
static readonly Version = 1
88

99
public publishingIds: bigint[]
10-
private publisherId: number
10+
readonly publisherId: number
1111
constructor(private response: RawPublishConfirmResponse) {
1212
if (this.response.key !== PublishConfirmResponse.key) {
1313
throw new Error(`Unable to create ${PublishConfirmResponse.name} from data of type ${this.response.key}`)

src/responses/publish_error_response.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export class PublishErrorResponse implements Response {
1111
static key = 0x0004
1212
static readonly Version = 1
1313

14-
private publisherId: number
14+
readonly publisherId: number
1515
public publishingError: PublishingError
1616
constructor(private response: RawPublishErrorResponse) {
1717
if (this.response.key !== PublishErrorResponse.key) {

0 commit comments

Comments
 (0)