Skip to content

Commit 6110872

Browse files
fix: ddp-streamer restart keeping opened connections (#35220)
Co-authored-by: Diego Sampaio <chinello@gmail.com>
1 parent 876062a commit 6110872

File tree

10 files changed

+103
-92
lines changed

10 files changed

+103
-92
lines changed

_templates/service/new/service.ejs.t

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ to: ee/apps/<%= name %>/src/service.ts
33
---
44
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
55
import { registerServiceModels } from '@rocket.chat/models';
6-
import { broker } from '@rocket.chat/network-broker';
6+
import { startBroker } from '@rocket.chat/network-broker';
77
import { startTracing } from '@rocket.chat/tracing';
88
import polka from 'polka';
99

@@ -16,7 +16,7 @@ const PORT = process.env.PORT || <%= h.random() %>;
1616

1717
registerServiceModels(db, await getTrashCollection());
1818

19-
api.setBroker(broker);
19+
api.setBroker(startBroker());
2020

2121
// need to import service after models are registered
2222
const { <%= h.changeCase.pascalCase(name) %> } = await import('./<%= h.changeCase.pascalCase(name) %>');

apps/meteor/ee/server/startup/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ import { isRunningMs } from '../../../server/lib/isRunningMs';
1212
export const registerEEBroker = async (): Promise<void> => {
1313
// only starts network broker if running in micro services mode
1414
if (isRunningMs()) {
15-
const { broker } = await import('@rocket.chat/network-broker');
15+
const { startBroker } = await import('@rocket.chat/network-broker');
1616

17-
api.setBroker(broker);
17+
api.setBroker(startBroker());
1818
void api.start();
1919
} else {
2020
require('./presence');

ee/apps/account-service/src/service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
22
import { registerServiceModels } from '@rocket.chat/models';
3-
import { broker } from '@rocket.chat/network-broker';
3+
import { startBroker } from '@rocket.chat/network-broker';
44
import { startTracing } from '@rocket.chat/tracing';
55
import polka from 'polka';
66

@@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3033;
1313

1414
registerServiceModels(db, await getTrashCollection());
1515

16-
api.setBroker(broker);
16+
api.setBroker(startBroker());
1717

1818
// need to import service after models are registered
1919
const { Account } = await import('./Account');

ee/apps/authorization-service/src/service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
22
import { registerServiceModels } from '@rocket.chat/models';
3-
import { broker } from '@rocket.chat/network-broker';
3+
import { startBroker } from '@rocket.chat/network-broker';
44
import { startTracing } from '@rocket.chat/tracing';
55
import polka from 'polka';
66

@@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3034;
1313

1414
registerServiceModels(db, await getTrashCollection());
1515

16-
api.setBroker(broker);
16+
api.setBroker(startBroker());
1717

1818
// need to import service after models are registered
1919
const { Authorization } = await import('../../../../apps/meteor/server/services/authorization/service');

ee/apps/ddp-streamer/src/service.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import os from 'os';
2+
13
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
4+
import { InstanceStatus } from '@rocket.chat/instance-status';
25
import { registerServiceModels } from '@rocket.chat/models';
3-
import { broker } from '@rocket.chat/network-broker';
6+
import { startBroker } from '@rocket.chat/network-broker';
47
import { startTracing } from '@rocket.chat/tracing';
58

69
(async () => {
@@ -10,7 +13,11 @@ import { startTracing } from '@rocket.chat/tracing';
1013

1114
registerServiceModels(db, await getTrashCollection());
1215

13-
api.setBroker(broker);
16+
api.setBroker(
17+
startBroker({
18+
nodeID: `${os.hostname().toLowerCase()}-${InstanceStatus.id()}`,
19+
}),
20+
);
1421

1522
// need to import service after models are registered
1623
const { NotificationsModule } = await import('../../../../apps/meteor/server/modules/notifications/notifications.module');

ee/apps/omnichannel-transcript/src/service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
22
import { Logger } from '@rocket.chat/logger';
33
import { registerServiceModels } from '@rocket.chat/models';
4-
import { broker } from '@rocket.chat/network-broker';
4+
import { startBroker } from '@rocket.chat/network-broker';
55
import { startTracing } from '@rocket.chat/tracing';
66
import polka from 'polka';
77

@@ -14,7 +14,7 @@ const PORT = process.env.PORT || 3036;
1414

1515
registerServiceModels(db, await getTrashCollection());
1616

17-
api.setBroker(broker);
17+
api.setBroker(startBroker());
1818

1919
// need to import service after models are registered
2020
const { OmnichannelTranscript } = await import('@rocket.chat/omnichannel-services');

ee/apps/presence-service/src/service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
22
import { registerServiceModels } from '@rocket.chat/models';
3-
import { broker } from '@rocket.chat/network-broker';
3+
import { startBroker } from '@rocket.chat/network-broker';
44
import { startTracing } from '@rocket.chat/tracing';
55
import polka from 'polka';
66

@@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3031;
1313

1414
registerServiceModels(db, await getTrashCollection());
1515

16-
api.setBroker(broker);
16+
api.setBroker(startBroker());
1717

1818
// need to import Presence service after models are registered
1919
const { Presence } = await import('@rocket.chat/presence');

ee/apps/queue-worker/src/service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
22
import { Logger } from '@rocket.chat/logger';
33
import { registerServiceModels } from '@rocket.chat/models';
4-
import { broker } from '@rocket.chat/network-broker';
4+
import { startBroker } from '@rocket.chat/network-broker';
55
import { startTracing } from '@rocket.chat/tracing';
66
import polka from 'polka';
77

@@ -14,7 +14,7 @@ const PORT = process.env.PORT || 3038;
1414

1515
registerServiceModels(db, await getTrashCollection());
1616

17-
api.setBroker(broker);
17+
api.setBroker(startBroker());
1818

1919
// need to import service after models are registeredpackagfe
2020
const { QueueWorker } = await import('@rocket.chat/omnichannel-services');

ee/apps/stream-hub-service/src/service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
22
import { Logger } from '@rocket.chat/logger';
33
import { DatabaseWatcher, registerServiceModels } from '@rocket.chat/models';
4-
import { broker } from '@rocket.chat/network-broker';
4+
import { startBroker } from '@rocket.chat/network-broker';
55
import { startTracing } from '@rocket.chat/tracing';
66
import polka from 'polka';
77

@@ -16,7 +16,7 @@ const PORT = process.env.PORT || 3035;
1616

1717
registerServiceModels(db, await getTrashCollection());
1818

19-
api.setBroker(broker);
19+
api.setBroker(startBroker());
2020

2121
// TODO having to import Logger to pass as a param is a temporary solution. logger should come from the service (either from broker or api)
2222
const watcher = new DatabaseWatcher({ db, logger: Logger });
Lines changed: 78 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { isMeteorError, MeteorError } from '@rocket.chat/core-services';
22
import EJSON from 'ejson';
3+
import type Moleculer from 'moleculer';
34
import { Errors, Serializers, ServiceBroker } from 'moleculer';
45
import { pino } from 'pino';
56

@@ -70,82 +71,85 @@ class EJSONSerializer extends Base {
7071
}
7172
}
7273

73-
const network = new ServiceBroker({
74-
namespace: MS_NAMESPACE,
75-
skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true',
76-
transporter: TRANSPORTER,
77-
metrics: {
78-
enabled: MS_METRICS === 'true',
79-
reporter: [
80-
{
81-
type: 'Prometheus',
82-
options: {
83-
port: MS_METRICS_PORT,
74+
export function startBroker(options: Moleculer.BrokerOptions = {}): NetworkBroker {
75+
const network = new ServiceBroker({
76+
namespace: MS_NAMESPACE,
77+
skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true',
78+
transporter: TRANSPORTER,
79+
metrics: {
80+
enabled: MS_METRICS === 'true',
81+
reporter: [
82+
{
83+
type: 'Prometheus',
84+
options: {
85+
port: MS_METRICS_PORT,
86+
},
8487
},
85-
},
86-
],
87-
},
88-
cacher: CACHE,
89-
serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER,
90-
logger: {
91-
type: 'Pino',
92-
options: {
93-
level: MOLECULER_LOG_LEVEL,
94-
pino: {
95-
options: {
96-
timestamp: pino.stdTimeFunctions.isoTime,
97-
...(process.env.NODE_ENV !== 'production'
98-
? {
99-
transport: {
100-
target: 'pino-pretty',
101-
options: {
102-
colorize: true,
88+
],
89+
},
90+
cacher: CACHE,
91+
serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER,
92+
logger: {
93+
type: 'Pino',
94+
options: {
95+
level: MOLECULER_LOG_LEVEL,
96+
pino: {
97+
options: {
98+
timestamp: pino.stdTimeFunctions.isoTime,
99+
...(process.env.NODE_ENV !== 'production'
100+
? {
101+
transport: {
102+
target: 'pino-pretty',
103+
options: {
104+
colorize: true,
105+
},
103106
},
104-
},
105-
}
106-
: {}),
107+
}
108+
: {}),
109+
},
107110
},
108111
},
109112
},
110-
},
111-
registry: {
112-
strategy: BALANCE_STRATEGY,
113-
preferLocal: BALANCE_PREFER_LOCAL !== 'false',
114-
},
115-
116-
requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000,
117-
retryPolicy: {
118-
enabled: RETRY_ENABLED === 'true',
119-
retries: parseInt(RETRY_RETRIES),
120-
delay: parseInt(RETRY_DELAY),
121-
maxDelay: parseInt(RETRY_MAX_DELAY),
122-
factor: parseInt(RETRY_FACTOR),
123-
check: (err: any): boolean => err && !!err.retryable,
124-
},
125-
126-
maxCallLevel: 100,
127-
heartbeatInterval: parseInt(HEARTBEAT_INTERVAL),
128-
heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT),
129-
130-
// circuitBreaker: {
131-
// enabled: false,
132-
// threshold: 0.5,
133-
// windowTime: 60,
134-
// minRequestCount: 20,
135-
// halfOpenTime: 10 * 1000,
136-
// check: (err: any): boolean => err && err.code >= 500,
137-
// },
138-
139-
bulkhead: {
140-
enabled: BULKHEAD_ENABLED === 'true',
141-
concurrency: parseInt(BULKHEAD_CONCURRENCY),
142-
maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE),
143-
},
144-
145-
errorRegenerator: new CustomRegenerator(),
146-
started(): void {
147-
console.log('NetworkBroker started successfully.');
148-
},
149-
});
150-
151-
export const broker = new NetworkBroker(network);
113+
registry: {
114+
strategy: BALANCE_STRATEGY,
115+
preferLocal: BALANCE_PREFER_LOCAL !== 'false',
116+
},
117+
118+
requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000,
119+
retryPolicy: {
120+
enabled: RETRY_ENABLED === 'true',
121+
retries: parseInt(RETRY_RETRIES),
122+
delay: parseInt(RETRY_DELAY),
123+
maxDelay: parseInt(RETRY_MAX_DELAY),
124+
factor: parseInt(RETRY_FACTOR),
125+
check: (err: any): boolean => err && !!err.retryable,
126+
},
127+
128+
maxCallLevel: 100,
129+
heartbeatInterval: parseInt(HEARTBEAT_INTERVAL),
130+
heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT),
131+
132+
// circuitBreaker: {
133+
// enabled: false,
134+
// threshold: 0.5,
135+
// windowTime: 60,
136+
// minRequestCount: 20,
137+
// halfOpenTime: 10 * 1000,
138+
// check: (err: any): boolean => err && err.code >= 500,
139+
// },
140+
141+
bulkhead: {
142+
enabled: BULKHEAD_ENABLED === 'true',
143+
concurrency: parseInt(BULKHEAD_CONCURRENCY),
144+
maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE),
145+
},
146+
147+
errorRegenerator: new CustomRegenerator(),
148+
started(): void {
149+
console.log('NetworkBroker started successfully.');
150+
},
151+
...options,
152+
});
153+
154+
return new NetworkBroker(network);
155+
}

0 commit comments

Comments
 (0)