Skip to content

Commit 1f06017

Browse files
committed
Update celery forwarder to use greenlets instead of processes
1 parent a33d5c2 commit 1f06017

File tree

1 file changed

+6
-8
lines changed

1 file changed

+6
-8
lines changed

model-engine/model_engine_server/inference/forwarding/celery_forwarder.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Any, Dict, Optional, TypedDict, Union
55

66
from celery import Celery, Task, states
7+
from gevent import monkey
78
from model_engine_server.common.constants import DEFAULT_CELERY_TASK_NAME, LIRA_CELERY_TASK_NAME
89
from model_engine_server.common.dtos.model_endpoints import BrokerType
910
from model_engine_server.common.dtos.tasks import EndpointPredictV1Request
@@ -23,7 +24,9 @@
2324
from model_engine_server.inference.infra.gateways.datadog_inference_monitoring_metrics_gateway import (
2425
DatadogInferenceMonitoringMetricsGateway,
2526
)
26-
from requests import ConnectionError
27+
from request import ConnectionError
28+
29+
monkey.patch_all()
2730

2831
logger = make_logger(logger_name())
2932

@@ -144,7 +147,7 @@ def exec_func(payload, arrival_timestamp, *ignored_args, **ignored_kwargs):
144147
# Don't fail the celery task even if there's a status code
145148
# (otherwise we can't really control what gets put in the result attribute)
146149
# in the task (https://docs.celeryq.dev/en/stable/reference/celery.result.html#celery.result.AsyncResult.status)
147-
result = forwarder(payload)
150+
result = forwarder.forward(payload)
148151
request_duration = datetime.now() - arrival_timestamp
149152
if request_duration > timedelta(seconds=DEFAULT_TASK_VISIBILITY_SECONDS):
150153
monitoring_metrics_gateway.emit_async_task_stuck_metric(queue_name)
@@ -177,12 +180,7 @@ def start_celery_service(
177180
concurrency=concurrency,
178181
loglevel="INFO",
179182
optimization="fair",
180-
# Don't use pool="solo" so we can send multiple concurrent requests over
181-
# Historically, pool="solo" argument fixes the known issues of celery and some of the libraries.
182-
# Particularly asyncio and torchvision transformers. This isn't relevant since celery-forwarder
183-
# is quite lightweight
184-
# TODO: we should probably use eventlet or gevent for the pool, since
185-
# the forwarder is nearly the most extreme example of IO bound.
183+
pool="gevent",
186184
)
187185
worker.start()
188186

0 commit comments

Comments
 (0)