Skip to content

Conversation

@james-mcnulty
Copy link
Member

@james-mcnulty james-mcnulty commented Jan 3, 2026

Description

Currently, workers pull tasks from the brokers via gRPC. This model works, but scaling is tricky. For this reason, we want to have the taskbroker push tasks to the workers instead. Read this article on Notion for more information.

Each taskworker now has its own gRPC server with a PushTask method that accepts a task and returns a response that indicates (1) whether the task was accepted, and (2) the current size of that worker's queue.

service WorkerService {
  // Receives a task to execute.
  rpc PushTask(PushTaskRequest) returns (PushTaskResponse) {}
}

message PushTaskRequest {
  TaskActivation task = 1;
  string callback_url = 2;
}

message PushTaskResponse {
  bool added = 1;
  uint32 queue_size = 2;
}

The taskbroker now spawns a thread that repeatedly (1) queries the store for a pending activation, (2) pushes the activation to a worker, and (3) marks the activation as processing if added is true.

Worker Selection

The broker now maintains a pool of worker connections in the WorkerPool structure. On initialization, the TaskPusher spawns a new thread that calls WorkerPool::update every second. This method attempts to reestablish dead worker connections, if there are any. If all workers still have connections, it does nothing.

Notice that the PushTask response has a queue_size field. My initial approach to keep the queue sizes updated was to use a dedicated GetQueueSize endpoint on a short interval to get the queue sizes for all active workers, but I thought hammering the workers with too many RPC calls is not ideal.

For this reason, every time the broker pushes a task to a worker and receives a response, it updates the worker's estimated queue size. Likewise, when a worker uses the SetTaskStatus endpoint, the broker decrements that worker's queue size.

This value is used to pick which worker will receive a task. I use the Power of Two Choices approach, which means I (1) randomly select two workers, and (2) pick the one with the smallest estimated queue size. If the selected workers have identical queue lengths, I will randomly pick one of them to avoid hammering the same worker over and over again. This may come up if multiple workers become full.

Worker Discovery

Workers are represented by address strings in the code. The taskbroker configuration has a workers field, which takes a list of address strings. You can start a broker with a certain set of workers by setting this field.

You can also use the new AddWorker and RemoveWorker endpoints to dynamically register and unregister workers during runtime.

service ConsumerService {
  ...

  // Add a worker to the broker's inner worker pool.
  rpc AddWorker(AddWorkerRequest) returns (AddWorkerResponse) {}

  // Remove a worker from the broker's inner worker pool.
  rpc RemoveWorker(RemoveWorkerRequest) returns (RemoveWorkerResponse) {}
}

message AddWorkerRequest {
  string address = 1;
}

message AddWorkerResponse {}

message RemoveWorkerRequest {
  string address = 1;
}

message RemoveWorkerResponse {}

Related Work

@james-mcnulty james-mcnulty changed the title [STREAM 640] Push-Based Broker / Worker POC [STREAM-640] Push-Based Broker / Worker POC Jan 3, 2026
@linear
Copy link

linear bot commented Jan 3, 2026

@james-mcnulty james-mcnulty changed the title [STREAM-640] Push-Based Broker / Worker POC [STREAM-640] Push-Based Broker POC Jan 3, 2026
Comment on lines +221 to +222
/// Worker addresses.
pub workers: Vec<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we have a static list of workers? With the workers generally running behind an horizontal pod scaler we may not know how many workers will be online, and the number & names of those workers will not be fixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, workers can use the AddWorker and RemoveWorker RPC endpoints to add themselves on startup and to remove themselves on shutdown. This field is just something I've been using for testing, or if it's useful this way, as a list of initial workers we definitely want to connect to.

Comment on lines +53 to +60
pub fn add_worker<T: Into<String>>(&mut self, address: T) {
let address = address.into();
info!("Adding worker {address}");
self.addresses.insert(address);
}

/// Unregister worker address during execution.
pub fn remove_worker(&mut self, address: &String) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does add_worker accept Into<String>, while remove_worker takes &String? Shouldn't these methods support the same types?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - yes, probably!

