@@ -8,6 +8,10 @@ import type { CancellationToken } from '../utils/cancellation.js';
88import type { LangiumCoreServices } from '../services.js' ;
99import type { AstNode } from '../syntax-tree.js' ;
1010import type { LangiumParser , ParseResult } from './langium-parser.js' ;
11+ import type { Hydrator } from '../serializer/hydrator.js' ;
12+ import type { Event } from '../utils/event.js' ;
13+ import { Deferred , OperationCancelled } from '../utils/promise-utils.js' ;
14+ import { Emitter } from '../utils/event.js' ;
1115
1216/**
1317 * Async parser that allows to cancel the current parsing process.
@@ -37,3 +41,156 @@ export class DefaultAsyncParser implements AsyncParser {
3741 return Promise . resolve ( this . syncParser . parse < T > ( text ) ) ;
3842 }
3943}
44+
45+ export abstract class AbstractThreadedAsyncParser implements AsyncParser {
46+
47+ /**
48+ * The thread count determines how many threads are used to parse files in parallel.
49+ * The default value is 8. Decreasing this value increases startup performance, but decreases parallel parsing performance.
50+ */
51+ protected threadCount = 8 ;
52+ /**
53+ * The termination delay determines how long the parser waits for a thread to finish after a cancellation request.
54+ * The default value is 200(ms).
55+ */
56+ protected terminationDelay = 200 ;
57+ protected workerPool : ParserWorker [ ] = [ ] ;
58+ protected queue : Array < Deferred < ParserWorker > > = [ ] ;
59+
60+ protected readonly hydrator : Hydrator ;
61+
62+ constructor ( services : LangiumCoreServices ) {
63+ this . hydrator = services . serializer . Hydrator ;
64+ }
65+
66+ protected initializeWorkers ( ) : void {
67+ while ( this . workerPool . length < this . threadCount ) {
68+ const worker = this . createWorker ( ) ;
69+ worker . onReady ( ( ) => {
70+ if ( this . queue . length > 0 ) {
71+ const deferred = this . queue . shift ( ) ;
72+ if ( deferred ) {
73+ worker . lock ( ) ;
74+ deferred . resolve ( worker ) ;
75+ }
76+ }
77+ } ) ;
78+ this . workerPool . push ( worker ) ;
79+ }
80+ }
81+
82+ async parse < T extends AstNode > ( text : string , cancelToken : CancellationToken ) : Promise < ParseResult < T > > {
83+ const worker = await this . acquireParserWorker ( cancelToken ) ;
84+ const deferred = new Deferred < ParseResult < T > > ( ) ;
85+ let timeout : NodeJS . Timeout | undefined ;
86+ // If the cancellation token is requested, we wait for a certain time before terminating the worker.
87+ // Since the cancellation token lives longer than the parsing process, we need to dispose the event listener.
88+ // Otherwise, we might accidentally terminate the worker after the parsing process has finished.
89+ const cancellation = cancelToken . onCancellationRequested ( ( ) => {
90+ timeout = setTimeout ( ( ) => {
91+ this . terminateWorker ( worker ) ;
92+ } , this . terminationDelay ) ;
93+ } ) ;
94+ worker . parse ( text ) . then ( result => {
95+ result . value = this . hydrator . hydrate ( result . value ) ;
96+ deferred . resolve ( result as ParseResult < T > ) ;
97+ } ) . catch ( err => {
98+ deferred . reject ( err ) ;
99+ } ) . finally ( ( ) => {
100+ cancellation . dispose ( ) ;
101+ clearTimeout ( timeout ) ;
102+ } ) ;
103+ return deferred . promise ;
104+ }
105+
106+ protected terminateWorker ( worker : ParserWorker ) : void {
107+ worker . terminate ( ) ;
108+ const index = this . workerPool . indexOf ( worker ) ;
109+ if ( index >= 0 ) {
110+ this . workerPool . splice ( index , 1 ) ;
111+ }
112+ }
113+
114+ protected async acquireParserWorker ( cancelToken : CancellationToken ) : Promise < ParserWorker > {
115+ this . initializeWorkers ( ) ;
116+ for ( const worker of this . workerPool ) {
117+ if ( worker . ready ) {
118+ worker . lock ( ) ;
119+ return worker ;
120+ }
121+ }
122+ const deferred = new Deferred < ParserWorker > ( ) ;
123+ cancelToken . onCancellationRequested ( ( ) => {
124+ const index = this . queue . indexOf ( deferred ) ;
125+ if ( index >= 0 ) {
126+ this . queue . splice ( index , 1 ) ;
127+ }
128+ deferred . reject ( 'OperationCancelled' ) ;
129+ } ) ;
130+ this . queue . push ( deferred ) ;
131+ return deferred . promise ;
132+ }
133+
134+ protected abstract createWorker ( ) : ParserWorker ;
135+ }
136+
137+ export type WorkerMessagePost = ( message : unknown ) => void ;
138+ export type WorkerMessageCallback = ( cb : ( message : unknown ) => void ) => void ;
139+
140+ export class ParserWorker {
141+
142+ protected readonly sendMessage : WorkerMessagePost ;
143+ protected readonly _terminate : ( ) => void ;
144+ protected readonly onReadyEmitter = new Emitter < void > ( ) ;
145+
146+ protected deferred = new Deferred < ParseResult > ( ) ;
147+ protected _ready = true ;
148+ protected _parsing = false ;
149+
150+ get ready ( ) : boolean {
151+ return this . _ready ;
152+ }
153+
154+ get onReady ( ) : Event < void > {
155+ return this . onReadyEmitter . event ;
156+ }
157+
158+ constructor ( sendMessage : WorkerMessagePost , onMessage : WorkerMessageCallback , onError : WorkerMessageCallback , terminate : ( ) => void ) {
159+ this . sendMessage = sendMessage ;
160+ this . _terminate = terminate ;
161+ onMessage ( result => {
162+ const parseResult = result as ParseResult ;
163+ this . deferred . resolve ( parseResult ) ;
164+ this . unlock ( ) ;
165+ } ) ;
166+ onError ( error => {
167+ this . deferred . reject ( error ) ;
168+ this . unlock ( ) ;
169+ } ) ;
170+ }
171+
172+ terminate ( ) : void {
173+ this . deferred . reject ( OperationCancelled ) ;
174+ this . _terminate ( ) ;
175+ }
176+
177+ lock ( ) : void {
178+ this . _ready = false ;
179+ }
180+
181+ unlock ( ) : void {
182+ this . _parsing = false ;
183+ this . _ready = true ;
184+ this . onReadyEmitter . fire ( ) ;
185+ }
186+
187+ parse ( text : string ) : Promise < ParseResult > {
188+ if ( this . _parsing ) {
189+ throw new Error ( 'Parser worker is busy' ) ;
190+ }
191+ this . _parsing = true ;
192+ this . deferred = new Deferred ( ) ;
193+ this . sendMessage ( text ) ;
194+ return this . deferred . promise ;
195+ }
196+ }
0 commit comments