-
Notifications
You must be signed in to change notification settings - Fork 103
ref(client-reports): Move Client Reports to the new processing #5338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
ref(client-reports): Move Client Reports to the new processing #5338
Conversation
| if (!config.emit_outcomes().any() || !config.emit_client_outcomes()) | ||
| && config.processing_enabled() | ||
| { | ||
| // if a processing relay has client outcomes disabled we drop them without processing. | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This used to be:
// if client outcomes are disabled we leave the client reports unprocessed
// and pass them on.
if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
// if a processing relay has client outcomes disabled we drop them.
if config.processing_enabled() {
managed_envelope.retain_items(|item| match item.ty() {
ItemType::ClientReport => ItemAction::DropSilently,
_ => ItemAction::Keep,
});
}
return;
}Not sure however we want to still drop them when processing is enabled or if we just want to get rid of this all together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a redundant check, if outcomes are disabled, the aggregator should drop them alltogether 🤔
@jjbayer wdyt, remove this either now or in a follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, emit_client_outcomes and emit_outcomes are different config flags so it's not entirely redundant. But I would be fine with making a breaking change and remove emit_client_outcomes from the config altogether.
|
|
||
| use super::*; | ||
|
|
||
| // FIXME: Ask if moving the tests over is worth the changes to ProcessEnvelopeGrouped and Submit or if they should just stay here (hard to find). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The remaining 2 client report test are not yet moved since doing so will require us to make ProcessEnvelopeGrouped and Submit public not sure if we want to do that to have the tests in a "better spot".
| /// At the moment client reports are primarily used to transfer outcomes from | ||
| /// client SDKs. The outcomes are removed here and sent directly to the outcomes | ||
| /// system. | ||
| pub fn process_client_reports( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is taken almost 1-1 from the old processing so if we think this is better broken up or rewritten to make it more elegant can still do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should do that, left an explaining comment on why here: #5338 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, rewrote the logic now to split it up ruffly into the individual parts that you mentioned. Not sure if the code is too cute now though.
| if (!config.emit_outcomes().any() || !config.emit_client_outcomes()) | ||
| && config.processing_enabled() | ||
| { | ||
| // if a processing relay has client outcomes disabled we drop them without processing. | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a redundant check, if outcomes are disabled, the aggregator should drop them alltogether 🤔
@jjbayer wdyt, remove this either now or in a follow up?
| && config.processing_enabled() | ||
| { | ||
| // if a processing relay has client outcomes disabled we drop them without processing. | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should return an error here, this only works because we do not log any outcomes for client reports, but dropping data should be explicit and go through Rejected.
| relay_log::trace!("ignored client outcome with an overlong reason"); | ||
| continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, this bypasses the accounting logic of Managed, it just happens to work because there are no outcomes for client reports, but the processing code should work independent of this.
With outcomes for client reports and in a debug build this would panic due to accounting errors.
If you follow the implementation of e.g. the logs processor, you can do the processing in multiple phases:
managed.map()-> here you expand the client reportsmanaged.retain()-> here you validate/normalize the client reportsmanaged.accept()-> here you send the client reports to the aggregator
For readability reasons you can also split these 3 phases into 3 different functions.
If you want we can pair next week on making these changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, if we make the effort of porting the code we might as well use the new bookkeeping mechanism.
| if (!config.emit_outcomes().any() || !config.emit_client_outcomes()) | ||
| && config.processing_enabled() | ||
| { | ||
| // if a processing relay has client outcomes disabled we drop them without processing. | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, emit_client_outcomes and emit_outcomes are different config flags so it's not entirely redundant. But I would be fine with making a breaking change and remove emit_client_outcomes from the config altogether.
| relay_log::trace!("ignored client outcome with an overlong reason"); | ||
| continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, if we make the effort of porting the code we might as well use the new bookkeeping mechanism.
| /// Emits the validated client report outcomes to the outcome aggregator. | ||
| pub fn emit( | ||
| client_reports: Managed<ValidatedClientReports>, | ||
| outcome_aggregator: &Addr<TrackOutcome>, | ||
| ) { | ||
| client_reports.accept(|client_reports| { | ||
| for outcome in client_reports.outcomes { | ||
| outcome_aggregator.send(outcome) | ||
| } | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we think this is worth its own function else can also move it into validate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can merge these if it makes sense, the additional step of converting to TrackedOutcome seems a bit weird.
What we can do is merge these two parts together into just an emit, there you can split out individual reports/outcomes using managed.split() then rejecting individual incorrect items with a try_accept.
Roughly:
for outcome in client_reports.split(|client_reports| client_reports.outcomes) {
outcome.try_accept(|...| {
})
}A lot of this is often just trying it out, see how it feels and reads and if whether you think this overall improves the situation if you compare the two options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above, I would even go further and split up validate, so you get
let client_reports = process::validate(...);
let tracked_outcomes = process::convert(_);
process::emit(tracked_outcomes)IMO these are all high-level building blocks of the processing pipeline.
| let mut output_events = BTreeMap::new(); | ||
|
|
||
| for item in client_reports.client_reports { | ||
| match ClientReport::parse(&item.payload()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd consider factoring this match block out in a separate function it reduces nesting overall and in general just increases readability.
You have the expansion function which deals with the container and many items, then you have a dedicated parsing function which deals with an individual item.
If you were to write unittests, that would also be much easier to unittest individually. Tests are often a good way to figure out code could be organized better/differently.
| /// Emits the validated client report outcomes to the outcome aggregator. | ||
| pub fn emit( | ||
| client_reports: Managed<ValidatedClientReports>, | ||
| outcome_aggregator: &Addr<TrackOutcome>, | ||
| ) { | ||
| client_reports.accept(|client_reports| { | ||
| for outcome in client_reports.outcomes { | ||
| outcome_aggregator.send(outcome) | ||
| } | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can merge these if it makes sense, the additional step of converting to TrackedOutcome seems a bit weird.
What we can do is merge these two parts together into just an emit, there you can split out individual reports/outcomes using managed.split() then rejecting individual incorrect items with a try_accept.
Roughly:
for outcome in client_reports.split(|client_reports| client_reports.outcomes) {
outcome.try_accept(|...| {
})
}A lot of this is often just trying it out, see how it feels and reads and if whether you think this overall improves the situation if you compare the two options.
| } | ||
|
|
||
| ValidatedClientReports { outcomes } | ||
| })) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Similar as what David mentioned above, I would move the body of the for loop to a separate function.
- I think readability would benefit from splitting the conversion to
TrackOutcomeinto a top-levelprocessfunction, so you get something like
let client_reports = process::validate(...);
let tracked_outcomes = process::convert(_);| /// Emits the validated client report outcomes to the outcome aggregator. | ||
| pub fn emit( | ||
| client_reports: Managed<ValidatedClientReports>, | ||
| outcome_aggregator: &Addr<TrackOutcome>, | ||
| ) { | ||
| client_reports.accept(|client_reports| { | ||
| for outcome in client_reports.outcomes { | ||
| outcome_aggregator.send(outcome) | ||
| } | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above, I would even go further and split up validate, so you get
let client_reports = process::validate(...);
let tracked_outcomes = process::convert(_);
process::emit(tracked_outcomes)IMO these are all high-level building blocks of the processing pipeline.
Co-authored-by: Joris Bayer <[email protected]>
Co-authored-by: Joris Bayer <[email protected]>
Co-authored-by: Joris Bayer <[email protected]>
Co-authored-by: Joris Bayer <[email protected]>
| if !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() { | ||
| // if a processing relay has client outcomes disabled we drop them without processing. | ||
| return Err(client_reports.reject_err(Error::OutcomesDisabled)); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: The condition !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() is logically unreachable, preventing OutcomesDisabled error.
Severity: HIGH | Confidence: High
🔍 Detailed Analysis
The condition !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() at relay-server/src/processing/client_reports/mod.rs:80-82 is logically unreachable. When ctx.config.processing_enabled() is true, emit_outcomes().any() is always true, making the first part false. When ctx.config.processing_enabled() is false, the second part is false. Consequently, the OutcomesDisabled error path is never taken, causing client reports to always be processed, ignoring the emit_outcomes configuration. This is a functional regression.
💡 Suggested Fix
Correct the condition at relay-server/src/processing/client_reports/mod.rs:80-82 to reflect the intended logic, likely by negating ctx.config.processing_enabled() or reverting to the original condition (!config.emit_outcomes().any() || !config.emit_client_outcomes()) && !config.processing_enabled().
🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: relay-server/src/processing/client_reports/mod.rs#L80-L84
Potential issue: The condition `!ctx.config.emit_outcomes().any() &&
ctx.config.processing_enabled()` at
`relay-server/src/processing/client_reports/mod.rs:80-82` is logically unreachable. When
`ctx.config.processing_enabled()` is true, `emit_outcomes().any()` is always true,
making the first part false. When `ctx.config.processing_enabled()` is false, the second
part is false. Consequently, the `OutcomesDisabled` error path is never taken, causing
client reports to always be processed, ignoring the `emit_outcomes` configuration. This
is a functional regression.
Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 3173889
This ports the client_report processing logic over to the new processor. In doing so it refactors the logic to better align with the structure that can be found in the other processors. The logic should be however remain the same except for one breaking change. To simplify the logic, reports are now never forwarded but rather always processed or droped. Previously if
(!config.emit_outcomes().any() || !config.emit_client_outcomes()) && !config.processing_enabled()we would forward the reports instead of processing them.Fixes: INGEST-514