Skip to content

Commit f4526f6

Browse files
committed
fix: fix the majordomo example
1 parent 58438fa commit f4526f6

File tree

5 files changed

+92
-89
lines changed

5 files changed

+92
-89
lines changed

examples/majordomo/README.md

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ not send or listen to heartbeats.
66

77
## Running this example
88

9-
To run this example, install the example project depedencies and run the
10-
majordomo example script with `yarn`:
9+
To run this example, install the example project dependencies and run the
10+
majordomo example script with `pnpm`:
1111

1212
```
13-
> yarn install
14-
> yarn majordomo
13+
> pnpm install
14+
> pnpm run majordomo
1515
```
1616

1717
## Expected behaviour
@@ -20,7 +20,13 @@ The example will start a broker and some workers, then do some requests. The
2020
output will be similar to this:
2121

2222
```
23+
2324
starting broker on tcp://127.0.0.1:5555
25+
starting worker on tcp://127.0.0.1:5555
26+
starting worker on tcp://127.0.0.1:5555
27+
starting worker on tcp://127.0.0.1:5555
28+
---------- Started -----------
29+
requesting 'cola' from 'soda'
2430
requesting 'oolong' from 'tea'
2531
requesting 'sencha' from 'tea'
2632
requesting 'earl grey, with milk' from 'tea'
@@ -29,33 +35,33 @@ requesting 'cappuccino' from 'coffee'
2935
requesting 'latte, with soy milk' from 'coffee'
3036
requesting 'espresso' from 'coffee'
3137
requesting 'irish coffee' from 'coffee'
32-
registered worker 00800041af for 'coffee'
33-
dispatching 'coffee' 00800041ab req -> 00800041af
34-
registered worker 00800041b0 for 'tea'
35-
dispatching 'tea' 00800041a7 req -> 00800041b0
36-
registered worker 00800041b1 for 'tea'
37-
dispatching 'tea' 00800041a8 req -> 00800041b1
38-
dispatching 'tea' 00800041a7 <- rep 00800041b0
39-
dispatching 'tea' 00800041a9 req -> 00800041b0
40-
received 'oolong' from 'tea'
41-
dispatching 'coffee' 00800041ab <- rep 00800041af
42-
dispatching 'coffee' 00800041ac req -> 00800041af
43-
received 'cappuccino' from 'coffee'
44-
dispatching 'tea' 00800041a9 <- rep 00800041b0
45-
dispatching 'tea' 00800041aa req -> 00800041b0
46-
received 'earl grey, with milk' from 'tea'
47-
dispatching 'tea' 00800041a8 <- rep 00800041b1
38+
registered worker 00800041a7 for 'tea'
39+
registered worker 00800041a8 for 'coffee'
40+
registered worker 00800041a9 for 'tea'
41+
dispatching 'tea' 00800041ab req -> 00800041a7
42+
dispatching 'tea' 00800041ac req -> 00800041a9
43+
dispatching 'coffee' 00800041af req -> 00800041a8
44+
dispatching 'tea' 00800041ac <- rep 00800041a9
45+
dispatching 'tea' 00800041ad req -> 00800041a9
4846
received 'sencha' from 'tea'
49-
dispatching 'tea' 00800041aa <- rep 00800041b0
50-
received 'jasmine' from 'tea'
51-
dispatching 'coffee' 00800041ac <- rep 00800041af
52-
dispatching 'coffee' 00800041ad req -> 00800041af
47+
dispatching 'tea' 00800041ad <- rep 00800041a9
48+
dispatching 'tea' 00800041ae req -> 00800041a9
49+
received 'earl grey, with milk' from 'tea'
50+
dispatching 'coffee' 00800041af <- rep 00800041a8
51+
dispatching 'coffee' 00800041b0 req -> 00800041a8
52+
received 'cappuccino' from 'coffee'
53+
dispatching 'coffee' 00800041b0 <- rep 00800041a8
54+
dispatching 'coffee' 00800041b1 req -> 00800041a8
5355
received 'latte, with soy milk' from 'coffee'
54-
dispatching 'coffee' 00800041ad <- rep 00800041af
55-
dispatching 'coffee' 00800041ae req -> 00800041af
56+
dispatching 'coffee' 00800041b1 <- rep 00800041a8
57+
dispatching 'coffee' 00800041b2 req -> 00800041a8
5658
received 'espresso' from 'coffee'
57-
dispatching 'coffee' 00800041ae <- rep 00800041af
59+
dispatching 'tea' 00800041ae <- rep 00800041a9
60+
received 'jasmine' from 'tea'
61+
dispatching 'coffee' 00800041b2 <- rep 00800041a8
5862
received 'irish coffee' from 'coffee'
63+
dispatching 'tea' 00800041ab <- rep 00800041a7
64+
received 'oolong' from 'tea'
5965
timeout expired waiting for 'soda'
60-
deregistered worker 00800041b1 for 'tea'
66+
---------- Stopping -----------
6167
```

