Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ metrics = "0.24.0"
metrics-exporter-statsd = "0.9.0"
prost = "0.13"
prost-types = "0.13.3"
rand = "0.8.5"
rand = "0.9.2"
rdkafka = { version = "0.37.0", features = ["cmake-build", "ssl"] }
sentry = { version = "0.41.0", default-features = false, features = [
# default features, except `release-health` is disabled
Expand All @@ -39,7 +39,8 @@ sentry = { version = "0.41.0", default-features = false, features = [
"tracing",
"logs"
] }
sentry_protos = "0.4.10"
sentry_protos = { git = "https://github.com/getsentry/sentry-protos", branch = "george/push-broker-worker" }
itertools = "0.14.0"
serde = "1.0.214"
serde_yaml = "0.9.34"
sha2 = "0.10.8"
Expand Down
8 changes: 4 additions & 4 deletions benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use tokio::task::JoinSet;

async fn get_pending_activations(num_activations: u32, num_workers: u32) {
let url = if cfg!(feature = "bench-with-mnt-disk") {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
format!(
"/mnt/disks/sqlite/{}-{}.sqlite",
Utc::now(),
rng.r#gen::<u64>()
rng.random::<u64>()
)
} else {
generate_temp_filename()
Expand Down Expand Up @@ -78,11 +78,11 @@ async fn set_status(num_activations: u32, num_workers: u32) {
assert!(num_activations.is_multiple_of(num_workers));

let url = if cfg!(feature = "bench-with-mnt-disk") {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
format!(
"/mnt/disks/sqlite/{}-{}.sqlite",
Utc::now(),
rng.r#gen::<u64>()
rng.random::<u64>()
)
} else {
generate_temp_filename()
Expand Down
6 changes: 6 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kafka_topic: "test-topic"
push: true
# workers:
# - "http://127.0.0.1:50052"
# - "http://127.0.0.1:50053"
# - "http://127.0.0.1:50054"
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ pub struct Config {

/// Enable additional metrics for the sqlite.
pub enable_sqlite_status_metrics: bool,

/// Enable push mode.
pub push: bool,

/// Worker addresses.
pub workers: Vec<String>,
Comment on lines +221 to +222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we have a static list of workers? With the workers generally running behind an horizontal pod scaler we may not know how many workers will be online, and the number & names of those workers will not be fixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, workers can use the AddWorker and RemoveWorker RPC endpoints to add themselves on startup and to remove themselves on shutdown. This field is just something I've been using for testing, or if it's useful this way, as a list of initial workers we definitely want to connect to.

}

impl Default for Config {
Expand Down Expand Up @@ -279,6 +285,8 @@ impl Default for Config {
full_vacuum_on_upkeep: true,
vacuum_interval_ms: 30000,
enable_sqlite_status_metrics: true,
push: false,
workers: vec![],
}
}
}
Expand Down
32 changes: 31 additions & 1 deletion src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,46 @@ use chrono::Utc;
use prost::Message;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService;
use sentry_protos::taskbroker::v1::{
FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse,
AddWorkerRequest, AddWorkerResponse, FetchNextTask, GetTaskRequest, GetTaskResponse,
RemoveWorkerRequest, RemoveWorkerResponse, SetTaskStatusRequest, SetTaskStatusResponse,
TaskActivation, TaskActivationStatus,
};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use tonic::{Request, Response, Status};

use crate::pool::WorkerPool;
use crate::store::inflight_activation::{InflightActivationStatus, InflightActivationStore};
use tracing::{error, instrument};

pub struct TaskbrokerServer {
pub store: Arc<InflightActivationStore>,
pub pool: Arc<RwLock<WorkerPool>>,
}

#[tonic::async_trait]
impl ConsumerService for TaskbrokerServer {
#[instrument(skip_all)]
async fn add_worker(
&self,
request: Request<AddWorkerRequest>,
) -> Result<Response<AddWorkerResponse>, Status> {
let address = &request.get_ref().address;
self.pool.write().await.add_worker(address);
Ok(Response::new(AddWorkerResponse {}))
}

#[instrument(skip_all)]
async fn remove_worker(
&self,
request: Request<RemoveWorkerRequest>,
) -> Result<Response<RemoveWorkerResponse>, Status> {
let address = &request.get_ref().address;
self.pool.write().await.remove_worker(address);
Ok(Response::new(RemoveWorkerResponse {}))
}
Comment on lines 25 to 43

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the interface of the taskbroker service is considerably different between push and poll mode I wonder whether it would be a good idea to simply have two separate GRPC server implementations (and even the GRPC service definition). this would probably generate a more composeable system with less coupling between the two modes.

In alternative we can keep a single service but it needs to reject calls that are not valid in the mode of operation: add_worker/remove_worker must immediately reject the calls when in poll mode, so get_task should do when in push mode.

@evanh what do you think ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure that having workers register with brokers is going to work out well. It is going to be hard to keep the worker lists in each broker replica current and there are edge cases like worker oomkills that will need to be handled.

Having the brokers send requests into a loadbalancer that handles forwarding to worker replicas seems simpler, and we can use response codes to signal that the worker that was reached is overcapacity and have the broker retry.


#[instrument(skip_all)]
async fn get_task(
&self,
Expand Down Expand Up @@ -67,6 +91,12 @@ impl ConsumerService for TaskbrokerServer {
let start_time = Instant::now();
let id = request.get_ref().id.clone();

// Update worker queue size estimate
// self.pool
// .write()
// .await
// .decrement_queue_size(&request.get_ref().address);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you hit an interesting case here. I assume you commented this because we risk decrementing too much because this can happen

  • push a task, get the queue size and set
  • task completes but set task status not yet called
  • push another task, get the new queue size (smaller than before)
  • get the call to set task status and decrement further.

We probably do not want to update the queue in this case


let status: InflightActivationStatus =
TaskActivationStatus::try_from(request.get_ref().status)
.map_err(|e| {
Expand Down
25 changes: 19 additions & 6 deletions src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use tonic::{Code, Request};

use crate::grpc::server::TaskbrokerServer;

use crate::test_utils::{create_test_store, make_activations};
use crate::test_utils::{create_pool, create_test_store, make_activations};

#[tokio::test]
async fn test_get_task() {
let store = create_test_store().await;
let service = TaskbrokerServer { store };
let pool = create_pool();

let service = TaskbrokerServer { store, pool };
let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
assert!(response.is_err());
Expand All @@ -22,11 +24,14 @@ async fn test_get_task() {
#[allow(deprecated)]
async fn test_set_task_status() {
let store = create_test_store().await;
let service = TaskbrokerServer { store };
let pool = create_pool();

let service = TaskbrokerServer { store, pool };
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
status: 5, // Complete
fetch_next_task: None,
address: "http://127.0.0.1:50052".into(),
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand All @@ -38,11 +43,14 @@ async fn test_set_task_status() {
#[allow(deprecated)]
async fn test_set_task_status_invalid() {
let store = create_test_store().await;
let service = TaskbrokerServer { store };
let pool = create_pool();

let service = TaskbrokerServer { store, pool };
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
status: 1, // Invalid
fetch_next_task: None,
address: "http://127.0.0.1:50052".into(),
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_err());
Expand All @@ -58,10 +66,12 @@ async fn test_set_task_status_invalid() {
#[allow(deprecated)]
async fn test_get_task_success() {
let store = create_test_store().await;
let pool = create_pool();

let activations = make_activations(1);
store.store(activations).await.unwrap();

let service = TaskbrokerServer { store };
let service = TaskbrokerServer { store, pool };
let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
assert!(response.is_ok());
Expand All @@ -75,10 +85,12 @@ async fn test_get_task_success() {
#[allow(deprecated)]
async fn test_set_task_status_success() {
let store = create_test_store().await;
let pool = create_pool();

let activations = make_activations(2);
store.store(activations).await.unwrap();

let service = TaskbrokerServer { store };
let service = TaskbrokerServer { store, pool };

let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
Expand All @@ -92,6 +104,7 @@ async fn test_set_task_status_success() {
id: "id_0".to_string(),
status: 5, // Complete
fetch_next_task: Some(FetchNextTask { namespace: None }),
address: "http://127.0.0.1:50052".into(),
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub mod grpc;
pub mod kafka;
pub mod logging;
pub mod metrics;
pub mod pool;
pub mod push;
pub mod runtime_config;
pub mod store;
pub mod test_utils;
Expand Down
37 changes: 34 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use std::{sync::Arc, time::Duration};
use taskbroker::kafka::inflight_activation_batcher::{
ActivationBatcherConfig, InflightActivationBatcher,
};
use taskbroker::pool::WorkerPool;
use taskbroker::push::TaskPusher;
use taskbroker::upkeep::upkeep;
use tokio::signal::unix::SignalKind;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::{select, time};
use tonic::transport::Server;
Expand Down Expand Up @@ -57,6 +60,8 @@ async fn main() -> Result<(), Error> {
let runtime_config_manager =
Arc::new(RuntimeConfigManager::new(config.runtime_config_path.clone()).await);

let pool = Arc::new(RwLock::new(WorkerPool::new(config.workers.clone())));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the main method need to create the pool for the TaskPusher? It is not shared by anybody else, can the taskPusher create it on its own ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pool is also used by the gRPC server to execute AddWorker and RemoveWorker requests (so it is shared by both TaskbrokerServer and TaskPusher).


println!("taskbroker starting");
println!("version: {}", get_version().trim());

Expand Down Expand Up @@ -177,10 +182,29 @@ async fn main() -> Result<(), Error> {
}
});

// Push task loop (conditionally enabled)
let push_task = if config.push {
info!("Running in PUSH mode");

let push_task_store = store.clone();
let push_task_config = config.clone();
let push_task_pool = pool.clone();

Some(tokio::spawn(async move {
let pusher = TaskPusher::new(push_task_store, push_task_config, push_task_pool);
pusher.start().await
}))
} else {
info!("Running in PULL mode");
None
};

// GRPC server
let grpc_server_task = tokio::spawn({
let grpc_store = store.clone();
let grpc_config = config.clone();
let grpc_pool = pool.clone();

async move {
let addr = format!("{}:{}", grpc_config.grpc_addr, grpc_config.grpc_port)
.parse()
Expand All @@ -195,6 +219,7 @@ async fn main() -> Result<(), Error> {
.layer(layers)
.add_service(ConsumerServiceServer::new(TaskbrokerServer {
store: grpc_store,
pool: grpc_pool,
}))
.add_service(health_service.clone())
.serve(addr);
Expand Down Expand Up @@ -225,16 +250,22 @@ async fn main() -> Result<(), Error> {
}
});

elegant_departure::tokio::depart()
let mut depart = elegant_departure::tokio::depart()
.on_termination()
.on_sigint()
.on_signal(SignalKind::hangup())
.on_signal(SignalKind::quit())
.on_completion(log_task_completion("consumer", consumer_task))
.on_completion(log_task_completion("grpc_server", grpc_server_task))
.on_completion(log_task_completion("upkeep_task", upkeep_task))
.on_completion(log_task_completion("maintenance_task", maintenance_task))
.await;
.on_completion(log_task_completion("maintenance_task", maintenance_task));

// Only register push_task if it was spawned
if let Some(task) = push_task {
depart = depart.on_completion(log_task_completion("push_task", task));
}

depart.await;

Ok(())
}
Loading
Loading