@@ -21,6 +21,7 @@ import parseSchema from 'mongodb-schema';
21
21
import path from 'path' ;
22
22
import { signatures } from '@mongosh/shell-api' ;
23
23
import translator from '@mongosh/i18n' ;
24
+ import { isAtlasStream } from 'mongodb-build-info' ;
24
25
import { Worker as WorkerThreads } from 'worker_threads' ;
25
26
26
27
import { ExportToLanguageMode } from '../types/playgroundType' ;
@@ -65,6 +66,7 @@ export default class MongoDBService {
65
66
_currentConnectionOptions ?: MongoClientOptions ;
66
67
67
68
_databaseCompletionItems : CompletionItem [ ] = [ ] ;
69
+ _streamProcessorCompletionItems : CompletionItem [ ] = [ ] ;
68
70
_shellSymbolCompletionItems : { [ symbol : string ] : CompletionItem [ ] } = { } ;
69
71
_globalSymbolCompletionItems : CompletionItem [ ] = [ ] ;
70
72
_collections : { [ database : string ] : string [ ] } = { } ;
@@ -147,6 +149,7 @@ export default class MongoDBService {
147
149
databases : true ,
148
150
collections : true ,
149
151
fields : true ,
152
+ streamProcessors : true ,
150
153
} ) ;
151
154
await this . _closeCurrentConnection ( ) ;
152
155
}
@@ -174,6 +177,22 @@ export default class MongoDBService {
174
177
) ;
175
178
}
176
179
180
+ if ( isAtlasStream ( connectionString || '' ) ) {
181
+ await this . _getAndCacheStreamProcessors ( ) ;
182
+ } else {
183
+ await this . _getAndCacheDatabases ( ) ;
184
+ }
185
+
186
+ this . _connection . console . log (
187
+ `CliServiceProvider active connection has changed: { connectionId: ${ connectionId } }`
188
+ ) ;
189
+ return {
190
+ successfullyConnected : true ,
191
+ connectionId,
192
+ } ;
193
+ }
194
+
195
+ async _getAndCacheDatabases ( ) {
177
196
try {
178
197
// Get database names for the current connection.
179
198
const databases = await this . _getDatabases ( ) ;
@@ -184,14 +203,17 @@ export default class MongoDBService {
184
203
`LS get databases error: ${ util . inspect ( error ) } `
185
204
) ;
186
205
}
206
+ }
187
207
188
- this . _connection . console . log (
189
- `CliServiceProvider active connection has changed: { connectionId: ${ connectionId } }`
190
- ) ;
191
- return {
192
- successfullyConnected : true ,
193
- connectionId,
194
- } ;
208
+ async _getAndCacheStreamProcessors ( ) {
209
+ try {
210
+ const processors = await this . _getStreamProcessors ( ) ;
211
+ this . _cacheStreamProcessorCompletionItems ( processors ) ;
212
+ } catch ( error ) {
213
+ this . _connection . console . error (
214
+ `LS get stream processors error: ${ util . inspect ( error ) } `
215
+ ) ;
216
+ }
195
217
}
196
218
197
219
/**
@@ -245,9 +267,16 @@ export default class MongoDBService {
245
267
)
246
268
) ;
247
269
248
- worker ?. on (
249
- 'message' ,
250
- ( { error, data } : { data ?: ShellEvaluateResult ; error ?: any } ) => {
270
+ worker ?. on ( 'message' , ( { name, payload } ) => {
271
+ if ( name === ServerCommands . SHOW_CONSOLE_OUTPUT ) {
272
+ void this . _connection . sendNotification ( name , payload ) ;
273
+ }
274
+
275
+ if ( name === ServerCommands . CODE_EXECUTION_RESULT ) {
276
+ const { error, data } = payload as {
277
+ data ?: ShellEvaluateResult ;
278
+ error ?: any ;
279
+ } ;
251
280
if ( error ) {
252
281
this . _connection . console . error (
253
282
`WORKER error: ${ util . inspect ( error ) } `
@@ -261,7 +290,7 @@ export default class MongoDBService {
261
290
resolve ( data ) ;
262
291
} ) ;
263
292
}
264
- ) ;
293
+ } ) ;
265
294
266
295
worker . postMessage ( {
267
296
name : ServerCommands . EXECUTE_CODE_FROM_PLAYGROUND ,
@@ -294,6 +323,24 @@ export default class MongoDBService {
294
323
} ) ;
295
324
}
296
325
326
+ /**
327
+ * Get stream processors names for the current connection.
328
+ */
329
+ async _getStreamProcessors ( ) : Promise < Document [ ] > {
330
+ if ( this . _serviceProvider ) {
331
+ try {
332
+ const cmd = { listStreamProcessors : 1 } ;
333
+ const result = await this . _serviceProvider . runCommand ( 'admin' , cmd ) ;
334
+ return result . streamProcessors ?? [ ] ;
335
+ } catch ( error ) {
336
+ this . _connection . console . error (
337
+ `LS get stream processors error: ${ error } `
338
+ ) ;
339
+ }
340
+ }
341
+ return [ ] ;
342
+ }
343
+
297
344
/**
298
345
* Get database names for the current connection.
299
346
*/
@@ -377,7 +424,7 @@ export default class MongoDBService {
377
424
}
378
425
379
426
/**
380
- * Return 'db' and 'use' completion items.
427
+ * Return 'db', 'sp' and 'use' completion items.
381
428
*/
382
429
_cacheGlobalSymbolCompletionItems ( ) {
383
430
this . _globalSymbolCompletionItems = [
@@ -386,6 +433,11 @@ export default class MongoDBService {
386
433
kind : CompletionItemKind . Method ,
387
434
preselect : true ,
388
435
} ,
436
+ {
437
+ label : 'sp' ,
438
+ kind : CompletionItemKind . Method ,
439
+ preselect : true ,
440
+ } ,
389
441
{
390
442
label : 'use' ,
391
443
kind : CompletionItemKind . Function ,
@@ -783,6 +835,18 @@ export default class MongoDBService {
783
835
}
784
836
}
785
837
838
+ /**
839
+ * If the current node is 'sp.processor.<trigger>' or 'sp["processor"].<trigger>'.
840
+ */
841
+ _provideStreamProcessorSymbolCompletionItems ( state : CompletionState ) {
842
+ if ( state . isStreamProcessorSymbol ) {
843
+ this . _connection . console . log (
844
+ 'VISITOR found stream processor symbol completions'
845
+ ) ;
846
+ return this . _shellSymbolCompletionItems . StreamProcessor ;
847
+ }
848
+ }
849
+
786
850
/**
787
851
* If the current node is 'db.collection.find().<trigger>'.
788
852
*/
@@ -895,6 +959,37 @@ export default class MongoDBService {
895
959
}
896
960
}
897
961
962
+ /**
963
+ * If the current node is 'sp.<trigger>'.
964
+ */
965
+ _provideSpSymbolCompletionItems ( state : CompletionState ) {
966
+ if ( state . isSpSymbol ) {
967
+ if ( state . isStreamProcessorName ) {
968
+ this . _connection . console . log (
969
+ 'VISITOR found sp symbol and stream processor name completions'
970
+ ) ;
971
+ return this . _shellSymbolCompletionItems . Streams . concat (
972
+ this . _streamProcessorCompletionItems
973
+ ) ;
974
+ }
975
+
976
+ this . _connection . console . log ( 'VISITOR found sp symbol completions' ) ;
977
+ return this . _shellSymbolCompletionItems . Streams ;
978
+ }
979
+ }
980
+
981
+ /**
982
+ * If the current node is 'sp.get(<trigger>)'.
983
+ */
984
+ _provideStreamProcessorNameCompletionItems ( state : CompletionState ) {
985
+ if ( state . isStreamProcessorName ) {
986
+ this . _connection . console . log (
987
+ 'VISITOR found stream processor name completions'
988
+ ) ;
989
+ return this . _streamProcessorCompletionItems ;
990
+ }
991
+ }
992
+
898
993
/**
899
994
* If the current node can be used as a collection name
900
995
* e.g. 'db.<trigger>.find()' or 'let a = db.<trigger>'.
@@ -965,6 +1060,7 @@ export default class MongoDBService {
965
1060
this . _provideIdentifierObjectValueCompletionItems . bind ( this , state ) ,
966
1061
this . _provideTextObjectValueCompletionItems . bind ( this , state ) ,
967
1062
this . _provideCollectionSymbolCompletionItems . bind ( this , state ) ,
1063
+ this . _provideStreamProcessorSymbolCompletionItems . bind ( this , state ) ,
968
1064
this . _provideFindCursorCompletionItems . bind ( this , state ) ,
969
1065
this . _provideAggregationCursorCompletionItems . bind ( this , state ) ,
970
1066
this . _provideGlobalSymbolCompletionItems . bind ( this , state ) ,
@@ -974,13 +1070,15 @@ export default class MongoDBService {
974
1070
currentLineText ,
975
1071
position
976
1072
) ,
1073
+ this . _provideSpSymbolCompletionItems . bind ( this , state ) ,
977
1074
this . _provideCollectionNameCompletionItems . bind (
978
1075
this ,
979
1076
state ,
980
1077
currentLineText ,
981
1078
position
982
1079
) ,
983
1080
this . _provideDbNameCompletionItems . bind ( this , state ) ,
1081
+ this . _provideStreamProcessorNameCompletionItems . bind ( this , state ) ,
984
1082
] ;
985
1083
986
1084
for ( const func of completionOptions ) {
@@ -1117,6 +1215,18 @@ export default class MongoDBService {
1117
1215
this . _collections [ database ] = collections . map ( ( item ) => item . name ) ;
1118
1216
}
1119
1217
1218
+ _cacheStreamProcessorCompletionItems ( processors : Document [ ] ) : void {
1219
+ this . _streamProcessorCompletionItems = processors . map ( ( { name } ) => ( {
1220
+ kind : CompletionItemKind . Folder ,
1221
+ preselect : true ,
1222
+ label : name ,
1223
+ } ) ) ;
1224
+ }
1225
+
1226
+ clearCachedStreamProcessors ( ) : void {
1227
+ this . _streamProcessorCompletionItems = [ ] ;
1228
+ }
1229
+
1120
1230
clearCachedFields ( ) : void {
1121
1231
this . _fields = { } ;
1122
1232
}
@@ -1142,13 +1252,16 @@ export default class MongoDBService {
1142
1252
1143
1253
clearCachedCompletions ( clear : ClearCompletionsCache ) : void {
1144
1254
if ( clear . fields ) {
1145
- this . _fields = { } ;
1255
+ this . clearCachedFields ( ) ;
1146
1256
}
1147
1257
if ( clear . databases ) {
1148
- this . _databaseCompletionItems = [ ] ;
1258
+ this . clearCachedDatabases ( ) ;
1149
1259
}
1150
1260
if ( clear . collections ) {
1151
- this . _collections = { } ;
1261
+ this . clearCachedCollections ( ) ;
1262
+ }
1263
+ if ( clear . streamProcessors ) {
1264
+ this . clearCachedStreamProcessors ( ) ;
1152
1265
}
1153
1266
}
1154
1267
}
0 commit comments