-
-
Notifications
You must be signed in to change notification settings - Fork 3
fix(blq): Fix forward loop in demoted namespaces #504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #504 +/- ##
==========================================
+ Coverage 88.80% 88.83% +0.03%
==========================================
Files 20 20
Lines 5787 5867 +80
==========================================
+ Hits 5139 5212 +73
- Misses 648 655 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
evanh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some tests for the issues you mentioned?
| pub struct InflightActivationBatcher { | ||
| batch: Vec<InflightActivation>, | ||
| batch_size: usize, | ||
| forwarded_count: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THis is just to report metrics properly in case some tasks are demoted, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also used this in flush so that offsets can be committed for forwarded tasks even if batch is empty
src/kafka/utils.rs
Outdated
| /// * `Err(DecodeError)` - If the protobuf cannot be decoded | ||
| pub fn modify_activation_namespace(activation_bytes: &[u8]) -> Result<Vec<u8>, prost::DecodeError> { | ||
| let mut activation = TaskActivation::decode(activation_bytes)?; | ||
| activation.namespace += DEFAULT_DEMOTED_NAMESPACE_SUFFIX; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't a safe change to make. We won't have task definitions bound to these generated namespaces and tasks will fail as the tasks are undefined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've switched to headers to achieve the same results
| let results = join_all(sends).await; | ||
| let success_count = results.iter().filter(|r| r.is_ok()).count(); | ||
|
|
||
| metrics::histogram!("consumer.forwarded_rows").record(success_count as f64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a metric for failed produces as well please.
| // Already forwarded, fall through to add to batch | ||
| } | ||
| Err(_) => { | ||
| // Decode error, fall through to add to batch to handle in upkeep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a metric at least for this case, so we can tell if something weird is happening.
| return Ok(()); | ||
| } | ||
| Ok(None) => { | ||
| // Already forwarded, fall through to add to batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a metric here as well, as this shouldn't happen in normal scenarios.
| if !self.batch.is_empty() { | ||
| metrics::histogram!("consumer.batch_rows").record(self.batch.len() as f64); | ||
| metrics::histogram!("consumer.batch_bytes").record(self.batch_size as f64); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not always make the metrics? The zeros are meaningful.
| let sends = self.forward_batch.iter().map(|payload| { | ||
| self.producer.send( | ||
| FutureRecord::<(), Vec<u8>>::to(&topic).payload(payload), | ||
| Timeout::After(Duration::from_millis(self.config.send_timeout_ms)), | ||
| ) | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One concern I have with demoted namespaces, is that the producer cluster won't always have a long topic deployed on it. For example, if a broker is connected to kafka-events and we need to demote to the long/limited topic, that topic only exists on kafka-small. The current logic will result in the broker failing to produce.
I think we would need to expand the application configuration to have broker configuration for the demoted namespace flow. That would also imply that the topic we demote to is also fixed and could be removed from runtime config.
| ) | ||
| }); | ||
|
|
||
| let results = join_all(sends).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better 👏
src/kafka/utils.rs
Outdated
| let mut activation = TaskActivation::decode(activation_bytes)?; | ||
| if activation.headers.get("forwarded").map(|v| v.as_str()) == Some("true") { | ||
| return Ok(None); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the safety property that this provides. Would it be possible to simplify this and avoid deserializing and mutating the activation? We'll have the namespace on the InflightActivation, wouldn't skipping forwarding when the task's namespace matches the demoted namespace option work provide the same safety against loops?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that wouldn't provide safety against loops since all brokers share the same runtime config, however something similar I think would work is to compare between TASKBROKER_LONG_TOPIC and the topic we are trying to forward tasks to. If they are the same, we skip the forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that wouldn't provide safety against loops since all brokers share the same runtime config
Each replica in a processing pool does have the same runtime config, but each pool has configuration that is separate from other pools. We could have config to demote tasks in the default pool, and not have that config present on the long or limited pools. Comparing the topic names as we talked about today offline is another solution that would prevent loops.
|
Switched to comparing topic values to determine whether the task needs forwarding. Also added more metrics for visibility. |
markstory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. We can address the demoted topic cluster connection in a follow up pull request.
| println!("kafka_topic: {:?}", config.kafka_topic); | ||
| println!("kafka_long_topic: {:?}", config.kafka_long_topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some stray prints.
Checks whether topics that taskbroker is consuming from and producing to is the same before forwarding tasks.
Added metrics for forwarded tasks and fixed inflated completed tasks metrics.
Also handles edge case where all tasks are forwarded and batch is empty.
Tested and working in sandbox.
Refs STREAM-279