Skip to content

Commit 6e0d1c7

Browse files
l4mbymagne
andauthored
[IS-6/feat]: add publisher support (#26)
* feat: add publisher support * fix: sender error test and remove ts-node * fix: skipped test --------- Co-authored-by: magne <[email protected]>
1 parent 4bde5c3 commit 6e0d1c7

16 files changed

+810
-289
lines changed

package-lock.json

Lines changed: 14 additions & 170 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
"globals": "^16.2.0",
5656
"got": "^14.4.7",
5757
"prettier": "3.5.3",
58-
"ts-node": "^10.9.2",
5958
"tsup": "^8.5.0",
6059
"typescript": "^5.8.3",
6160
"typescript-eslint": "^8.33.1",

src/connection.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
import { ConnectionEvents, create_container, Connection as RheaConnection } from "rhea"
22
import { AmqpManagement, Management } from "./management.js"
33
import { EnvironmentParams } from "./environment.js"
4+
import { AmqpPublisher, Publisher } from "./publisher.js"
5+
import { DestinationOptions } from "./message.js"
46

57
export interface Connection {
68
close(): Promise<boolean>
79
isOpen(): boolean
810
management(): Management
11+
createPublisher(options?: DestinationOptions): Promise<Publisher>
12+
get publishers(): Map<string, Publisher>
913
}
1014

1115
export class AmqpConnection implements Connection {
16+
private _publishers: Map<string, Publisher> = new Map<string, Publisher>()
17+
1218
static async create(params: EnvironmentParams) {
1319
const connection = await AmqpConnection.open(params)
1420
const topologyManagement = await AmqpManagement.create(connection)
@@ -43,6 +49,7 @@ export class AmqpConnection implements Connection {
4349
return rej(new Error("Connection error: " + context.connection.error))
4450
})
4551

52+
this._publishers.forEach((p) => p.close())
4653
this.connection.close()
4754
})
4855
}
@@ -51,6 +58,16 @@ export class AmqpConnection implements Connection {
5158
return this.topologyManagement
5259
}
5360

61+
async createPublisher(options?: DestinationOptions): Promise<Publisher> {
62+
const publisher = await AmqpPublisher.createFrom(this.connection, this._publishers, options)
63+
this._publishers.set(publisher.id, publisher)
64+
return publisher
65+
}
66+
67+
public get publishers(): Map<string, Publisher> {
68+
return this._publishers
69+
}
70+
5471
public isOpen(): boolean {
5572
return this.connection ? this.connection.is_open() : false
5673
}

src/message_builder.ts renamed to src/link_message_builder.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export enum AmqpEndpoints {
1515

1616
export const ME = "$me"
1717

18-
export class MessageBuilder {
18+
export class LinkMessageBuilder {
1919
private messageId: string = generate_uuid()
2020
private to: string = ""
2121
private replyTo: string = ME

0 commit comments

Comments
 (0)