@@ -63,6 +63,7 @@ import {
63
63
type AddSessionResponse ,
64
64
type ClientSessionCtx ,
65
65
type ConnectionSocket ,
66
+ type ConsumerHandle ,
66
67
type GetWorkspaceResponse ,
67
68
LOGGING_ENABLED ,
68
69
pingConst ,
@@ -72,6 +73,7 @@ import {
72
73
type PlatformQueueProducer ,
73
74
QueueTopic ,
74
75
type QueueUserMessage ,
76
+ QueueWorkspaceEvent ,
75
77
type QueueWorkspaceMessage ,
76
78
type Session ,
77
79
type SessionManager ,
@@ -116,6 +118,7 @@ export class TSessionManager implements SessionManager {
116
118
117
119
workspaceProducer : PlatformQueueProducer < QueueWorkspaceMessage >
118
120
usersProducer : PlatformQueueProducer < QueueUserMessage >
121
+ workspaceConsumer : ConsumerHandle
119
122
120
123
now : number = Date . now ( )
121
124
@@ -141,8 +144,28 @@ export class TSessionManager implements SessionManager {
141
144
this . handleTick ( )
142
145
} , 1000 / ticksPerSecond )
143
146
}
144
- this . workspaceProducer = this . queue . getProducer ( ctx . newChild ( 'queue' , { } , { span : false } ) , QueueTopic . Workspace )
145
- this . usersProducer = this . queue . getProducer ( ctx . newChild ( 'queue' , { } , { span : false } ) , QueueTopic . Users )
147
+ this . workspaceProducer = this . queue . getProducer ( ctx . newChild ( 'ws-queue' , { } , { span : false } ) , QueueTopic . Workspace )
148
+ this . usersProducer = this . queue . getProducer ( ctx . newChild ( 'user-queue' , { } , { span : false } ) , QueueTopic . Users )
149
+
150
+ this . workspaceConsumer = this . queue . createConsumer < QueueWorkspaceMessage > (
151
+ ctx . newChild ( 'ws-queue-consume' , { } , { span : false } ) ,
152
+ QueueTopic . Workspace ,
153
+ generateId ( ) ,
154
+ async ( messages ) => {
155
+ for ( const msg of messages ) {
156
+ for ( const m of msg . value ) {
157
+ if (
158
+ m . type === QueueWorkspaceEvent . Upgraded ||
159
+ m . type === QueueWorkspaceEvent . Restored ||
160
+ m . type === QueueWorkspaceEvent . Deleted
161
+ ) {
162
+ // Handle workspace messages
163
+ this . workspaceInfoCache . delete ( msg . workspace )
164
+ }
165
+ }
166
+ }
167
+ }
168
+ )
146
169
147
170
this . ticksContext = ctx . newChild ( 'ticks' , { } , { span : false } )
148
171
}
@@ -523,6 +546,8 @@ export class TSessionManager implements SessionManager {
523
546
524
547
maintenanceWorkspaces = new Set < WorkspaceUuid > ( )
525
548
549
+ workspaceInfoCache = new Map < WorkspaceUuid , WorkspaceInfoWithStatus > ( )
550
+
526
551
async addSession (
527
552
ctx : MeasureContext ,
528
553
ws : ConnectionSocket ,
@@ -552,10 +577,13 @@ export class TSessionManager implements SessionManager {
552
577
if ( wsInfo === undefined ) {
553
578
// In case of guest or system account
554
579
// We need to get workspace info for system account.
555
- const workspaceInfo = await this . getWorkspaceInfo ( ctx , rawToken , false )
580
+ const workspaceInfo =
581
+ this . workspaceInfoCache . get ( token . workspace ) ?? ( await this . getWorkspaceInfo ( ctx , rawToken , false ) )
556
582
if ( workspaceInfo === undefined ) {
557
583
return { error : new Error ( 'Workspace not found or not available' ) , terminate : true }
558
584
}
585
+ this . workspaceInfoCache . set ( token . workspace , workspaceInfo )
586
+
559
587
wsInfo = {
560
588
url : workspaceInfo . url ,
561
589
mode : workspaceInfo . mode ,
@@ -569,6 +597,8 @@ export class TSessionManager implements SessionManager {
569
597
endpoint : { externalUrl : '' , internalUrl : '' , region : workspaceInfo . region ?? '' } ,
570
598
progress : workspaceInfo . processingProgress
571
599
}
600
+ } else {
601
+ this . workspaceInfoCache . delete ( token . workspace )
572
602
}
573
603
const { workspace, resp } = await this . getWorkspace ( ctx . parent ?? ctx , token . workspace , wsInfo , token , ws )
574
604
if ( resp !== undefined ) {
@@ -961,6 +991,7 @@ export class TSessionManager implements SessionManager {
961
991
async forceClose ( wsId : WorkspaceUuid , ignoreSocket ?: ConnectionSocket ) : Promise < void > {
962
992
const ws = this . workspaces . get ( wsId )
963
993
this . maintenanceWorkspaces . delete ( wsId )
994
+ this . workspaceInfoCache . delete ( wsId )
964
995
if ( ws !== undefined ) {
965
996
this . ctx . warn ( 'force-close' , { name : ws . wsId . url } )
966
997
ws . maintenance = true // We need to similare upgrade to refresh all clients.
0 commit comments