-
-
Notifications
You must be signed in to change notification settings - Fork 3
feat(kafka): Implement basic async kafka consumer #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| macro_rules! processing_strategy { | ||
| ( | ||
| @reducers, | ||
| ($reduce:expr), | ||
| $prev_receiver:ident, | ||
| $err_sender:ident, | ||
| $shutdown_signal:ident, | ||
| $handles:ident, | ||
| ) => {{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love this macro, but I can't figure out for the life of me, how create a function A that accept another function B, where function B returns a impl trait without using dyn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have enough experience with rust + rdkafka consumers to give useful detailed feedback yet. Building a custom macro strikes me as complex solution though.
51bc418 to
7327f42
Compare
| partition: msg.partition(), | ||
| offset: msg.offset(), | ||
| added_at: Utc::now(), | ||
| deadletter_at: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll eventually need to read this from configuration, but we can add that in later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I made some changes to the typing, so now we can pass in configs into the parsing function
let deadletter_duration = ...;
processing_strategy!({
map: deserialize_activation::new(DeserializerConfig {
deadletter_duration,
}),
reduce: InflightTaskWriter::new(
store.clone(),
InflightTaskWriterConfig {
max_buf_len: 2048,
flush_interval: Some(Duration::from_secs(1)),
when_full_behaviour: ReducerWhenFullBehaviour::Backpressure,
shutdown_behaviour: ReduceShutdownBehaviour::Flush,
}
),
err: OsStreamWriter::new(
Duration::from_secs(1),
OsStream::StdErr,
),
}),There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, how come the deadletter duration is configured in the config from the consumer, instead of being defined in python, and passed along in kafka?
| fn is_full(&self) -> bool { | ||
| self.buffer.len() >= self.max_buf_len |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this change based on how full the inflight store is? I think we'd want the reduce batch to only take as many messages as we could store in sqlite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree. I think this behaviour sort of depends on what kind of insert behaviour works best for sqlite. This was originally built with clickhouse in mind, where it cannot accept insert faster than once per second, and there are no constraints for the number of rows already in the database. So this reduce step batches messages up to a predetermined amount and write to it.
I see 2 options here:
- We batch to a predetermined amount. Each time we try to write (in the flush function), we check if the db is full, if it's not we write, otherwise we sleep (which emits backpressure).
- We batch to an dynamic amount. In this
is_fullfunction, we query the db to calculate how many rows we can accept, but I feel like this makes the code more unpredictable and we may need to cache the row count. Because the is_full call is on a very hot path (called when each message from kafka is sent to the reducer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLite should be able to take row-by-row writes. We're likely going to have small sqlite stores (probably less 1000 pending tasks at any given time). Batching pre-determined amounts into the batch buffer and then checking for db fullness on write seems like a good approach. That would let us build limit how frequently the 'is-full' query is run and if there is no room in the DB we can sleep/backpressure.
I've also been thinking about the partial batch fit problem we talked about last week. If we've built up a batch of messages in memory and when we get to flushing to sqlite, if the batch doesn't fully fit, I'm thinking we should proceed with the insert and overfill sqlite vs give up on the batch and have to reprocess messages again. If after flushing to sqlite, the db is at capacity we could have the consumer sleep before consuming more messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Okay, so the insert config now looks like this
reduce: InflightTaskWriter::new(
store.clone(),
InflightTaskWriterConfig {
max_buf_len: 128,
max_pending_tasks: 2048,
flush_interval: None,
when_full_behaviour: ReducerWhenFullBehaviour::Flush,
shutdown_behaviour: ReduceShutdownBehaviour::Drop,
}
),It has an internal buffer of max_buf_len, and targets max_pending_tasks number of pending tasks in the storage.
The is_full function is re-defined as follow:
async fn is_full(&self) -> bool {
self.buffer.len() >= self.config.max_buf_len
|| self
.store
.count_pending_activations()
.await
.expect("Error communicating with activation store")
+ self.buffer.len()
>= self.config.max_pending_tasks
}It accept messages only if is_full return false, and only flushes (writes to db) when is_full returns true
I added this logic as a separate commit, because there's a chance that I got something wrong somewhere
| macro_rules! processing_strategy { | ||
| ( | ||
| @reducers, | ||
| ($reduce:expr), | ||
| $prev_receiver:ident, | ||
| $err_sender:ident, | ||
| $shutdown_signal:ident, | ||
| $handles:ident, | ||
| ) => {{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have enough experience with rust + rdkafka consumers to give useful detailed feedback yet. Building a custom macro strikes me as complex solution though.
9ba3600 to
a9dc8ca
Compare
685d84c to
70b900a
Compare
980eeab to
80bb9de
Compare
evanh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to rubber stamp this but I can't say I really understand a lot of what is happening here.
Co-authored-by: Evan Hicks <[email protected]>
| pub struct Config { | ||
| pub max_buf_len: usize, | ||
| pub max_pending_tasks: usize, | ||
| pub flush_interval: Option<Duration>, | ||
| pub when_full_behaviour: ReducerWhenFullBehaviour, | ||
| pub shutdown_behaviour: ReduceShutdownBehaviour, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is your plan that we can generate these configuration structs from the application configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's being used like so right now in main.rs
InflightActivationWriter::new(
store.clone(),
inflight_activation_writer::Config {
max_buf_len: 128,
max_pending_activations: 2048,
flush_interval: None,
when_full_behaviour: ReducerWhenFullBehaviour::Flush,
shutdown_behaviour: ReduceShutdownBehaviour::Drop,
}In the future, we can add a cli argument to supply max_buf_len and max_pending_tasks, so we can do
InflightActivationWriter::new(
store.clone(),
inflight_activation_writer::Config {
max_buf_len: config.max_batch_size,
max_pending_activations: config.max_pending_activations,
flush_interval: None,
when_full_behaviour: ReducerWhenFullBehaviour::Flush,
shutdown_behaviour: ReduceShutdownBehaviour::Drop,
}| let mut query_builder = QueryBuilder::<Sqlite>::new( | ||
| "INSERT INTO inflight_taskactivations \ | ||
| (id, activation, offset, added_at, deadletter_at, processing_deadline_duration, status)", | ||
| (id, activation, partition, offset, added_at, deadletter_at, processing_deadline_duration, status)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to store the partition? If a consumer is rebalanced, don't we still need to deliver all the tasks that were added to sqlite even if they came from historical partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I just thought if we're storing offset we should store the partition as well, because with just an offset, we can't do much without a partition (since an offset can belong to any number of partitions). I think this is useful in the future if we want to commit when the tasks are completely.
But I don't feel really strongly either way.
markstory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets move forward with this. We can adapt and evolve it as necessary.
Overview
Implements a basic asynchronous kafa consumer
TODO
*.sqlitefiles after the consumer has shutdown. It would be really great if we can automate this into part of our CI.Local testing
Increase the number of partitions on the hackweek to 16 to observe more interesting rebalancing behaviour (defaults to 1):
docker exec -it sentry_kafka bashkafka-topics --bootstrap-server localhost:9092 --alter --topic hackweek --partitions 16Start a django shell (make sure to use the hackweek demo branch)
sentry django shellSend some messages to kafka
Start the consumer on multiple terminals
Should see some output about inserting into the store
Use this sqlite query to check for exactly once delivery,
delta=0for a partition means that the (minimum offset - maximum offset + 1) is equal to the number of messages.