Skip to content

Commit 02b9038

Browse files
author
magne
committed
chore: add error if receiver link is closed
1 parent 45c68b5 commit 02b9038

File tree

2 files changed

+12
-3
lines changed

2 files changed

+12
-3
lines changed

src/consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ export class AmqpConsumer implements Consumer {
104104
start() {
105105
this.receiverLink.on(ReceiverEvents.message, (context: EventContext) => {
106106
if (context.message && context.delivery) {
107-
const deliveryContext = new AmqpDeliveryContext(context.delivery)
107+
const deliveryContext = new AmqpDeliveryContext(context.delivery, this.receiverLink)
108108
this.params.messageHandler(deliveryContext, context.message)
109109
}
110110
})

src/delivery_context.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Delivery } from "rhea"
1+
import { Delivery, Receiver } from "rhea"
22

33
export interface DeliveryContext {
44
accept(): void
@@ -7,17 +7,26 @@ export interface DeliveryContext {
77
}
88

99
export class AmqpDeliveryContext implements DeliveryContext {
10-
constructor(private readonly delivery: Delivery) {}
10+
constructor(
11+
private readonly delivery: Delivery,
12+
private readonly receiverLink: Receiver
13+
) {}
1114

1215
accept(): void {
16+
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")
17+
1318
this.delivery.accept()
1419
}
1520

1621
discard(): void {
22+
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")
23+
1724
this.delivery.reject()
1825
}
1926

2027
requeue(): void {
28+
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")
29+
2130
this.delivery.release()
2231
}
2332
}

0 commit comments

Comments
 (0)