Skip to content

Commit 7392289

Browse files
author
Claude Code
committed
now everything gets committed properly
1 parent b40e567 commit 7392289

File tree

2 files changed

+36
-18
lines changed

2 files changed

+36
-18
lines changed

rust_snuba/src/factory_v2.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ use crate::strategies::replacements::ProduceReplacements;
4343
use crate::types::{BytesInsertBatch, CogsData, RowData, TypedInsertBatch};
4444

4545
// BLQ configuration
46-
const STALE_THRESHOLD: TimeDelta = TimeDelta::minutes(30);
47-
const STATIC_FRICTION: Option<TimeDelta> = Some(TimeDelta::minutes(2));
46+
const BLQ_STALE_THRESHOLD: TimeDelta = TimeDelta::minutes(30);
47+
const BLQ_STATIC_FRICTION: Option<TimeDelta> = Some(TimeDelta::minutes(2));
48+
4849
pub struct ConsumerStrategyFactoryV2 {
4950
pub storage_config: config::StorageConfig,
5051
pub env_config: config::EnvConfig,
@@ -283,17 +284,17 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
283284
) {
284285
tracing::info!(
285286
"Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
286-
STALE_THRESHOLD,
287+
BLQ_STALE_THRESHOLD,
287288
self.blq_topic,
288-
STATIC_FRICTION
289+
BLQ_STATIC_FRICTION
289290
);
290291
Box::new(
291292
BLQRouter::new(
292293
next_step,
293294
blq_producer_config.clone(),
294295
blq_topic,
295-
STALE_THRESHOLD,
296-
STATIC_FRICTION,
296+
BLQ_STALE_THRESHOLD,
297+
BLQ_STATIC_FRICTION,
297298
)
298299
.expect("invalid BLQRouter config"),
299300
)
@@ -446,17 +447,17 @@ impl ConsumerStrategyFactoryV2 {
446447
) {
447448
tracing::info!(
448449
"Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
449-
STALE_THRESHOLD,
450+
BLQ_STALE_THRESHOLD,
450451
self.blq_topic,
451-
STATIC_FRICTION,
452+
BLQ_STATIC_FRICTION,
452453
);
453454
Box::new(
454455
BLQRouter::new(
455456
next_step,
456457
blq_producer_config.clone(),
457458
blq_topic,
458-
STALE_THRESHOLD,
459-
STATIC_FRICTION,
459+
BLQ_STALE_THRESHOLD,
460+
BLQ_STATIC_FRICTION,
460461
)
461462
.expect("invalid BLQRouter config"),
462463
)

rust_snuba/src/strategies/blq_router.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ use sentry_arroyo::types::{Message, Topic, TopicOrPartition};
4747
enum State {
4848
Idle, // no messages have gone through the router yet
4949
RoutingStale, // router is directing stale messages to the backlog-queue (BLQ)
50-
Forwarding, // router is forwarding non-stale messages along to the next strategy
50+
// we have processed all stale messages and are now flushing (finishing producing to BLQ)
51+
// when we transition to this state we will have CommitRequest for what was flushed, and poll
52+
// will be responsible for returning it
53+
Flushing(Option<CommitRequest>),
54+
Forwarding, // router is forwarding non-stale messages along to the next strategy
5155
}
5256

5357
pub struct BLQRouter<Next, ProduceStrategy> {
@@ -151,9 +155,14 @@ where
151155
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
152156
let produce_result = self.producer.poll();
153157
let next_step_result = self.next_step.poll();
154-
match self.state {
158+
match &mut self.state {
155159
State::RoutingStale => produce_result,
156160
State::Forwarding | State::Idle => next_step_result,
161+
State::Flushing(commits) => {
162+
let commits = commits.take();
163+
self.state = State::Forwarding;
164+
Ok(commits)
165+
}
157166
}
158167
}
159168

@@ -192,12 +201,19 @@ where
192201
}
193202
(false, State::RoutingStale) => {
194203
// We hit a fresh message, so we are done routing the backlog.
195-
// Call join on the producer so all writes to the BLQ are committed.
196-
self.producer.join(Some(Duration::from_secs(5))).unwrap();
204+
// Finish producing and committing all the state messages and
205+
// then switch back to forwarding fresh.
197206

198-
// Now go back to forwarding non-stale messages as usual.
199-
self.state = State::Forwarding;
200-
self.next_step.submit(message)
207+
// i know i shouldnt be blocking in submit but there was no better way to do it
208+
// the pipeline cant make progress until this completes anyways so it should be fine
209+
let flush_results = self.producer.join(Some(Duration::from_secs(5))).unwrap();
210+
self.state = State::Flushing(flush_results);
211+
Ok(())
212+
}
213+
(true, State::Flushing(_)) | (false, State::Flushing(_)) => {
214+
return Err(SubmitError::MessageRejected(
215+
sentry_arroyo::processing::strategies::MessageRejected { message },
216+
));
201217
}
202218
}
203219
}
@@ -210,9 +226,10 @@ where
210226
fn join(&mut self, timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
211227
let producer_result = self.producer.join(timeout);
212228
let next_step_result = self.next_step.join(timeout);
213-
match self.state {
229+
match &self.state {
214230
State::RoutingStale => producer_result,
215231
State::Forwarding | State::Idle => next_step_result,
232+
State::Flushing(commits) => Ok(commits.clone()),
216233
}
217234
}
218235
}

0 commit comments

Comments
 (0)