forked from microsoft/FluidFramework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoutbox.ts
More file actions
694 lines (621 loc) · 24.5 KB
/
outbox.ts
File metadata and controls
694 lines (621 loc) · 24.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/
import type { IBatchMessage } from "@fluidframework/container-definitions/internal";
import type {
ITelemetryBaseLogger,
ITelemetryBaseProperties,
} from "@fluidframework/core-interfaces";
import { assert, Lazy } from "@fluidframework/core-utils/internal";
import {
DataProcessingError,
UsageError,
createChildLogger,
type IFluidErrorBase,
type ITelemetryLoggerExt,
} from "@fluidframework/telemetry-utils/internal";
import type { ICompressionRuntimeOptions } from "../compressionDefinitions.js";
import type {
PendingMessageResubmitData,
PendingStateManager,
} from "../pendingStateManager.js";
import {
BatchManager,
type BatchSequenceNumbers,
sequenceNumbersMatch,
type BatchId,
} from "./batchManager.js";
import type {
LocalBatchMessage,
IBatchCheckpoint,
OutboundBatchMessage,
OutboundSingletonBatch,
LocalBatch,
OutboundBatch,
} from "./definitions.js";
import type { OpCompressor } from "./opCompressor.js";
import type { OpGroupingManager } from "./opGroupingManager.js";
import { serializeOp } from "./opSerialization.js";
import type { OpSplitter } from "./opSplitter.js";
export interface IOutboxConfig {
readonly compressionOptions: ICompressionRuntimeOptions;
/**
* The maximum size of a batch that we can send over the wire.
*/
readonly maxBatchSizeInBytes: number;
/**
* If true, maybeFlushPartialBatch will flush the batch if the reference sequence number changed
* since the batch started. Otherwise, it will throw in this case (apart from reentrancy which is handled elsewhere).
* Once the new throw-based flow is proved in a production environment, this option will be removed.
*/
readonly flushPartialBatches: boolean;
}
export interface IOutboxParameters {
readonly shouldSend: () => boolean;
readonly pendingStateManager: PendingStateManager;
readonly submitBatchFn:
| ((batch: IBatchMessage[], referenceSequenceNumber?: number) => number)
| undefined;
readonly legacySendBatchFn: (batch: OutboundBatch) => number;
readonly config: IOutboxConfig;
readonly compressor: OpCompressor;
readonly splitter: OpSplitter;
readonly logger: ITelemetryBaseLogger;
readonly groupingManager: OpGroupingManager;
readonly getCurrentSequenceNumbers: () => BatchSequenceNumbers;
readonly reSubmit: (message: PendingMessageResubmitData, squash: boolean) => void;
readonly opReentrancy: () => boolean;
}
/**
* Info needed to correctly resubmit a batch
*/
export interface BatchResubmitInfo {
/**
* If defined, indicates the Batch ID of the batch being resubmitted.
* This must be preserved on the new batch about to be submitted so they can be correlated/deduped in case both are sent.
*/
batchId?: string;
/**
* Indicates whether or not this batch is "staged", meaning it should not be sent to the ordering service yet
* This is important on resubmit because we may be in Staging Mode for new changes,
* but resubmitting a non-staged change from before entering Staging Mode
*/
staged: boolean;
}
/**
* Temporarily increase the stack limit while executing the provided action.
* If a negative value is provided for `length`, no stack frames will be collected.
* If Infinity is provided, all frames will be collected.
*
* ADO:4663 - add this to the common packages.
*
* @param action - action which returns an error
* @param length - number of stack frames to collect, 50 if unspecified.
* @returns the result of the action provided
*/
export function getLongStack<T>(action: () => T, length: number = 50): T {
// TODO: better typing here
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-assignment
const errorObj = Error as any;
if (
/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
// ?? is not logically equivalent when the first clause returns false.
(
Object.getOwnPropertyDescriptor(errorObj, "stackTraceLimit") ||
Object.getOwnPropertyDescriptor(Object.getPrototypeOf(errorObj), "stackTraceLimit")
)?.writable !== true
/* eslint-enable @typescript-eslint/prefer-nullish-coalescing */
) {
return action();
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment
const originalStackTraceLimit = errorObj.stackTraceLimit;
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
errorObj.stackTraceLimit = length;
return action();
} finally {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment
errorObj.stackTraceLimit = originalStackTraceLimit;
}
}
/**
* Convert from local batch to outbound batch, including computing contentSizeInBytes.
*/
export function localBatchToOutboundBatch({
staged: _staged, // Peel this off the incoming batch, it's irrelevant (see Note below)
...localBatch
}: LocalBatch): OutboundBatch {
// Note: the staged property might be misleading here, in case a pre-staging batch is resubmitted during staging mode.
// It will be set to true, but the batch was not actually staged.
// Shallow copy each message as we switch types
const outboundMessages = localBatch.messages.map<OutboundBatchMessage>(
({ runtimeOp, ...message }) => ({
contents: serializeOp(runtimeOp),
...message,
}),
);
const contentSizeInBytes = outboundMessages.reduce(
(acc, message) => acc + (message.contents?.length ?? 0),
0,
);
// Shallow copy the local batch, updating the messages to be outbound messages and adding contentSizeInBytes
const outboundBatch: OutboundBatch = {
...localBatch,
messages: outboundMessages,
contentSizeInBytes,
};
return outboundBatch;
}
/**
* Estimated size of the stringification overhead for an op accumulated
* from runtime to loader to the service.
*/
const opOverhead = 200;
/**
* Estimates the real size in bytes on the socket for a given batch. It assumes that
* the envelope size (and the size of an empty op) is 200 bytes, taking into account
* extra overhead from stringification.
*
* @remarks
* Also content will be stringified, and that adds a lot of overhead due to a lot of escape characters.
* Not taking it into account, as compression work should help there - compressed payload will be
* initially stored as base64, and that requires only 2 extra escape characters.
*
* @param batch - the batch to inspect
* @returns An estimate of the payload size in bytes which will be produced when the batch is sent over the wire
*/
export const estimateSocketSize = (batch: OutboundBatch): number => {
return batch.contentSizeInBytes + opOverhead * batch.messages.length;
};
/**
* The Outbox collects messages submitted by the ContainerRuntime into a batch,
* and then flushes the batch when requested.
*
* @remarks There are actually multiple independent batches (some are for a specific message type),
* to support slight variation in semantics for each batch (e.g. support for rebasing or grouping).
*/
export class Outbox {
private readonly logger: ITelemetryLoggerExt;
private readonly mainBatch: BatchManager;
private readonly blobAttachBatch: BatchManager;
private readonly idAllocationBatch: BatchManager;
private batchRebasesToReport = 5;
private rebasing = false;
/**
* Track the number of ops which were detected to have a mismatched
* reference sequence number, in order to self-throttle the telemetry events.
*
* This should be removed as part of ADO:2322
*/
private readonly maxMismatchedOpsToReport = 3;
private mismatchedOpsReported = 0;
constructor(private readonly params: IOutboxParameters) {
this.logger = createChildLogger({ logger: params.logger, namespace: "Outbox" });
this.mainBatch = new BatchManager({ canRebase: true });
this.blobAttachBatch = new BatchManager({ canRebase: true });
this.idAllocationBatch = new BatchManager({
canRebase: false,
ignoreBatchId: true,
});
}
public get messageCount(): number {
return this.mainBatch.length + this.blobAttachBatch.length + this.idAllocationBatch.length;
}
public get mainBatchMessageCount(): number {
return this.mainBatch.length;
}
public get blobAttachBatchMessageCount(): number {
return this.blobAttachBatch.length;
}
public get idAllocationBatchMessageCount(): number {
return this.idAllocationBatch.length;
}
public get isEmpty(): boolean {
return this.messageCount === 0;
}
public containsUserChanges(): boolean {
return (
this.mainBatch.containsUserChanges() || this.blobAttachBatch.containsUserChanges()
// ID Allocation ops are not user changes
);
}
/**
* Detect whether batching has been interrupted by an incoming message being processed. In this case,
* we will flush the accumulated messages to account for that (if allowed) and create a new batch with the new
* message as the first message. If flushing partial batch is not enabled, we will throw (except for reentrant ops).
* This would indicate we expected this case to be precluded by logic elsewhere.
*
* @remarks To detect batch interruption, we compare both the reference sequence number
* (i.e. last message processed by DeltaManager) and the client sequence number of the
* last message processed by the ContainerRuntime. In the absence of op reentrancy, this
* pair will remain stable during a single JS turn during which the batch is being built up.
*/
private maybeFlushPartialBatch(): void {
const mainBatchSeqNums = this.mainBatch.sequenceNumbers;
const blobAttachSeqNums = this.blobAttachBatch.sequenceNumbers;
const idAllocSeqNums = this.idAllocationBatch.sequenceNumbers;
assert(
sequenceNumbersMatch(mainBatchSeqNums, blobAttachSeqNums) &&
sequenceNumbersMatch(mainBatchSeqNums, idAllocSeqNums),
0x58d /* Reference sequence numbers from both batches must be in sync */,
);
const currentSequenceNumbers = this.params.getCurrentSequenceNumbers();
if (
sequenceNumbersMatch(mainBatchSeqNums, currentSequenceNumbers) &&
sequenceNumbersMatch(blobAttachSeqNums, currentSequenceNumbers) &&
sequenceNumbersMatch(idAllocSeqNums, currentSequenceNumbers)
) {
// The reference sequence numbers are stable, there is nothing to do
return;
}
// Reference and/or Client sequence number will be advancing while processing this batch,
// so we can't use this check to detect wrongdoing. But we will still log via telemetry.
// This is rare, and the reentrancy will be handled during Flush.
const expectedDueToReentrancy = this.isContextReentrant();
const errorWrapper = new Lazy(() =>
getLongStack(() =>
DataProcessingError.create(
"Sequence numbers advanced as if ops were processed while a batch is accumulating",
"outboxSequenceNumberCoherencyCheck",
),
),
);
if (++this.mismatchedOpsReported <= this.maxMismatchedOpsToReport) {
this.logger.sendTelemetryEvent(
{
// Only log error if this is truly unexpected
category:
expectedDueToReentrancy || this.params.config.flushPartialBatches
? "generic"
: "error",
eventName: "ReferenceSequenceNumberMismatch",
details: {
expectedDueToReentrancy,
mainReferenceSequenceNumber: mainBatchSeqNums.referenceSequenceNumber,
mainClientSequenceNumber: mainBatchSeqNums.clientSequenceNumber,
blobAttachReferenceSequenceNumber: blobAttachSeqNums.referenceSequenceNumber,
blobAttachClientSequenceNumber: blobAttachSeqNums.clientSequenceNumber,
currentReferenceSequenceNumber: currentSequenceNumbers.referenceSequenceNumber,
currentClientSequenceNumber: currentSequenceNumbers.clientSequenceNumber,
},
},
errorWrapper.value,
);
}
// If we're configured to flush partial batches, do that now and return (don't throw)
if (this.params.config.flushPartialBatches) {
this.flushAll();
return;
}
// If we are in a reentrant context, we know this can happen without causing any harm.
if (expectedDueToReentrancy) {
return;
}
throw errorWrapper.value;
}
public submit(message: LocalBatchMessage): void {
this.maybeFlushPartialBatch();
this.addMessageToBatchManager(this.mainBatch, message);
}
public submitBlobAttach(message: LocalBatchMessage): void {
this.maybeFlushPartialBatch();
this.addMessageToBatchManager(this.blobAttachBatch, message);
}
public submitIdAllocation(message: LocalBatchMessage): void {
this.maybeFlushPartialBatch();
this.addMessageToBatchManager(this.idAllocationBatch, message);
}
private addMessageToBatchManager(
batchManager: BatchManager,
message: LocalBatchMessage,
): void {
batchManager.push(
message,
this.isContextReentrant(),
this.params.getCurrentSequenceNumbers().clientSequenceNumber,
);
}
/**
* Flush all the batches to the ordering service.
* This method is expected to be called at the end of a batch.
*
* @throws If called from a reentrant context, or if the batch being flushed is too large.
* @param resubmitInfo - Key information when flushing a resubmitted batch. Undefined means this is not resubmit.
*/
public flush(resubmitInfo?: BatchResubmitInfo): void {
// We have nothing to flush if all batchManagers are empty, and we we're not needing to resubmit an empty batch placeholder
if (
this.idAllocationBatch.empty &&
this.blobAttachBatch.empty &&
this.mainBatch.empty &&
resubmitInfo?.batchId === undefined
) {
return;
}
assert(
!this.isContextReentrant(),
0xb7b /* Flushing must not happen while incoming changes are being processed */,
);
this.flushAll(resubmitInfo);
}
private flushAll(resubmitInfo?: BatchResubmitInfo): void {
const allBatchesEmpty =
this.idAllocationBatch.empty && this.blobAttachBatch.empty && this.mainBatch.empty;
if (allBatchesEmpty) {
// If we're resubmitting with a batchId and all batches are empty, we need to flush an empty batch.
// Note that we currently resubmit one batch at a time, so on resubmit, 1 of the 2 batches will *always* be empty.
// It's theoretically possible that we don't *need* to resubmit this empty batch, and in those cases, it'll safely be ignored
// by the rest of the system, including remote clients.
// In some cases we *must* resubmit the empty batch (to match up with a non-empty version tracked locally by a container fork), so we do it always.
if (resubmitInfo?.batchId !== undefined) {
this.flushEmptyBatch(resubmitInfo.batchId, resubmitInfo.staged);
}
return;
}
// Don't use resubmittingBatchId for idAllocationBatch.
// ID Allocation messages are not directly resubmitted so don't pass the resubmitInfo
this.flushInternal({
batchManager: this.idAllocationBatch,
// Note: For now, we will never stage ID Allocation messages.
// They won't contain personal info and no harm in extra allocations in case of discarding the staged changes
});
this.flushInternal({
batchManager: this.blobAttachBatch,
disableGroupedBatching: true,
resubmitInfo,
});
this.flushInternal({
batchManager: this.mainBatch,
resubmitInfo,
});
}
private flushEmptyBatch(
resubmittingBatchId: BatchId,
resubmittingStagedBatch: boolean,
): void {
const referenceSequenceNumber =
this.params.getCurrentSequenceNumbers().referenceSequenceNumber;
assert(
referenceSequenceNumber !== undefined,
0xa01 /* reference sequence number should be defined */,
);
const { outboundBatch, placeholderMessage } =
this.params.groupingManager.createEmptyGroupedBatch(
resubmittingBatchId,
referenceSequenceNumber,
);
let clientSequenceNumber: number | undefined;
if (this.params.shouldSend() && !resubmittingStagedBatch) {
clientSequenceNumber = this.sendBatch(outboundBatch);
}
// Push the empty batch placeholder to the PendingStateManager
this.params.pendingStateManager.onFlushEmptyBatch(
placeholderMessage,
clientSequenceNumber,
resubmittingStagedBatch,
);
return;
}
private flushInternal(params: {
batchManager: BatchManager;
disableGroupedBatching?: boolean;
resubmitInfo?: BatchResubmitInfo; // undefined if not resubmitting
}): void {
const { batchManager, disableGroupedBatching = false, resubmitInfo } = params;
if (batchManager.empty) {
return;
}
const rawBatch = batchManager.popBatch(resubmitInfo?.batchId);
// On resubmit we use the original batch's staged state, so these should match as well.
const staged = rawBatch.staged === true;
assert(
resubmitInfo === undefined || resubmitInfo.staged === staged,
0xba3 /* Mismatch in staged state tracking */,
);
const groupingEnabled =
!disableGroupedBatching && this.params.groupingManager.groupedBatchingEnabled();
if (
batchManager.options.canRebase &&
rawBatch.hasReentrantOps === true &&
groupingEnabled
) {
assert(!this.rebasing, 0x6fa /* A rebased batch should never have reentrant ops */);
// If a batch contains reentrant ops (ops created as a result from processing another op)
// it needs to be rebased so that we can ensure consistent reference sequence numbers
// and eventual consistency at the DDS level.
// Note: Since this is happening in the same turn the ops were originally created with,
// and they haven't gone to PendingStateManager yet, we can just let them respect
// ContainerRuntime.inStagingMode. So we do not plumb local 'staged' variable through here.
this.rebase(rawBatch, batchManager);
return;
}
let clientSequenceNumber: number | undefined;
// Did we disconnect? (i.e. is shouldSend false?)
// If so, do nothing, as pending state manager will resubmit it correctly on reconnect.
// Because flush() is a task that executes async (on clean stack), we can get here in disconnected state.
if (this.params.shouldSend() && !staged) {
const virtualizedBatch = this.virtualizeBatch(rawBatch, groupingEnabled);
clientSequenceNumber = this.sendBatch(virtualizedBatch);
assert(
clientSequenceNumber === undefined || clientSequenceNumber >= 0,
0x9d2 /* unexpected negative clientSequenceNumber (empty batch should yield undefined) */,
);
}
this.params.pendingStateManager.onFlushBatch(
rawBatch.messages,
clientSequenceNumber,
staged,
batchManager.options.ignoreBatchId,
);
}
/**
* Rebases a batch. All the ops in the batch are resubmitted to the runtime and
* they will end up back in the same batch manager they were flushed from and subsequently flushed.
*
* @param rawBatch - the batch to be rebased
*/
private rebase(rawBatch: LocalBatch, batchManager: BatchManager): void {
assert(!this.rebasing, 0x6fb /* Reentrancy */);
assert(batchManager.options.canRebase, 0x9a7 /* BatchManager does not support rebase */);
this.rebasing = true;
const squash = false;
for (const message of rawBatch.messages) {
this.params.reSubmit(
{
runtimeOp: message.runtimeOp,
localOpMetadata: message.localOpMetadata,
opMetadata: message.metadata,
},
squash,
);
}
if (this.batchRebasesToReport > 0) {
this.logger.sendTelemetryEvent(
{
eventName: "BatchRebase",
length: rawBatch.messages.length,
referenceSequenceNumber: rawBatch.referenceSequenceNumber,
},
new UsageError("BatchRebase"),
);
this.batchRebasesToReport--;
}
this.flushInternal({ batchManager });
this.rebasing = false;
}
private isContextReentrant(): boolean {
return this.params.opReentrancy() && !this.rebasing;
}
/**
* As necessary and enabled, groups / compresses / chunks the given batch.
*
* @remarks If chunking happens, a side effect here is that 1 or more chunks are queued immediately for sending in next JS turn.
*
* @param localBatch - Local Batch to be virtualized - i.e. transformed into an Outbound Batch
* @param groupingEnabled - If true, Grouped batching is enabled.
* @returns One of the following:
* - (A) The original batch (Based on what's enabled)
* - (B) A grouped batch (it's a singleton batch)
* - (C) A compressed singleton batch
* - (D) A singleton batch containing the last chunk.
*/
private virtualizeBatch(localBatch: LocalBatch, groupingEnabled: boolean): OutboundBatch {
// Shallow copy the local batch, updating the messages to be outbound messages
const originalBatch = localBatchToOutboundBatch(localBatch);
const originalOrGroupedBatch = groupingEnabled
? this.params.groupingManager.groupBatch(originalBatch)
: originalBatch;
if (originalOrGroupedBatch.messages.length !== 1) {
// Compression requires a single message, so return early otherwise.
return originalOrGroupedBatch;
}
// Regardless of whether we grouped or not, we now have a batch with a single message.
// Now proceed to compress/chunk it if necessary.
const singletonBatch = originalOrGroupedBatch as OutboundSingletonBatch;
if (
this.params.config.compressionOptions.minimumBatchSizeInBytes >
singletonBatch.contentSizeInBytes ||
this.params.submitBatchFn === undefined
) {
// Nothing to do if compression is disabled, unnecessary or unsupported.
return singletonBatch;
}
const compressedBatch = this.params.compressor.compressBatch(singletonBatch);
if (this.params.splitter.isBatchChunkingEnabled) {
return compressedBatch.contentSizeInBytes <= this.params.splitter.chunkSizeInBytes
? compressedBatch
: this.params.splitter.splitSingletonBatchMessage(compressedBatch);
}
// We want to distinguish this "BatchTooLarge" case from the generic "BatchTooLarge" case in sendBatch
if (compressedBatch.contentSizeInBytes >= this.params.config.maxBatchSizeInBytes) {
throw this.makeBatchTooLargeError(compressedBatch, "CompressionInsufficient", {
uncompressedSizeInBytes: singletonBatch.contentSizeInBytes,
});
}
return compressedBatch;
}
/**
* Sends the batch object to the container context to be sent over the wire.
*
* @param batch - batch to be sent
* @returns the clientSequenceNumber of the start of the batch, or undefined if nothing was sent
*/
private sendBatch(batch: OutboundBatch): number | undefined {
const length = batch.messages.length;
if (length === 0) {
return undefined; // Nothing submitted
}
const socketSize = estimateSocketSize(batch);
if (socketSize >= this.params.config.maxBatchSizeInBytes) {
throw this.makeBatchTooLargeError(batch, "CannotSend");
}
let clientSequenceNumber: number;
if (this.params.submitBatchFn === undefined) {
// Legacy path - supporting old loader versions. Can be removed only when LTS moves above
// version that has support for batches (submitBatchFn)
assert(
batch.messages[0].compression === undefined,
0x5a6 /* Compression should not have happened if the loader does not support it */,
);
clientSequenceNumber = this.params.legacySendBatchFn(batch);
} else {
assert(batch.referenceSequenceNumber !== undefined, 0x58e /* Batch must not be empty */);
clientSequenceNumber = this.params.submitBatchFn(
batch.messages.map<IBatchMessage>((message) => ({
contents: message.contents,
metadata: message.metadata,
compression: message.compression,
referenceSequenceNumber: message.referenceSequenceNumber,
})),
batch.referenceSequenceNumber,
);
}
// Convert from clientSequenceNumber of last message in the batch to clientSequenceNumber of first message.
clientSequenceNumber -= length - 1;
assert(clientSequenceNumber >= 0, 0x3d0 /* clientSequenceNumber can't be negative */);
return clientSequenceNumber;
}
private makeBatchTooLargeError(
batch: OutboundBatch,
codepath: string,
moreDetails?: ITelemetryBaseProperties,
): IFluidErrorBase {
return DataProcessingError.create(
"BatchTooLarge",
codepath,
/* sequencedMessage */ undefined,
{
errorDetails: {
opCount: batch.messages.length,
contentSizeInBytes: batch.contentSizeInBytes,
socketSize: estimateSocketSize(batch),
maxBatchSizeInBytes: this.params.config.maxBatchSizeInBytes,
groupedBatchingEnabled: this.params.groupingManager.groupedBatchingEnabled(),
compressionOptions: JSON.stringify(this.params.config.compressionOptions),
chunkingEnabled: this.params.splitter.isBatchChunkingEnabled,
chunkSizeInBytes: this.params.splitter.chunkSizeInBytes,
...moreDetails,
},
},
);
}
/**
* Gets a checkpoint object per batch that facilitates iterating over the batch messages when rolling back.
*/
public getBatchCheckpoints(): {
mainBatch: IBatchCheckpoint;
idAllocationBatch: IBatchCheckpoint;
blobAttachBatch: IBatchCheckpoint;
} {
// This variable is declared with a specific type so that we have a standard import of the IBatchCheckpoint type.
// When the type is inferred, the generated .d.ts uses a dynamic import which doesn't resolve.
const mainBatch: IBatchCheckpoint = this.mainBatch.checkpoint();
return {
mainBatch,
idAllocationBatch: this.idAllocationBatch.checkpoint(),
blobAttachBatch: this.blobAttachBatch.checkpoint(),
};
}
}