Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,14 @@ defmodule Broadway do
Sends a list of `Broadway.Message`s to the Broadway pipeline.

The producer is randomly chosen among all sets of producers/stages.
This is used to send out of band data to a Broadway pipeline.
This is used to send out of band data to a Broadway pipeline, regardless
of which producer module you are using.

> #### Sync Operation {: .info}
>
> This function is synchronous, that is, it waits for the producer
> to enqueue `messages` before returning. It's a "call".

"""
@spec push_messages(broadway :: name(), messages :: [Message.t()]) :: :ok
def push_messages(broadway, messages) when is_broadway_name(broadway) and is_list(messages) do
Expand Down
10 changes: 10 additions & 0 deletions lib/broadway/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ defmodule Broadway.Options do
_message, count -> {:cont, count - 1}
end}

We start with the batch size as the accumulator, and then we go down for every
event. When we get down to `1`, we emit the batch and *reset* the accumulator
to the batch size. That's because when returning `{:emit, acc}`, `acc` is
used for the next call to the `:batch_size` function.

> #### When is this called {: .info}
>
> If you pass a function as the batch size, that function is invoked *after*
> `c:handle_message/3`.

"""
],
max_demand: [
Expand Down