1- import { writeAll } from " https://deno.land/std@0.216.0/io/write_all.ts" ;
1+ import { writeAll } from ' https://deno.land/std@0.216.0/io/write_all.ts' ;
22
33import * as jsonrpc from 'jsonrpc-lite' ;
44
@@ -33,26 +33,52 @@ const COMMAND_PONG = '_zPONG';
3333
3434export const RPCResponseObserver = new EventTarget ( ) ;
3535
36+ class ProcessingLock {
37+ private isProcessing = false ;
38+
39+ lock ( ) {
40+ const granted = this . isProcessing === false ;
41+
42+ if ( ! granted ) {
43+ return {
44+ granted,
45+ [ Symbol . dispose ] : ( ) => { } ,
46+ } ;
47+ }
48+
49+ this . isProcessing = true ;
50+
51+ return {
52+ granted,
53+ [ Symbol . dispose ] : ( ) => {
54+ this . isProcessing = false ;
55+ } ,
56+ } ;
57+ }
58+ [ Symbol . dispose ] ( ) {
59+ this . isProcessing = false ;
60+ }
61+ }
62+
3663export const Queue = new ( class Queue {
3764 private queue : Uint8Array [ ] = [ ] ;
38- private isProcessing = false ;
65+ private isProcessingLock = new ProcessingLock ( ) ;
3966
4067 private async processQueue ( ) {
41- if ( this . isProcessing ) {
68+ using processing = this . isProcessingLock . lock ( ) ;
69+
70+ if ( ! processing . granted ) {
4271 return ;
4372 }
4473
45- this . isProcessing = true ;
46-
4774 while ( this . queue . length ) {
48- const message = this . queue . shift ( ) ;
75+ const [ message ] = this . queue ;
4976
5077 if ( message ) {
5178 await Transport . send ( message ) ;
5279 }
80+ this . queue . shift ( ) ;
5381 }
54-
55- this . isProcessing = false ;
5682 }
5783
5884 public enqueue ( message : jsonrpc . JsonRpc | typeof COMMAND_PONG ) {
@@ -63,7 +89,7 @@ export const Queue = new (class Queue {
6389 public getCurrentSize ( ) {
6490 return this . queue . length ;
6591 }
66- } ) ;
92+ } ) ( ) ;
6793
6894export const Transport = new ( class Transporter {
6995 private selectedTransport : Transporter [ 'stdoutTransport' ] | Transporter [ 'noopTransport' ] ;
0 commit comments