Skip to content

Commit 89c534b

Browse files
committed
Merge branch 'release/0.1.7'
2 parents 600db86 + 00e724c commit 89c534b

File tree

5 files changed

+59
-53
lines changed

5 files changed

+59
-53
lines changed

docs/extending-taskiq/resutl-backend.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,8 @@ Here's a minimal example of a result backend:
1414
::: info Cool tip!
1515
It's a good practice to skip fetching logs from the storage unless `with_logs=True` is explicitly specified.
1616
:::
17+
18+
19+
::: danger Important note!
20+
`with_logs` param is now deprecated. It will be removed in future releases.
21+
:::

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.1.6"
3+
version = "0.1.7"
44
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <[email protected]>"]
66
maintainers = ["Pavel Kirilin <[email protected]>"]

taskiq/cli/worker/receiver.py

Lines changed: 40 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
22
import inspect
3-
import io
43
from concurrent.futures import ThreadPoolExecutor
54
from logging import getLogger
65
from time import time
@@ -11,7 +10,6 @@
1110
from taskiq.abc.broker import AsyncBroker
1211
from taskiq.abc.middleware import TaskiqMiddleware
1312
from taskiq.cli.worker.args import WorkerArgs
14-
from taskiq.cli.worker.log_collector import log_collector
1513
from taskiq.cli.worker.params_parser import parse_params
1614
from taskiq.context import Context
1715
from taskiq.message import BrokerMessage, TaskiqMessage
@@ -150,8 +148,6 @@ async def run_task( # noqa: C901, WPS210
150148
:return: result of execution.
151149
"""
152150
loop = asyncio.get_running_loop()
153-
# Buffer to capture logs.
154-
logs = io.StringIO()
155151
returned = None
156152
found_exception = None
157153
signature = self.task_signatures.get(message.task_name)
@@ -160,55 +156,51 @@ async def run_task( # noqa: C901, WPS210
160156
signature = None
161157
parse_params(signature, self.task_hints.get(message.task_name) or {}, message)
162158

163-
# Captures function's logs.
164-
with log_collector(logs, self.cli_args.log_collector_format):
165-
dep_ctx = None
166-
if dependency_graph:
167-
# Create a context for dependency resolving.
168-
dep_ctx = dependency_graph.async_ctx(
169-
{
170-
Context: Context(message, self.broker),
171-
TaskiqState: self.broker.state,
172-
},
173-
)
174-
# Resolve all function's dependencies.
175-
dep_kwargs = await dep_ctx.resolve_kwargs()
176-
for key, val in dep_kwargs.items():
177-
if key not in message.kwargs:
178-
message.kwargs[key] = val
179-
# Start a timer.
180-
start_time = time()
181-
try:
182-
# If the function is a coroutine we await it.
183-
if asyncio.iscoroutinefunction(target):
184-
returned = await target(*message.args, **message.kwargs)
185-
else:
186-
# If this is a synchronous function we
187-
# run it in executor.
188-
returned = await loop.run_in_executor(
189-
self.executor,
190-
_run_sync,
191-
target,
192-
message,
193-
)
194-
except Exception as exc:
195-
found_exception = exc
196-
logger.error(
197-
"Exception found while executing function: %s",
198-
exc,
199-
exc_info=True,
159+
dep_ctx = None
160+
if dependency_graph:
161+
# Create a context for dependency resolving.
162+
dep_ctx = dependency_graph.async_ctx(
163+
{
164+
Context: Context(message, self.broker),
165+
TaskiqState: self.broker.state,
166+
},
167+
)
168+
# Resolve all function's dependencies.
169+
dep_kwargs = await dep_ctx.resolve_kwargs()
170+
for key, val in dep_kwargs.items():
171+
if key not in message.kwargs:
172+
message.kwargs[key] = val
173+
# Start a timer.
174+
start_time = time()
175+
try:
176+
# If the function is a coroutine we await it.
177+
if asyncio.iscoroutinefunction(target):
178+
returned = await target(*message.args, **message.kwargs)
179+
else:
180+
# If this is a synchronous function we
181+
# run it in executor.
182+
returned = await loop.run_in_executor(
183+
self.executor,
184+
_run_sync,
185+
target,
186+
message,
200187
)
201-
# Stop the timer.
202-
execution_time = time() - start_time
203-
if dep_ctx:
204-
await dep_ctx.close()
188+
except Exception as exc:
189+
found_exception = exc
190+
logger.error(
191+
"Exception found while executing function: %s",
192+
exc,
193+
exc_info=True,
194+
)
195+
# Stop the timer.
196+
execution_time = time() - start_time
197+
if dep_ctx:
198+
await dep_ctx.close()
205199

206-
raw_logs = logs.getvalue()
207-
logs.close()
208200
# Assemble result.
209201
result: "TaskiqResult[Any]" = TaskiqResult(
210202
is_err=found_exception is not None,
211-
log=raw_logs,
203+
log=None,
212204
return_value=returned,
213205
execution_time=execution_time,
214206
)

taskiq/cli/worker/run.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import asyncio
22
import os
33
import signal
4-
from logging import basicConfig, getLevelName, getLogger
5-
from multiprocessing import Process
6-
from queue import Queue
4+
import sys
5+
from logging import StreamHandler, basicConfig, getLevelName, getLogger
6+
from logging.handlers import QueueHandler, QueueListener
7+
from multiprocessing import Process, Queue
78
from time import sleep
89
from typing import Any, List
910

@@ -223,10 +224,14 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
223224
224225
:param args: CLI arguments.
225226
"""
227+
logging_queue = Queue(-1) # type: ignore
228+
listener = QueueListener(logging_queue, StreamHandler(sys.stdout))
226229
basicConfig(
227230
level=getLevelName(args.log_level),
228-
format=("[%(asctime)s][%(levelname)-7s][%(processName)s] %(message)s"),
231+
format="[%(asctime)s][%(levelname)-7s][%(processName)s] %(message)s",
232+
handlers=[QueueHandler(logging_queue)],
229233
)
234+
listener.start()
230235
logger.info("Starting %s worker processes.", args.workers)
231236

232237
global worker_processes # noqa: WPS420
@@ -259,3 +264,4 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
259264
signal.signal(signal.SIGTERM, signal_handler)
260265

261266
watcher_loop(args=args)
267+
listener.stop()

taskiq/result.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ class TaskiqResult(GenericModel, Generic[_ReturnType]):
99
"""Result of a remote task invocation."""
1010

1111
is_err: bool
12+
# Log is a deprecated field. It would be removed in future
13+
# releases of not, if we find a way to capture logs in async
14+
# environment.
1215
log: Optional[str]
1316
return_value: _ReturnType
1417
execution_time: float

0 commit comments

Comments
 (0)