Skip to content

Commit dcf7ed1

Browse files
protobuf: move StageKey to protobuf module
The StageKey will be used by various modules due to metrics collection for EXPLAIN ANALYZE. It will be more organized to import from the protobuf module than flight_service, which is its old module.
1 parent acd974b commit dcf7ed1

File tree

6 files changed

+31
-20
lines changed

6 files changed

+31
-20
lines changed

src/execution_plans/arrow_flight_read.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
33
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
44
use crate::execution_plans::StageExec;
5-
use crate::flight_service::{DoGet, StageKey};
6-
use crate::protobuf::{proto_from_stage, DistributedCodec};
5+
use crate::flight_service::DoGet;
6+
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
77
use crate::ChannelResolver;
88
use arrow_flight::decode::FlightRecordBatchStream;
99
use arrow_flight::error::FlightError;

src/flight_service/do_get.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
use super::service::StageKey;
21
use crate::config_extension_ext::ContextGrpcMetadata;
32
use crate::errors::datafusion_error_to_tonic_status;
43
use crate::execution_plans::{PartitionGroup, StageExec};
54
use crate::flight_service::service::ArrowFlightEndpoint;
65
use crate::flight_service::session_builder::DistributedSessionBuilderContext;
7-
use crate::protobuf::{stage_from_proto, DistributedCodec, StageExecProto};
6+
use crate::protobuf::{stage_from_proto, DistributedCodec, StageExecProto, StageKey};
87
use arrow_flight::encode::FlightDataEncoderBuilder;
98
use arrow_flight::error::FlightError;
109
use arrow_flight::flight_service_server::FlightService;

src/flight_service/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ mod session_builder;
44

55
pub(crate) use do_get::DoGet;
66

7-
pub use service::{ArrowFlightEndpoint, StageKey};
7+
pub use service::ArrowFlightEndpoint;
88
pub use session_builder::{
99
DefaultSessionBuilder, DistributedSessionBuilder, DistributedSessionBuilderContext,
1010
MappedDistributedSessionBuilder, MappedDistributedSessionBuilderExt,

src/flight_service/service.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::common::ttl_map::{TTLMap, TTLMapConfig};
22
use crate::flight_service::do_get::TaskData;
33
use crate::flight_service::DistributedSessionBuilder;
4+
use crate::protobuf::StageKey;
45
use arrow_flight::flight_service_server::FlightService;
56
use arrow_flight::{
67
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
@@ -14,20 +15,6 @@ use std::sync::Arc;
1415
use tokio::sync::OnceCell;
1516
use tonic::{Request, Response, Status, Streaming};
1617

17-
/// A key that uniquely identifies a stage in a query
18-
#[derive(Clone, Hash, Eq, PartialEq, ::prost::Message)]
19-
pub struct StageKey {
20-
/// Our query id
21-
#[prost(string, tag = "1")]
22-
pub query_id: String,
23-
/// Our stage id
24-
#[prost(uint64, tag = "2")]
25-
pub stage_id: u64,
26-
/// The task number within the stage
27-
#[prost(uint64, tag = "3")]
28-
pub task_number: u64,
29-
}
30-
3118
pub struct ArrowFlightEndpoint {
3219
pub(super) runtime: Arc<RuntimeEnv>,
3320
pub(super) task_data_entries: TTLMap<StageKey, Arc<OnceCell<TaskData>>>,

src/protobuf/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@ mod stage_proto;
33
mod user_codec;
44

55
pub(crate) use distributed_codec::DistributedCodec;
6-
pub(crate) use stage_proto::{proto_from_stage, stage_from_proto, StageExecProto};
6+
pub(crate) use stage_proto::{proto_from_stage, stage_from_proto, StageExecProto, StageKey};
77
pub(crate) use user_codec::{get_distributed_user_codec, set_distributed_user_codec};

src/protobuf/stage_proto.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,34 @@ use datafusion_proto::{
99
physical_plan::{AsExecutionPlan, PhysicalExtensionCodec},
1010
protobuf::PhysicalPlanNode,
1111
};
12+
use std::fmt::Display;
1213
use std::sync::Arc;
1314
use url::Url;
1415

16+
/// A key that uniquely identifies a stage in a query
17+
#[derive(Clone, Hash, Eq, PartialEq, ::prost::Message)]
18+
pub struct StageKey {
19+
/// Our query id
20+
#[prost(string, tag = "1")]
21+
pub query_id: String,
22+
/// Our stage id
23+
#[prost(uint64, tag = "2")]
24+
pub stage_id: u64,
25+
/// The task number within the stage
26+
#[prost(uint64, tag = "3")]
27+
pub task_number: u64,
28+
}
29+
30+
impl Display for StageKey {
31+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32+
write!(
33+
f,
34+
"StageKey_QueryID_{}_StageID_{}_TaskNumber_{}",
35+
self.query_id, self.stage_id, self.task_number
36+
)
37+
}
38+
}
39+
1540
#[derive(Clone, PartialEq, ::prost::Message)]
1641
pub struct StageExecProto {
1742
/// Our query id

0 commit comments

Comments
 (0)