forked from microsoft/FluidFramework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopGroupingManager.ts
More file actions
198 lines (176 loc) · 6.5 KB
/
opGroupingManager.ts
File metadata and controls
198 lines (176 loc) · 6.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/
import type { ITelemetryBaseLogger } from "@fluidframework/core-interfaces";
import { assert } from "@fluidframework/core-utils/internal";
import type { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal";
import {
createChildLogger,
type ITelemetryLoggerExt,
} from "@fluidframework/telemetry-utils/internal";
import type {
LocalEmptyBatchPlaceholder,
OutboundBatch,
OutboundSingletonBatch,
} from "./definitions.js";
/**
* The number of ops in a batch above which the batch is considered "large"
* for telemetry purposes. Used by both {@link OpGroupingManager} (GroupLargeBatch event)
* and as the default staging-mode auto-flush threshold.
*/
export const largeBatchThreshold = 1000;
/**
* Grouping makes assumptions about the shape of message contents. This interface codifies those assumptions, but does not validate them.
*/
interface IGroupedBatchMessageContents {
type: typeof OpGroupingManager.groupedBatchOp;
contents: IGroupedMessage[];
}
interface IGroupedMessage {
contents?: unknown;
metadata?: Record<string, unknown>;
compression?: string;
}
function isGroupContents(opContents: unknown): opContents is IGroupedBatchMessageContents {
return (
(opContents as Partial<IGroupedBatchMessageContents>)?.type ===
OpGroupingManager.groupedBatchOp
);
}
export function isGroupedBatch(op: ISequencedDocumentMessage): boolean {
return isGroupContents(op.contents);
}
export interface OpGroupingManagerConfig {
readonly groupedBatchingEnabled: boolean;
}
/**
* This is the type of an empty grouped batch we send over the wire
* We also put this in the placeholder for an empty batch in the PendingStateManager.
* But most places throughout the ContainerRuntime, this will not be used (just as Grouped Batches in general don't appear outside opLifecycle dir)
*/
export interface EmptyGroupedBatch {
type: typeof OpGroupingManager.groupedBatchOp;
contents: readonly [];
}
export class OpGroupingManager {
static readonly groupedBatchOp = "groupedBatch";
private readonly logger: ITelemetryLoggerExt;
constructor(
private readonly config: OpGroupingManagerConfig,
logger: ITelemetryBaseLogger,
) {
this.logger = createChildLogger({ logger, namespace: "OpGroupingManager" });
}
/**
* Creates a new batch with a single message of type "groupedBatch" and empty contents.
* This is needed as a placeholder if a batch becomes empty on resubmit, but we are tracking batch IDs.
* @param resubmittingBatchId - batch ID of the resubmitting batch
* @param referenceSequenceNumber - reference sequence number
* @returns The outbound batch as well as the interior placeholder message
*/
public createEmptyGroupedBatch(
resubmittingBatchId: string,
referenceSequenceNumber: number,
): {
outboundBatch: OutboundSingletonBatch;
placeholderMessage: LocalEmptyBatchPlaceholder;
} {
assert(
this.config.groupedBatchingEnabled,
0xa00 /* cannot create empty grouped batch when grouped batching is disabled */,
);
const emptyGroupedBatch: EmptyGroupedBatch = {
type: "groupedBatch",
contents: [],
};
const serializedOp = JSON.stringify(emptyGroupedBatch);
const placeholderMessage: LocalEmptyBatchPlaceholder = {
metadata: { batchId: resubmittingBatchId },
localOpMetadata: { emptyBatch: true },
referenceSequenceNumber,
runtimeOp: emptyGroupedBatch,
};
const outboundBatch: OutboundSingletonBatch = {
contentSizeInBytes: 0,
messages: [{ ...placeholderMessage, runtimeOp: undefined, contents: serializedOp }],
referenceSequenceNumber,
};
return { outboundBatch, placeholderMessage };
}
/**
* Converts the given batch into a "grouped batch" - a batch with a single message of type "groupedBatch",
* with contents being an array of the original batch's messages.
*
* If the batch already has only 1 message, it is returned as-is.
*
* @remarks Remember that a BatchMessage has its content JSON serialized, so the incoming batch message contents
* must be parsed first, and then the type and contents mentioned above are hidden in that JSON serialization.
*/
public groupBatch(batch: OutboundBatch): OutboundSingletonBatch {
assert(this.groupedBatchingEnabled(), 0xb79 /* grouping disabled! */);
assert(batch.messages.length > 0, 0xb7a /* Unexpected attempt to group an empty batch */);
if (batch.messages.length === 1) {
return batch as OutboundSingletonBatch;
}
// Use > (not >=) so that batches flushed exactly at the staging-mode
// auto-flush threshold (which defaults to largeBatchThreshold) don't
// trigger this event. Only genuinely oversized batches are logged.
if (batch.messages.length > largeBatchThreshold) {
this.logger.sendTelemetryEvent({
eventName: "GroupLargeBatch",
length: batch.messages.length,
reentrant: batch.hasReentrantOps,
referenceSequenceNumber: batch.messages[0].referenceSequenceNumber,
});
}
// We expect this will be on the first message, if present at all.
let groupedBatchId: unknown;
for (const message of batch.messages) {
if (message.metadata !== undefined) {
const { batch: _batch, batchId, ...rest } = message.metadata;
if (batchId !== undefined) {
groupedBatchId = batchId;
}
assert(Object.keys(rest).length === 0, 0x5dd /* cannot group ops with metadata */);
}
}
const serializedContent = JSON.stringify({
type: OpGroupingManager.groupedBatchOp,
contents: batch.messages.map<IGroupedMessage>((message) => ({
contents:
message.contents === undefined
? undefined
: (JSON.parse(message.contents) as unknown),
metadata: message.metadata,
compression: message.compression,
})),
});
const groupedBatch: OutboundSingletonBatch = {
...batch,
messages: [
{
metadata: { batchId: groupedBatchId },
referenceSequenceNumber: batch.messages[0].referenceSequenceNumber,
contents: serializedContent,
},
],
};
return groupedBatch;
}
public ungroupOp(op: ISequencedDocumentMessage): ISequencedDocumentMessage[] {
assert(isGroupContents(op.contents), 0x947 /* can only ungroup a grouped batch */);
const contents: IGroupedBatchMessageContents = op.contents;
let fakeCsn = 1;
return contents.contents.map((subMessage) => ({
...op,
clientSequenceNumber: fakeCsn++,
contents: subMessage.contents,
metadata: subMessage.metadata,
compression: subMessage.compression,
}));
}
public groupedBatchingEnabled(): boolean {
return this.config.groupedBatchingEnabled;
}
}