Skip to content

Commit 2c4b6b6

Browse files
Propogate parent spans for dataloader
1 parent 8f2e1f0 commit 2c4b6b6

File tree

1 file changed

+81
-33
lines changed
  • processed_data/src/graphql

1 file changed

+81
-33
lines changed

processed_data/src/graphql/mod.rs

Lines changed: 81 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use models::{
1818
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
1919
use std::collections::HashMap;
2020
use std::time::Duration;
21-
use tracing::instrument;
21+
use tracing::{instrument, Span};
2222
use url::Url;
2323

2424
use self::entities::AutoProcProgram;
@@ -31,7 +31,6 @@ pub trait AddDataLoadersExt {
3131
}
3232

3333
impl AddDataLoadersExt for async_graphql::Request {
34-
#[instrument(name = "add_data_loaders", skip(self))]
3534
fn add_data_loaders(self, database: DatabaseConnection) -> Self {
3635
self.data(DataLoader::new(
3736
ProcessedDataLoader::new(database.clone()),
@@ -86,88 +85,119 @@ pub fn root_schema_builder() -> SchemaBuilder<Query, EmptyMutation, EmptySubscri
8685
#[derive(Debug, Clone, Default)]
8786
pub struct Query;
8887

89-
pub struct ProcessedDataLoader(DatabaseConnection);
90-
pub struct ProcessingJobDataLoader(DatabaseConnection);
91-
pub struct ProcessingJobParameterDataLoader(DatabaseConnection);
92-
pub struct AutoProcIntegrationDataLoader(DatabaseConnection);
93-
pub struct AutoProcProgramDataLoader(DatabaseConnection);
94-
pub struct AutoProcDataLoader(DatabaseConnection);
95-
pub struct AutoProcScalingDataLoader(DatabaseConnection);
96-
pub struct AutoProcScalingOverall(DatabaseConnection);
97-
pub struct AutoProcScalingInnerShell(DatabaseConnection);
98-
pub struct AutoProcScalingOuterShell(DatabaseConnection);
88+
pub struct ProcessedDataLoader {
89+
database: DatabaseConnection,
90+
parent_span: Span,
91+
}
92+
pub struct ProcessingJobDataLoader {
93+
database: DatabaseConnection,
94+
parent_span: Span,
95+
}
96+
pub struct ProcessingJobParameterDataLoader {
97+
database: DatabaseConnection,
98+
parent_span: Span,
99+
}
100+
pub struct AutoProcIntegrationDataLoader {
101+
database: DatabaseConnection,
102+
parent_span: Span,
103+
}
104+
pub struct AutoProcProgramDataLoader {
105+
database: DatabaseConnection,
106+
parent_span: Span,
107+
}
108+
pub struct AutoProcDataLoader {
109+
database: DatabaseConnection,
110+
parent_span: Span,
111+
}
112+
pub struct AutoProcScalingDataLoader {
113+
database: DatabaseConnection,
114+
parent_span: Span,
115+
}
116+
pub struct AutoProcScalingOverall {
117+
database: DatabaseConnection,
118+
parent_span: Span,
119+
}
120+
pub struct AutoProcScalingInnerShell {
121+
database: DatabaseConnection,
122+
parent_span: Span,
123+
}
124+
pub struct AutoProcScalingOuterShell {
125+
database: DatabaseConnection,
126+
parent_span: Span,
127+
}
99128

100129
impl ProcessingJobDataLoader {
101130
fn new(database: DatabaseConnection) -> Self {
102-
Self(database)
131+
Self{database, parent_span: Span::current()}
103132
}
104133
}
105134

106135
impl ProcessedDataLoader {
107136
fn new(database: DatabaseConnection) -> Self {
108-
Self(database)
137+
Self{database, parent_span: Span::current()}
109138
}
110139
}
111140

112141
impl ProcessingJobParameterDataLoader {
113142
fn new(database: DatabaseConnection) -> Self {
114-
Self(database)
143+
Self{database, parent_span: Span::current()}
115144
}
116145
}
117146

118147
impl AutoProcIntegrationDataLoader {
119148
fn new(database: DatabaseConnection) -> Self {
120-
Self(database)
149+
Self{database, parent_span: Span::current()}
121150
}
122151
}
123152

124153
impl AutoProcProgramDataLoader {
125154
fn new(database: DatabaseConnection) -> Self {
126-
Self(database)
155+
Self{database, parent_span: Span::current()}
127156
}
128157
}
129158

130159
impl AutoProcDataLoader {
131160
fn new(database: DatabaseConnection) -> Self {
132-
Self(database)
161+
Self{database, parent_span: Span::current()}
133162
}
134163
}
135164

136165
impl AutoProcScalingDataLoader {
137166
fn new(database: DatabaseConnection) -> Self {
138-
Self(database)
167+
Self{database, parent_span: Span::current()}
139168
}
140169
}
141170

142171
impl AutoProcScalingOverall {
143172
fn new(database: DatabaseConnection) -> Self {
144-
Self(database)
173+
Self{database, parent_span: Span::current()}
145174
}
146175
}
147176

148177
impl AutoProcScalingInnerShell {
149178
fn new(database: DatabaseConnection) -> Self {
150-
Self(database)
179+
Self{database, parent_span: Span::current()}
151180
}
152181
}
153182

154183
impl AutoProcScalingOuterShell {
155184
fn new(database: DatabaseConnection) -> Self {
156-
Self(database)
185+
Self{database, parent_span: Span::current()}
157186
}
158187
}
159188

160189
impl Loader<u32> for ProcessedDataLoader {
161190
type Value = DataProcessing;
162191
type Error = async_graphql::Error;
163192

164-
#[instrument(name = "load_processed_data", skip(self))]
165193
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
194+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
195+
let _span = span.enter();
166196
let mut results = HashMap::new();
167197
let keys_vec: Vec<u32> = keys.to_vec();
168198
let records = data_collection_file_attachment::Entity::find()
169199
.filter(data_collection_file_attachment::Column::DataCollectionId.is_in(keys_vec))
170-
.all(&self.0)
200+
.all(&self.database)
171201
.await?;
172202

173203
for record in records {
@@ -187,11 +217,13 @@ impl Loader<u32> for ProcessingJobDataLoader {
187217

188218
#[instrument(name = "load_processing_job", skip(self))]
189219
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
220+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
221+
let _span = span.enter();
190222
let mut results = HashMap::new();
191223
let keys_vec: Vec<u32> = keys.to_vec();
192224
let records = processing_job::Entity::find()
193225
.filter(processing_job::Column::DataCollectionId.is_in(keys_vec))
194-
.all(&self.0)
226+
.all(&self.database)
195227
.await?;
196228

197229
for record in records {
@@ -213,11 +245,13 @@ impl Loader<u32> for ProcessingJobParameterDataLoader {
213245

214246
#[instrument(name = "load_processing_job_parameter", skip(self))]
215247
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
248+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
249+
let _span = span.enter();
216250
let mut results = HashMap::new();
217251
let keys_vec: Vec<u32> = keys.to_vec();
218252
let records = processing_job_parameter::Entity::find()
219253
.filter(processing_job_parameter::Column::ProcessingJobId.is_in(keys_vec))
220-
.all(&self.0)
254+
.all(&self.database)
221255
.await?;
222256

223257
for record in records {
@@ -239,11 +273,13 @@ impl Loader<u32> for AutoProcIntegrationDataLoader {
239273

240274
#[instrument(name = "load_auto_proc_integration", skip(self))]
241275
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
276+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
277+
let _span = span.enter();
242278
let mut results = HashMap::new();
243279
let keys_vec: Vec<u32> = keys.to_vec();
244280
let records = auto_proc_integration::Entity::find()
245281
.filter(auto_proc_integration::Column::DataCollectionId.is_in(keys_vec))
246-
.all(&self.0)
282+
.all(&self.database)
247283
.await?;
248284

249285
for record in records {
@@ -265,11 +301,13 @@ impl Loader<u32> for AutoProcProgramDataLoader {
265301

266302
#[instrument(name = "load_auto_proc_program", skip(self))]
267303
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
304+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
305+
let _span = span.enter();
268306
let mut results = HashMap::new();
269307
let keys_vec: Vec<u32> = keys.to_vec();
270308
let records = auto_proc_program::Entity::find()
271309
.filter(auto_proc_program::Column::AutoProcProgramId.is_in(keys_vec))
272-
.all(&self.0)
310+
.all(&self.database)
273311
.await?;
274312

275313
for record in records {
@@ -288,11 +326,13 @@ impl Loader<u32> for AutoProcDataLoader {
288326

289327
#[instrument(name = "load_auto_proc", skip(self))]
290328
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
329+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
330+
let _span = span.enter();
291331
let mut results = HashMap::new();
292332
let keys_vec: Vec<u32> = keys.to_vec();
293333
let records = auto_proc::Entity::find()
294334
.filter(auto_proc::Column::AutoProcProgramId.is_in(keys_vec))
295-
.all(&self.0)
335+
.all(&self.database)
296336
.await?;
297337

298338
for record in records {
@@ -311,11 +351,13 @@ impl Loader<u32> for AutoProcScalingDataLoader {
311351

312352
#[instrument(name = "load_auto_proc_scaling", skip(self))]
313353
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
354+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
355+
let _span = span.enter();
314356
let mut results = HashMap::new();
315357
let keys_vec: Vec<u32> = keys.to_vec();
316358
let records = auto_proc_scaling::Entity::find()
317359
.filter(auto_proc_scaling::Column::AutoProcId.is_in(keys_vec))
318-
.all(&self.0)
360+
.all(&self.database)
319361
.await?;
320362

321363
for record in records {
@@ -334,12 +376,14 @@ impl Loader<u32> for AutoProcScalingOverall {
334376

335377
#[instrument(name = "load_auto_proc_scaling_statics", skip(self))]
336378
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
379+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
380+
let _span = span.enter();
337381
let mut results = HashMap::new();
338382
let keys_vec: Vec<u32> = keys.to_vec();
339383
let records = auto_proc_scaling_statistics::Entity::find()
340384
.filter(auto_proc_scaling_statistics::Column::AutoProcScalingId.is_in(keys_vec))
341385
.filter(auto_proc_scaling_statistics::Column::ScalingStatisticsType.eq("overall"))
342-
.all(&self.0)
386+
.all(&self.database)
343387
.await?;
344388

345389
for record in records {
@@ -358,12 +402,14 @@ impl Loader<u32> for AutoProcScalingInnerShell {
358402

359403
#[instrument(name = "load_auto_proc_scaling_statics", skip(self))]
360404
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
405+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
406+
let _span = span.enter();
361407
let mut results = HashMap::new();
362408
let keys_vec: Vec<u32> = keys.to_vec();
363409
let records = auto_proc_scaling_statistics::Entity::find()
364410
.filter(auto_proc_scaling_statistics::Column::AutoProcScalingId.is_in(keys_vec))
365411
.filter(auto_proc_scaling_statistics::Column::ScalingStatisticsType.eq("innerShell"))
366-
.all(&self.0)
412+
.all(&self.database)
367413
.await?;
368414

369415
for record in records {
@@ -382,12 +428,14 @@ impl Loader<u32> for AutoProcScalingOuterShell {
382428

383429
#[instrument(name = "load_auto_proc_scaling_statics", skip(self))]
384430
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
431+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
432+
let _span = span.enter();
385433
let mut results = HashMap::new();
386434
let keys_vec: Vec<u32> = keys.to_vec();
387435
let records = auto_proc_scaling_statistics::Entity::find()
388436
.filter(auto_proc_scaling_statistics::Column::AutoProcScalingId.is_in(keys_vec))
389437
.filter(auto_proc_scaling_statistics::Column::ScalingStatisticsType.eq("outerShell"))
390-
.all(&self.0)
438+
.all(&self.database)
391439
.await?;
392440

393441
for record in records {

0 commit comments

Comments
 (0)