Skip to content

Commit 2c6c11c

Browse files
committed
reimplement run locking and starting coroutines in another thread
1 parent 2f5f1c0 commit 2c6c11c

File tree

7 files changed

+265
-88
lines changed

7 files changed

+265
-88
lines changed

robotcode/jsonrpc2/protocol.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,12 @@ def handle_response(self, message: JsonRPCResponse) -> None:
595595
entry.future.set_result(res)
596596
else:
597597
if entry.future._loop.is_running():
598-
entry.future._loop.call_soon_threadsafe(entry.future.set_result, res)
598+
599+
def s(f: asyncio.Future[Any], r: Any) -> None:
600+
if not f.done():
601+
f.set_result(r)
602+
603+
entry.future._loop.call_soon_threadsafe(s, entry.future, res)
599604
else:
600605
self._logger.warning("Response loop is not running.")
601606

@@ -700,8 +705,8 @@ def done(t: asyncio.Task[Any]) -> None:
700705
self.send_error(JsonRPCErrors.REQUEST_CANCELLED, "Request canceled.", id=message.id)
701706
except (SystemExit, KeyboardInterrupt):
702707
raise
703-
except JsonRPCErrorException as ex:
704-
self.send_error(ex.code, ex.message, id=message.id, data=ex.data)
708+
except JsonRPCErrorException as e:
709+
self.send_error(e.code, e.message or f"{type(e).__name__}: {e}", id=message.id, data=e.data)
705710
except BaseException as e:
706711
self._logger.exception(e)
707712
self.send_error(JsonRPCErrors.INTERNAL_ERROR, f"{type(e).__name__}: {e}", id=message.id)

robotcode/language_server/common/parts/diagnostics.py

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ def __init__(self, protocol: LanguageServerProtocol) -> None:
9090
] = {}
9191

9292
self._current_workspace_task: Optional[asyncio.Task[WorkspaceDiagnosticReport]] = None
93+
self.in_get_document_diagnostics = Event(True)
94+
self.in_get_workspace_diagnostics = Event(True)
9395

