Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
sentry_protos = "0.1.37"
sentry_protos = "0.1.39"
anyhow = "1.0.92"
chrono = { version = "0.4.26" }
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] }
Expand Down
4 changes: 3 additions & 1 deletion migrations/0001_create_inflight_taskactivations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations (
deadletter_at DATETIME,
processing_deadline_duration INTEGER NOT NULL,
processing_deadline DATETIME,
status INTEGER NOT NULL
status INTEGER NOT NULL,
at_most_once BOOLEAN NOT NULL DEFAULT FALSE,
namespace TEXT
);
1 change: 0 additions & 1 deletion migrations/0002_add_at_most_once.sql

This file was deleted.

2 changes: 1 addition & 1 deletion python/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ orjson>=3.10.10
protobuf>=5.28.3
pytest>=8.3.3
pyyaml>=6.0.2
sentry-protos>=0.1.37
sentry-protos>=0.1.39
2 changes: 2 additions & 0 deletions src/consumer/deserialize_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub fn new(
return Err(anyhow!("Message has no payload"));
};
let activation = TaskActivation::decode(payload)?;
let namespace = activation.namespace.clone();

let mut at_most_once = false;
if let Some(ref retry_state) = activation.retry_state {
Expand Down Expand Up @@ -69,6 +70,7 @@ pub fn new(
deadletter_at: Some(deadletter_at),
processing_deadline: None,
at_most_once,
namespace,
})
}
}
Expand Down
33 changes: 20 additions & 13 deletions src/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ impl ConsumerService for MyConsumerService {
#[instrument(skip(self))]
async fn get_task(
&self,
_request: Request<GetTaskRequest>,
request: Request<GetTaskRequest>,
) -> Result<Response<GetTaskResponse>, Status> {
let start_time = Instant::now();
let namespace = &request.get_ref().namespace;
let inflight = self
.store
.get_pending_activation(namespace.as_deref())
.await;

let inflight = self.store.get_pending_activation().await;
match inflight {
Ok(Some(inflight)) => {
let resp = GetTaskResponse {
Expand Down Expand Up @@ -81,19 +85,22 @@ impl ConsumerService for MyConsumerService {
error: None,
};

if let Some(fetch_next) = request.get_ref().fetch_next {
if fetch_next {
let start_time = Instant::now();
let inflight = self.store.get_pending_activation().await;
metrics::histogram!("grpc_server.fetch_next.duration").record(start_time.elapsed());
let fetch_next = &request.get_ref().fetch_next_task;
if let Some(fetch_next) = fetch_next {
let start_time = Instant::now();
let namespace = &fetch_next.namespace;
let inflight = self
.store
.get_pending_activation(namespace.as_deref())
.await;
metrics::histogram!("grpc_server.fetch_next.duration").record(start_time.elapsed());

match inflight {
Ok(Some(inflight)) => {
response.task = Some(inflight.activation);
}
Ok(None) => return Err(Status::not_found("No pending activation")),
Err(e) => return Err(Status::internal(e.to_string())),
match inflight {
Ok(Some(inflight)) => {
response.task = Some(inflight.activation);
}
Ok(None) => return Err(Status::not_found("No pending activation")),
Err(e) => return Err(Status::internal(e.to_string())),
}
}

Expand Down
86 changes: 61 additions & 25 deletions src/inflight_activation_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub struct InflightActivation {
/// When enabled activations are not retried when processing_deadlines
/// are exceeded.
pub at_most_once: bool,
pub namespace: String,
}

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -106,6 +107,7 @@ struct TableRow {
processing_deadline: Option<DateTime<Utc>>,
status: InflightActivationStatus,
at_most_once: bool,
namespace: String,
}

impl TryFrom<InflightActivation> for TableRow {
Expand All @@ -123,6 +125,7 @@ impl TryFrom<InflightActivation> for TableRow {
processing_deadline: value.processing_deadline,
status: value.status,
at_most_once: value.at_most_once,
namespace: value.namespace.clone(),
})
}
}
Expand All @@ -140,6 +143,7 @@ impl From<TableRow> for InflightActivation {
deadletter_at: value.deadletter_at,
processing_deadline: value.processing_deadline,
at_most_once: value.at_most_once,
namespace: value.namespace,
}
}
}
Expand Down Expand Up @@ -179,7 +183,7 @@ impl InflightActivationStore {
}
let mut query_builder = QueryBuilder::<Sqlite>::new(
"INSERT INTO inflight_taskactivations \
(id, activation, partition, offset, added_at, deadletter_at, processing_deadline_duration, processing_deadline, status, at_most_once)",
(id, activation, partition, offset, added_at, deadletter_at, processing_deadline_duration, processing_deadline, status, at_most_once, namespace)",
);
let rows = batch
.into_iter()
Expand All @@ -202,35 +206,45 @@ impl InflightActivationStore {
}
b.push_bind(row.status);
b.push_bind(row.at_most_once);
b.push_bind(row.namespace);
})
.push(" ON CONFLICT(id) DO NOTHING")
.build();
Ok(query.execute(&self.sqlite_pool).await?.into())
}

pub async fn get_pending_activation(&self) -> Result<Option<InflightActivation>, Error> {
pub async fn get_pending_activation(
&self,
namespace: Option<&str>,
) -> Result<Option<InflightActivation>, Error> {
let now = Utc::now();
let result: Option<TableRow> = sqlx::query_as(

let mut query_builder = QueryBuilder::new(
"UPDATE inflight_taskactivations
SET
processing_deadline = datetime('now', '+' || processing_deadline_duration || ' seconds'),
status = $1
WHERE id = (
SELECT id
FROM inflight_taskactivations
WHERE status = $2
AND (deadletter_at IS NULL OR deadletter_at > $3)
ORDER BY added_at
LIMIT 1
)
RETURNING *",
)
.bind(InflightActivationStatus::Processing)
.bind(InflightActivationStatus::Pending)
.bind(now)
.fetch_optional(&self.sqlite_pool)
.await?;
SET processing_deadline = datetime('now', '+' || processing_deadline_duration || ' seconds'), status = ",
);
query_builder.push_bind(InflightActivationStatus::Processing);
query_builder.push(
" WHERE id = (
SELECT id
FROM inflight_taskactivations
WHERE status = ",
);
query_builder.push_bind(InflightActivationStatus::Pending);
query_builder.push(" AND (deadletter_at IS NULL OR deadletter_at > ");
query_builder.push_bind(now);
query_builder.push(")");

if let Some(namespace) = namespace {
query_builder.push(" AND namespace = ");
query_builder.push_bind(namespace);
}
query_builder.push(" ORDER BY added_at LIMIT 1) RETURNING *");

let result: Option<TableRow> = query_builder
.build_query_as::<TableRow>()
.fetch_optional(&self.sqlite_pool)
.await?;
let Some(row) = result else { return Ok(None) };

Ok(Some(row.into()))
Expand Down Expand Up @@ -624,14 +638,35 @@ mod tests {
let batch = make_activations(2);
assert!(store.store(batch.clone()).await.is_ok());

let result = store.get_pending_activation().await.unwrap().unwrap();
let result = store.get_pending_activation(None).await.unwrap().unwrap();

assert_eq!(result.activation.id, "id_0");
assert_eq!(result.status, InflightActivationStatus::Processing);
assert!(result.processing_deadline.unwrap() > Utc::now());
assert_count_by_status(&store, InflightActivationStatus::Pending, 1).await;
}

#[tokio::test]
async fn test_get_pending_activation_with_namespace() {
let url = generate_temp_filename();
let store = InflightActivationStore::new(&url).await.unwrap();

let mut batch = make_activations(2);
batch[1].namespace = "other_namespace".into();
assert!(store.store(batch.clone()).await.is_ok());

// Get activation from other namespace
let result = store
.get_pending_activation(Some("other_namespace"))
.await
.unwrap()
.unwrap();
assert_eq!(result.activation.id, "id_1");
assert_eq!(result.status, InflightActivationStatus::Processing);
assert!(result.processing_deadline.unwrap() > Utc::now());
assert_eq!(result.namespace, "other_namespace");
}

#[tokio::test]
async fn test_get_pending_activation_no_deadletter() {
let url = generate_temp_filename();
Expand All @@ -641,7 +676,7 @@ mod tests {
batch[0].deadletter_at = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap());
assert!(store.store(batch.clone()).await.is_ok());

let result = store.get_pending_activation().await;
let result = store.get_pending_activation(None).await;
assert!(result.is_ok());
let res_option = result.unwrap();
assert!(res_option.is_none());
Expand All @@ -658,7 +693,7 @@ mod tests {
batch[1].added_at = Utc.with_ymd_and_hms(1998, 6, 24, 0, 0, 0).unwrap();
assert!(store.store(batch.clone()).await.is_ok());

let result = store.get_pending_activation().await.unwrap().unwrap();
let result = store.get_pending_activation(None).await.unwrap().unwrap();
assert_eq!(
result.added_at,
Utc.with_ymd_and_hms(1998, 6, 24, 0, 0, 0).unwrap()
Expand Down Expand Up @@ -707,7 +742,7 @@ mod tests {
.await
.is_ok());
assert_eq!(store.count_pending_activations().await.unwrap(), 0);
assert!(store.get_pending_activation().await.unwrap().is_none());
assert!(store.get_pending_activation(None).await.unwrap().is_none());
}

#[tokio::test]
Expand Down Expand Up @@ -1152,6 +1187,7 @@ mod tests {
deadletter_at: None,
processing_deadline: None,
at_most_once: false,
namespace: "namespace".into(),
}];
assert!(store.store(batch).await.is_ok());
assert_eq!(store.count().await.unwrap(), 1);
Expand Down
1 change: 1 addition & 0 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub fn make_activations(count: u32) -> Vec<InflightActivation> {
deadletter_at: None,
processing_deadline: None,
at_most_once: false,
namespace: "namespace".into(),
};
records.push(item);
}
Expand Down
12 changes: 9 additions & 3 deletions tests/grpc_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn test_get_task() {
let url = generate_temp_filename();
let store = Arc::new(InflightActivationStore::new(&url).await.unwrap());
let service = MyConsumerService { store };
let request = GetTaskRequest {};
let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
assert!(response.is_err());
let e = response.unwrap_err();
Expand All @@ -27,14 +27,17 @@ async fn test_get_task() {
}

#[tokio::test]
#[allow(deprecated)]
async fn test_set_task_status() {
let url = generate_temp_filename();
let store = Arc::new(InflightActivationStore::new(&url).await.unwrap());
let service = MyConsumerService { store };
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
status: 5, // Complete
fetch_next: Some(false),
fetch_next_task: None,
fetch_next: None,
fetch_next_namespace: None,
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand All @@ -44,14 +47,17 @@ async fn test_set_task_status() {
}

#[tokio::test]
#[allow(deprecated)]
async fn test_set_task_status_invalid() {
let url = generate_temp_filename();
let store = Arc::new(InflightActivationStore::new(&url).await.unwrap());
let service = MyConsumerService { store };
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
status: 1, // Invalid
fetch_next: Some(false),
fetch_next_task: None,
fetch_next: None,
fetch_next_namespace: None,
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_err());
Expand Down
Loading