Skip to content

Commit c10915f

Browse files
authored
chore: remove rocketchat:streamer meteor package (#38494)
1 parent 46bae8e commit c10915f

File tree

21 files changed

+587
-129
lines changed

21 files changed

+587
-129
lines changed

.worktrees/replies-refactor

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit e5b749fabc58569dd3d3d059ca1745d9606b661d

apps/meteor/.meteor/packages

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
rocketchat:mongo-config
77
rocketchat:livechat
8-
rocketchat:streamer
98
rocketchat:version
109

1110
accounts-base@3.1.2

apps/meteor/.meteor/versions

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ reload@1.3.2
7070
retry@1.1.1
7171
rocketchat:livechat@0.0.1
7272
rocketchat:mongo-config@0.0.1
73-
rocketchat:streamer@1.1.0
7473
rocketchat:version@1.0.0
7574
routepolicy@1.1.2
7675
service-configuration@1.3.5

apps/meteor/app/importer/server/classes/ImporterWebsocket.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { IImportProgress } from '@rocket.chat/core-typings';
2-
import type { IStreamer } from 'meteor/rocketchat:streamer';
32

3+
import type { IStreamer } from '../../../../server/modules/streamer/types';
44
import notifications from '../../../notifications/server/lib/Notifications';
55

66
class ImporterWebsocketDef {

apps/meteor/app/notifications/client/lib/Presence.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ import { UserStatus } from '@rocket.chat/core-typings';
22
import { Meteor } from 'meteor/meteor';
33

44
import { Presence } from '../../../../client/lib/presence';
5+
import { streamerCentral } from '../../../../client/lib/streamer';
56

67
// TODO implement API on Streamer to be able to listen to all streamed data
78
// this is a hacky way to listen to all streamed data from user-presence Streamer
89

9-
new Meteor.Streamer('user-presence');
10+
streamerCentral.getStreamer('user-presence', { ddpConnection: Meteor.connection });
1011

1112
type args = [username: string, statusChanged?: UserStatus, statusText?: string];
1213

1314
export const STATUS_MAP = [UserStatus.OFFLINE, UserStatus.ONLINE, UserStatus.AWAY, UserStatus.BUSY, UserStatus.DISABLED];
1415

15-
Meteor.StreamerCentral.on('stream-user-presence', (uid: string, [username, statusChanged, statusText]: args) => {
16+
streamerCentral.on('stream-user-presence', (uid: string, [username, statusChanged, statusText]: args) => {
1617
Presence.notify({ _id: uid, username, status: STATUS_MAP[statusChanged as any], statusText });
1718
});

apps/meteor/app/notifications/server/lib/Presence.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import type { IUser } from '@rocket.chat/core-typings';
22
import type { StreamerEvents } from '@rocket.chat/ddp-client';
33
import { Emitter } from '@rocket.chat/emitter';
4-
import type { IPublication, IStreamerConstructor, Connection, IStreamer } from 'meteor/rocketchat:streamer';
4+
5+
import type { IPublication, IStreamerConstructor, Connection, IStreamer } from '../../../../server/modules/streamer/types';
56

67
type UserPresenceStreamProps = {
78
added: IUser['_id'][];
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import * as z from 'zod/mini';
2+
3+
/**
4+
* DDP Message Specification
5+
* https://github.com/meteor/meteor/blob/devel/packages/ddp/DDP.md
6+
*/
7+
8+
// Establishing connection
9+
const ConnectSchema = z.strictObject({
10+
msg: z.literal('connect'),
11+
session: z.optional(z.string()),
12+
version: z.string(),
13+
support: z.array(z.string()),
14+
});
15+
16+
const ConnectedSchema = z.strictObject({
17+
msg: z.literal('connected'),
18+
session: z.string(),
19+
});
20+
21+
const FailedSchema = z.strictObject({
22+
msg: z.literal('failed'),
23+
version: z.string(),
24+
});
25+
26+
// Heartbeats
27+
const PingSchema = z.strictObject({
28+
msg: z.literal('ping'),
29+
id: z.optional(z.string()),
30+
});
31+
32+
const PongSchema = z.strictObject({
33+
msg: z.literal('pong'),
34+
id: z.optional(z.string()),
35+
});
36+
37+
// Managing data
38+
const AddedSchema = z.strictObject({
39+
msg: z.literal('added'),
40+
collection: z.string(),
41+
id: z.json(),
42+
fields: z.optional(z.record(z.string(), z.json())),
43+
// FIXME: Apparently Meteor can send `cleared` in `added`...
44+
cleared: z.optional(z.array(z.string())),
45+
});
46+
47+
const AddedBeforeSchema = z.strictObject({
48+
msg: z.literal('addedBefore'),
49+
collection: z.string(),
50+
id: z.json(),
51+
fields: z.optional(z.record(z.string(), z.json())),
52+
before: z.optional(z.string()),
53+
});
54+
55+
const ChangedSchema = z.strictObject({
56+
msg: z.literal('changed'),
57+
collection: z.string(),
58+
id: z.json(),
59+
fields: z.optional(z.record(z.string(), z.json())),
60+
cleared: z.optional(z.array(z.string())),
61+
});
62+
63+
const MovedBeforeSchema = z.strictObject({
64+
msg: z.literal('movedBefore'),
65+
collection: z.string(),
66+
id: z.json(),
67+
before: z.optional(z.string()),
68+
});
69+
70+
const RemovedSchema = z.strictObject({
71+
msg: z.literal('removed'),
72+
collection: z.string(),
73+
id: z.json(),
74+
});
75+
// Managing subscriptions
76+
const NosubSchema = z.strictObject({
77+
msg: z.literal('nosub'),
78+
id: z.string(),
79+
error: z.optional(z.json()),
80+
});
81+
const ReadySchema = z.strictObject({
82+
msg: z.literal('ready'),
83+
subs: z.array(z.string()),
84+
});
85+
const SubSchema = z.strictObject({
86+
msg: z.literal('sub'),
87+
id: z.string(),
88+
name: z.string(),
89+
params: z.optional(z.array(z.json())),
90+
});
91+
92+
const UnsubSchema = z.strictObject({
93+
msg: z.literal('unsub'),
94+
id: z.string(),
95+
});
96+
97+
// Remote procedure calls
98+
const MethodSchema = z.strictObject({
99+
msg: z.literal('method'),
100+
id: z.string(),
101+
method: z.string(),
102+
params: z.optional(z.array(z.json())),
103+
randomSeed: z.optional(z.string()),
104+
});
105+
106+
const ResultSchema = z.strictObject({
107+
msg: z.literal('result'),
108+
id: z.string(),
109+
error: z.optional(z.json()),
110+
result: z.optional(z.json()),
111+
});
112+
113+
const UpdatedSchema = z.strictObject({
114+
msg: z.literal('updated'),
115+
methods: z.array(z.string()),
116+
});
117+
118+
const ServerIdSchema = z.strictObject({
119+
msg: z.literal('server_id'),
120+
server_id: z.string(),
121+
});
122+
123+
/**
124+
* Main DDP Message Schema
125+
*/
126+
const DDPMessageSchema = z.discriminatedUnion('msg', [
127+
// Connection
128+
ConnectSchema,
129+
ConnectedSchema,
130+
FailedSchema,
131+
132+
// Heartbeats
133+
PingSchema,
134+
PongSchema,
135+
136+
// Data
137+
AddedSchema,
138+
AddedBeforeSchema,
139+
ChangedSchema,
140+
MovedBeforeSchema,
141+
RemovedSchema,
142+
143+
// Subscriptions
144+
NosubSchema,
145+
ReadySchema,
146+
SubSchema,
147+
UnsubSchema,
148+
149+
// RPC
150+
MethodSchema,
151+
ResultSchema,
152+
UpdatedSchema,
153+
154+
// Server ID
155+
ServerIdSchema,
156+
]);
157+
158+
/**
159+
* Inferred TypeScript Type
160+
* This is "erasable" as it compiles away completely.
161+
*/
162+
export type DDPMessage = z.infer<typeof DDPMessageSchema>;
163+
164+
/**
165+
* Parses a DDP message from JSON string
166+
* @param json The JSON string to parse
167+
* @returns The parsed DDP message
168+
* @throws If the JSON is invalid or does not conform to the DDP message schema
169+
*/
170+
export const parseDDPMessage = (json: string): DDPMessage => {
171+
return DDPMessageSchema.parse(JSON.parse(json));
172+
};
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
export type StreamArgs = unknown[];
2+
export type EventHandler<TArgs extends StreamArgs = any> = (...args: TArgs) => void;
3+
4+
type HandlerMap = {
5+
[event: string]: EventHandler[] | undefined;
6+
};
7+
8+
export class EV {
9+
private handlers: HandlerMap = {};
10+
11+
emit(event: string, ...args: StreamArgs): void {
12+
this.handlers[event]?.forEach((handler) => handler.apply(this, args));
13+
}
14+
15+
emitWithScope(event: string, scope: unknown, ...args: StreamArgs): void {
16+
this.handlers[event]?.forEach((handler) => handler.apply(scope, args));
17+
}
18+
19+
listenerCount(event: string): number {
20+
return this.handlers[event]?.length ?? 0;
21+
}
22+
23+
on<TArgs extends StreamArgs>(event: string, callback: EventHandler<TArgs>): void {
24+
if (!this.handlers[event]) {
25+
this.handlers[event] = [];
26+
}
27+
this.handlers[event].push(callback);
28+
}
29+
30+
once(event: string, callback: EventHandler): void {
31+
const onetimeCallback: EventHandler = (...args) => {
32+
this.removeListener(event, onetimeCallback);
33+
callback.apply(this, args);
34+
};
35+
this.on(event, onetimeCallback);
36+
}
37+
38+
removeListener(event: string, callback: EventHandler): void {
39+
const handlers = this.handlers[event];
40+
if (!handlers) {
41+
return;
42+
}
43+
const index = handlers.indexOf(callback);
44+
if (index > -1) {
45+
handlers.splice(index, 1);
46+
}
47+
}
48+
49+
removeAllListeners(event: string): void {
50+
this.handlers[event] = undefined;
51+
}
52+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import { StreamerCentral } from './streamer';
2+
3+
export const streamerCentral = new StreamerCentral();

0 commit comments

Comments
 (0)