Skip to content

Commit e39250b

Browse files
ProcessingJob dataloader
1 parent eed8ea4 commit e39250b

File tree

4 files changed

+87
-40
lines changed

4 files changed

+87
-40
lines changed

Cargo.lock

Lines changed: 0 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/processed_data/charts/processed_data/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ type: application
55

66
version: 0.1.0
77

8-
appVersion: 0.1.0-rc5
8+
appVersion: 0.1.0-rc6

processed_data/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ anyhow = "1.0.81"
1010
async-graphql = { version = "7.0.2", default-features = false, features = [
1111
"chrono",
1212
"graphiql",
13-
"tracing",
1413
"dataloader",
1514
] }
1615
async-graphql-axum = { version = "7.0.2" }

processed_data/src/graphql/mod.rs

Lines changed: 86 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,15 @@ pub fn root_schema_builder(
3232
) -> SchemaBuilder<Query, EmptyMutation, EmptySubscription> {
3333
Schema::build(Query, EmptyMutation, EmptySubscription)
3434
.data(DataLoader::new(
35-
DataCollectionLoader::new(database.clone()),
35+
ProcessedDataLoader::new(database.clone()),
36+
tokio::spawn,
37+
))
38+
.data(DataLoader::new(
39+
ProcessingJobDataLoader::new(database.clone()),
40+
tokio::spawn,
41+
))
42+
.data(DataLoader::new(
43+
ProcessingJobParameterDataLoader::new(database.clone()),
3644
tokio::spawn,
3745
))
3846
.data(database)
@@ -43,18 +51,33 @@ pub fn root_schema_builder(
4351
#[derive(Debug, Clone, Default)]
4452
pub struct Query;
4553

46-
pub struct DataCollectionLoader(DatabaseConnection);
54+
pub struct ProcessedDataLoader(DatabaseConnection);
55+
pub struct ProcessingJobDataLoader(DatabaseConnection);
56+
pub struct ProcessingJobParameterDataLoader(DatabaseConnection);
4757

48-
impl DataCollectionLoader {
58+
impl ProcessingJobDataLoader {
4959
fn new(database: DatabaseConnection) -> Self {
5060
Self(database)
5161
}
5262
}
5363

54-
impl Loader<u32> for DataCollectionLoader {
64+
impl ProcessedDataLoader {
65+
fn new(database: DatabaseConnection) -> Self {
66+
Self(database)
67+
}
68+
}
69+
70+
impl ProcessingJobParameterDataLoader {
71+
fn new(database: DatabaseConnection) -> Self {
72+
Self(database)
73+
}
74+
}
75+
76+
impl Loader<u32> for ProcessedDataLoader {
5577
type Value = DataProcessing;
5678
type Error = async_graphql::Error;
5779

80+
#[instrument(name = "load_processed_data", skip(self))]
5881
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
5982
let mut results = HashMap::new();
6083
let keys_vec: Vec<u32> = keys.iter().cloned().collect();
@@ -74,30 +97,76 @@ impl Loader<u32> for DataCollectionLoader {
7497
}
7598
}
7699

100+
impl Loader<u32> for ProcessingJobDataLoader {
101+
type Value = Vec<ProcessingJob>;
102+
type Error = async_graphql::Error;
103+
104+
#[instrument(name = "load_processing_job", skip(self))]
105+
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
106+
let mut results = HashMap::new();
107+
let keys_vec: Vec<u32> = keys.iter().cloned().collect();
108+
let records = processing_job::Entity::find()
109+
.filter(processing_job::Column::DataCollectionId.is_in(keys_vec))
110+
.all(&self.0)
111+
.await?;
112+
113+
for record in records {
114+
let data_collection_id = record.data_collection_id.unwrap();
115+
let data = ProcessingJob::from(record);
116+
117+
results
118+
.entry(data_collection_id)
119+
.or_insert_with(Vec::new)
120+
.push(data)
121+
}
122+
Ok(results)
123+
}
124+
}
125+
126+
impl Loader<u32> for ProcessingJobParameterDataLoader {
127+
type Value = Vec<ProcessingJobParameter>;
128+
type Error = async_graphql::Error;
129+
130+
#[instrument(name = "load_processing_job_parameter", skip(self))]
131+
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
132+
let mut results = HashMap::new();
133+
let keys_vec: Vec<u32> = keys.iter().cloned().collect();
134+
let records = processing_job_parameter::Entity::find()
135+
.filter(processing_job_parameter::Column::ProcessingJobId.is_in(keys_vec))
136+
.all(&self.0)
137+
.await?;
138+
139+
for record in records {
140+
let processing_job_id = record.processing_job_id.unwrap();
141+
let data = ProcessingJobParameter::from(record);
142+
results
143+
.entry(processing_job_id)
144+
.or_insert_with(Vec::new)
145+
.push(data)
146+
}
147+
148+
Ok(results)
149+
}
150+
}
151+
77152
#[ComplexObject]
78153
impl DataCollection {
79154
/// Fetched all the processed data from data collection during a session
80155
async fn processed_data(
81156
&self,
82157
ctx: &Context<'_>,
83158
) -> Result<Option<DataProcessing>, async_graphql::Error> {
84-
let loader = ctx.data_unchecked::<DataLoader<DataCollectionLoader>>();
159+
let loader = ctx.data_unchecked::<DataLoader<ProcessedDataLoader>>();
85160
Ok(loader.load_one(self.id).await?)
86161
}
87162

88163
/// Fetched all the processing jobs
89164
async fn processing_jobs(
90165
&self,
91166
ctx: &Context<'_>,
92-
) -> async_graphql::Result<Vec<ProcessingJob>, async_graphql::Error> {
93-
let database = ctx.data::<DatabaseConnection>()?;
94-
Ok(processing_job::Entity::find()
95-
.filter(processing_job::Column::DataCollectionId.eq(self.id))
96-
.all(database)
97-
.await?
98-
.into_iter()
99-
.map(ProcessingJob::from)
100-
.collect())
167+
) -> async_graphql::Result<Option<Vec<ProcessingJob>>, async_graphql::Error> {
168+
let loader = ctx.data_unchecked::<DataLoader<ProcessingJobDataLoader>>();
169+
Ok(loader.load_one(self.id).await?)
101170
}
102171

103172
/// Fetches all the automatic process
@@ -141,15 +210,9 @@ impl ProcessingJob {
141210
async fn parameters(
142211
&self,
143212
ctx: &Context<'_>,
144-
) -> async_graphql::Result<Vec<ProcessingJobParameter>> {
145-
let database = ctx.data::<DatabaseConnection>()?;
146-
Ok(processing_job_parameter::Entity::find()
147-
.filter(processing_job_parameter::Column::ProcessingJobId.eq(self.processing_job_id))
148-
.all(database)
149-
.await?
150-
.into_iter()
151-
.map(ProcessingJobParameter::from)
152-
.collect())
213+
) -> async_graphql::Result<Option<Vec<ProcessingJobParameter>>> {
214+
let loader = ctx.data_unchecked::<DataLoader<ProcessingJobParameterDataLoader>>();
215+
Ok(loader.load_one(self.processing_job_id).await?)
153216
}
154217
}
155218

0 commit comments

Comments
 (0)