Skip to content

Conversation

@jayshrivastava
Copy link
Collaborator

@jayshrivastava jayshrivastava commented Sep 22, 2025

This change adds a new type called MetricsCollectingStream. It wraps a stream of FlightData
and collects any metrics that are passed in the app_metadata. This change also introduces
an FlightAppMetadata enum proto which can be used to define our app metadata protocol.

Informs #123

@jayshrivastava jayshrivastava force-pushed the js/mixed-message-streams branch 2 times, most recently from 7b98cac to 30f44db Compare September 22, 2025 20:53
@jayshrivastava jayshrivastava marked this pull request as ready for review September 22, 2025 20:55
Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good. I just have some minor suggestions for the tests

stage_id: 2,
task_number: 2,
},
];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe the purpose of this test? Do you want to test collecting metrics for one query with 2 stages or you want to test collecting metrics for 2 queries each has one stage?

If you want to test both, you may want to split them in 2 different tests and make it clear in the names of the tests or add comments to explain so. Also, do we collect metrics of different queries in one pass and need such tests?

Either way, you want to make the test valid by:

  • Have the same query_id for the test on the same query. And have different query_id if they are for 2 different queries
  • Use smaller number for child stage_id

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe the purpose of this test?

I added a comment.

Do you want to test collecting metrics for one query with 2 stages or you want to test collecting metrics for 2 queries each has one stage?

This metrics stream doesn't really know the semantics of how queries are executed. The only "guarantee" it offers is that it collects metrics by StageKey but it doesn't know what those are. That's why I just used random StageKeys. However, I do agree that it's confusing, so I updated the test to use same query_id and stage_id with two different task ids. This should make more sense.

panic!("expected Gauge metric");
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is a bit long. What do you think to make different functions for creating metrics, stages and result verification, then create a macro to use them. There are many examples for that in Vanilla DF tests and I think Gabriel has that in this repro, too. That way, you can reuse them and it is easier for us to review

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabotechs : I am a bit ambiguous here. It would be great, if you can hep point Jayant to some good examples how to do so

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Plus I made it much more concise.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test utils for metrics creation as well :) Now, you can call make_test_metrics_set_proto_from_seed to make test data. Also all metrics protos implement Eq now so it's easy to assert metrics equality in tests

@jayshrivastava jayshrivastava force-pushed the js/mixed-message-streams branch from 30f44db to 0147b8d Compare September 24, 2025 14:37
Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! not much to say, this looks just right

where
S: Stream<Item = Result<FlightData, FlightError>> + Send + Unpin,
{
inner: S,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should instead be:

#[pin_project]
pub struct MetricsCollectingStream<S>
where
    S: Stream<Item = Result<FlightData, FlightError>> + Send + Unpin,
{
    #[pin]
    inner: S,

I don't know the details about when this is needed, but I usually see wrapping streams be implemented this way in Rust.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 Will look into it.

@jayshrivastava jayshrivastava force-pushed the js/mixed-message-streams branch 2 times, most recently from 34185c1 to 147579f Compare September 25, 2025 16:50
This change adds a new type called `MetricsCollectingStream`. It wraps a stream of `FlightData`
and collects any metrics that are passed in the `app_metadata`. This change also introduces
an `FlightAppMetadata` enum proto which can be used to define our app metadata protocol.
@jayshrivastava jayshrivastava force-pushed the js/mixed-message-streams branch from 147579f to 19e5222 Compare September 25, 2025 16:56
@jayshrivastava jayshrivastava merged commit 64920af into main Sep 25, 2025
4 checks passed
@jayshrivastava jayshrivastava deleted the js/mixed-message-streams branch September 25, 2025 17:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants