Skip to content

Commit 9019744

Browse files
john-z-yangevanh
andauthored
feat(kafka): Implement basic async kafka consumer (#17)
Co-authored-by: Evan Hicks <evanh@users.noreply.github.com>
1 parent 804711c commit 9019744

File tree

11 files changed

+2161
-71
lines changed

11 files changed

+2161
-71
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,8 @@
22

33
# Editors
44
.DS_Store
5+
6+
# Sqlite artifacts
7+
*.sqlite
8+
*.sqlite-shm
9+
*.sqlite-wal

Cargo.lock

Lines changed: 63 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,25 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
sentry_protos = "0.1.33"
9+
sentry_protos = "0.1.34"
10+
prost = "0.13"
11+
prost-types = "0.13.3"
1012
anyhow = "1.0.92"
1113
chrono = { version = "0.4.26" }
12-
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] }
13-
prost = "0.13"
1414
tokio = { version = "1.41.0", features = ["full"] }
15-
prost-types = "0.13.3"
15+
tokio-util = "0.7.12"
16+
tokio-stream = { version = "0.1.16", features = ["full"] }
17+
async-stream = "0.3.5"
18+
futures = "0.3.31"
1619
rdkafka = { version = "0.36.2", features = ["cmake-build"] }
1720
serde = "1.0.214"
1821
serde_yaml = "0.9.34"
1922
figment = { version = "0.10.19", features = ["env", "yaml", "test"] }
23+
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] }
2024
clap = { version = "4.5.20", features = ["derive"] }
2125
sentry = { version = "0.34.0", features = ["tracing"] }
22-
tracing-subscriber = { version = "0.3.18", features = ["json"] }
2326
tracing = "0.1.40"
27+
tracing-subscriber = { version = "0.3.18", features = ["json"] }
2428
metrics-exporter-statsd = "0.9.0"
2529
metrics = "0.24.0"
2630

migrations/0001_create_inflight_taskactivations.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
CREATE TABLE IF NOT EXISTS inflight_taskactivations (
22
id UUID NOT NULL PRIMARY KEY,
33
activation BLOB NOT NULL,
4+
partition INTEGER NOT NULL,
45
offset BIGINTEGER NOT NULL,
56
added_at DATETIME NOT NULL,
67
deadletter_at DATETIME,
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use anyhow::{anyhow, Error};
4+
use chrono::Utc;
5+
use prost::Message as _;
6+
use rdkafka::{message::OwnedMessage, Message};
7+
use sentry_protos::sentry::v1::TaskActivation;
8+
9+
use crate::inflight_activation_store::{InflightActivation, TaskActivationStatus};
10+
11+
pub struct Config {
12+
pub deadletter_duration: Option<Duration>,
13+
}
14+
15+
pub fn new(config: Config) -> impl Fn(Arc<OwnedMessage>) -> Result<InflightActivation, Error> {
16+
move |msg: Arc<OwnedMessage>| {
17+
let Some(payload) = msg.payload() else {
18+
return Err(anyhow!("Message has no payload"));
19+
};
20+
let activation = TaskActivation::decode(payload)?;
21+
Ok(InflightActivation {
22+
activation,
23+
status: TaskActivationStatus::Pending,
24+
partition: msg.partition(),
25+
offset: msg.offset(),
26+
added_at: Utc::now(),
27+
deadletter_at: config
28+
.deadletter_duration
29+
.map(|duration| Utc::now() + duration),
30+
processing_deadline: None,
31+
})
32+
}
33+
}

0 commit comments

Comments
 (0)