examples/majordomo/broker.ts

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,18 @@ export class Broker {
1717
console.log(`starting broker on ${this.address}`)
1818
await this.socket.bind(this.address)
1919

20-
const loop = async () => {
21-
for await (const [sender, _blank, header, ...rest] of this.socket) {
22-
switch (header.toString()) {
23-
case Header.Client:
24-
this.handleClient(sender, ...rest)
25-
break
26-
case Header.Worker:
27-
this.handleWorker(sender, ...rest)
28-
break
29-
default:
30-
console.error(`invalid message header: ${header}`)
31-
}
20+
for await (const [sender, _blank, header, ...rest] of this.socket) {
21+
switch (header.toString()) {
22+
case Header.Client:
23+
await this.handleClient(sender as Buffer, ...(rest as Buffer[]))
24+
break
25+
case Header.Worker:
26+
await this.handleWorker(sender as Buffer, ...(rest as Buffer[]))
27+
break
28+
default:
29+
console.error(`invalid message header: ${header}`)
3230
}
3331
}
34-
35-
return loop()
3632
}
3733

3834
async stop() {
@@ -43,33 +39,30 @@ export class Broker {
4339

4440
handleClient(client: Buffer, service?: Buffer, ...req: Buffer[]) {
4541
if (service) {
46-
this.dispatchRequest(client, service, ...req)
42+
return this.dispatchRequest(client, service, ...req)
4743
}
4844
}
4945

5046
handleWorker(worker: Buffer, type?: Buffer, ...rest: Buffer[]) {
5147
switch (type?.toString()) {
5248
case Message.Ready: {
5349
const [service] = rest
54-
this.register(worker, service)
55-
break
50+
return this.register(worker, service)
5651
}
5752

5853
case Message.Reply: {
5954
const [client, _blank, ...rep] = rest
60-
this.dispatchReply(worker, client, ...rep).catch(err => {
55+
return this.dispatchReply(worker, client, ...rep).catch(err => {
6156
console.error(err)
6257
})
63-
break
6458
}
6559

6660
case Message.Heartbeat:
6761
/* Heartbeats not implemented yet. */
6862
break
6963

7064
case Message.Disconnect:
71-
this.deregister(worker)
72-
break
65+
return this.deregister(worker)
7366

7467
default:
7568
console.error(`invalid worker message type: ${type}`)
@@ -78,11 +71,11 @@ export class Broker {
7871

7972
register(worker: Buffer, service: Buffer) {
8073
this.setWorkerService(worker, service)
81-
this.getService(service).register(worker)
74+
return this.getService(service).register(worker)
8275
}
8376

8477
dispatchRequest(client: Buffer, service: Buffer, ...req: Buffer[]) {
85-
this.getService(service).dispatchRequest(client, ...req)
78+
return this.getService(service).dispatchRequest(client, ...req)
8679
}
8780

8881
dispatchReply(worker: Buffer, client: Buffer, ...rep: Buffer[]) {
@@ -92,7 +85,7 @@ export class Broker {
9285

9386
deregister(worker: Buffer) {
9487
const service = this.getWorkerService(worker)
95-
this.getService(service).deregister(worker)
88+
return this.getService(service).deregister(worker)
9689
}
9790

9891
getService(name: Buffer): Service {

examples/majordomo/index.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ async function sleep(msec: number) {
1212
class TeaWorker extends Worker {
1313
service = "tea"
1414

15-
async process(...msgs: Buffer[]): Promise<Buffer[]> {
15+
override async process(...msgs: Buffer[]): Promise<Buffer[]> {
1616
await sleep(Math.random() * 500)
1717
return msgs
1818
}
@@ -21,7 +21,7 @@ class TeaWorker extends Worker {
2121
class CoffeeWorker extends Worker {
2222
service = "coffee"
2323

24-
async process(...msgs: Buffer[]): Promise<Buffer[]> {
24+
override async process(...msgs: Buffer[]): Promise<Buffer[]> {
2525
await sleep(Math.random() * 200)
2626
return msgs
2727
}
@@ -51,10 +51,14 @@ async function request(
5151
}
5252

5353
async function main() {
54-
for (const worker of workers) {
55-
await worker.start()
56-
}
57-
await broker.start()
54+
const started = Promise.all([
55+
// start the broker
56+
broker.start(),
57+
// start the workers
58+
...workers.map(worker => worker.start()),
59+
])
60+
61+
console.log("---------- Started -----------")
5862

5963
/* Requests are issued in parallel. */
6064
await Promise.all([
@@ -69,10 +73,16 @@ async function main() {
6973
request("coffee", "irish coffee"),
7074
])
7175

72-
for (const worker of workers) {
73-
await worker.stop()
74-
}
75-
await broker.stop()
76+
console.log("---------- Stopping -----------")
77+
78+
await Promise.all([
79+
// stop the broker
80+
broker.stop(),
81+
// stop the workers
82+
...workers.map(worker => worker.stop()),
83+
])
84+
// await outstanding promises
85+
await started
7686
}
7787

7888
main().catch(err => {

examples/majordomo/worker.ts

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,25 @@ export class Worker {
1313
}
1414

1515
async start() {
16+
console.log(`starting worker on ${this.address}`)
1617
await this.socket.send([null, Header.Worker, Message.Ready, this.service])
1718

18-
const loop = async () => {
19-
for await (const [
20-
_blank1,
21-
_header,
22-
_type,
23-
client,
24-
_blank2,
25-
...req
26-
] of this.socket) {
27-
const rep = await this.process(...req)
28-
try {
29-
await this.socket.send([
30-
null,
31-
Header.Worker,
32-
Message.Reply,
33-
client,
34-
null,
35-
...rep,
36-
])
37-
} catch (err) {
38-
console.error(`unable to send reply for ${this.address}`)
39-
}
19+
for await (const [_blank1, _header, _type, client, _blank2, ...req] of this
20+
.socket) {
21+
const rep = await this.process(...req)
22+
try {
23+
await this.socket.send([
24+
null,
25+
Header.Worker,
26+
Message.Reply,
27+
client,
28+
null,
29+
...rep,
30+
])
31+
} catch (err) {
32+
console.error(`unable to send reply for ${this.address}`)
4033
}
4134
}
42-
43-
return loop()
4435
}
4536

4637
async stop() {
@@ -55,6 +46,9 @@ export class Worker {
5546
}
5647
}
5748

49+
/**
50+
* @virtual
51+
*/
5852
async process(...req: Buffer[]): Promise<Buffer[]> {
5953
return req
6054
}

examples/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
"ts-node": ">= 0"
99
},
1010
"scripts": {
11-
"majordomo": "ts-node majordomo",
12-
"queue": "ts-node queue",
13-
"threaded-worker": "ts-node threaded-worker"
11+
"majordomo": "ts-node ./majordomo/index.ts",
12+
"queue": "ts-node ./queue/index.ts",
13+
"threaded-worker": "ts-node ./threaded-worker/index.ts"
1414
}
1515
}

0 commit comments

Comments
 (0)