Skip to content

Commit 671cb2b

Browse files
committed
fixing for lint and progress outputs
1 parent 1856f5c commit 671cb2b

File tree

3 files changed

+38
-57
lines changed

3 files changed

+38
-57
lines changed

dash/_callback.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ def _set_side_update(ctx, response) -> bool:
300300
return False
301301

302302

303-
# pylint: disable=too-many-branches,too-many-statements
303+
# pylint: disable=too-many-branches,too-many-statements,too-many-locals
304304
def register_callback(
305305
callback_list, callback_map, config_prevent_initial_callbacks, *_args, **_kwargs
306306
):
@@ -475,24 +475,9 @@ def update_long_callback(error_handler, callback_ctx, response, kwargs):
475475
cache_key = flask.request.args.get("cacheKey")
476476
job_id = flask.request.args.get("job")
477477

478-
output_value = callback_manager.get_result(cache_key, job_id)
479-
480478
progress_long_callback(response, callback_manager)
481479

482-
return handle_rest_long_callback(
483-
output_value, callback_manager, response, error_handler, callback_ctx
484-
)
485-
486-
async def async_update_long_callback(error_handler, callback_ctx, response, kwargs):
487-
"""Set up the long callback and manage jobs."""
488-
callback_manager = get_callback_manager(kwargs)
489-
490-
cache_key = flask.request.args.get("cacheKey")
491-
job_id = flask.request.args.get("job")
492-
493-
output_value = await callback_manager.async_get_result(cache_key, job_id)
494-
495-
progress_long_callback(response, callback_manager)
480+
output_value = callback_manager.get_result(cache_key, job_id)
496481

497482
return handle_rest_long_callback(
498483
output_value, callback_manager, response, error_handler, callback_ctx

dash/long_callback/managers/celery_manager.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import json
22
import traceback
33
from contextvars import copy_context
4+
import asyncio
5+
from functools import partial
46

57
from _plotly_utils.utils import PlotlyJSONEncoder
68

@@ -9,8 +11,6 @@
911
from dash.exceptions import PreventUpdate
1012
from dash.long_callback._proxy_set_props import ProxySetProps
1113
from dash.long_callback.managers import BaseLongCallbackManager
12-
import asyncio
13-
from functools import partial
1414

1515

1616
class CeleryManager(BaseLongCallbackManager):
@@ -92,14 +92,7 @@ def clear_cache_entry(self, key):
9292
self.handle.backend.delete(key)
9393

9494
def call_job_fn(self, key, job_fn, args, context):
95-
if asyncio.iscoroutinefunction(job_fn):
96-
# pylint: disable-next=import-outside-toplevel,no-name-in-module,import-error
97-
from asgiref.sync import async_to_sync
98-
99-
new_job_fun = async_to_sync(job_fn)
100-
task = new_job_fun.delay(key, self._make_progress_key(key), args, context)
101-
else:
102-
task = job_fn.delay(key, self._make_progress_key(key), args, context)
95+
task = job_fn.delay(key, self._make_progress_key(key), args, context)
10396
return task.task_id
10497

10598
def get_progress(self, key):
@@ -144,11 +137,13 @@ def get_updated_props(self, key):
144137
return json.loads(updated_props)
145138

146139

147-
def _make_job_fn(fn, celery_app, progress, key):
140+
def _make_job_fn(fn, celery_app, progress, key): # pylint: disable=too-many-statements
148141
cache = celery_app.backend
149142

150143
@celery_app.task(name=f"long_callback_{key}")
151-
def job_fn(result_key, progress_key, user_callback_args, context=None):
144+
def job_fn(
145+
result_key, progress_key, user_callback_args, context=None
146+
): # pylint: disable=too-many-statements
152147
def _set_progress(progress_value):
153148
if not isinstance(progress_value, (list, tuple)):
154149
progress_value = [progress_value]
@@ -224,21 +219,31 @@ async def async_run():
224219
else:
225220
user_callback_output = await fn(*maybe_progress, user_callback_args)
226221
except PreventUpdate:
222+
# Put NoUpdate dict directly to avoid circular imports.
227223
errored = True
228-
cache.set(result_key, {"_dash_no_update": "_dash_no_update"})
224+
cache.set(
225+
result_key,
226+
json.dumps(
227+
{"_dash_no_update": "_dash_no_update"}, cls=PlotlyJSONEncoder
228+
),
229+
)
229230
except Exception as err: # pylint: disable=broad-except
230231
errored = True
231232
cache.set(
232233
result_key,
233-
{
234-
"long_callback_error": {
235-
"msg": str(err),
236-
"tb": traceback.format_exc(),
237-
}
238-
},
234+
json.dumps(
235+
{
236+
"long_callback_error": {
237+
"msg": str(err),
238+
"tb": traceback.format_exc(),
239+
}
240+
},
241+
),
239242
)
243+
240244
if asyncio.iscoroutine(user_callback_output):
241245
user_callback_output = await user_callback_output
246+
242247
if not errored:
243248
cache.set(
244249
result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder)

dash/long_callback/managers/diskcache_manager.py

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import traceback
22
from contextvars import copy_context
3+
import asyncio
4+
from functools import partial
35

46
from . import BaseLongCallbackManager
57
from .._proxy_set_props import ProxySetProps
68
from ..._callback_context import context_value
79
from ..._utils import AttributeDict
810
from ...exceptions import PreventUpdate
9-
import asyncio
10-
from functools import partial
1111

1212
_pending_value = "__$pending__"
1313

@@ -131,24 +131,13 @@ def call_job_fn(self, key, job_fn, args, context):
131131
# pylint: disable-next=import-outside-toplevel,no-name-in-module,import-error
132132
from multiprocess import Process
133133

134-
# Check if the job is asynchronous
135-
if asyncio.iscoroutinefunction(job_fn):
136-
# For async jobs, run in an event loop in a new process
137-
process = Process(
138-
target=self._run_async_in_process,
139-
args=(job_fn, key, args, context),
140-
)
141-
process.start()
142-
return process.pid
143-
else:
144-
# For sync jobs, use the existing implementation
145-
# pylint: disable-next=not-callable
146-
process = Process(
147-
target=job_fn,
148-
args=(key, self._make_progress_key(key), args, context),
149-
)
150-
process.start()
151-
return process.pid
134+
# pylint: disable-next=not-callable
135+
process = Process(
136+
target=job_fn,
137+
args=(key, self._make_progress_key(key), args, context),
138+
)
139+
process.start()
140+
return process.pid
152141

153142
@staticmethod
154143
def _run_async_in_process(job_fn, key, args, context):
@@ -172,7 +161,7 @@ def _run_async_in_process(job_fn, key, args, context):
172161
loop.run_until_complete(async_job())
173162
except Exception as e:
174163
# Handle errors, log them, and cache if necessary
175-
raise str(e)
164+
raise Exception(str(e)) from e
176165
finally:
177166
loop.close()
178167

@@ -217,7 +206,9 @@ def get_updated_props(self, key):
217206
return result
218207

219208

209+
# pylint: disable-next=too-many-statements
220210
def _make_job_fn(fn, cache, progress):
211+
# pylint: disable-next=too-many-statements
221212
def job_fn(result_key, progress_key, user_callback_args, context):
222213
def _set_progress(progress_value):
223214
if not isinstance(progress_value, (list, tuple)):

0 commit comments

Comments
 (0)