diff --git a/Cargo.lock b/Cargo.lock index d93551f9..e51e816b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2229,9 +2229,9 @@ dependencies = [ [[package]] name = "sentry_protos" -version = "0.1.37" +version = "0.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1b790b1a532c699eaff863e4258e1606fd589d35d6202631c9a95cb6146ce51" +checksum = "854737cda7b9a620f372837967f6695a07566626c6d3a5bfa356d17aa55f794f" dependencies = [ "glob", "prost", diff --git a/Cargo.toml b/Cargo.toml index aa2b1cf1..244fcf14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -sentry_protos = "0.1.37" +sentry_protos = "0.1.39" anyhow = "1.0.92" chrono = { version = "0.4.26" } sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] } diff --git a/migrations/0001_create_inflight_taskactivations.sql b/migrations/0001_create_inflight_taskactivations.sql index 3e2c3dc6..3f13d9d8 100644 --- a/migrations/0001_create_inflight_taskactivations.sql +++ b/migrations/0001_create_inflight_taskactivations.sql @@ -7,5 +7,7 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations ( deadletter_at DATETIME, processing_deadline_duration INTEGER NOT NULL, processing_deadline DATETIME, - status INTEGER NOT NULL + status INTEGER NOT NULL, + at_most_once BOOLEAN NOT NULL DEFAULT FALSE, + namespace TEXT ); diff --git a/migrations/0002_add_at_most_once.sql b/migrations/0002_add_at_most_once.sql deleted file mode 100644 index 10bca4ea..00000000 --- a/migrations/0002_add_at_most_once.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE inflight_taskactivations ADD COLUMN at_most_once BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt index 1853c2d8..9a79f8ce 100644 --- a/python/requirements-dev.txt +++ b/python/requirements-dev.txt @@ -3,4 +3,4 @@ orjson>=3.10.10 protobuf>=5.28.3 pytest>=8.3.3 pyyaml>=6.0.2 -sentry-protos>=0.1.37 +sentry-protos>=0.1.39 diff --git a/src/consumer/deserialize_activation.rs b/src/consumer/deserialize_activation.rs index 409963be..449b222d 100644 --- a/src/consumer/deserialize_activation.rs +++ b/src/consumer/deserialize_activation.rs @@ -33,6 +33,7 @@ pub fn new( return Err(anyhow!("Message has no payload")); }; let activation = TaskActivation::decode(payload)?; + let namespace = activation.namespace.clone(); let mut at_most_once = false; if let Some(ref retry_state) = activation.retry_state { @@ -69,6 +70,7 @@ pub fn new( deadletter_at: Some(deadletter_at), processing_deadline: None, at_most_once, + namespace, }) } } diff --git a/src/grpc_server.rs b/src/grpc_server.rs index 6885a9d1..3bcaf5d4 100644 --- a/src/grpc_server.rs +++ b/src/grpc_server.rs @@ -20,11 +20,15 @@ impl ConsumerService for MyConsumerService { #[instrument(skip(self))] async fn get_task( &self, - _request: Request, + request: Request, ) -> Result, Status> { let start_time = Instant::now(); + let namespace = &request.get_ref().namespace; + let inflight = self + .store + .get_pending_activation(namespace.as_deref()) + .await; - let inflight = self.store.get_pending_activation().await; match inflight { Ok(Some(inflight)) => { let resp = GetTaskResponse { @@ -81,19 +85,22 @@ impl ConsumerService for MyConsumerService { error: None, }; - if let Some(fetch_next) = request.get_ref().fetch_next { - if fetch_next { - let start_time = Instant::now(); - let inflight = self.store.get_pending_activation().await; - metrics::histogram!("grpc_server.fetch_next.duration").record(start_time.elapsed()); + let fetch_next = &request.get_ref().fetch_next_task; + if let Some(fetch_next) = fetch_next { + let start_time = Instant::now(); + let namespace = &fetch_next.namespace; + let inflight = self + .store + .get_pending_activation(namespace.as_deref()) + .await; + metrics::histogram!("grpc_server.fetch_next.duration").record(start_time.elapsed()); - match inflight { - Ok(Some(inflight)) => { - response.task = Some(inflight.activation); - } - Ok(None) => return Err(Status::not_found("No pending activation")), - Err(e) => return Err(Status::internal(e.to_string())), + match inflight { + Ok(Some(inflight)) => { + response.task = Some(inflight.activation); } + Ok(None) => return Err(Status::not_found("No pending activation")), + Err(e) => return Err(Status::internal(e.to_string())), } } diff --git a/src/inflight_activation_store.rs b/src/inflight_activation_store.rs index 91fa4230..7655e265 100644 --- a/src/inflight_activation_store.rs +++ b/src/inflight_activation_store.rs @@ -79,6 +79,7 @@ pub struct InflightActivation { /// When enabled activations are not retried when processing_deadlines /// are exceeded. pub at_most_once: bool, + pub namespace: String, } #[derive(Clone, Copy, Debug)] @@ -106,6 +107,7 @@ struct TableRow { processing_deadline: Option>, status: InflightActivationStatus, at_most_once: bool, + namespace: String, } impl TryFrom for TableRow { @@ -123,6 +125,7 @@ impl TryFrom for TableRow { processing_deadline: value.processing_deadline, status: value.status, at_most_once: value.at_most_once, + namespace: value.namespace.clone(), }) } } @@ -140,6 +143,7 @@ impl From for InflightActivation { deadletter_at: value.deadletter_at, processing_deadline: value.processing_deadline, at_most_once: value.at_most_once, + namespace: value.namespace, } } } @@ -179,7 +183,7 @@ impl InflightActivationStore { } let mut query_builder = QueryBuilder::::new( "INSERT INTO inflight_taskactivations \ - (id, activation, partition, offset, added_at, deadletter_at, processing_deadline_duration, processing_deadline, status, at_most_once)", + (id, activation, partition, offset, added_at, deadletter_at, processing_deadline_duration, processing_deadline, status, at_most_once, namespace)", ); let rows = batch .into_iter() @@ -202,35 +206,45 @@ impl InflightActivationStore { } b.push_bind(row.status); b.push_bind(row.at_most_once); + b.push_bind(row.namespace); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); Ok(query.execute(&self.sqlite_pool).await?.into()) } - pub async fn get_pending_activation(&self) -> Result, Error> { + pub async fn get_pending_activation( + &self, + namespace: Option<&str>, + ) -> Result, Error> { let now = Utc::now(); - let result: Option = sqlx::query_as( + + let mut query_builder = QueryBuilder::new( "UPDATE inflight_taskactivations - SET - processing_deadline = datetime('now', '+' || processing_deadline_duration || ' seconds'), - status = $1 - WHERE id = ( - SELECT id - FROM inflight_taskactivations - WHERE status = $2 - AND (deadletter_at IS NULL OR deadletter_at > $3) - ORDER BY added_at - LIMIT 1 - ) - RETURNING *", - ) - .bind(InflightActivationStatus::Processing) - .bind(InflightActivationStatus::Pending) - .bind(now) - .fetch_optional(&self.sqlite_pool) - .await?; + SET processing_deadline = datetime('now', '+' || processing_deadline_duration || ' seconds'), status = ", + ); + query_builder.push_bind(InflightActivationStatus::Processing); + query_builder.push( + " WHERE id = ( + SELECT id + FROM inflight_taskactivations + WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending); + query_builder.push(" AND (deadletter_at IS NULL OR deadletter_at > "); + query_builder.push_bind(now); + query_builder.push(")"); + + if let Some(namespace) = namespace { + query_builder.push(" AND namespace = "); + query_builder.push_bind(namespace); + } + query_builder.push(" ORDER BY added_at LIMIT 1) RETURNING *"); + let result: Option = query_builder + .build_query_as::() + .fetch_optional(&self.sqlite_pool) + .await?; let Some(row) = result else { return Ok(None) }; Ok(Some(row.into())) @@ -624,7 +638,7 @@ mod tests { let batch = make_activations(2); assert!(store.store(batch.clone()).await.is_ok()); - let result = store.get_pending_activation().await.unwrap().unwrap(); + let result = store.get_pending_activation(None).await.unwrap().unwrap(); assert_eq!(result.activation.id, "id_0"); assert_eq!(result.status, InflightActivationStatus::Processing); @@ -632,6 +646,27 @@ mod tests { assert_count_by_status(&store, InflightActivationStatus::Pending, 1).await; } + #[tokio::test] + async fn test_get_pending_activation_with_namespace() { + let url = generate_temp_filename(); + let store = InflightActivationStore::new(&url).await.unwrap(); + + let mut batch = make_activations(2); + batch[1].namespace = "other_namespace".into(); + assert!(store.store(batch.clone()).await.is_ok()); + + // Get activation from other namespace + let result = store + .get_pending_activation(Some("other_namespace")) + .await + .unwrap() + .unwrap(); + assert_eq!(result.activation.id, "id_1"); + assert_eq!(result.status, InflightActivationStatus::Processing); + assert!(result.processing_deadline.unwrap() > Utc::now()); + assert_eq!(result.namespace, "other_namespace"); + } + #[tokio::test] async fn test_get_pending_activation_no_deadletter() { let url = generate_temp_filename(); @@ -641,7 +676,7 @@ mod tests { batch[0].deadletter_at = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); - let result = store.get_pending_activation().await; + let result = store.get_pending_activation(None).await; assert!(result.is_ok()); let res_option = result.unwrap(); assert!(res_option.is_none()); @@ -658,7 +693,7 @@ mod tests { batch[1].added_at = Utc.with_ymd_and_hms(1998, 6, 24, 0, 0, 0).unwrap(); assert!(store.store(batch.clone()).await.is_ok()); - let result = store.get_pending_activation().await.unwrap().unwrap(); + let result = store.get_pending_activation(None).await.unwrap().unwrap(); assert_eq!( result.added_at, Utc.with_ymd_and_hms(1998, 6, 24, 0, 0, 0).unwrap() @@ -707,7 +742,7 @@ mod tests { .await .is_ok()); assert_eq!(store.count_pending_activations().await.unwrap(), 0); - assert!(store.get_pending_activation().await.unwrap().is_none()); + assert!(store.get_pending_activation(None).await.unwrap().is_none()); } #[tokio::test] @@ -1152,6 +1187,7 @@ mod tests { deadletter_at: None, processing_deadline: None, at_most_once: false, + namespace: "namespace".into(), }]; assert!(store.store(batch).await.is_ok()); assert_eq!(store.count().await.unwrap(), 1); diff --git a/src/test_utils.rs b/src/test_utils.rs index 7703b71d..347be544 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -53,6 +53,7 @@ pub fn make_activations(count: u32) -> Vec { deadletter_at: None, processing_deadline: None, at_most_once: false, + namespace: "namespace".into(), }; records.push(item); } diff --git a/tests/grpc_server_test.rs b/tests/grpc_server_test.rs index ec946137..3a21ab2d 100644 --- a/tests/grpc_server_test.rs +++ b/tests/grpc_server_test.rs @@ -18,7 +18,7 @@ async fn test_get_task() { let url = generate_temp_filename(); let store = Arc::new(InflightActivationStore::new(&url).await.unwrap()); let service = MyConsumerService { store }; - let request = GetTaskRequest {}; + let request = GetTaskRequest { namespace: None }; let response = service.get_task(Request::new(request)).await; assert!(response.is_err()); let e = response.unwrap_err(); @@ -27,6 +27,7 @@ async fn test_get_task() { } #[tokio::test] +#[allow(deprecated)] async fn test_set_task_status() { let url = generate_temp_filename(); let store = Arc::new(InflightActivationStore::new(&url).await.unwrap()); @@ -34,7 +35,9 @@ async fn test_set_task_status() { let request = SetTaskStatusRequest { id: "test_task".to_string(), status: 5, // Complete - fetch_next: Some(false), + fetch_next_task: None, + fetch_next: None, + fetch_next_namespace: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -44,6 +47,7 @@ async fn test_set_task_status() { } #[tokio::test] +#[allow(deprecated)] async fn test_set_task_status_invalid() { let url = generate_temp_filename(); let store = Arc::new(InflightActivationStore::new(&url).await.unwrap()); @@ -51,7 +55,9 @@ async fn test_set_task_status_invalid() { let request = SetTaskStatusRequest { id: "test_task".to_string(), status: 1, // Invalid - fetch_next: Some(false), + fetch_next_task: None, + fetch_next: None, + fetch_next_namespace: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_err());