|
33 | 33 | from .initialization_options import HoverDisableOptions, InitializationOptions
|
34 | 34 | from .type_map import get_lsp_completion_type, get_lsp_symbol_type
|
35 | 35 |
|
| 36 | +import functools |
| 37 | +import threading |
| 38 | +import inspect |
| 39 | + |
| 40 | +from ast import PyCF_ONLY_AST |
| 41 | + |
| 42 | +def debounce(interval_s, keyed_by=None): |
| 43 | + """ |
| 44 | + Debounce calls to this function until interval_s seconds have passed. |
| 45 | + Decorator copied from https://github.com/python-lsp/python-lsp-server |
| 46 | + """ |
| 47 | + def wrapper(func): |
| 48 | + timers = {} |
| 49 | + lock = threading.Lock() |
| 50 | + |
| 51 | + @functools.wraps(func) |
| 52 | + def debounced(*args, **kwargs): |
| 53 | + sig = inspect.signature(func) |
| 54 | + call_args = sig.bind(*args, **kwargs) |
| 55 | + key = call_args.arguments[keyed_by] if keyed_by else None |
| 56 | + |
| 57 | + def run(): |
| 58 | + with lock: |
| 59 | + del timers[key] |
| 60 | + return func(*args, **kwargs) |
| 61 | + |
| 62 | + with lock: |
| 63 | + old_timer = timers.get(key) |
| 64 | + if old_timer: |
| 65 | + old_timer.cancel() |
| 66 | + |
| 67 | + timer = threading.Timer(interval_s, run) |
| 68 | + timers[key] = timer |
| 69 | + timer.start() |
| 70 | + return debounced |
| 71 | + return wrapper |
| 72 | + |
36 | 73 |
|
37 | 74 | def _jedi_debug_function(
|
38 | 75 | color: str, # pylint: disable=unused-argument
|
@@ -238,6 +275,31 @@ def lsp_diagnostic(error: jedi.api.errors.SyntaxError) -> Diagnostic:
|
238 | 275 | )
|
239 | 276 |
|
240 | 277 |
|
| 278 | +def lsp_python_diagnostic(uri: str, source: str) -> Diagnostic: |
| 279 | + """Get LSP Diagnostic using the compile builtin.""" |
| 280 | + try: |
| 281 | + compile(source, uri, "exec", PyCF_ONLY_AST) |
| 282 | + return None |
| 283 | + except SyntaxError as err: |
| 284 | + column, line = err.offset - 1, err.lineno - 1 |
| 285 | + until_column = getattr(err, "end_offset", 0) - 1 |
| 286 | + until_line = getattr(err, "end_lineno", 0) - 1 |
| 287 | + |
| 288 | + if (line, column) >= (until_line, until_column): |
| 289 | + until_column, until_line = column, line |
| 290 | + column = 0 |
| 291 | + |
| 292 | + return Diagnostic( |
| 293 | + range=Range( |
| 294 | + start=Position(line=line, character=column), |
| 295 | + end=Position(line=until_line, character=until_column), |
| 296 | + ), |
| 297 | + message=err.__class__.__name__ + ': ' + str(err), |
| 298 | + severity=DiagnosticSeverity.Error, |
| 299 | + source="compile", |
| 300 | + ) |
| 301 | + |
| 302 | + |
241 | 303 | def line_column(position: Position) -> Tuple[int, int]:
|
242 | 304 | """Translate pygls Position to Jedi's line/column.
|
243 | 305 |
|
|
0 commit comments