Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions _templates/service/new/service.ejs.t
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ to: ee/apps/<%= name %>/src/service.ts
---
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';

Expand All @@ -16,7 +16,7 @@ const PORT = process.env.PORT || <%= h.random() %>;

registerServiceModels(db, await getTrashCollection());

api.setBroker(broker);
api.setBroker(startBroker());

// need to import service after models are registered
const { <%= h.changeCase.pascalCase(name) %> } = await import('./<%= h.changeCase.pascalCase(name) %>');
Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/ee/server/startup/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import { isRunningMs } from '../../../server/lib/isRunningMs';
export const registerEEBroker = async (): Promise<void> => {
// only starts network broker if running in micro services mode
if (isRunningMs()) {
const { broker } = await import('@rocket.chat/network-broker');
const { startBroker } = await import('@rocket.chat/network-broker');

api.setBroker(broker);
api.setBroker(startBroker());
void api.start();
} else {
require('./presence');
Expand Down
4 changes: 2 additions & 2 deletions ee/apps/account-service/src/service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';

Expand All @@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3033;

registerServiceModels(db, await getTrashCollection());

api.setBroker(broker);
api.setBroker(startBroker());

// need to import service after models are registered
const { Account } = await import('./Account');
Expand Down
4 changes: 2 additions & 2 deletions ee/apps/authorization-service/src/service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';

Expand All @@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3034;

registerServiceModels(db, await getTrashCollection());

api.setBroker(broker);
api.setBroker(startBroker());

// need to import service after models are registered
const { Authorization } = await import('../../../../apps/meteor/server/services/authorization/service');
Expand Down
11 changes: 9 additions & 2 deletions ee/apps/ddp-streamer/src/service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os from 'os';

import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { InstanceStatus } from '@rocket.chat/instance-status';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';

(async () => {
Expand All @@ -10,7 +13,11 @@ import { startTracing } from '@rocket.chat/tracing';

registerServiceModels(db, await getTrashCollection());

api.setBroker(broker);
api.setBroker(
startBroker({
nodeID: `${os.hostname().toLowerCase()}-${InstanceStatus.id()}`,
}),
);

// need to import service after models are registered
const { NotificationsModule } = await import('../../../../apps/meteor/server/modules/notifications/notifications.module');
Expand Down
4 changes: 2 additions & 2 deletions ee/apps/omnichannel-transcript/src/service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { Logger } from '@rocket.chat/logger';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';

Expand All @@ -14,7 +14,7 @@ const PORT = process.env.PORT || 3036;

registerServiceModels(db, await getTrashCollection());

api.setBroker(broker);
api.setBroker(startBroker());

// need to import service after models are registered
const { OmnichannelTranscript } = await import('@rocket.chat/omnichannel-services');
Expand Down
4 changes: 2 additions & 2 deletions ee/apps/presence-service/src/service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';

Expand All @@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3031;

registerServiceModels(db, await getTrashCollection());

api.setBroker(broker);
api.setBroker(startBroker());

// need to import Presence service after models are registered
const { Presence } = await import('@rocket.chat/presence');
Expand Down
4 changes: 2 additions & 2 deletions ee/apps/queue-worker/src/service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { Logger } from '@rocket.chat/logger';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';

Expand All @@ -14,7 +14,7 @@ const PORT = process.env.PORT || 3038;

registerServiceModels(db, await getTrashCollection());

api.setBroker(broker);
api.setBroker(startBroker());

// need to import service after models are registeredpackagfe
const { QueueWorker } = await import('@rocket.chat/omnichannel-services');
Expand Down
4 changes: 2 additions & 2 deletions ee/apps/stream-hub-service/src/service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { Logger } from '@rocket.chat/logger';
import { DatabaseWatcher, registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';

Expand All @@ -16,7 +16,7 @@ const PORT = process.env.PORT || 3035;

registerServiceModels(db, await getTrashCollection());

api.setBroker(broker);
api.setBroker(startBroker());

// TODO having to import Logger to pass as a param is a temporary solution. logger should come from the service (either from broker or api)
const watcher = new DatabaseWatcher({ db, logger: Logger });
Expand Down
152 changes: 78 additions & 74 deletions ee/packages/network-broker/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { isMeteorError, MeteorError } from '@rocket.chat/core-services';
import EJSON from 'ejson';
import type Moleculer from 'moleculer';
import { Errors, Serializers, ServiceBroker } from 'moleculer';
import { pino } from 'pino';

Expand Down Expand Up @@ -70,82 +71,85 @@ class EJSONSerializer extends Base {
}
}

const network = new ServiceBroker({
namespace: MS_NAMESPACE,
skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true',
transporter: TRANSPORTER,
metrics: {
enabled: MS_METRICS === 'true',
reporter: [
{
type: 'Prometheus',
options: {
port: MS_METRICS_PORT,
export function startBroker(options?: Moleculer.BrokerOptions): NetworkBroker {
const network = new ServiceBroker({
namespace: MS_NAMESPACE,
skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true',
transporter: TRANSPORTER,
metrics: {
enabled: MS_METRICS === 'true',
reporter: [
{
type: 'Prometheus',
options: {
port: MS_METRICS_PORT,
},
},
},
],
},
cacher: CACHE,
serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER,
logger: {
type: 'Pino',
options: {
level: MOLECULER_LOG_LEVEL,
pino: {
options: {
timestamp: pino.stdTimeFunctions.isoTime,
...(process.env.NODE_ENV !== 'production'
? {
transport: {
target: 'pino-pretty',
options: {
colorize: true,
],
},
cacher: CACHE,
serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER,
logger: {
type: 'Pino',
options: {
level: MOLECULER_LOG_LEVEL,
pino: {
options: {
timestamp: pino.stdTimeFunctions.isoTime,
...(process.env.NODE_ENV !== 'production'
? {
transport: {
target: 'pino-pretty',
options: {
colorize: true,
},
},
},
}
: {}),
}
: {}),
},
},
},
},
},
registry: {
strategy: BALANCE_STRATEGY,
preferLocal: BALANCE_PREFER_LOCAL !== 'false',
},

requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000,
retryPolicy: {
enabled: RETRY_ENABLED === 'true',
retries: parseInt(RETRY_RETRIES),
delay: parseInt(RETRY_DELAY),
maxDelay: parseInt(RETRY_MAX_DELAY),
factor: parseInt(RETRY_FACTOR),
check: (err: any): boolean => err && !!err.retryable,
},

maxCallLevel: 100,
heartbeatInterval: parseInt(HEARTBEAT_INTERVAL),
heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT),

// circuitBreaker: {
// enabled: false,
// threshold: 0.5,
// windowTime: 60,
// minRequestCount: 20,
// halfOpenTime: 10 * 1000,
// check: (err: any): boolean => err && err.code >= 500,
// },

bulkhead: {
enabled: BULKHEAD_ENABLED === 'true',
concurrency: parseInt(BULKHEAD_CONCURRENCY),
maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE),
},

errorRegenerator: new CustomRegenerator(),
started(): void {
console.log('NetworkBroker started successfully.');
},
});

export const broker = new NetworkBroker(network);
registry: {
strategy: BALANCE_STRATEGY,
preferLocal: BALANCE_PREFER_LOCAL !== 'false',
},

requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000,
retryPolicy: {
enabled: RETRY_ENABLED === 'true',
retries: parseInt(RETRY_RETRIES),
delay: parseInt(RETRY_DELAY),
maxDelay: parseInt(RETRY_MAX_DELAY),
factor: parseInt(RETRY_FACTOR),
check: (err: any): boolean => err && !!err.retryable,
},

maxCallLevel: 100,
heartbeatInterval: parseInt(HEARTBEAT_INTERVAL),
heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT),

// circuitBreaker: {
// enabled: false,
// threshold: 0.5,
// windowTime: 60,
// minRequestCount: 20,
// halfOpenTime: 10 * 1000,
// check: (err: any): boolean => err && err.code >= 500,
// },

bulkhead: {
enabled: BULKHEAD_ENABLED === 'true',
concurrency: parseInt(BULKHEAD_CONCURRENCY),
maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE),
},

errorRegenerator: new CustomRegenerator(),
started(): void {
console.log('NetworkBroker started successfully.');
},
...(options || {}),
});

return new NetworkBroker(network);
}
Loading