Skip to content

Commit 51bc418

Browse files
committed
feat(kafka): Implement basic async kafka consumer
1 parent 86bd0c6 commit 51bc418

File tree

11 files changed

+2195
-41
lines changed

11 files changed

+2195
-41
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22

33
# Editors
44
.DS_Store
5+
6+
# Sqlite artifacts
7+
*.sqlite

Cargo.lock

Lines changed: 126 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,20 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
sentry_protos = "0.1.33"
9+
sentry_protos = "0.1.34"
10+
prost = "0.13"
11+
prost-types = "0.13.3"
1012
anyhow = "1.0.92"
1113
chrono = { version = "0.4.26" }
12-
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] }
13-
prost = "0.13"
1414
tokio = { version = "1.41.0", features = ["full"] }
15-
prost-types = "0.13.3"
15+
tokio-util = "0.7.12"
16+
tokio-stream = { version = "0.1.16", features = ["full"] }
17+
async-stream = "0.3.5"
18+
futures = "0.3.31"
19+
tracing = "0.1.40"
20+
tracing-subscriber = "0.3.18"
1621
rdkafka = { version = "0.36.2", features = ["cmake-build"] }
22+
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] }
1723

1824
[dev-dependencies]
1925
rand = "0.8.5"

migrations/0001_create_inflight_taskactivations.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
CREATE TABLE IF NOT EXISTS inflight_taskactivations (
22
id UUID NOT NULL PRIMARY KEY,
33
activation BLOB NOT NULL,
4+
partition INTEGER NOT NULL,
45
offset BIGINTEGER NOT NULL,
56
added_at DATETIME NOT NULL,
67
deadletter_at DATETIME,
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::{anyhow, Error};
4+
use chrono::Utc;
5+
use prost::Message as _;
6+
use rdkafka::{message::OwnedMessage, Message};
7+
use sentry_protos::sentry::v1::TaskActivation;
8+
9+
use crate::inflight_activation_store::{InflightActivation, TaskActivationStatus};
10+
11+
pub async fn deserialize_activation(msg: Arc<OwnedMessage>) -> Result<InflightActivation, Error> {
12+
let Some(payload) = msg.payload() else {
13+
return Err(anyhow!("Message has no payload"));
14+
};
15+
let activation = TaskActivation::decode(payload)?;
16+
Ok(InflightActivation {
17+
activation,
18+
status: TaskActivationStatus::Pending,
19+
partition: msg.partition(),
20+
offset: msg.offset(),
21+
added_at: Utc::now(),
22+
deadletter_at: None,
23+
processing_deadline: None,
24+
})
25+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use std::{mem::replace, sync::Arc, time::Duration};
2+
3+
use tracing::info;
4+
5+
use crate::inflight_activation_store::{InflightActivation, InflightActivationStore};
6+
7+
use super::kafka::{
8+
ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer,
9+
ReducerWhenFullBehaviour,
10+
};
11+
12+
pub struct InflightTaskWriterConfig {
13+
pub max_buf_len: usize,
14+
pub flush_interval: Option<Duration>,
15+
pub when_full_behaviour: ReducerWhenFullBehaviour,
16+
pub shutdown_behaviour: ReduceShutdownBehaviour,
17+
}
18+
19+
pub struct InflightTaskWriter {
20+
store: Arc<InflightActivationStore>,
21+
buffer: Vec<InflightActivation>,
22+
max_buf_len: usize,
23+
reduce_config: ReduceConfig,
24+
}
25+
26+
impl InflightTaskWriter {
27+
pub fn new(store: Arc<InflightActivationStore>, config: InflightTaskWriterConfig) -> Self {
28+
Self {
29+
store,
30+
buffer: Vec::with_capacity(config.max_buf_len),
31+
max_buf_len: config.max_buf_len,
32+
reduce_config: ReduceConfig {
33+
shutdown_condition: ReduceShutdownCondition::Signal,
34+
shutdown_behaviour: ReduceShutdownBehaviour::Flush,
35+
when_full_behaviour: config.when_full_behaviour,
36+
flush_interval: config.flush_interval,
37+
},
38+
}
39+
}
40+
}
41+
42+
impl Reducer for InflightTaskWriter {
43+
type Input = InflightActivation;
44+
45+
type Output = ();
46+
47+
async fn reduce(&mut self, t: Self::Input) -> Result<(), anyhow::Error> {
48+
self.buffer.push(t);
49+
Ok(())
50+
}
51+
52+
async fn flush(&mut self) -> Result<Self::Output, anyhow::Error> {
53+
if self.buffer.is_empty() {
54+
return Ok(());
55+
}
56+
let res = self
57+
.store
58+
.store(replace(
59+
&mut self.buffer,
60+
Vec::with_capacity(self.max_buf_len),
61+
))
62+
.await?;
63+
info!("Inserted {:?} entries", res.rows_affected);
64+
Ok(())
65+
}
66+
67+
fn reset(&mut self) {
68+
self.buffer.clear();
69+
}
70+
71+
fn is_full(&self) -> bool {
72+
self.buffer.len() >= self.max_buf_len
73+
}
74+
75+
fn get_reduce_config(&self) -> ReduceConfig {
76+
self.reduce_config.clone()
77+
}
78+
}

0 commit comments

Comments
 (0)