@@ -9,6 +9,7 @@ import * as fs from 'fs-extra';
9
9
import { inject , injectable } from 'inversify' ;
10
10
import * as path from 'path' ;
11
11
import { Observable } from 'rxjs/Observable' ;
12
+ import { Subscriber } from 'rxjs/Subscriber' ;
12
13
import * as uuid from 'uuid/v4' ;
13
14
import * as vscode from 'vscode' ;
14
15
@@ -148,7 +149,7 @@ export class JupyterServer implements INotebookServer {
148
149
output = cells ;
149
150
} ,
150
151
( error ) => {
151
- deferred . resolve ( output ) ;
152
+ deferred . reject ( error ) ;
152
153
} ,
153
154
( ) => {
154
155
deferred . resolve ( output ) ;
@@ -197,23 +198,34 @@ export class JupyterServer implements INotebookServer {
197
198
}
198
199
199
200
public executeSilently = ( code : string ) : Promise < void > => {
200
- // If we have a session, execute the code now.
201
- if ( this . session ) {
202
- // Generate a new request and wrap it in a promise as we wait for it to finish
203
- const request = this . generateRequest ( code , true ) ;
204
-
205
- return new Promise ( ( resolve , reject ) => {
206
- // Just wait for our observable to finish
207
- const observable = this . generateExecuteObservable ( code , 'file' , - 1 , '0' , request ) ;
208
- // tslint:disable-next-line:no-empty
209
- observable . subscribe ( ( ) => {
210
- } ,
211
- reject ,
212
- resolve ) ;
213
- } ) ;
214
- }
215
-
216
- return Promise . reject ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
201
+ return new Promise ( ( resolve , reject ) => {
202
+ // If we have a session, execute the code now.
203
+ if ( this . session ) {
204
+ // Generate a new request and resolve when it's done.
205
+ const request = this . generateRequest ( code , true ) ;
206
+
207
+ if ( request ) {
208
+
209
+ // // For debugging purposes when silently is failing.
210
+ // request.onIOPub = (msg: KernelMessage.IIOPubMessage) => {
211
+ // try {
212
+ // this.logger.logInformation(`Execute silently message ${msg.header.msg_type} : hasData=${'data' in msg.content}`);
213
+ // } catch (err) {
214
+ // this.logger.logError(err);
215
+ // }
216
+ // };
217
+
218
+ request . done . then ( ( ) => {
219
+ this . logger . logInformation ( `Execute for ${ code } silently finished.` ) ;
220
+ resolve ( ) ;
221
+ } ) . catch ( reject ) ;
222
+ } else {
223
+ reject ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
224
+ }
225
+ } else {
226
+ reject ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
227
+ }
228
+ } ) ;
217
229
}
218
230
219
231
public get onStatusChanged ( ) : vscode . Event < boolean > {
@@ -300,7 +312,7 @@ export class JupyterServer implements INotebookServer {
300
312
return this . session . kernel . requestExecute (
301
313
{
302
314
// Replace windows line endings with unix line endings.
303
- code : code . replace ( ' \r\n' , '\n' ) ,
315
+ code : code . replace ( / \r \n / g , '\n' ) ,
304
316
stop_on_error : false ,
305
317
allow_stdin : false ,
306
318
silent : silent
@@ -422,68 +434,6 @@ export class JupyterServer implements INotebookServer {
422
434
} ) ;
423
435
}
424
436
425
- private changeDirectoryObservable = ( file : string ) : Observable < boolean > => {
426
- return new Observable < boolean > ( subscriber => {
427
- // Execute some code and when its done, finish our subscriber
428
- const dir = path . dirname ( file ) ;
429
- this . executeSilently ( `%cd "${ dir } "` )
430
- . then ( ( ) => {
431
- subscriber . next ( true ) ;
432
- subscriber . complete ( ) ;
433
- } )
434
- . catch ( err => subscriber . error ( err ) ) ;
435
- } ) ;
436
- }
437
-
438
- private chainObservables < T > ( first : Observable < T > , second : ( ) => Observable < ICell > ) : Observable < ICell > {
439
- return new Observable < ICell > ( subscriber => {
440
- first . subscribe (
441
- ( ) => { return ; } ,
442
- ( err ) => subscriber . error ( err ) ,
443
- ( ) => {
444
- // When the first completes, tell the second to go
445
- second ( ) . subscribe ( ( cell : ICell ) => {
446
- subscriber . next ( cell ) ;
447
- } ,
448
- ( err ) => {
449
- subscriber . error ( err ) ;
450
- } ,
451
- ( ) => {
452
- subscriber . complete ( ) ;
453
- } ) ;
454
- }
455
- ) ;
456
- } ) ;
457
- }
458
-
459
- private executeCodeObservable = ( code : string , file : string , line : number ) : Observable < ICell > => {
460
-
461
- if ( this . session ) {
462
- // Send a magic that changes the current directory if we aren't already sending a magic
463
- if ( line >= 0 && fs . existsSync ( file ) ) {
464
- return this . chainObservables (
465
- this . changeDirectoryObservable ( file ) ,
466
- ( ) => this . executeCodeObservableInternal ( code , file , line ) ) ;
467
- } else {
468
- // We're inside of an execute silently already, don't recurse
469
- return this . executeCodeObservableInternal ( code , file , line ) ;
470
- }
471
- }
472
-
473
- return new Observable < ICell > ( subscriber => {
474
- subscriber . error ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
475
- subscriber . complete ( ) ;
476
- } ) ;
477
- }
478
-
479
- private executeCodeObservableInternal = ( code : string , file : string , line : number ) : Observable < ICell > => {
480
- // Send an execute request with this code
481
- const id = uuid ( ) ;
482
- const request = this . session ? this . generateRequest ( code , false ) : undefined ;
483
-
484
- return this . generateExecuteObservable ( code , file , line , id , request ) ;
485
- }
486
-
487
437
private appendLineFeed ( arr : string [ ] , modifier ? : ( s : string ) => string ) {
488
438
return arr . map ( ( s : string , i : number ) => {
489
439
const out = modifier ? modifier ( s ) : s ;
@@ -514,7 +464,76 @@ export class JupyterServer implements INotebookServer {
514
464
} ) ;
515
465
}
516
466
517
- private generateExecuteObservable ( code : string , file : string , line : number , id : string , request : Kernel . IFuture | undefined ) : Observable < ICell > {
467
+ private changeDirectoryIfPossible = async ( file : string , line : number ) : Promise < void > => {
468
+ if ( line >= 0 && await fs . pathExists ( file ) ) {
469
+ const dir = path . dirname ( file ) ;
470
+ await this . executeSilently ( `%cd "${ dir } "` ) ;
471
+ }
472
+ }
473
+
474
+ private handleCodeRequest = ( subscriber : Subscriber < ICell > , startTime : number , cell : ICell , code : string ) => {
475
+ // Generate a new request.
476
+ const request = this . generateRequest ( code , false ) ;
477
+
478
+ // Transition to the busy stage
479
+ cell . state = CellState . executing ;
480
+
481
+ // Listen to the reponse messages and update state as we go
482
+ if ( request ) {
483
+ request . onIOPub = ( msg : KernelMessage . IIOPubMessage ) => {
484
+ try {
485
+ if ( KernelMessage . isExecuteResultMsg ( msg ) ) {
486
+ this . handleExecuteResult ( msg as KernelMessage . IExecuteResultMsg , cell ) ;
487
+ } else if ( KernelMessage . isExecuteInputMsg ( msg ) ) {
488
+ this . handleExecuteInput ( msg as KernelMessage . IExecuteInputMsg , cell ) ;
489
+ } else if ( KernelMessage . isStatusMsg ( msg ) ) {
490
+ this . handleStatusMessage ( msg as KernelMessage . IStatusMsg ) ;
491
+ } else if ( KernelMessage . isStreamMsg ( msg ) ) {
492
+ this . handleStreamMesssage ( msg as KernelMessage . IStreamMsg , cell ) ;
493
+ } else if ( KernelMessage . isDisplayDataMsg ( msg ) ) {
494
+ this . handleDisplayData ( msg as KernelMessage . IDisplayDataMsg , cell ) ;
495
+ } else if ( KernelMessage . isErrorMsg ( msg ) ) {
496
+ this . handleError ( msg as KernelMessage . IErrorMsg , cell ) ;
497
+ } else {
498
+ this . logger . logWarning ( `Unknown message ${ msg . header . msg_type } : hasData=${ 'data' in msg . content } ` ) ;
499
+ }
500
+
501
+ // Set execution count, all messages should have it
502
+ if ( msg . content . execution_count ) {
503
+ cell . data . execution_count = msg . content . execution_count as number ;
504
+ }
505
+
506
+ // Show our update if any new output
507
+ subscriber . next ( cell ) ;
508
+ } catch ( err ) {
509
+ // If not a restart error, then tell the subscriber
510
+ if ( startTime > this . sessionStartTime ) {
511
+ this . logger . logError ( `Error during message ${ msg . header . msg_type } ` ) ;
512
+ subscriber . error ( err ) ;
513
+ }
514
+ }
515
+ } ;
516
+
517
+ // Create completion and error functions so we can bind our cell object
518
+ // tslint:disable-next-line:no-any
519
+ const completion = ( error ?: any ) => {
520
+ cell . state = error ? CellState . error : CellState . finished ;
521
+ // Only do this if start time is still valid. Dont log an error to the subscriber. Error
522
+ // state should end up in the cell output.
523
+ if ( startTime > this . sessionStartTime ) {
524
+ subscriber . next ( cell ) ;
525
+ }
526
+ subscriber . complete ( ) ;
527
+ } ;
528
+
529
+ // When the request finishes we are done
530
+ request . done . then ( completion ) . catch ( completion ) ;
531
+ } else {
532
+ subscriber . error ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
533
+ }
534
+ }
535
+
536
+ private executeCodeObservable ( code : string , file : string , line : number ) : Observable < ICell > {
518
537
return new Observable < ICell > ( subscriber => {
519
538
// Start out empty;
520
539
const cell : ICell = {
@@ -525,7 +544,7 @@ export class JupyterServer implements INotebookServer {
525
544
metadata : { } ,
526
545
execution_count : 0
527
546
} ,
528
- id : id ,
547
+ id : uuid ( ) ,
529
548
file : file ,
530
549
line : line ,
531
550
state : CellState . init
@@ -534,64 +553,20 @@ export class JupyterServer implements INotebookServer {
534
553
// Keep track of when we started.
535
554
const startTime = Date . now ( ) ;
536
555
537
- // Tell our listener.
556
+ // Tell our listener. NOTE: have to do this asap so that markdown cells don't get
557
+ // run before our cells.
538
558
subscriber . next ( cell ) ;
539
559
540
- // Transition to the busy stage
541
- cell . state = CellState . executing ;
542
-
543
- // Listen to the reponse messages and update state as we go
544
- if ( request ) {
545
- request . onIOPub = ( msg : KernelMessage . IIOPubMessage ) => {
546
- try {
547
- if ( KernelMessage . isExecuteResultMsg ( msg ) ) {
548
- this . handleExecuteResult ( msg as KernelMessage . IExecuteResultMsg , cell ) ;
549
- } else if ( KernelMessage . isExecuteInputMsg ( msg ) ) {
550
- this . handleExecuteInput ( msg as KernelMessage . IExecuteInputMsg , cell ) ;
551
- } else if ( KernelMessage . isStatusMsg ( msg ) ) {
552
- this . handleStatusMessage ( msg as KernelMessage . IStatusMsg ) ;
553
- } else if ( KernelMessage . isStreamMsg ( msg ) ) {
554
- this . handleStreamMesssage ( msg as KernelMessage . IStreamMsg , cell ) ;
555
- } else if ( KernelMessage . isDisplayDataMsg ( msg ) ) {
556
- this . handleDisplayData ( msg as KernelMessage . IDisplayDataMsg , cell ) ;
557
- } else if ( KernelMessage . isErrorMsg ( msg ) ) {
558
- this . handleError ( msg as KernelMessage . IErrorMsg , cell ) ;
559
- } else {
560
- this . logger . logWarning ( `Unknown message ${ msg . header . msg_type } : hasData=${ 'data' in msg . content } ` ) ;
561
- }
562
-
563
- // Set execution count, all messages should have it
564
- if ( msg . content . execution_count ) {
565
- cell . data . execution_count = msg . content . execution_count as number ;
566
- }
567
-
568
- // Show our update if any new output
569
- subscriber . next ( cell ) ;
570
- } catch ( err ) {
571
- // If not a restart error, then tell the subscriber
572
- if ( startTime > this . sessionStartTime ) {
573
- this . logger . logError ( `Error during message ${ msg . header . msg_type } ` ) ;
574
- subscriber . error ( err ) ;
575
- }
576
- }
577
- } ;
578
-
579
- // Create completion and error functions so we can bind our cell object
580
- const completion = ( error : boolean ) => {
581
- cell . state = error ? CellState . error : CellState . finished ;
582
- // Only do this if start time is still valid
583
- if ( startTime > this . sessionStartTime ) {
584
- subscriber . next ( cell ) ;
585
- }
586
- subscriber . complete ( ) ;
587
- } ;
588
-
589
- // When the request finishes we are done
590
- request . done . then ( ( ) => completion ( false ) , ( ) => completion ( true ) ) ;
591
- } else {
592
- subscriber . error ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
593
- }
594
-
560
+ // Attempt to change to the current directory. When that finishes
561
+ // send our real request
562
+ this . changeDirectoryIfPossible ( file , line )
563
+ . then ( ( ) => {
564
+ this . handleCodeRequest ( subscriber , startTime , cell , code ) ;
565
+ } )
566
+ . catch ( ( ) => {
567
+ // Ignore errors if they occur. Just execute normally
568
+ this . handleCodeRequest ( subscriber , startTime , cell , code ) ;
569
+ } ) ;
595
570
} ) ;
596
571
}
597
572
0 commit comments