Skip to content

Commit bd63777

Browse files
author
magne
committed
chore: fix pr comments and add errors in promise rejection
1 parent a861fbd commit bd63777

File tree

3 files changed

+24
-19
lines changed

3 files changed

+24
-19
lines changed

src/connection.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { ConnectionEvents, Connection as RheaConnection } from "rhea"
22

33
export interface Connection {
44
close(): Promise<boolean>
5+
isOpen(): boolean
56
}
67

78
export class AmqpConnection implements Connection {
@@ -17,11 +18,14 @@ export class AmqpConnection implements Connection {
1718
return res(true)
1819
})
1920
this.rheaConnection.once(ConnectionEvents.connectionError, (context) => {
20-
console.error("ERROR: ", context.connection.error)
21-
return rej(false)
21+
return rej(new Error("Connection error: " + context.connection.error))
2222
})
2323

2424
this.rheaConnection.close()
2525
})
2626
}
27+
28+
public isOpen(): boolean {
29+
return this.rheaConnection.is_open()
30+
}
2731
}

src/environment.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export class AmqpEnvironment implements Environment {
2020
private readonly username: string
2121
private readonly password: string
2222
private readonly container: Container
23-
private readonly connections: Connection[] = []
23+
private connections: Connection[] = []
2424

2525
constructor({ host, port, username, password }: EnvironmentParams) {
2626
this.host = host
@@ -38,27 +38,31 @@ export class AmqpEnvironment implements Environment {
3838
return connection
3939
}
4040

41-
async close(): Promise<void> {
42-
await Promise.allSettled(
43-
this.connections.map(async (c) => {
44-
await c.close()
45-
})
46-
)
47-
}
48-
4941
private async openConnection(): Promise<RheaConnection> {
5042
return new Promise((res, rej) => {
5143
this.container.once(ConnectionEvents.connectionOpen, (context) => {
5244
return res(context.connection)
5345
})
5446
this.container.once(ConnectionEvents.error, (context) => {
55-
console.log(context)
56-
rej()
47+
return rej(context.error ?? new Error("Connection error occurred"))
5748
})
5849

5950
this.container.connect({ host: this.host, port: this.port, username: this.username, password: this.password })
6051
})
6152
}
53+
54+
async close(): Promise<void> {
55+
await this.closeConnections()
56+
this.connections = []
57+
}
58+
59+
private async closeConnections(): Promise<void> {
60+
await Promise.allSettled(
61+
this.connections.map(async (c) => {
62+
if (c.isOpen()) await c.close()
63+
})
64+
)
65+
}
6266
}
6367

6468
export function createEnvironment(params: EnvironmentParams): Environment {

test/unit/rhea/connection.test.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ async function open(container: Container, params: ConnectionOptions): Promise<Co
5050
return res(context.connection)
5151
})
5252
container.once("error", (context) => {
53-
console.log(context)
54-
rej()
53+
return rej(context.connection.error)
5554
})
5655
container.connect(params)
5756
})
@@ -63,8 +62,7 @@ async function openReceiver(connection: Connection) {
6362
return res(context.receiver)
6463
})
6564
connection.once("receiver_error", (context) => {
66-
console.log(context)
67-
rej()
65+
return rej(context.connection.error)
6866
})
6967
connection.open_receiver({
7068
snd_settle_mode: 1,
@@ -83,8 +81,7 @@ async function openSender(connection: Connection) {
8381
return res(context.sender)
8482
})
8583
connection.once("sender_error", (context) => {
86-
console.log(context)
87-
rej()
84+
return rej(context.connection.error)
8885
})
8986
connection.open_sender({
9087
snd_settle_mode: 1,

0 commit comments

Comments
 (0)