2
2
3
3
import asyncio
4
4
import itertools
5
+ import threading
5
6
import uuid
6
7
from dataclasses import dataclass , field
7
8
from enum import Enum
@@ -74,10 +75,34 @@ class WorkspaceDocumentsResult:
74
75
75
76
@dataclass
76
77
class DiagnosticsData :
77
- id : str
78
+ id : str = field ( default_factory = lambda : str ( uuid . uuid4 ()))
78
79
entries : Dict [Any , Optional [List [Diagnostic ]]] = field (default_factory = dict )
79
80
version : Optional [int ] = None
80
81
task : Optional [asyncio .Task [Any ]] = None
82
+ force : bool = False
83
+
84
+
85
+ def _cancel_all_tasks (loop : asyncio .AbstractEventLoop ) -> None :
86
+ to_cancel = asyncio .all_tasks (loop )
87
+ if not to_cancel :
88
+ return
89
+
90
+ for task in to_cancel :
91
+ task .cancel ()
92
+
93
+ loop .run_until_complete (asyncio .gather (* to_cancel , loop = loop , return_exceptions = True ))
94
+
95
+ for task in to_cancel :
96
+ if task .cancelled ():
97
+ continue
98
+ if task .exception () is not None :
99
+ loop .call_exception_handler (
100
+ {
101
+ "message" : "unhandled exception during asyncio.run() shutdown" ,
102
+ "exception" : task .exception (),
103
+ "task" : task ,
104
+ }
105
+ )
81
106
82
107
83
108
class DiagnosticsProtocolPart (LanguageServerProtocolPart , HasExtendCapabilities ):
@@ -97,18 +122,63 @@ def __init__(self, protocol: LanguageServerProtocol) -> None:
97
122
98
123
self ._current_workspace_task : Optional [asyncio .Task [WorkspaceDiagnosticReport ]] = None
99
124
125
+ self ._diagnostics_loop : Optional [asyncio .AbstractEventLoop ] = None
126
+ self ._diagnostics_loop_lock = threading .RLock ()
127
+ self ._diagnostics_started = threading .Event ()
128
+
129
+ self .parent .on_initialized .add (self .initialized )
130
+
100
131
self .in_get_workspace_diagnostics = Event (True )
101
132
133
+ async def initialized (self , sender : Any ) -> None :
134
+ self ._ensure_diagnostics_thread_started ()
135
+
136
+ @property
137
+ def diagnostics_loop (self ) -> asyncio .AbstractEventLoop :
138
+ if self ._diagnostics_loop is None :
139
+ self ._ensure_diagnostics_thread_started ()
140
+
141
+ assert self ._diagnostics_loop is not None
142
+
143
+ return self ._diagnostics_loop
144
+
145
+ def _run_diagnostics (self ) -> None :
146
+ loop = asyncio .new_event_loop ()
147
+ asyncio .set_event_loop (loop )
148
+ try :
149
+ self ._diagnostics_loop = loop
150
+ self ._diagnostics_started .set ()
151
+
152
+ loop .slow_callback_duration = 10
153
+
154
+ loop .run_forever ()
155
+ _cancel_all_tasks (loop )
156
+ loop .run_until_complete (loop .shutdown_asyncgens ())
157
+ finally :
158
+ asyncio .set_event_loop (None )
159
+ loop .close ()
160
+
161
+ def _ensure_diagnostics_thread_started (self ) -> None :
162
+ with self ._diagnostics_loop_lock :
163
+ if self ._diagnostics_loop is None :
164
+ self ._server_thread = threading .Thread (
165
+ name = "diagnostics_worker" , target = self ._run_diagnostics , daemon = True
166
+ )
167
+
168
+ self ._server_thread .start ()
169
+
170
+ if not self ._diagnostics_started .wait (10 ):
171
+ raise RuntimeError ("Can't start diagnostics worker thread." )
172
+
102
173
def extend_capabilities (self , capabilities : ServerCapabilities ) -> None :
103
174
if (
104
175
self .parent .client_capabilities is not None
105
176
and self .parent .client_capabilities .text_document is not None
106
177
and self .parent .client_capabilities .text_document .diagnostic is not None
107
178
):
108
- # capabilities.diagnostic_provider = None
109
179
capabilities .diagnostic_provider = DiagnosticOptions (
110
180
inter_file_dependencies = True ,
111
- workspace_diagnostics = True ,
181
+ workspace_diagnostics = False ,
112
182
identifier = f"robotcodelsp_{ uuid .uuid4 ()} " ,
113
183
work_done_progress = True ,
114
184
)
@@ -117,19 +187,6 @@ def extend_capabilities(self, capabilities: ServerCapabilities) -> None:
117
187
async def collect (sender , document : TextDocument ) -> DiagnosticsResult : # NOSONAR
118
188
...
119
189
120
- @async_tasking_event_iterator
121
- async def collect_document_has_diagnostics (sender , document : TextDocument ) -> bool : # NOSONAR
122
- ...
123
-
124
- async def document_has_diagnostics (self , document : TextDocument ) -> bool : # NOSONAR
125
- async for result in self .collect_document_has_diagnostics (
126
- self , document , callback_filter = language_id_filter (document )
127
- ):
128
- if result :
129
- return True
130
-
131
- return False
132
-
133
190
@async_tasking_event
134
191
async def load_workspace_documents (sender ) -> List [WorkspaceDocumentsResult ]: # NOSONAR
135
192
...
@@ -167,10 +224,22 @@ async def ensure_workspace_loaded(self) -> None:
167
224
self ._workspace_loaded = True
168
225
self .workspace_loaded_event .set ()
169
226
await self .on_workspace_loaded (self )
170
- await self .refresh ()
227
+ await self .force_refresh_all ()
228
+
229
+ async def force_refresh_all (self ) -> None :
230
+ for doc in self .parent .documents .documents :
231
+ self .get_diagnostics_data (doc ).force = True
232
+
233
+ await self .refresh ()
234
+
235
+ async def force_refresh_document (self , document : TextDocument ) -> None :
236
+ self .get_diagnostics_data (document ).force = True
237
+ if document .opened_in_editor :
238
+ await self .refresh ()
171
239
172
240
@_logger .call
173
- async def _get_diagnostics (self , document : TextDocument , data : DiagnosticsData ) -> None :
241
+ async def _get_diagnostics_for_document (self , document : TextDocument , data : DiagnosticsData ) -> None :
242
+ self ._logger .debug (lambda : f"Get diagnostics for { document } " )
174
243
175
244
await asyncio .sleep (0.75 )
176
245
@@ -193,12 +262,11 @@ async def _get_diagnostics(self, document: TextDocument, data: DiagnosticsData)
193
262
if result .diagnostics is not None :
194
263
collected_keys .append (result .key )
195
264
196
- await self .refresh ()
265
+ if document .opened_in_editor :
266
+ await self .refresh ()
197
267
198
268
except asyncio .CancelledError :
199
- self ._logger .critical (lambda : f"_get_diagnostics cancelled for { document } " )
200
- else :
201
- await self .refresh ()
269
+ self ._logger .debug (lambda : f"_get_diagnostics cancelled for { document } " )
202
270
finally :
203
271
for k in set (data .entries .keys ()) - set (collected_keys ):
204
272
data .entries .pop (k )
@@ -214,50 +282,41 @@ async def _text_document_diagnostic(
214
282
** kwargs : Any ,
215
283
) -> DocumentDiagnosticReport :
216
284
try :
217
- # if not self.workspace_loaded_event.is_set():
218
- # raise JsonRPCErrorException(
219
- # ErrorCodes.SERVER_CANCELLED,
220
- # "Workspace not loaded.",
221
- # data=DiagnosticServerCancellationData(True),
222
- # )
223
-
224
285
document = await self .parent .documents .get (text_document .uri )
225
286
if document is None :
226
- raise JsonRPCErrorException (ErrorCodes .INVALID_PARAMS , f"Document { text_document !r} not found." )
287
+ raise JsonRPCErrorException (ErrorCodes .SERVER_CANCELLED , f"Document { text_document !r} not found." )
227
288
228
- data : DiagnosticsData = document . get_data ( self , None )
289
+ data = self . get_diagnostics_data ( document )
229
290
230
- if data is None :
231
- data = DiagnosticsData (str (uuid .uuid4 ()))
232
- document .set_data (self , data )
233
-
234
- if (
235
- document .version != data .version
236
- or data .task is None
237
- or not await self .document_has_diagnostics (document )
238
- ):
291
+ if data .force or document .version != data .version or data .task is None :
239
292
240
293
task = data .task
241
294
242
- data = DiagnosticsData (str ( uuid . uuid4 ()) )
295
+ data = DiagnosticsData ()
243
296
document .set_data (self , data )
244
297
245
298
if task is not None and not task .done ():
246
- self ._logger .critical (lambda : f"try to cancel diagnostics for { document } " )
299
+ self ._logger .debug (lambda : f"try to cancel diagnostics for { document } " )
247
300
task .get_loop ().call_soon_threadsafe (task .cancel )
248
301
249
302
data .version = document .version
250
303
data .task = create_sub_task (
251
- self ._get_diagnostics (document , data ), loop = self .parent .loop , name = f"diagnostics ${ text_document } "
304
+ self ._get_diagnostics_for_document (document , data ),
305
+ loop = self .diagnostics_loop ,
306
+ name = f"diagnostics ${ text_document } " ,
252
307
)
253
308
254
309
def done (t : asyncio .Task [Any ]) -> None :
255
- if t .cancelled ():
256
- self ._logger .critical (lambda : f"diagnostics for { document } canceled" )
257
- try :
258
- t .exception ()
259
- except asyncio .CancelledError :
260
- pass
310
+
311
+ self ._logger .debug (lambda : f"diagnostics for { document } { 'canceled' if t .cancelled () else 'ended' } " )
312
+ try :
313
+ t .result ()
314
+ except asyncio .CancelledError :
315
+ pass
316
+ except (SystemExit , KeyboardInterrupt ):
317
+ raise
318
+ except BaseException as e :
319
+ self ._logger .exception (e )
261
320
262
321
data .task .add_done_callback (done )
263
322
@@ -268,9 +327,18 @@ def done(t: asyncio.Task[Any]) -> None:
268
327
list (itertools .chain (* (e for e in data .entries .values () if e is not None ))), result_id = data .id
269
328
)
270
329
except asyncio .CancelledError :
271
- self ._logger .critical ("canceled _text_document_diagnostic" )
330
+ self ._logger .debug ("canceled _text_document_diagnostic" )
272
331
raise
273
332
333
+ def get_diagnostics_data (self , document : TextDocument ) -> DiagnosticsData :
334
+ data : DiagnosticsData = document .get_data (self , None )
335
+
336
+ if data is None :
337
+ data = DiagnosticsData (str (uuid .uuid4 ()))
338
+ document .set_data (self , data )
339
+
340
+ return data
341
+
274
342
@rpc_method (name = "workspace/diagnostic" , param_type = WorkspaceDiagnosticParams )
275
343
@threaded ()
276
344
async def _workspace_diagnostic (
0 commit comments