9496
def extend_capabilities(self, capabilities: ServerCapabilities) -> None:
9597
if (
@@ -108,14 +110,26 @@ def extend_capabilities(self, capabilities: ServerCapabilities) -> None:
108110
async def collect(sender, document: TextDocument) -> DiagnosticsResult: # NOSONAR
109111
...
110112

113+
@async_tasking_event_iterator
114+
async def collect_stage2(sender, document: TextDocument) -> DiagnosticsResult: # NOSONAR
115+
...
116+
111117
@async_tasking_event
112-
async def collect_workspace_documents(sender) -> List[WorkspaceDocumentsResult]: # NOSONAR
118+
async def load_workspace_documents(sender) -> List[WorkspaceDocumentsResult]: # NOSONAR
113119
...
114120

115121
@async_tasking_event
116122
async def on_workspace_loaded(sender) -> None: # NOSONAR
117123
...
118124

125+
@async_tasking_event
126+
async def on_workspace_diagnostics_ended(sender) -> None: # NOSONAR
127+
...
128+
129+
@async_tasking_event
130+
async def on_document_diagnostics_ended(sender) -> None: # NOSONAR
131+
...
132+
119133
@async_event
120134
async def on_get_analysis_progress_mode(sender, uri: Uri) -> Optional[AnalysisProgressMode]: # NOSONAR
121135
...
@@ -125,17 +139,18 @@ async def on_get_diagnostics_mode(sender, uri: Uri) -> Optional[DiagnosticsMode]
125139
...
126140

127141
async def ensure_workspace_loaded(self) -> None:
128-
async with self._workspace_load_lock:
129-
if not self._workspace_loaded:
130-
if self.workspace_loaded_event.is_set():
131-
return
132-
133-
try:
134-
await self.collect_workspace_documents(self)
135-
finally:
136-
self._workspace_loaded = True
137-
self.workspace_loaded_event.set()
138-
await self.on_workspace_loaded(self)
142+
if not self._workspace_loaded:
143+
async with self._workspace_load_lock:
144+
if not self._workspace_loaded:
145+
if self.workspace_loaded_event.is_set():
146+
return
147+
148+
try:
149+
await self.load_workspace_documents(self)
150+
finally:
151+
self._workspace_loaded = True
152+
self.workspace_loaded_event.set()
153+
await self.on_workspace_loaded(self)
139154

140155
async def get_document_diagnostics(self, document: TextDocument) -> RelatedFullDocumentDiagnosticReport:
141156
return await document.get_cache(self.__get_document_diagnostics)
@@ -171,11 +186,13 @@ async def _text_document_diagnostic(
171186
*args: Any,
172187
**kwargs: Any,
173188
) -> DocumentDiagnosticReport:
174-
self._logger.info(lambda: f"textDocument/diagnostic for {text_document}")
189+
self._logger.debug(lambda: f"textDocument/diagnostic for {text_document}")
175190

176-
await self.ensure_workspace_loaded()
191+
self.in_get_document_diagnostics.clear()
177192

178193
try:
194+
await self.ensure_workspace_loaded()
195+
179196
document = await self.parent.documents.get(text_document.uri)
180197
if document is None:
181198
raise JsonRPCErrorException(ErrorCodes.INVALID_PARAMS, f"Document {text_document!r} not found")
@@ -187,7 +204,7 @@ async def _get_diagnostics() -> Optional[RelatedFullDocumentDiagnosticReport]:
187204
return None
188205

189206
if document in self._current_document_tasks and not self._current_document_tasks[document].done():
190-
self._logger.info(lambda: f"textDocument/diagnostic cancel old task {text_document}")
207+
self._logger.debug(lambda: f"textDocument/diagnostic cancel old task {text_document}")
191208
self._current_document_tasks[document].cancel()
192209

193210
task = create_sub_task(_get_diagnostics())
@@ -199,7 +216,7 @@ async def _get_diagnostics() -> Optional[RelatedFullDocumentDiagnosticReport]:
199216
raise RuntimeError("Unexpected result.")
200217

201218
except asyncio.CancelledError as e:
202-
self._logger.info(lambda: f"textDocument/diagnostic canceled {text_document}")
219+
self._logger.debug(lambda: f"textDocument/diagnostic canceled {text_document}")
203220

204221
raise JsonRPCErrorException(
205222
ErrorCodes.SERVER_CANCELLED, "Cancelled", data=DiagnosticServerCancellationData(True)
@@ -213,7 +230,10 @@ async def _get_diagnostics() -> Optional[RelatedFullDocumentDiagnosticReport]:
213230

214231
return result
215232
finally:
216-
self._logger.info(lambda: f"textDocument/diagnostic ready {text_document}")
233+
self.in_get_document_diagnostics.set()
234+
await self.on_document_diagnostics_ended(self)
235+
236+
self._logger.debug(lambda: f"textDocument/diagnostic ready {text_document}")
217237

218238
@rpc_method(name="workspace/diagnostic", param_type=WorkspaceDiagnosticParams)
219239
@threaded()
@@ -226,9 +246,7 @@ async def _workspace_diagnostic(
226246
*args: Any,
227247
**kwargs: Any,
228248
) -> WorkspaceDiagnosticReport:
229-
self._logger.critical("workspace/diagnostic")
230-
231-
await self.ensure_workspace_loaded()
249+
self._logger.debug("workspace/diagnostic")
232250

233251
async def _get_diagnostics() -> WorkspaceDiagnosticReport:
234252
result: List[WorkspaceDocumentDiagnosticReport] = []
@@ -303,22 +321,30 @@ async def _task(doc: TextDocument) -> None:
303321

304322
return WorkspaceDiagnosticReport(items=[])
305323

306-
if self._current_workspace_task is not None:
307-
self._current_workspace_task.cancel()
308-
309-
task = create_sub_task(_get_diagnostics() if partial_result_token is None else _get_partial_diagnostics())
310-
self._current_workspace_task = task
324+
await self.in_get_document_diagnostics.wait()
325+
self.in_get_workspace_diagnostics.clear()
311326
try:
312-
return await task
313-
except asyncio.CancelledError as e:
314-
self._logger.critical("workspace/diagnostic canceled")
315-
raise JsonRPCErrorException(
316-
ErrorCodes.SERVER_CANCELLED, "Cancelled", data=DiagnosticServerCancellationData(True)
317-
) from e
327+
await self.ensure_workspace_loaded()
328+
329+
if self._current_workspace_task is not None:
330+
self._current_workspace_task.cancel()
331+
332+
task = create_sub_task(_get_diagnostics() if partial_result_token is None else _get_partial_diagnostics())
333+
self._current_workspace_task = task
334+
try:
335+
return await task
336+
except asyncio.CancelledError as e:
337+
self._logger.debug("workspace/diagnostic canceled")
338+
raise JsonRPCErrorException(
339+
ErrorCodes.SERVER_CANCELLED, "ServerCancelled", data=DiagnosticServerCancellationData(True)
340+
) from e
341+
finally:
342+
if self._current_workspace_task == task:
343+
self._current_workspace_task = None
344+
self._logger.debug("workspace/diagnostic ready")
318345
finally:
319-
if self._current_workspace_task == task:
320-
self._current_workspace_task = None
321-
self._logger.critical("workspace/diagnostic ready")
346+
self.in_get_workspace_diagnostics.set()
347+
await self.on_workspace_diagnostics_ended(self)
322348

323349
def cancel_workspace_diagnostics(self) -> None:
324350
if self._current_workspace_task is not None and not self._current_workspace_task.done():

robotcode/language_server/robotframework/parts/codelens.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import ast
44
from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple, cast
55

6-
from ....utils.async_tools import create_sub_task, run_coroutine_in_thread, threaded
6+
from ....utils.async_tools import create_sub_task, threaded
77
from ....utils.logging import LoggingDescriptor
88
from ...common.decorators import language_id
99
from ...common.lsp_types import CodeLens, Command
@@ -28,16 +28,13 @@ def __init__(self, parent: RobotLanguageServerProtocol) -> None:
2828
parent.code_lens.collect.add(self.collect)
2929
parent.code_lens.resolve.add(self.resolve)
3030

31-
parent.robot_references.cache_cleared.add(self.robot_references_cache_cleared)
32-
3331
self._running_task: Set[Tuple[TextDocument, KeywordDoc]] = set()
3432

35-
parent.diagnostics.on_workspace_loaded.add(self.diagnostics_on_workspace_loaded)
36-
37-
async def robot_references_cache_cleared(self, sender: Any) -> None: # NOSONAR
38-
await self.parent.code_lens.refresh()
33+
parent.diagnostics.on_workspace_loaded.add(self.codelens_refresh)
34+
parent.diagnostics.on_document_diagnostics_ended.add(self.codelens_refresh)
35+
parent.diagnostics.on_workspace_diagnostics_ended.add(self.codelens_refresh)
3936

40-
async def diagnostics_on_workspace_loaded(self, sender: Any) -> None: # NOSONAR
37+
async def codelens_refresh(self, sender: Any) -> None: # NOSONAR
4138
await self.parent.code_lens.refresh()
4239

4340
@language_id("robotframework")
@@ -114,7 +111,11 @@ async def resolve(self, sender: Any, code_lens: CodeLens) -> Optional[CodeLens]:
114111
name = code_lens.data["name"]
115112
line = code_lens.data["line"]
116113

117-
if self.parent.diagnostics.workspace_loaded_event.is_set():
114+
if (
115+
self.parent.diagnostics.workspace_loaded_event.is_set()
116+
and self.parent.diagnostics.in_get_workspace_diagnostics.is_set()
117+
and self.parent.diagnostics.in_get_document_diagnostics.is_set()
118+
):
118119
kw_doc = await self.get_keyword_definition_at_line(namespace, name, line)
119120

120121
if kw_doc is not None and not kw_doc.is_error_handler:
@@ -131,17 +132,17 @@ async def find_refs() -> None:
131132
if document is None or kw_doc is None:
132133
return
133134

134-
await run_coroutine_in_thread(
135-
self.parent.robot_references.find_keyword_references,
136-
document,
137-
kw_doc,
138-
include_declaration=False,
139-
)
140-
141-
# await self.parent.robot_references.find_keyword_references(
142-
# document, kw_doc, include_declaration=False
135+
# await run_coroutine_in_thread(
136+
# self.parent.robot_references.find_keyword_references,
137+
# document,
138+
# kw_doc,
139+
# include_declaration=False,
143140
# )
144141

142+
await self.parent.robot_references.find_keyword_references(
143+
document, kw_doc, include_declaration=False
144+
)
145+
145146
await self.parent.code_lens.refresh()
146147

147148
key = (document, kw_doc)

robotcode/language_server/robotframework/parts/diagnostics.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,17 @@ def __init__(self, parent: RobotLanguageServerProtocol) -> None:
4343

4444
parent.diagnostics.collect.add(self.collect_namespace_diagnostics)
4545

46-
parent.diagnostics.collect.add(self.collect_unused_references)
46+
parent.diagnostics.collect_stage2.add(self.collect_unused_references)
4747

4848
parent.documents_cache.namespace_invalidated.add(self.namespace_invalidated)
4949

50-
async def cache_cleared(self, sender: Any) -> None:
51-
...
52-
5350
@language_id("robotframework")
5451
async def namespace_invalidated(self, sender: Any, document: TextDocument) -> None:
5552
self.parent.diagnostics.cancel_workspace_diagnostics()
5653

5754
@language_id("robotframework")
5855
@threaded()
56+
@_logger.call
5957
async def collect_namespace_diagnostics(self, sender: Any, document: TextDocument) -> DiagnosticsResult:
6058
try:
6159
namespace = await self.parent.documents_cache.get_namespace(document)
@@ -120,6 +118,7 @@ def _create_error_from_token(self, token: Token, source: Optional[str] = None) -
120118
)
121119

122120
@language_id("robotframework")
121+
@threaded()
123122
@_logger.call
124123
async def collect_token_errors(self, sender: Any, document: TextDocument) -> DiagnosticsResult:
125124
from robot.errors import VariableError

robotcode/language_server/robotframework/parts/robot_workspace.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class RobotWorkspaceProtocolPart(RobotLanguageServerProtocolPart):
3232
def __init__(self, parent: RobotLanguageServerProtocol) -> None:
3333
super().__init__(parent)
3434
self.parent.documents.on_read_document_text.add(self._on_read_document_text)
35-
self.parent.diagnostics.collect_workspace_documents.add(self._collect_workspace_documents)
35+
self.parent.diagnostics.load_workspace_documents.add(self._load_workspace_documents)
3636
self.parent.diagnostics.on_get_diagnostics_mode.add(self.on_get_diagnostics_mode)
3737
self.parent.diagnostics.on_get_analysis_progress_mode.add(self.on_get_analysis_progress_mode)
3838
self.workspace_loaded = Event()
@@ -53,7 +53,7 @@ async def on_get_analysis_progress_mode(self, sender: Any, uri: Uri) -> Optional
5353
return config.progress_mode
5454

5555
@threaded()
56-
async def _collect_workspace_documents(self, sender: Any) -> List[WorkspaceDocumentsResult]:
56+
async def _load_workspace_documents(self, sender: Any) -> List[WorkspaceDocumentsResult]:
5757

5858
result: List[WorkspaceDocumentsResult] = []
5959

robotcode/utils/async_cache.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,33 @@ def __init__(self, max_items: Optional[int] = 128) -> None:
2727
self._order: Optional[List[Tuple[Any, ...]]] = None
2828
if self.max_items:
2929
self._order = []
30+
self.lock: Lock = Lock()
3031

3132
async def has(self, *args: Any, **kwargs: Any) -> bool:
3233
key = self._make_key(*args, **kwargs)
33-
34-
if key in self._cache:
35-
return self._cache[key].has_data
34+
async with self.lock:
35+
if key in self._cache:
36+
return self._cache[key].has_data
3637

3738
return False
3839

3940
async def get(self, func: Callable[..., Awaitable[_T]], *args: Any, **kwargs: Any) -> _T:
4041
key = self._make_key(*args, **kwargs)
4142

42-
entry = self._cache[key]
43+
async with self.lock:
44+
entry = self._cache[key]
4345

4446
async with entry.lock:
4547
if not entry.has_data:
4648
entry.data = await func(*args, **kwargs)
4749
entry.has_data = True
4850

49-
if self._order is not None and self.max_items is not None:
50-
self._order.insert(0, key)
51+
async with self.lock:
52+
if self._order is not None and self.max_items is not None:
53+
self._order.insert(0, key)
5154

52-
if len(self._order) > self.max_items:
53-
del self._cache[self._order.pop()]
55+
if len(self._order) > self.max_items:
56+
del self._cache[self._order.pop()]
5457

5558
return cast(_T, entry.data)
5659

@@ -59,6 +62,7 @@ def _make_key(*args: Any, **kwargs: Any) -> Tuple[Any, ...]:
5962
return (tuple(_freeze(v) for v in args), hash(frozenset({k: _freeze(v) for k, v in kwargs.items()})))
6063

6164
async def clear(self) -> None:
62-
self._cache.clear()
63-
if self._order is not None:
64-
self._order.clear()
65+
async with self.lock:
66+
self._cache.clear()
67+
if self._order is not None:
68+
self._order.clear()

0 commit comments

Comments
 (0)