forked from microsoft/FluidFramework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda.ts
More file actions
151 lines (125 loc) · 4.94 KB
/
lambda.ts
File metadata and controls
151 lines (125 loc) · 4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*!
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
import { INack, ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import {
extractBoxcar,
IContext,
INackMessage,
IPartitionLambda,
IPublisher,
ISequencedOperationMessage,
NackOperationType,
SequencedOperationType,
IQueuedMessage,
} from "@fluidframework/server-services-core";
class BroadcasterBatch {
public messages: (ISequencedDocumentMessage | INack)[] = [];
constructor(
public documentId: string,
public tenantId: string,
public event: string) {
}
}
// Set immediate is not available in all environments, specifically it does not work in a browser.
// Fallback to set timeout in those cases
let taskScheduleFunction: (cb: () => any) => unknown;
let clearTaskScheduleTimerFunction: (timer: any) => void;
if (typeof setImmediate === "function") {
taskScheduleFunction = setImmediate;
clearTaskScheduleTimerFunction = clearImmediate;
} else {
taskScheduleFunction = setTimeout;
clearTaskScheduleTimerFunction = clearTimeout;
}
export class BroadcasterLambda implements IPartitionLambda {
private pending = new Map<string, BroadcasterBatch>();
private pendingOffset: IQueuedMessage | undefined;
private current = new Map<string, BroadcasterBatch>();
private messageSendingTimerId: unknown | undefined;
constructor(private readonly publisher: IPublisher, protected context: IContext) {
}
public handler(message: IQueuedMessage): void {
const boxcar = extractBoxcar(message);
for (const baseMessage of boxcar.contents) {
let topic: string | undefined;
let event: string | undefined;
if (baseMessage.type === SequencedOperationType) {
const value = baseMessage as ISequencedOperationMessage;
topic = `${value.tenantId}/${value.documentId}`;
event = "op";
} else if (baseMessage.type === NackOperationType) {
const value = baseMessage as INackMessage;
topic = `client#${value.clientId}`;
event = "nack";
}
if (topic && event) {
const value = baseMessage as INackMessage | ISequencedOperationMessage;
let pendingBatch = this.pending.get(topic);
if (!pendingBatch) {
pendingBatch = new BroadcasterBatch(value.documentId, value.tenantId, event);
this.pending.set(topic, pendingBatch);
}
pendingBatch.messages.push(value.operation);
}
}
this.pendingOffset = message;
this.sendPending();
}
public close() {
this.pending.clear();
this.current.clear();
this.pendingOffset = undefined;
if (this.messageSendingTimerId !== undefined) {
clearTaskScheduleTimerFunction(this.messageSendingTimerId);
this.messageSendingTimerId = undefined;
}
}
public hasPendingWork() {
return this.pending.size !== 0 || this.current.size !== 0;
}
private sendPending() {
if (this.messageSendingTimerId !== undefined) {
// a send is in progress
return;
}
if (this.pending.size === 0) {
// no pending work. checkpoint now if we have a pending offset
if (this.pendingOffset) {
this.context.checkpoint(this.pendingOffset);
this.pendingOffset = undefined;
}
return;
}
// Invoke the next send after a delay to give IO time to create more batches
this.messageSendingTimerId = taskScheduleFunction(async () => {
const batchOffset = this.pendingOffset;
this.current = this.pending;
this.pending = new Map<string, BroadcasterBatch>();
this.pendingOffset = undefined;
// Process all the batches + checkpoint
if (this.publisher.emit) {
const promises: Promise<void>[] = [];
for (const [topic, batch] of this.current) {
promises.push(this.publisher.emit(topic, batch.event, batch.documentId, batch.messages));
}
try {
await Promise.all(promises);
} catch (ex) {
this.context.error(ex, { restart: true });
return;
}
} else {
for (const [topic, batch] of this.current) {
this.publisher.to(topic).emit(batch.event, batch.documentId, batch.messages);
}
}
this.messageSendingTimerId = undefined;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.context.checkpoint(batchOffset!);
this.current.clear();
this.sendPending();
});
}
}