Skip to content

Commit a822fa8

Browse files
Propogate parent spans for dataloader
1 parent 8f2e1f0 commit a822fa8

File tree

2 files changed

+112
-34
lines changed
  • charts/processed_data/charts/processed_data
  • processed_data/src/graphql

2 files changed

+112
-34
lines changed

charts/processed_data/charts/processed_data/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ type: application
55

66
version: 0.1.0
77

8-
appVersion: 0.1.0-rc9
8+
appVersion: 0.1.0-rc10

processed_data/src/graphql/mod.rs

Lines changed: 111 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use models::{
1818
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
1919
use std::collections::HashMap;
2020
use std::time::Duration;
21-
use tracing::instrument;
21+
use tracing::{instrument, Span};
2222
use url::Url;
2323

2424
use self::entities::AutoProcProgram;
@@ -31,7 +31,6 @@ pub trait AddDataLoadersExt {
3131
}
3232

3333
impl AddDataLoadersExt for async_graphql::Request {
34-
#[instrument(name = "add_data_loaders", skip(self))]
3534
fn add_data_loaders(self, database: DatabaseConnection) -> Self {
3635
self.data(DataLoader::new(
3736
ProcessedDataLoader::new(database.clone()),
@@ -86,88 +85,149 @@ pub fn root_schema_builder() -> SchemaBuilder<Query, EmptyMutation, EmptySubscri
8685
#[derive(Debug, Clone, Default)]
8786
pub struct Query;
8887

89-
pub struct ProcessedDataLoader(DatabaseConnection);
90-
pub struct ProcessingJobDataLoader(DatabaseConnection);
91-
pub struct ProcessingJobParameterDataLoader(DatabaseConnection);
92-
pub struct AutoProcIntegrationDataLoader(DatabaseConnection);
93-
pub struct AutoProcProgramDataLoader(DatabaseConnection);
94-
pub struct AutoProcDataLoader(DatabaseConnection);
95-
pub struct AutoProcScalingDataLoader(DatabaseConnection);
96-
pub struct AutoProcScalingOverall(DatabaseConnection);
97-
pub struct AutoProcScalingInnerShell(DatabaseConnection);
98-
pub struct AutoProcScalingOuterShell(DatabaseConnection);
88+
pub struct ProcessedDataLoader {
89+
database: DatabaseConnection,
90+
parent_span: Span,
91+
}
92+
pub struct ProcessingJobDataLoader {
93+
database: DatabaseConnection,
94+
parent_span: Span,
95+
}
96+
pub struct ProcessingJobParameterDataLoader {
97+
database: DatabaseConnection,
98+
parent_span: Span,
99+
}
100+
pub struct AutoProcIntegrationDataLoader {
101+
database: DatabaseConnection,
102+
parent_span: Span,
103+
}
104+
pub struct AutoProcProgramDataLoader {
105+
database: DatabaseConnection,
106+
parent_span: Span,
107+
}
108+
pub struct AutoProcDataLoader {
109+
database: DatabaseConnection,
110+
parent_span: Span,
111+
}
112+
pub struct AutoProcScalingDataLoader {
113+
database: DatabaseConnection,
114+
parent_span: Span,
115+
}
116+
pub struct AutoProcScalingOverall {
117+
database: DatabaseConnection,
118+
parent_span: Span,
119+
}
120+
pub struct AutoProcScalingInnerShell {
121+
database: DatabaseConnection,
122+
parent_span: Span,
123+
}
124+
pub struct AutoProcScalingOuterShell {
125+
database: DatabaseConnection,
126+
parent_span: Span,
127+
}
99128

100129
impl ProcessingJobDataLoader {
101130
fn new(database: DatabaseConnection) -> Self {
102-
Self(database)
131+
Self {
132+
database,
133+
parent_span: Span::current(),
134+
}
103135
}
104136
}
105137

106138
impl ProcessedDataLoader {
107139
fn new(database: DatabaseConnection) -> Self {
108-
Self(database)
140+
Self {
141+
database,
142+
parent_span: Span::current(),
143+
}
109144
}
110145
}
111146

112147
impl ProcessingJobParameterDataLoader {
113148
fn new(database: DatabaseConnection) -> Self {
114-
Self(database)
149+
Self {
150+
database,
151+
parent_span: Span::current(),
152+
}
115153
}
116154
}
117155

118156
impl AutoProcIntegrationDataLoader {
119157
fn new(database: DatabaseConnection) -> Self {
120-
Self(database)
158+
Self {
159+
database,
160+
parent_span: Span::current(),
161+
}
121162
}
122163
}
123164

124165
impl AutoProcProgramDataLoader {
125166
fn new(database: DatabaseConnection) -> Self {
126-
Self(database)
167+
Self {
168+
database,
169+
parent_span: Span::current(),
170+
}
127171
}
128172
}
129173

130174
impl AutoProcDataLoader {
131175
fn new(database: DatabaseConnection) -> Self {
132-
Self(database)
176+
Self {
177+
database,
178+
parent_span: Span::current(),
179+
}
133180
}
134181
}
135182

136183
impl AutoProcScalingDataLoader {
137184
fn new(database: DatabaseConnection) -> Self {
138-
Self(database)
185+
Self {
186+
database,
187+
parent_span: Span::current(),
188+
}
139189
}
140190
}
141191

142192
impl AutoProcScalingOverall {
143193
fn new(database: DatabaseConnection) -> Self {
144-
Self(database)
194+
Self {
195+
database,
196+
parent_span: Span::current(),
197+
}
145198
}
146199
}
147200

148201
impl AutoProcScalingInnerShell {
149202
fn new(database: DatabaseConnection) -> Self {
150-
Self(database)
203+
Self {
204+
database,
205+
parent_span: Span::current(),
206+
}
151207
}
152208
}
153209

154210
impl AutoProcScalingOuterShell {
155211
fn new(database: DatabaseConnection) -> Self {
156-
Self(database)
212+
Self {
213+
database,
214+
parent_span: Span::current(),
215+
}
157216
}
158217
}
159218

160219
impl Loader<u32> for ProcessedDataLoader {
161220
type Value = DataProcessing;
162221
type Error = async_graphql::Error;
163222

164-
#[instrument(name = "load_processed_data", skip(self))]
165223
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
224+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
225+
let _span = span.enter();
166226
let mut results = HashMap::new();
167227
let keys_vec: Vec<u32> = keys.to_vec();
168228
let records = data_collection_file_attachment::Entity::find()
169229
.filter(data_collection_file_attachment::Column::DataCollectionId.is_in(keys_vec))
170-
.all(&self.0)
230+
.all(&self.database)
171231
.await?;
172232

173233
for record in records {
@@ -187,11 +247,13 @@ impl Loader<u32> for ProcessingJobDataLoader {
187247

188248
#[instrument(name = "load_processing_job", skip(self))]
189249
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
250+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
251+
let _span = span.enter();
190252
let mut results = HashMap::new();
191253
let keys_vec: Vec<u32> = keys.to_vec();
192254
let records = processing_job::Entity::find()
193255
.filter(processing_job::Column::DataCollectionId.is_in(keys_vec))
194-
.all(&self.0)
256+
.all(&self.database)
195257
.await?;
196258

197259
for record in records {
@@ -213,11 +275,13 @@ impl Loader<u32> for ProcessingJobParameterDataLoader {
213275

214276
#[instrument(name = "load_processing_job_parameter", skip(self))]
215277
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
278+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
279+
let _span = span.enter();
216280
let mut results = HashMap::new();
217281
let keys_vec: Vec<u32> = keys.to_vec();
218282
let records = processing_job_parameter::Entity::find()
219283
.filter(processing_job_parameter::Column::ProcessingJobId.is_in(keys_vec))
220-
.all(&self.0)
284+
.all(&self.database)
221285
.await?;
222286

223287
for record in records {
@@ -239,11 +303,13 @@ impl Loader<u32> for AutoProcIntegrationDataLoader {
239303

240304
#[instrument(name = "load_auto_proc_integration", skip(self))]
241305
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
306+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
307+
let _span = span.enter();
242308
let mut results = HashMap::new();
243309
let keys_vec: Vec<u32> = keys.to_vec();
244310
let records = auto_proc_integration::Entity::find()
245311
.filter(auto_proc_integration::Column::DataCollectionId.is_in(keys_vec))
246-
.all(&self.0)
312+
.all(&self.database)
247313
.await?;
248314

249315
for record in records {
@@ -265,11 +331,13 @@ impl Loader<u32> for AutoProcProgramDataLoader {
265331

266332
#[instrument(name = "load_auto_proc_program", skip(self))]
267333
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
334+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
335+
let _span = span.enter();
268336
let mut results = HashMap::new();
269337
let keys_vec: Vec<u32> = keys.to_vec();
270338
let records = auto_proc_program::Entity::find()
271339
.filter(auto_proc_program::Column::AutoProcProgramId.is_in(keys_vec))
272-
.all(&self.0)
340+
.all(&self.database)
273341
.await?;
274342

275343
for record in records {
@@ -288,11 +356,13 @@ impl Loader<u32> for AutoProcDataLoader {
288356

289357
#[instrument(name = "load_auto_proc", skip(self))]
290358
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
359+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
360+
let _span = span.enter();
291361
let mut results = HashMap::new();
292362
let keys_vec: Vec<u32> = keys.to_vec();
293363
let records = auto_proc::Entity::find()
294364
.filter(auto_proc::Column::AutoProcProgramId.is_in(keys_vec))
295-
.all(&self.0)
365+
.all(&self.database)
296366
.await?;
297367

298368
for record in records {
@@ -311,11 +381,13 @@ impl Loader<u32> for AutoProcScalingDataLoader {
311381

312382
#[instrument(name = "load_auto_proc_scaling", skip(self))]
313383
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
384+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
385+
let _span = span.enter();
314386
let mut results = HashMap::new();
315387
let keys_vec: Vec<u32> = keys.to_vec();
316388
let records = auto_proc_scaling::Entity::find()
317389
.filter(auto_proc_scaling::Column::AutoProcId.is_in(keys_vec))
318-
.all(&self.0)
390+
.all(&self.database)
319391
.await?;
320392

321393
for record in records {
@@ -334,12 +406,14 @@ impl Loader<u32> for AutoProcScalingOverall {
334406

335407
#[instrument(name = "load_auto_proc_scaling_statics", skip(self))]
336408
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
409+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
410+
let _span = span.enter();
337411
let mut results = HashMap::new();
338412
let keys_vec: Vec<u32> = keys.to_vec();
339413
let records = auto_proc_scaling_statistics::Entity::find()
340414
.filter(auto_proc_scaling_statistics::Column::AutoProcScalingId.is_in(keys_vec))
341415
.filter(auto_proc_scaling_statistics::Column::ScalingStatisticsType.eq("overall"))
342-
.all(&self.0)
416+
.all(&self.database)
343417
.await?;
344418

345419
for record in records {
@@ -358,12 +432,14 @@ impl Loader<u32> for AutoProcScalingInnerShell {
358432

359433
#[instrument(name = "load_auto_proc_scaling_statics", skip(self))]
360434
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
435+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
436+
let _span = span.enter();
361437
let mut results = HashMap::new();
362438
let keys_vec: Vec<u32> = keys.to_vec();
363439
let records = auto_proc_scaling_statistics::Entity::find()
364440
.filter(auto_proc_scaling_statistics::Column::AutoProcScalingId.is_in(keys_vec))
365441
.filter(auto_proc_scaling_statistics::Column::ScalingStatisticsType.eq("innerShell"))
366-
.all(&self.0)
442+
.all(&self.database)
367443
.await?;
368444

369445
for record in records {
@@ -382,12 +458,14 @@ impl Loader<u32> for AutoProcScalingOuterShell {
382458

383459
#[instrument(name = "load_auto_proc_scaling_statics", skip(self))]
384460
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
461+
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
462+
let _span = span.enter();
385463
let mut results = HashMap::new();
386464
let keys_vec: Vec<u32> = keys.to_vec();
387465
let records = auto_proc_scaling_statistics::Entity::find()
388466
.filter(auto_proc_scaling_statistics::Column::AutoProcScalingId.is_in(keys_vec))
389467
.filter(auto_proc_scaling_statistics::Column::ScalingStatisticsType.eq("outerShell"))
390-
.all(&self.0)
468+
.all(&self.database)
391469
.await?;
392470

393471
for record in records {

0 commit comments

Comments
 (0)