forked from microsoft/FluidFramework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatchManager.ts
More file actions
221 lines (189 loc) · 6.81 KB
/
batchManager.ts
File metadata and controls
221 lines (189 loc) · 6.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/
import { assert } from "@fluidframework/core-utils/internal";
import {
LoggingError,
tagData,
TelemetryDataTag,
} from "@fluidframework/telemetry-utils/internal";
import type { ICompressionRuntimeOptions } from "../compressionDefinitions.js";
import { isContainerMessageDirtyable } from "../containerRuntime.js";
import { asBatchMetadata, type IBatchMetadata } from "../metadata.js";
import type { IPendingMessage } from "../pendingStateManager.js";
import type { LocalBatchMessage, IBatchCheckpoint, LocalBatch } from "./definitions.js";
import { serializeOp } from "./opSerialization.js";
import type { BatchStartInfo } from "./remoteMessageProcessor.js";
export interface IBatchManagerOptions {
readonly compressionOptions?: ICompressionRuntimeOptions;
/**
* If true, the outbox is allowed to rebase the batch during flushing.
*/
readonly canRebase: boolean;
}
export interface BatchSequenceNumbers {
referenceSequenceNumber?: number;
clientSequenceNumber?: number;
}
/**
* Type alias for the batchId stored in batch metadata
*/
export type BatchId = string;
/**
* Compose original client ID and client sequence number into BatchId to stamp on the message during reconnect
*/
export function generateBatchId(originalClientId: string, batchStartCsn: number): BatchId {
return `${originalClientId}_[${batchStartCsn}]`;
}
/**
* Get the effective batch ID for the input argument.
* Supports either an IPendingMessage or BatchStartInfo.
* If the batch ID is explicitly present, return it.
* Otherwise, generate a new batch ID using the client ID and batch start CSN.
*/
export function getEffectiveBatchId(
pendingMessageOrBatchStartInfo: IPendingMessage | BatchStartInfo,
): string {
if ("localOpMetadata" in pendingMessageOrBatchStartInfo) {
const pendingMessage: IPendingMessage = pendingMessageOrBatchStartInfo;
return (
asBatchMetadata(pendingMessage.opMetadata)?.batchId ??
generateBatchId(
pendingMessage.batchInfo.clientId,
pendingMessage.batchInfo.batchStartCsn,
)
);
}
const batchStart: BatchStartInfo = pendingMessageOrBatchStartInfo;
return batchStart.batchId ?? generateBatchId(batchStart.clientId, batchStart.batchStartCsn);
}
/**
* Helper class that manages partial batch & rollback.
*/
export class BatchManager {
private pendingBatch: LocalBatchMessage[] = [];
private hasReentrantOps = false;
public get length(): number {
return this.pendingBatch.length;
}
public get sequenceNumbers(): BatchSequenceNumbers {
return {
referenceSequenceNumber: this.referenceSequenceNumber,
clientSequenceNumber: this.clientSequenceNumber,
};
}
private get referenceSequenceNumber(): number | undefined {
return this.pendingBatch.length === 0
? undefined
: // NOTE: In case of reentrant ops, there could be multiple reference sequence numbers, but we will rebase before submitting.
this.pendingBatch[this.pendingBatch.length - 1].referenceSequenceNumber;
}
/**
* The last-processed CSN when this batch started.
* This is used to ensure that while the batch is open, no incoming ops are processed.
*/
private clientSequenceNumber: number | undefined;
constructor(public readonly options: IBatchManagerOptions) {}
public push(
message: LocalBatchMessage,
reentrant: boolean,
currentClientSequenceNumber?: number,
): void {
this.hasReentrantOps = this.hasReentrantOps || reentrant;
if (this.pendingBatch.length === 0) {
this.clientSequenceNumber = currentClientSequenceNumber;
}
this.pendingBatch.push(message);
}
public get empty(): boolean {
return this.pendingBatch.length === 0;
}
/**
* Gets the pending batch and clears state for the next batch.
*
* @remarks The returned batch does not have batch metadata stamped (batch start/end markers, batchId).
* The caller is responsible for calling {@link addBatchMetadata} after any modifications (e.g. prepending messages).
*/
public popBatch(): LocalBatch {
assert(this.pendingBatch[0] !== undefined, 0xb8a /* expected non-empty batch */);
const batch: LocalBatch = {
messages: this.pendingBatch,
referenceSequenceNumber: this.referenceSequenceNumber,
hasReentrantOps: this.hasReentrantOps,
staged: this.pendingBatch[0].staged,
};
this.pendingBatch = [];
this.clientSequenceNumber = undefined;
this.hasReentrantOps = false;
return batch;
}
/**
* Capture the pending state at this point
*/
public checkpoint(): IBatchCheckpoint {
const startSequenceNumber = this.clientSequenceNumber;
const startPoint = this.pendingBatch.length;
return {
rollback: (process: (message: LocalBatchMessage) => void) => {
this.clientSequenceNumber = startSequenceNumber;
const rollbackOpsLifo = this.pendingBatch.splice(startPoint).reverse();
for (const message of rollbackOpsLifo) {
process(message);
}
const count = this.pendingBatch.length - startPoint;
if (count !== 0) {
throw new LoggingError("Ops generated during rollback", {
count,
...tagData(TelemetryDataTag.UserData, {
ops: serializeOp(this.pendingBatch.slice(startPoint).map((b) => b.runtimeOp)),
}),
});
}
},
};
}
/**
* Does this batch current contain user changes ("dirtyable" ops)?
*/
public containsUserChanges(): boolean {
return this.pendingBatch.some((message) => isContainerMessageDirtyable(message.runtimeOp));
}
}
export const addBatchMetadata = (batch: LocalBatch, batchId?: BatchId): LocalBatch => {
const batchEnd = batch.messages.length - 1;
const firstMsg = batch.messages[0];
const lastMsg = batch.messages[batchEnd];
assert(
firstMsg !== undefined && lastMsg !== undefined,
0x9d1 /* expected non-empty batch */,
);
const firstMetadata: Partial<IBatchMetadata> = firstMsg.metadata ?? {};
const lastMetadata: Partial<IBatchMetadata> = lastMsg.metadata ?? {};
// Multi-message batches: mark the first and last messages with the "batch" flag indicating batch start/end
if (batch.messages.length > 1) {
firstMetadata.batch = true;
lastMetadata.batch = false;
firstMsg.metadata = firstMetadata;
lastMsg.metadata = lastMetadata;
}
// If batchId is provided (e.g. in case of resubmit): stamp it on the first message
if (batchId !== undefined) {
firstMetadata.batchId = batchId;
firstMsg.metadata = firstMetadata;
}
return batch;
};
export const sequenceNumbersMatch = (
seqNums: BatchSequenceNumbers,
otherSeqNums: BatchSequenceNumbers,
): boolean => {
return (
(seqNums.referenceSequenceNumber === undefined ||
otherSeqNums.referenceSequenceNumber === undefined ||
seqNums.referenceSequenceNumber === otherSeqNums.referenceSequenceNumber) &&
(seqNums.clientSequenceNumber === undefined ||
otherSeqNums.clientSequenceNumber === undefined ||
seqNums.clientSequenceNumber === otherSeqNums.clientSequenceNumber)
);
};