Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/528.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Change internal structure used for waiter in Producer.send() call. Now there should
be no performance degrade when a large backlog of messages is pending (issue #528)
140 changes: 96 additions & 44 deletions aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import collections
import copy
import time
from dataclasses import dataclass
from typing import List, Any

import async_timeout
from aiokafka.errors import (KafkaTimeoutError,
NotLeaderForPartitionError,
LeaderNotAvailableError,
Expand Down Expand Up @@ -241,6 +244,17 @@ def retry_count(self):
return self._retry_count


@dataclass
class WaitlistHandle():

attrs: List[Any]
# Waitlist items are either pending batches or pending messages
is_message: bool
# Future exposed to add_message. Is not shielded, so can be cancelled
# before resolving.
future: asyncio.Future


class MessageAccumulator:
"""Accumulator of messages batched by topic-partition

Expand All @@ -254,6 +268,7 @@ def __init__(
loop = get_running_loop()
self._loop = loop
self._batches = collections.defaultdict(collections.deque)
self._waitlist = collections.defaultdict(collections.deque)
self._pending_batches = set()
self._cluster = cluster
self._batch_size = batch_size
Expand Down Expand Up @@ -316,31 +331,67 @@ async def add_message(
If batch is already full this method waits (`timeout` seconds maximum)
until batch is drained by send task
"""
while True:
if self._closed:
# this can happen when producer is closing but try to send some
# messages in async task
raise ProducerClosed()
if self._exception is not None:
raise copy.copy(self._exception)

pending_batches = self._batches.get(tp)
if not pending_batches:
builder = self.create_builder()
batch = self._append_batch(builder, tp)
else:
batch = pending_batches[-1]
self._check_errors()

future = batch.append(key, value, timestamp_ms, headers=headers)
if not self._waitlist[tp]:
future = self._try_add_message(
tp, key, value, timestamp_ms, headers)
if future is not None:
return future
# Batch is full, can't append data atm,
# waiting until batch per topic-partition is drained
start = time.monotonic()
await batch.wait_drain(timeout)
timeout -= time.monotonic() - start
if timeout <= 0:
raise KafkaTimeoutError()

# Batch is full, can't append data atm, enqueue data to be sent
# after batch for this partition is drained.
handle = self._add_to_waitlist(
tp, True, key, value, timestamp_ms, headers)
try:
async with async_timeout.timeout(timeout):
return await handle.future
except asyncio.TimeoutError:
raise KafkaTimeoutError()

def _check_errors(self):
if self._closed:
# this can happen when producer is closing but try to send some
# messages in async task
raise ProducerClosed()
if self._exception is not None:
raise copy.copy(self._exception)

def _try_add_message(self, tp, key, value, timestamp_ms, headers):
pending_batches = self._batches.get(tp)
if not pending_batches:
builder = self.create_builder()
batch = self._append_batch(builder, tp)
else:
batch = pending_batches[-1]
return batch.append(key, value, timestamp_ms, headers=headers)

def _add_to_waitlist(self, tp, is_message, *attrs):
handle = WaitlistHandle(attrs, is_message, self._loop.create_future())
self._waitlist[tp].append(handle)
return handle

def _process_waitlist(self, tp):
while self._waitlist.get(tp):
handle = self._waitlist[tp].popleft()
# We do not send messages that are no longer waited for, just clean
# them up.
if handle.future.done():
continue

if handle.is_message:
future = self._try_add_message(tp, *handle.attrs)
if future is not None:
handle.future.set_result(future)
else:
if not self._batches.get(tp):
builder = handle.attrs[0]
batch = self._append_batch(builder, tp)
handle.future.set_result(batch.future)

# Return item to waitlist if it was not processed
if not handle.future.done():
self._waitlist.appendleft(handle)

def data_waiter(self):
""" Return waiter future that will be resolved when accumulator contain
Expand Down Expand Up @@ -370,6 +421,9 @@ def _pop_batch(self, tp):
def cb(fut, batch=batch, self=self):
self._pending_batches.remove(batch)
batch.future.add_done_callback(cb)

# Populate next batch based on waitlist items (if any)
self._process_waitlist(tp)
return batch

def reenqueue(self, batch):
Expand All @@ -382,6 +436,13 @@ def drain_by_nodes(self, ignore_nodes, muted_partitions=set()):
""" Group batches by leader to partition nodes. """
nodes = collections.defaultdict(dict)
unknown_leaders_exist = False

# Reset the data waiter before processing batches to allow waitlist
# processing to reset it.
if not self._wait_data_future.done():
self._wait_data_future.set_result(None)
self._wait_data_future = self._loop.create_future()

for tp in list(self._batches.keys()):
# Just ignoring by node is not enough, as leader can change during
# the cycle
Expand Down Expand Up @@ -413,13 +474,6 @@ def drain_by_nodes(self, ignore_nodes, muted_partitions=set()):
# delivery future here, no message futures.
batch.done_noack()

# all batches are drained from accumulator
# so create "wait data" future again for waiting new data in send
# task
if not self._wait_data_future.done():
self._wait_data_future.set_result(None)
self._wait_data_future = self._loop.create_future()

return nodes, unknown_leaders_exist

def create_builder(self):
Expand Down Expand Up @@ -467,18 +521,16 @@ async def add_batch(self, builder, tp, timeout):
aiokafka.errors.KafkaTimeoutError: the batch could not be added
within the specified timeout.
"""
if self._closed:
raise ProducerClosed()
if self._exception is not None:
raise copy.copy(self._exception)

start = time.monotonic()
while timeout > 0:
pending = self._batches.get(tp)
if pending:
await pending[-1].wait_drain(timeout=timeout)
timeout -= time.monotonic() - start
else:
batch = self._append_batch(builder, tp)
return asyncio.shield(batch.future)
raise KafkaTimeoutError()
self._check_errors()

pending = self._batches.get(tp)
if not pending:
batch = self._append_batch(builder, tp)
return asyncio.shield(batch.future)

handle = self._add_to_waitlist(tp, False, builder)
try:
async with async_timeout.timeout(timeout):
return asyncio.shield(await handle.future)
except asyncio.TimeoutError:
raise KafkaTimeoutError()