Skip to content

Commit 659f088

Browse files
committed
Update celery forwarder to use greenlets instead of processes
1 parent a33d5c2 commit 659f088

File tree

1 file changed

+6
-8
lines changed

1 file changed

+6
-8
lines changed

model-engine/model_engine_server/inference/forwarding/celery_forwarder.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
from datetime import datetime, timedelta
44
from typing import Any, Dict, Optional, TypedDict, Union
55

6+
from aiohttp import ClientConnectionError
67
from celery import Celery, Task, states
8+
from gevent import monkey
79
from model_engine_server.common.constants import DEFAULT_CELERY_TASK_NAME, LIRA_CELERY_TASK_NAME
810
from model_engine_server.common.dtos.model_endpoints import BrokerType
911
from model_engine_server.common.dtos.tasks import EndpointPredictV1Request
@@ -23,7 +25,8 @@
2325
from model_engine_server.inference.infra.gateways.datadog_inference_monitoring_metrics_gateway import (
2426
DatadogInferenceMonitoringMetricsGateway,
2527
)
26-
from requests import ConnectionError
28+
29+
monkey.patch_all()
2730

2831
logger = make_logger(logger_name())
2932

@@ -132,7 +135,7 @@ def after_return(
132135
base=ErrorHandlingTask,
133136
name=LIRA_CELERY_TASK_NAME,
134137
track_started=True,
135-
autoretry_for=(ConnectionError,),
138+
autoretry_for=(ClientConnectionError,),
136139
)
137140
def exec_func(payload, arrival_timestamp, *ignored_args, **ignored_kwargs):
138141
if len(ignored_args) > 0:
@@ -177,12 +180,7 @@ def start_celery_service(
177180
concurrency=concurrency,
178181
loglevel="INFO",
179182
optimization="fair",
180-
# Don't use pool="solo" so we can send multiple concurrent requests over
181-
# Historically, pool="solo" argument fixes the known issues of celery and some of the libraries.
182-
# Particularly asyncio and torchvision transformers. This isn't relevant since celery-forwarder
183-
# is quite lightweight
184-
# TODO: we should probably use eventlet or gevent for the pool, since
185-
# the forwarder is nearly the most extreme example of IO bound.
183+
pool="gevent",
186184
)
187185
worker.start()
188186

0 commit comments

Comments
 (0)