-
Notifications
You must be signed in to change notification settings - Fork 571
Add stagingModeAutoFlushThreshold for staging mode batch control #26577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b0f4781
0a3a73d
ebf2e66
a977adc
7ad4783
2e5e536
869a38c
7d02235
b4921d8
839491d
9c29e85
66cb285
c82ea67
fad8c0e
b026458
4f3aa38
4b4a868
a77b5c7
0dfedf7
5c284e4
fffd94f
bf6a381
8819175
d780ccd
c43a390
3ead929
d590f0e
fa989ed
96ac931
d2d96a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -431,7 +431,7 @@ export class PendingStateManager implements IDisposable { | |
| clientId !== undefined, | ||
| 0xa33 /* clientId (from stateHandler) could only be undefined if we've never connected, but we have a CSN so we know that's not the case */, | ||
| ); | ||
|
|
||
| const batchInfo = { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged }; | ||
| for (const message of batch) { | ||
| const { | ||
| runtimeOp, | ||
|
|
@@ -446,8 +446,7 @@ export class PendingStateManager implements IDisposable { | |
| runtimeOp, | ||
| localOpMetadata, | ||
| opMetadata, | ||
| // Note: We only will read this off the first message, but put it on all for simplicity | ||
| batchInfo: { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged }, | ||
| batchInfo, | ||
| }; | ||
| this.pendingMessages.push(pendingMessage); | ||
| } | ||
|
|
@@ -751,8 +750,12 @@ export class PendingStateManager implements IDisposable { | |
| * Called when the Container's connection state changes. If the Container gets connected, it replays all the pending | ||
| * states in its queue. This includes triggering resubmission of unacked ops. | ||
| * ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer) | ||
| * | ||
| * @returns The unique batch infos for all batches that were replayed. | ||
| */ | ||
anthony-murphy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| public replayPendingStates(options?: ReplayPendingStateOptions): void { | ||
| public replayPendingStates( | ||
| options?: ReplayPendingStateOptions, | ||
| ): IPendingMessage["batchInfo"][] { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @anthony-murphy, @dannimad -- Side note, sometimes I think about changing
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot create an issue for this |
||
| const { committingStagedBatches, squash } = { | ||
| ...defaultReplayPendingStatesOptions, | ||
| ...options, | ||
|
|
@@ -779,6 +782,7 @@ export class PendingStateManager implements IDisposable { | |
|
|
||
| const initialPendingMessagesCount = this.pendingMessages.length; | ||
| let remainingPendingMessagesCount = this.pendingMessages.length; | ||
| const replayedBatchSet = new Set<IPendingMessage["batchInfo"]>(); | ||
|
|
||
| let seenStagedBatch = false; | ||
|
|
||
|
|
@@ -816,6 +820,7 @@ export class PendingStateManager implements IDisposable { | |
| if (asEmptyBatchLocalOpMetadata(pendingMessage.localOpMetadata)?.emptyBatch === true) { | ||
| // Resubmit no messages, with the batchId. Will result in another empty batch marker. | ||
| this.stateHandler.reSubmitBatch([], { batchId, staged, squash }); | ||
| replayedBatchSet.add(pendingMessage.batchInfo); | ||
| continue; | ||
| } | ||
|
|
||
|
|
@@ -842,6 +847,7 @@ export class PendingStateManager implements IDisposable { | |
| ], | ||
| { batchId, staged, squash }, | ||
| ); | ||
| replayedBatchSet.add(pendingMessage.batchInfo); | ||
| continue; | ||
| } | ||
| // else: batchMetadataFlag === true (It's a typical multi-message batch) | ||
|
|
@@ -881,6 +887,7 @@ export class PendingStateManager implements IDisposable { | |
| } | ||
|
|
||
| this.stateHandler.reSubmitBatch(batch, { batchId, staged, squash }); | ||
| replayedBatchSet.add(pendingMessage.batchInfo); | ||
| } | ||
|
|
||
| if (!committingStagedBatches) { | ||
|
|
@@ -898,6 +905,8 @@ export class PendingStateManager implements IDisposable { | |
| clientId: this.stateHandler.clientId(), | ||
| }); | ||
| } | ||
|
|
||
| return [...replayedBatchSet]; | ||
markfields marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -908,11 +917,13 @@ export class PendingStateManager implements IDisposable { | |
| // callback will only be given staged messages with a valid runtime op (i.e. not empty batch and not an initial message with only serialized content) | ||
| stagedMessage: IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage }, | ||
| ) => void, | ||
| ): void { | ||
| ): IPendingMessage["batchInfo"][] { | ||
| const batchSet = new Set<IPendingMessage["batchInfo"]>(); | ||
| while (!this.pendingMessages.isEmpty()) { | ||
| const stagedMessage = this.pendingMessages.peekBack(); | ||
| if (stagedMessage?.batchInfo.staged === true) { | ||
| this.pendingMessages.pop(); | ||
| batchSet.add(stagedMessage.batchInfo); | ||
|
|
||
| if (hasTypicalRuntimeOp(stagedMessage)) { | ||
| callback(stagedMessage); | ||
|
|
@@ -925,6 +936,7 @@ export class PendingStateManager implements IDisposable { | |
| this.pendingMessages.toArray().every((m) => m.batchInfo.staged !== true), | ||
| 0xb89 /* Shouldn't be any more staged messages */, | ||
| ); | ||
| return [...batchSet]; | ||
markfields marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.