Comment on lines +79 to +81
}

impl TaskPusher {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are there two impl blocks for the same struct/type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One block has only public methods, the other has only private methods. Want me to consolidate them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't catch that detail. We haven't been using separate impls so far, but I can see how it could be useful in larger structs.

@evanh
Copy link
Member

evanh commented Jan 6, 2026

So I think we will actually want a "GetCapacity" rpc on the workers. The reason for that is we don't need to keep this internal list of workers if we have that endpoint: we can create a k8s service for all the workers (easy to do) and then the brokers can make RPC calls to that service. This simplifies the service discovery enormously: k8s handles all the adding/removing of workers and neither the brokers or the workers need to keep an internal state of which pods are available.

This all works on two other changes though:

  • The brokers pass an address to the worker in the task activation, so the worker knows which broker to update afterwards
  • The workers pass an address to the broker when it calls GetCapacity so that the broker can connect directly to its chosen broker after the power of two decision.

Copy link

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comment on load balancing. I think we may be overcomplicating the system by building our own queue when we can leverage the queue managed by the GRPC server.

Comment on lines +25 to +43
#[instrument(skip_all)]
async fn add_worker(
&self,
request: Request<AddWorkerRequest>,
) -> Result<Response<AddWorkerResponse>, Status> {
let address = &request.get_ref().address;
self.pool.write().await.add_worker(address);
Ok(Response::new(AddWorkerResponse {}))
}

#[instrument(skip_all)]
async fn remove_worker(
&self,
request: Request<RemoveWorkerRequest>,
) -> Result<Response<RemoveWorkerResponse>, Status> {
let address = &request.get_ref().address;
self.pool.write().await.remove_worker(address);
Ok(Response::new(RemoveWorkerResponse {}))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the interface of the taskbroker service is considerably different between push and poll mode I wonder whether it would be a good idea to simply have two separate GRPC server implementations (and even the GRPC service definition). this would probably generate a more composeable system with less coupling between the two modes.

In alternative we can keep a single service but it needs to reject calls that are not valid in the mode of operation: add_worker/remove_worker must immediately reject the calls when in poll mode, so get_task should do when in push mode.

@evanh what do you think ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure that having workers register with brokers is going to work out well. It is going to be hard to keep the worker lists in each broker replica current and there are edge cases like worker oomkills that will need to be handled.

Having the brokers send requests into a loadbalancer that handles forwarding to worker replicas seems simpler, and we can use response codes to signal that the worker that was reached is overcapacity and have the broker retry.

let runtime_config_manager =
Arc::new(RuntimeConfigManager::new(config.runtime_config_path.clone()).await);

let pool = Arc::new(RwLock::new(WorkerPool::new(config.workers.clone())));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the main method need to create the pool for the TaskPusher? It is not shared by anybody else, can the taskPusher create it on its own ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pool is also used by the gRPC server to execute AddWorker and RemoveWorker requests (so it is shared by both TaskbrokerServer and TaskPusher).

Comment on lines +56 to +58
_ = beep_interval.tick() => {
pool.write().await.update().await;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason to keep creating the connections in the background rather than re-creating when some issues happen ? If we try to re-create when we fail sending tasks we would not have to deal with the concurrency of the main pusher task and the updater. the system will be simpler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, not really. I'll try implementing that instead.

Comment on lines +94 to +98
// Update worker queue size estimate
// self.pool
// .write()
// .await
// .decrement_queue_size(&request.get_ref().address);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you hit an interesting case here. I assume you commented this because we risk decrementing too much because this can happen

  • push a task, get the queue size and set
  • task completes but set task status not yet called
  • push another task, get the new queue size (smaller than before)
  • get the call to set task status and decrement further.

We probably do not want to update the queue in this case

Comment on lines +64 to +73
tokio::select! {
_ = guard.wait() => {
info!("Push task loop received shutdown signal");
break;
}

_ = async {
self.process_next_task().await;
} => {}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the queue model in the worker is misguided knowing the GRPC server has a request queue on its own anyway.
So we would have:

  • a non blocking queue inside the worker: the client send a request, the request is enqueued and we return immediately
  • the HTTP server has its own request queue. This is blocking though. If the request is in the queue the client knows, so envoy does and this information can be used for load balancing.

Let's pretend for a second to set the worker queue to a size of 1. This means that:

  • tasks accumulate in the GRPC server queue while the client waits for them to be served.
  • as soon as the previous task is complete the worker picks up a task from the queue, starts execution and returns to the client.
  • The client acks immediately and commits.

In this model the broker could just look at the number of inflight requests to the worker to know the queue size, it would not have to ask to the worker.
This behavior is, by the way, already implemented by Envoy https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/load_balancers#weighted-least-request.

I think, we could achieve the same result with less complexity if we relied on the server to manage the queue and envoy to do the load balancing. In theory we should reach the same result as if we managed the queue on our own.

@evanh what do you think ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the broker could just look at the number of inflight requests to the worker to know the queue size, it would not have to ask to the worker.

Unless the gRPC server somehow exposes that number the broker will need to ask, because there could be multiple brokers connected to the same worker.

It also means that the broker push thread will be blocked waiting for the worker queue to drain. For slow running/bursty tasks that's not what we want. We want to check to see if the worker queue is full, and if it is, return immediately to the broker so it can try another worker.

In a scenario where the worker queue is full, but we are immediately rejecting gRPC requests, the inflight gRPC requests will be low (signalling there is capacity) but the worker will actually be overloaded. I don't think inflight requests necessarily line up with a worker being at capacity.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two concerns from your comment:

  • The broker needing to keep track of the number of inflight requests to apply this algorithm
  • The broker being blocked

Overall, I think my proposal should provide equivalent results as we normally operate in one of these two states:

  • The queues are mostly empty and the tasks get picked up immediately and can go anywhere.
  • The queues are mostly full and the broker has to wait anyway. (here we backlog)
    All other states are transient.
    So in theory the queue inside the worker should provide the same semantics as the GRPC server queue with less visibility from the client side.

Load balancers like Envoy use the server queue size to balance their load. The proxy to know the queue size is the number of inflight requests they have per broker. This is a well known system and a problem Envoy solves well. If we create our queue in the worker we are basically rebuilding this system.

The broker needing to keep track of the number of inflight requests to apply this algorithm

The broker does not need to keep this number. Each broker operates locally as each broker gets relatively even load from kafka. The broker itself does not care of the connections, it does not even know the identity of each worker. It just talks to the Envoy proxy locally. Envoy keeps track of its connections in flight. Will find you some resources on why the brokers, operating locally, can generate reasonably balanced load.

It also means that the broker push thread will be blocked waiting for the worker queue to drain. For slow running/bursty tasks that's not what we want. We want to check to see if the worker queue is full, and if it is, return immediately to the broker so it can try another worker.

If I understand this correctly, your concern is that the broker would be waiting on the slow task before freeing up the connection. I'd say that is not a problem:

  • Let's say we enqueue a fast task after a slow one. That task is waiting anyway, whether or not there is a queue on the worker or not. Actually if there is no queue on the worker, the request from the broker may time out and the task being routed somewhere else thus being executed sooner. This is desirable.
  • The connection being open is serving the same goal as the queue on the worker. The following task will be routed elsewhere to a worker that does not have a running task as there will be fewer in flight connections there.

What I would want to stress is that, I believe, the outcome of the two solutions will be the same, but the queue-less one is simpler and more idiomatic.

In a scenario where the worker queue is full, but we are immediately rejecting gRPC requests, the inflight gRPC requests will be low (signalling there is capacity) but the worker will actually be overloaded. I don't think inflight requests necessarily line up with a worker being at capacity.

I am not following this point. If we have pending requests to a worker it means there is no available process to pick up new work -> worker at capacity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants