forked from microsoft/FluidFramework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontainer.ts
More file actions
2686 lines (2412 loc) · 91.3 KB
/
container.ts
File metadata and controls
2686 lines (2412 loc) · 91.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/
/* eslint-disable unicorn/consistent-function-scoping */
import {
TypedEventEmitter,
performanceNow,
type ILayerCompatDetails,
} from "@fluid-internal/client-utils";
import {
AttachState,
type IAudience,
type ICriticalContainerError,
} from "@fluidframework/container-definitions";
import type {
ContainerWarning,
IBatchMessage,
ICodeDetailsLoader,
IContainer,
IContainerEvents,
IContainerLoadMode,
IDeltaManager,
IFluidCodeDetails,
IFluidCodeDetailsComparer,
IFluidModuleWithDetails,
IProvideFluidCodeDetailsComparer,
IProvideRuntimeFactory,
IRuntime,
ReadOnlyInfo,
ILoader,
ILoaderOptions,
IContainerStorageService,
} from "@fluidframework/container-definitions/internal";
import { isFluidCodeDetails } from "@fluidframework/container-definitions/internal";
import {
type FluidObject,
type IEvent,
type IRequest,
type ITelemetryBaseProperties,
LogLevel,
} from "@fluidframework/core-interfaces";
import type { ISignalEnvelope } from "@fluidframework/core-interfaces/internal";
import { assert, isPromiseLike, unreachableCase } from "@fluidframework/core-utils/internal";
import {
type IClient,
type IClientDetails,
type IQuorumClients,
type ISequencedClient,
type ISummaryTree,
SummaryType,
} from "@fluidframework/driver-definitions";
import {
type IDocumentService,
type IDocumentServiceFactory,
type IResolvedUrl,
type ISnapshot,
type IThrottlingWarning,
type IUrlResolver,
type ICommittedProposal,
type IDocumentAttributes,
type IDocumentMessage,
type IQuorumProposals,
type ISequencedProposal,
type ISnapshotTree,
type ISummaryContent,
type IVersion,
MessageType,
type ISequencedDocumentMessage,
type ISignalMessage,
type ConnectionMode,
} from "@fluidframework/driver-definitions/internal";
import {
getSnapshotTree,
OnlineStatus,
isCombinedAppAndProtocolSummary,
isInstanceOfISnapshot,
isOnline,
readAndParse,
runWithRetry,
type CombinedAppAndProtocolSummary,
} from "@fluidframework/driver-utils/internal";
import {
type TelemetryEventCategory,
type ITelemetryLoggerExt,
EventEmitterWithErrorHandling,
GenericError,
type IFluidErrorBase,
type MonitoringContext,
PerformanceEvent,
UsageError,
connectedEventName,
createChildLogger,
createChildMonitoringContext,
formatTick,
normalizeError,
raiseConnectedEvent,
wrapError,
loggerToMonitoringContext,
type ITelemetryErrorEventExt,
} from "@fluidframework/telemetry-utils/internal";
import structuredClone from "@ungap/structured-clone";
import { v4 as uuid } from "uuid";
import {
type AttachProcessProps,
type AttachmentData,
runRetriableAttachProcess,
} from "./attachment.js";
import { Audience } from "./audience.js";
import { ConnectionManager } from "./connectionManager.js";
import { ConnectionState } from "./connectionState.js";
import {
type IConnectionStateHandler,
createConnectionStateHandler,
} from "./connectionStateHandler.js";
import { ContainerContext } from "./containerContext.js";
import { ContainerStorageAdapter } from "./containerStorageAdapter.js";
import {
type IConnectionDetailsInternal,
type IConnectionManagerFactoryArgs,
type IConnectionStateChangeReason,
ReconnectMode,
getPackageName,
} from "./contracts.js";
import { DeltaManager, type IConnectionArgs } from "./deltaManager.js";
import { RelativeLoader } from "./loader.js";
import {
validateDriverCompatibility,
validateRuntimeCompatibility,
} from "./loaderLayerCompatState.js";
import {
createMemoryDetachedBlobStorage,
tryInitializeMemoryDetachedBlobStorage,
type MemoryDetachedBlobStorage,
} from "./memoryBlobStorage.js";
import { NoopHeuristic } from "./noopHeuristic.js";
import { pkgVersion } from "./packageVersion.js";
import type { IQuorumSnapshot } from "./protocol/index.js";
import {
type InternalProtocolHandlerBuilder,
ProtocolHandler,
type ProtocolHandlerBuilder,
type ProtocolHandlerInternal,
protocolHandlerShouldProcessSignal,
wrapProtocolHandlerBuilder,
} from "./protocol.js";
import { initQuorumValuesFromCodeDetails } from "./quorum.js";
import {
type IPendingContainerState,
type IPendingDetachedContainerState,
SerializedStateManager,
} from "./serializedStateManager.js";
import {
combineAppAndProtocolSummary,
combineSnapshotTreeAndSnapshotBlobs,
getDetachedContainerStateFromSerializedContainer,
getDocumentAttributes,
getProtocolSnapshotTree,
getISnapshotFromSerializedContainer,
runSingle,
convertISnapshotToSnapshotWithBlobs,
convertSnapshotInfoToSnapshot,
} from "./utils.js";
const detachedContainerRefSeqNumber = 0;
const dirtyContainerEvent = "dirty";
const savedContainerEvent = "saved";
const packageNotFactoryError = "Code package does not implement IRuntimeFactory";
/**
* @internal
*/
export interface IContainerLoadProps {
/**
* The resolved url of the container being loaded
*/
readonly resolvedUrl: IResolvedUrl;
/**
* Control which snapshot version to load from. See IParsedUrl for detailed information.
*/
readonly version: string | undefined;
/**
* Loads the Container in paused state if true, unpaused otherwise.
*/
readonly loadMode?: IContainerLoadMode;
/**
* The pending state serialized from a previous container instance
*/
readonly pendingLocalState?: IPendingContainerState;
}
/**
* @internal
*/
export interface IContainerCreateProps {
/**
* Disables the Container from reconnecting if false, allows reconnect otherwise.
*/
readonly canReconnect?: boolean;
/**
* Client details provided in the override will be merged over the default client.
*/
readonly clientDetailsOverride?: IClientDetails;
/**
* The url resolver used by the loader for resolving external urls
* into Fluid urls such that the container specified by the
* external url can be loaded.
*/
readonly urlResolver: IUrlResolver;
/**
* The document service factory take the Fluid url provided
* by the resolved url and constructs all the necessary services
* for communication with the container's server.
*/
readonly documentServiceFactory: IDocumentServiceFactory;
/**
* The code loader handles loading the necessary code
* for running a container once it is loaded.
*/
readonly codeLoader: ICodeDetailsLoader;
/**
* A property bag of options used by various layers
* to control features
*/
readonly options: ILoaderOptions;
/**
* Scope is provided to all container and is a set of shared
* services for container's to integrate with their host environment.
*/
readonly scope: FluidObject;
/**
* The logger downstream consumers should construct their loggers from
*/
readonly subLogger: ITelemetryLoggerExt;
/**
* Optional property for allowing the container to use a custom
* protocol implementation for handling the quorum and/or the audience.
*/
readonly protocolHandlerBuilder?: ProtocolHandlerBuilder;
/**
* Optional property for specifying a timeout for retry connection loop.
*
* If provided, container will use this value as the maximum time to wait
* for a successful connection before giving up throwing the most recent error.
*
* If not provided, default behavior will be to retry until non-retryable error occurs.
*/
readonly retryConnectionTimeoutMs?: number;
}
/**
* Waits until container connects to delta storage and gets up-to-date.
*
* Useful when resolving URIs and hitting 404, due to container being loaded from (stale) snapshot and not being
* up to date. Host may chose to wait in such case and retry resolving URI.
*
* Warning: Will wait infinitely for connection to establish if there is no connection.
* May result in deadlock if Container.disconnect() is called and never followed by a call to Container.connect().
*
* @returns `true`: container is up to date, it processed all the ops that were know at the time of first connection.
*
* `false`: storage does not provide indication of how far the client is. Container processed all the ops known to it,
* but it maybe still behind.
*
* @throws an error beginning with `"Container closed"` if the container is closed before it catches up.
* @legacy @beta
*/
export async function waitContainerToCatchUp(container: IContainer): Promise<boolean> {
// Make sure we stop waiting if container is closed.
if (container.closed) {
throw new UsageError("waitContainerToCatchUp: Container closed");
}
return new Promise<boolean>((resolve, reject) => {
const deltaManager = container.deltaManager;
const closedCallback = (err?: ICriticalContainerError | undefined): void => {
container.off("closed", closedCallback);
const baseMessage = "Container closed while waiting to catch up";
reject(
err === undefined
? new GenericError(baseMessage)
: wrapError(
err,
(innerMessage) => new GenericError(`${baseMessage}: ${innerMessage}`),
),
);
};
container.on("closed", closedCallback);
// Depending on config, transition to "connected" state may include the guarantee
// that all known ops have been processed. If so, we may introduce additional wait here.
// Waiting for "connected" state in either case gets us at least to our own Join op
// which is a reasonable approximation of "caught up"
const waitForOps = (): void => {
assert(
container.connectionState === ConnectionState.CatchingUp ||
container.connectionState === ConnectionState.Connected,
0x0cd /* "Container disconnected while waiting for ops!" */,
);
const hasCheckpointSequenceNumber = deltaManager.hasCheckpointSequenceNumber;
const connectionOpSeqNumber = deltaManager.lastKnownSeqNumber;
assert(
deltaManager.lastSequenceNumber <= connectionOpSeqNumber,
0x266 /* "lastKnownSeqNumber should never be below last processed sequence number" */,
);
if (deltaManager.lastSequenceNumber === connectionOpSeqNumber) {
container.off("closed", closedCallback);
resolve(hasCheckpointSequenceNumber);
return;
}
const callbackOps = (message: ISequencedDocumentMessage): void => {
if (connectionOpSeqNumber <= message.sequenceNumber) {
container.off("closed", closedCallback);
resolve(hasCheckpointSequenceNumber);
deltaManager.off("op", callbackOps);
}
};
deltaManager.on("op", callbackOps);
};
// We can leverage DeltaManager's "connect" event here and test for ConnectionState.Disconnected
// But that works only if service provides us checkPointSequenceNumber
// Our internal testing is based on R11S that does not, but almost all tests connect as "write" and
// use this function to catch up, so leveraging our own join op as a fence/barrier
if (container.connectionState === ConnectionState.Connected) {
waitForOps();
return;
}
const callback = (): void => {
container.off(connectedEventName, callback);
waitForOps();
};
container.on(connectedEventName, callback);
if (container.connectionState === ConnectionState.Disconnected) {
container.connect();
}
});
}
const getCodeProposal = (quorum: IQuorumProposals): unknown =>
quorum.get("code") ?? quorum.get("code2");
/**
* Helper function to report to telemetry cases where operation takes longer than expected (200ms)
* @param logger - logger to use
* @param eventName - event name
* @param action - functor to call and measure
*/
export async function ReportIfTooLong(
logger: ITelemetryLoggerExt,
eventName: string,
action: () => Promise<ITelemetryBaseProperties>,
): Promise<void> {
const event = PerformanceEvent.start(logger, { eventName });
const props = await action();
if (event.duration > 200) {
event.end(props);
}
}
const summarizerClientType = "summarizer";
interface IContainerLifecycleEvents extends IEvent {
(event: "runtimeInstantiated", listener: () => void): void;
(event: "disposed", listener: () => void): void;
}
export class Container
extends EventEmitterWithErrorHandling<IContainerEvents>
implements IContainer, ContainerAlpha
{
/**
* Load an existing container.
*/
public static async load(
loadProps: IContainerLoadProps,
createProps: IContainerCreateProps,
): Promise<Container> {
const { version, pendingLocalState, loadMode, resolvedUrl } = loadProps;
const container = new Container(createProps, loadProps);
return PerformanceEvent.timedExecAsync(
container.mc.logger,
{ eventName: "Load", ...loadMode },
async (event) =>
new Promise<Container>((resolve, reject) => {
const defaultMode: IContainerLoadMode = { opsBeforeReturn: "cached" };
// if we have pendingLocalState, anything we cached is not useful and we shouldn't wait for connection
// to return container, so ignore this value and use undefined for opsBeforeReturn
const mode: IContainerLoadMode = pendingLocalState
? { ...(loadMode ?? defaultMode), opsBeforeReturn: undefined }
: (loadMode ?? defaultMode);
const onClosed = (err?: ICriticalContainerError): void => {
// pre-0.58 error message: containerClosedWithoutErrorDuringLoad
reject(err ?? new GenericError("Container closed without error during load"));
};
container.on("closed", onClosed);
container
.load(version, mode, resolvedUrl, pendingLocalState)
.finally(() => {
container.removeListener("closed", onClosed);
})
.then(
(props) => {
event.end({ ...props });
resolve(container);
},
(error) => {
const err = normalizeError(error);
// Depending where error happens, we can be attempting to connect to web socket
// and continuously retrying (consider offline mode)
// Host has no container to close, so it's prudent to do it here
// Note: We could only dispose the container instead of just close but that would
// the telemetry where users sometimes search for ContainerClose event to look
// for load failures. So not removing this at this time.
container.close(err);
container.dispose(err);
onClosed(err);
},
);
}),
{ start: true, end: true, cancel: "generic" },
);
}
/**
* Create a new container in a detached state.
*/
public static async createDetached(
createProps: IContainerCreateProps,
codeDetails: IFluidCodeDetails,
): Promise<Container> {
const container = new Container(createProps);
return PerformanceEvent.timedExecAsync(
container.mc.logger,
{ eventName: "CreateDetached" },
async (_event) => {
await container.createDetached(codeDetails);
return container;
},
{ start: true, end: true, cancel: "generic" },
);
}
/**
* Create a new container in a detached state that is initialized with a
* snapshot from a previous detached container.
* @param createProps - Config options for this new container instance
* @param snapshot - A stringified {@link IPendingDetachedContainerState}, e.g. generated via {@link serialize}
*/
public static async rehydrateDetachedFromSnapshot(
createProps: IContainerCreateProps,
snapshot: string,
): Promise<Container> {
const container = new Container(createProps);
return PerformanceEvent.timedExecAsync(
container.mc.logger,
{ eventName: "RehydrateDetachedFromSnapshot" },
async (_event) => {
const detachedContainerState: IPendingDetachedContainerState =
getDetachedContainerStateFromSerializedContainer(snapshot);
await container.rehydrateDetachedFromSnapshot(detachedContainerState);
return container;
},
{ start: true, end: true, cancel: "generic" },
);
}
// Tells if container can reconnect on losing fist connection
// If false, container gets closed on loss of connection.
private readonly _canReconnect: boolean;
private readonly clientDetailsOverride: IClientDetails | undefined;
private readonly urlResolver: IUrlResolver;
private readonly serviceFactory: IDocumentServiceFactory;
private readonly codeLoader: ICodeDetailsLoader;
private readonly options: ILoaderOptions;
private readonly scope: FluidObject;
private readonly subLogger: ITelemetryLoggerExt;
private readonly detachedBlobStorage: MemoryDetachedBlobStorage | undefined;
private readonly protocolHandlerBuilder: InternalProtocolHandlerBuilder;
private readonly signalAudience = new Audience();
private readonly client: IClient;
private readonly mc: MonitoringContext;
/**
* Used by the RelativeLoader to spawn a new Container for the same document. Used to create the summarizing client.
*/
public readonly clone: (
loadProps: IContainerLoadProps,
createParamOverrides: Partial<IContainerCreateProps>,
) => Promise<Container>;
/**
* Lifecycle state of the container, used mainly to prevent re-entrancy and telemetry
*
* States are allowed to progress to further states:
* "loading" - "loaded" - "closing" - "disposing" - "closed" - "disposed"
*
* For example, moving from "closed" to "disposing" is not allowed since it is an earlier state.
*
* loading: Container has been created, but is not yet in normal/loaded state
* loaded: Container is in normal/loaded state
* closing: Container has started closing process (for re-entrancy prevention)
* disposing: Container has started disposing process (for re-entrancy prevention)
* closed: Container has closed
* disposed: Container has been disposed
*/
private _lifecycleState:
| "loading"
| "loaded"
| "closing"
| "disposing"
| "closed"
| "disposed" = "loading";
private setLoaded(): void {
// It's conceivable the container could be closed when this is called
// Only transition states if currently loading
if (this._lifecycleState === "loading") {
this._lifecycleState = "loaded";
// Connections transitions are delayed till we are loaded.
// This is done by holding ops and signals until the end of load sequence
// (calling this.handleDeltaConnectionArg() after setLoaded() call)
// If this assert fires, it means our logic managing connection flow is wrong, and the logic below is also wrong.
assert(
this.connectionState !== ConnectionState.Connected,
0x969 /* not connected yet */,
);
// Track membership changes and update connection state accordingly
// We do this call here, instead of doing it in initializeProtocolState() due to pendingLocalState.
// When we load from stashed state, we let connectionStateHandler know about clientId from previous container instance.
// But we will play trailing ops from snapshot, including potentially playing join & leave ops for that same clientId!
// In other words, if connectionStateHandler has access to Quorum early in load sequence, it will see events (in stashed ops mode)
// in the order that is not possible in real life, that it may not expect.
// Ideally, we should supply pendingLocalState?.clientId here as well, not in constructor, but it does not matter (at least today)
this.connectionStateHandler.initProtocol(this.protocolHandler);
// This call does not look like needed any more, with delaying all connection-related events past loaded phase.
// Yet, there could be some customer code that would break if we do not deliver it.
// Will be removed in further PRs with proper changeset.
const runtime = this._runtime;
if (
runtime !== undefined &&
// Check for older runtime that may need this call
!("setConnectionStatus" in runtime) &&
runtime.disposed === false
) {
runtime.setConnectionState(false /* canSendOps */, this.clientId);
}
// Deliver delayed calls to DeltaManager - we ignored "connect" events while loading.
const cm = this._deltaManager.connectionManager;
if (cm.connected) {
const details = cm.connectionDetails;
assert(details !== undefined, 0x96a /* should have details if connected */);
this.connectionStateHandler.receivedConnectEvent(details);
}
}
}
public get closed(): boolean {
return (
this._lifecycleState === "closing" || this._lifecycleState === "closed" || this.disposed
);
}
protected get loaded(): boolean {
return this._lifecycleState === "loaded";
}
public get disposed(): boolean {
return this._lifecycleState === "disposing" || this._lifecycleState === "disposed";
}
private readonly storageAdapter: ContainerStorageAdapter;
private readonly _deltaManager: DeltaManager<ConnectionManager>;
private service: IDocumentService | undefined;
private _runtime: IRuntime | undefined;
private get runtime(): IRuntime {
if (this._runtime === undefined) {
throw new Error("Attempted to access runtime before it was defined");
}
return this._runtime;
}
private _protocolHandler: ProtocolHandlerInternal | undefined;
private get protocolHandler(): ProtocolHandlerInternal {
if (this._protocolHandler === undefined) {
throw new Error("Attempted to access protocolHandler before it was defined");
}
return this._protocolHandler;
}
/**
* During initialization we pause the inbound queues. We track this state to ensure we only call resume once
*/
private inboundQueuePausedFromInit = true;
private connectionCount = 0;
private readonly connectionTransitionTimes: number[] = [];
private _loadedFromVersion: IVersion | undefined;
private _dirtyContainer = false;
private attachmentData: AttachmentData = { state: AttachState.Detached };
private readonly serializedStateManager: SerializedStateManager;
private readonly _containerId: string;
private readonly _retryConnectionTimeoutMs: number | undefined;
private lastVisible: number | undefined;
private readonly visibilityEventHandler: (() => void) | undefined;
private readonly connectionStateHandler: IConnectionStateHandler;
private readonly clientsWhoShouldHaveLeft = new Set<string>();
private _containerMetadata: Readonly<Record<string, string>> = {};
private setAutoReconnectTime = performanceNow();
private noopHeuristic: NoopHeuristic | undefined;
private get connectionMode(): ConnectionMode {
return this._deltaManager.connectionManager.connectionMode;
}
public get resolvedUrl(): IResolvedUrl | undefined {
/**
* All attached containers will have a document service,
* this is required, as attached containers are attached to
* a service. Detached containers will neither have a document
* service or a resolved url as they only exist locally.
* in order to create a document service a resolved url must
* first be obtained, this is how the container is identified.
* Because of this, the document service's resolved url
* is always the same as the containers, as we had to
* obtain the resolved url, and then create the service from it.
*/
return this.service?.resolvedUrl;
}
public get readOnlyInfo(): ReadOnlyInfo {
return this._deltaManager.readOnlyInfo;
}
public get containerMetadata(): Record<string, string> {
return this._containerMetadata;
}
/**
* Sends signal to runtime (and data stores) to be read-only.
* Hosts may have read only views, indicating to data stores that no edits are allowed.
* This is independent from this._readonlyPermissions (permissions) and this.connectionMode
* (server can return "write" mode even when asked for "read")
* Leveraging same "readonly" event as runtime & data stores should behave the same in such case
* as in read-only permissions.
* But this.active can be used by some DDSes to figure out if ops can be sent
* (for example, read-only view still participates in code proposals / upgrades decisions)
*
* Forcing Readonly does not prevent DDS from generating ops. It is up to user code to honour
* the readonly flag. If ops are generated, they will accumulate locally and not be sent. If
* there are pending in the outbound queue, it will stop sending until force readonly is
* cleared.
*
* @param readonly - set or clear force readonly.
*/
public forceReadonly(readonly: boolean): void {
this._deltaManager.connectionManager.forceReadonly(readonly);
}
public get deltaManager(): IDeltaManager<ISequencedDocumentMessage, IDocumentMessage> {
return this._deltaManager;
}
public get connectionState(): ConnectionState {
return this.connectionStateHandler.connectionState;
}
private get connected(): boolean {
return this.connectionStateHandler.connectionState === ConnectionState.Connected;
}
/**
* clientId of the latest connection. Changes only once client is connected, caught up and fully loaded.
* Changes to clientId are delayed through container loading sequence and delived once container is fully loaded.
* clientId does not reset on lost connection - old value persists until new connection is fully established.
*/
public get clientId(): string | undefined {
return this.protocolHandler.audience.getSelf()?.clientId;
}
private get isInteractiveClient(): boolean {
return this.deltaManager.clientDetails.capabilities.interactive;
}
private supportGetSnapshotApi(): boolean {
const supportGetSnapshotApi: boolean =
this.mc.config.getBoolean("Fluid.Container.UseLoadingGroupIdForSnapshotFetch2") ===
true && this.service?.policies?.supportGetSnapshotApi === true;
return supportGetSnapshotApi;
}
/**
* Get the code details that are currently specified for the container.
* @returns The current code details if any are specified, undefined if none are specified.
*/
public getSpecifiedCodeDetails(): IFluidCodeDetails | undefined {
return this.getCodeDetailsFromQuorum();
}
private _loadedCodeDetails: IFluidCodeDetails | undefined;
/**
* Get the code details that were used to load the container.
* @returns The code details that were used to load the container if it is loaded, undefined if it is not yet
* loaded.
*/
public getLoadedCodeDetails(): IFluidCodeDetails | undefined {
return this._loadedCodeDetails;
}
private _loadedModule: IFluidModuleWithDetails | undefined;
/**
* Retrieves the audience associated with the document
*/
public get audience(): IAudience {
return this.protocolHandler.audience;
}
/**
* Returns true if container is dirty.
* Which means data loss if container is closed at that same moment
* Most likely that happens when there is no network connection to Relay Service
*/
public get isDirty(): boolean {
return this._dirtyContainer;
}
/**
* {@inheritDoc @fluidframework/container-definitions#IContainer.entryPoint}
*/
public async getEntryPoint(): Promise<FluidObject> {
if (this._disposed) {
throw new UsageError("The context is already disposed");
}
if (this._runtime !== undefined) {
return this._runtime.getEntryPoint?.();
}
return new Promise<FluidObject>((resolve, reject) => {
const runtimeInstantiatedHandler = (): void => {
assert(
this._runtime !== undefined,
0x5a3 /* runtimeInstantiated fired but runtime is still undefined */,
);
resolve(this._runtime.getEntryPoint?.());
this._lifecycleEvents.off("disposed", disposedHandler);
};
const disposedHandler = (): void => {
reject(new Error("ContainerContext was disposed"));
this._lifecycleEvents.off("runtimeInstantiated", runtimeInstantiatedHandler);
};
this._lifecycleEvents.once("runtimeInstantiated", runtimeInstantiatedHandler);
this._lifecycleEvents.once("disposed", disposedHandler);
});
}
private readonly _lifecycleEvents = new TypedEventEmitter<IContainerLifecycleEvents>();
constructor(
createProps: IContainerCreateProps,
loadProps?: Pick<IContainerLoadProps, "pendingLocalState">,
) {
super((name, error) => {
this.mc.logger.sendErrorEvent(
{
eventName: "ContainerEventHandlerException",
name: typeof name === "string" ? name : undefined,
},
error,
);
this.close(normalizeError(error));
});
const {
canReconnect,
clientDetailsOverride,
urlResolver,
documentServiceFactory,
codeLoader,
options,
scope,
subLogger,
protocolHandlerBuilder,
retryConnectionTimeoutMs,
} = createProps;
// Validate that the Driver is compatible with this Loader.
const maybeDriverCompatDetails =
documentServiceFactory as FluidObject<ILayerCompatDetails>;
validateDriverCompatibility(
maybeDriverCompatDetails.ILayerCompatDetails,
(error) => {} /* disposeFn */, // There is nothing to dispose here, so just ignore the error.
subLogger,
);
this.connectionTransitionTimes[ConnectionState.Disconnected] = performanceNow();
const pendingLocalState = loadProps?.pendingLocalState;
this._canReconnect = canReconnect ?? true;
this._retryConnectionTimeoutMs = retryConnectionTimeoutMs;
this.clientDetailsOverride = clientDetailsOverride;
this.urlResolver = urlResolver;
this.serviceFactory = documentServiceFactory;
this.codeLoader = codeLoader;
// Warning: this is only a shallow clone. Mutation of any individual loader option will mutate it for
// all clients that were loaded from the same loader (including summarizer clients).
// Tracking alternative ways to handle this in AB#4129.
this.options = { ...options };
this.scope = scope;
this.protocolHandlerBuilder = wrapProtocolHandlerBuilder(
protocolHandlerBuilder ??
((
attributes: IDocumentAttributes,
quorumSnapshot: IQuorumSnapshot,
sendProposal: (key: string, value: unknown) => number,
): ProtocolHandlerInternal =>
new ProtocolHandler(
attributes,
quorumSnapshot,
sendProposal,
new Audience(),
(clientId: string) => this.clientsWhoShouldHaveLeft.has(clientId),
)),
this.signalAudience,
);
// Note that we capture the createProps here so we can replicate the creation call when we want to clone.
this.clone = async (
_loadProps: IContainerLoadProps,
createParamOverrides: Partial<IContainerCreateProps>,
): Promise<Container> => {
return Container.load(_loadProps, {
...createProps,
...createParamOverrides,
});
};
this._containerId = uuid();
this.client = Container.setupClient(
this._containerId,
options.client,
this.clientDetailsOverride,
);
// Create logger for data stores to use
const type = this.client.details.type;
const interactive = this.client.details.capabilities.interactive;
const clientType = `${interactive ? "interactive" : "noninteractive"}${
type !== undefined && type !== "" ? `/${type}` : ""
}`;
// Need to use the property getter for docId because for detached flow we don't have the docId initially.
// We assign the id later so property getter is used.
this.subLogger = createChildLogger({
logger: subLogger,
properties: {
all: {
clientType, // Differentiating summarizer container from main container
containerId: this._containerId,
docId: () => this.resolvedUrl?.id,
containerAttachState: () => this.attachState,
containerLifecycleState: () => this._lifecycleState,
containerConnectionState: () => ConnectionState[this.connectionState],
serializedContainer: pendingLocalState !== undefined,
},
// we need to be judicious with our logging here to avoid generating too much data
// all data logged here should be broadly applicable, and not specific to a
// specific error or class of errors
error: {
// load information to associate errors with the specific load point
dmInitialSeqNumber: () => this._deltaManager?.initialSequenceNumber,
dmLastProcessedSeqNumber: () => this._deltaManager?.lastSequenceNumber,
dmLastKnownSeqNumber: () => this._deltaManager?.lastKnownSeqNumber,
containerLoadedFromVersionId: () => this._loadedFromVersion?.id,
containerLoadedFromVersionDate: () => this._loadedFromVersion?.date,
// message information to associate errors with the specific execution state
// dmLastMsqSeqNumber: if present, same as dmLastProcessedSeqNumber
dmLastMsqSeqNumber: () => this.deltaManager?.lastMessage?.sequenceNumber,
dmLastMsqSeqTimestamp: () => this.deltaManager?.lastMessage?.timestamp,
dmLastMsqSeqClientId: () =>
this.deltaManager?.lastMessage?.clientId === null
? "null"
: this.deltaManager?.lastMessage?.clientId,
dmLastMsgClientSeq: () => this.deltaManager?.lastMessage?.clientSequenceNumber,
connectionStateDuration: () =>
performanceNow() - this.connectionTransitionTimes[this.connectionState],
},
},
});
// Prefix all events in this file with container-loader
this.mc = createChildMonitoringContext({ logger: this.subLogger, namespace: "Container" });
this._deltaManager = this.createDeltaManager();
this.connectionStateHandler = createConnectionStateHandler(
{
logger: this.mc.logger,
// WARNING: logger on this context should not including getters like containerConnectionState above (on this.subLogger),
// as that will result in attempt to dereference this.connectionStateHandler from this call while it's still undefined.
mc: loggerToMonitoringContext(subLogger),
connectionStateChanged: (value, oldState, reason) => {
this.logConnectionStateChangeTelemetry(value, oldState, reason);
if (this.loaded) {
this.propagateConnectionState(
value === ConnectionState.Disconnected
? reason
: undefined /* disconnectedReason */,
);
}
},
shouldClientJoinWrite: () => this._deltaManager.connectionManager.shouldJoinWrite(),
maxClientLeaveWaitTime: options.maxClientLeaveWaitTime,
logConnectionIssue: (
eventName: string,
category: TelemetryEventCategory,
details?: ITelemetryBaseProperties,
) => {
const mode = this.connectionMode;
// We get here when socket does not receive any ops on "write" connection, including
// its own join op.
// Report issues only if we already loaded container - op processing is paused while container is loading,
// so we always time-out processing of join op in cases where fetching snapshot takes a minute.
// It's not a problem with op processing itself - such issues should be tracked as part of boot perf monitoring instead.
this._deltaManager.logConnectionIssue({
eventName,
mode,
category: this._lifecycleState === "loading" ? "generic" : category,
duration:
performanceNow() - this.connectionTransitionTimes[ConnectionState.CatchingUp],
...(details === undefined ? {} : { details: JSON.stringify(details) }),
});
// This assert is important for many reasons:
// 1) Cosmetic / OCE burden: It's useless to raise NoJoinOp error events, if we are loading, as that's most
// likely to happen if snapshot loading takes too long. During this time we are not processing ops so there is no
// way to move to "connected" state, and thus "NoJoin" timer would fire (see
// IConnectionStateHandler.logConnectionIssue() callback and related code in ConnectStateHandler class implementation).
// But these events do not tell us anything about connectivity pipeline / op processing pipeline,
// only that boot is slow, and we have events for that.
// 2) Doing recovery below is useless in loading mode, for the reasons described above. At the same time we can't
// not do it, as maybe we lost JoinSignal for "self", and when loading is done, we never move to connected
// state. So we would have to do (in most cases) useless infinite reconnect loop while we are loading.
assert(
this.loaded,
0x96b /* connection issues can be raised only after container is loaded */,
);
// If this is "write" connection, it took too long to receive join op. But in most cases that's due
// to very slow op fetches and we will eventually get there.
// For "read" connections, we get here due to join signal for "self" not arriving on time.
// Attempt to recover by reconnecting.
if (mode === "read" && category === "error") {
const reason = { text: "NoJoinSignal" };
this.disconnectInternal(reason);
this.connectInternal({ reason, fetchOpsFromStorage: false });
}
},
clientShouldHaveLeft: (clientId: string) => {
this.clientsWhoShouldHaveLeft.add(clientId);
},
onCriticalError: (error: unknown) => {
this.close(normalizeError(error));
},
},
this.deltaManager,
pendingLocalState?.clientId,
);
this.on(savedContainerEvent, () => {