Skip to content

Commit a9dc8ca

Browse files
committed
feat(kafka): Implement basic async kafka consumer
1 parent 57fed8c commit a9dc8ca

File tree

11 files changed

+2219
-55
lines changed

11 files changed

+2219
-55
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,8 @@
22

33
# Editors
44
.DS_Store
5+
6+
# Sqlite artifacts
7+
*.sqlite
8+
*.sqlite-shm
9+
*.sqlite-wal

Cargo.lock

Lines changed: 140 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,23 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
sentry_protos = "0.1.33"
9+
sentry_protos = "0.1.34"
10+
prost = "0.13"
11+
prost-types = "0.13.3"
1012
anyhow = "1.0.92"
1113
chrono = { version = "0.4.26" }
12-
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] }
13-
prost = "0.13"
1414
tokio = { version = "1.41.0", features = ["full"] }
15-
prost-types = "0.13.3"
15+
tokio-util = "0.7.12"
16+
tokio-stream = { version = "0.1.16", features = ["full"] }
17+
async-stream = "0.3.5"
18+
futures = "0.3.31"
19+
tracing = "0.1.40"
20+
tracing-subscriber = "0.3.18"
1621
rdkafka = { version = "0.36.2", features = ["cmake-build"] }
1722
serde = "1.0.214"
1823
serde_yaml = "0.9.34"
1924
figment = { version = "0.10.19", features = ["env", "yaml", "test"] }
25+
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] }
2026

2127
[dev-dependencies]
2228
rand = "0.8.5"

migrations/0001_create_inflight_taskactivations.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
CREATE TABLE IF NOT EXISTS inflight_taskactivations (
22
id UUID NOT NULL PRIMARY KEY,
33
activation BLOB NOT NULL,
4+
partition INTEGER NOT NULL,
45
offset BIGINTEGER NOT NULL,
56
added_at DATETIME NOT NULL,
67
deadletter_at DATETIME,
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use anyhow::{anyhow, Error};
4+
use chrono::Utc;
5+
use prost::Message as _;
6+
use rdkafka::{message::OwnedMessage, Message};
7+
use sentry_protos::sentry::v1::TaskActivation;
8+
9+
use crate::inflight_activation_store::{InflightActivation, TaskActivationStatus};
10+
11+
pub struct DeserializerConfig {
12+
pub deadletter_duration: Option<Duration>,
13+
}
14+
15+
pub fn new(
16+
config: DeserializerConfig,
17+
) -> impl Fn(Arc<OwnedMessage>) -> Result<InflightActivation, Error> {
18+
move |msg: Arc<OwnedMessage>| {
19+
let Some(payload) = msg.payload() else {
20+
return Err(anyhow!("Message has no payload"));
21+
};
22+
let activation = TaskActivation::decode(payload)?;
23+
Ok(InflightActivation {
24+
activation,
25+
status: TaskActivationStatus::Pending,
26+
partition: msg.partition(),
27+
offset: msg.offset(),
28+
added_at: Utc::now(),
29+
deadletter_at: config
30+
.deadletter_duration
31+
.map(|duration| Utc::now() + duration),
32+
processing_deadline: None,
33+
})
34+
}
35+
}

0 commit comments

Comments
 (0)