Skip to content

Commit eed8ea4

Browse files
Dataloader testing
1 parent 295a3ea commit eed8ea4

File tree

5 files changed

+72
-14
lines changed

5 files changed

+72
-14
lines changed

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/processed_data/charts/processed_data/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ type: application
55

66
version: 0.1.0
77

8-
appVersion: 0.1.0-rc4
8+
appVersion: 0.1.0-rc5

processed_data/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ async-graphql = { version = "7.0.2", default-features = false, features = [
1111
"chrono",
1212
"graphiql",
1313
"tracing",
14+
"dataloader",
1415
] }
1516
async-graphql-axum = { version = "7.0.2" }
1617
aws-credential-types = { version = "0.56.0" }

processed_data/src/graphql/mod.rs

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
mod entities;
33
use crate::S3Bucket;
44
use async_graphql::{
5+
dataloader::{DataLoader, Loader},
56
ComplexObject, Context, EmptyMutation, EmptySubscription, Object, Schema, SchemaBuilder,
67
};
78
use aws_sdk_s3::presigning::PresigningConfig;
@@ -15,7 +16,9 @@ use models::{
1516
processing_job_parameter,
1617
};
1718
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
19+
use std::collections::HashMap;
1820
use std::time::Duration;
21+
use tracing::instrument;
1922
use url::Url;
2023

2124
use self::entities::AutoProcProgram;
@@ -24,29 +27,62 @@ use self::entities::AutoProcProgram;
2427
pub type RootSchema = Schema<Query, EmptyMutation, EmptySubscription>;
2528

2629
/// A schema builder for the service
27-
pub fn root_schema_builder() -> SchemaBuilder<Query, EmptyMutation, EmptySubscription> {
28-
Schema::build(Query, EmptyMutation, EmptySubscription).enable_federation()
30+
pub fn root_schema_builder(
31+
database: DatabaseConnection,
32+
) -> SchemaBuilder<Query, EmptyMutation, EmptySubscription> {
33+
Schema::build(Query, EmptyMutation, EmptySubscription)
34+
.data(DataLoader::new(
35+
DataCollectionLoader::new(database.clone()),
36+
tokio::spawn,
37+
))
38+
.data(database)
39+
.enable_federation()
2940
}
3041

3142
/// The root query of the service
3243
#[derive(Debug, Clone, Default)]
3344
pub struct Query;
3445

46+
pub struct DataCollectionLoader(DatabaseConnection);
47+
48+
impl DataCollectionLoader {
49+
fn new(database: DatabaseConnection) -> Self {
50+
Self(database)
51+
}
52+
}
53+
54+
impl Loader<u32> for DataCollectionLoader {
55+
type Value = DataProcessing;
56+
type Error = async_graphql::Error;
57+
58+
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
59+
let mut results = HashMap::new();
60+
let keys_vec: Vec<u32> = keys.iter().cloned().collect();
61+
let records = data_collection_file_attachment::Entity::find()
62+
.filter(data_collection_file_attachment::Column::DataCollectionId.is_in(keys_vec))
63+
.all(&self.0)
64+
.await?;
65+
66+
for record in records {
67+
let data_collection_id = record.data_collection_id;
68+
let data = DataProcessing::from(record);
69+
70+
results.insert(data_collection_id, data);
71+
}
72+
73+
Ok(results)
74+
}
75+
}
76+
3577
#[ComplexObject]
3678
impl DataCollection {
3779
/// Fetched all the processed data from data collection during a session
3880
async fn processed_data(
3981
&self,
4082
ctx: &Context<'_>,
41-
) -> Result<Vec<DataProcessing>, async_graphql::Error> {
42-
let database = ctx.data::<DatabaseConnection>()?;
43-
Ok(data_collection_file_attachment::Entity::find()
44-
.filter(data_collection_file_attachment::Column::DataCollectionId.eq(self.id))
45-
.all(database)
46-
.await?
47-
.into_iter()
48-
.map(DataProcessing::from)
49-
.collect())
83+
) -> Result<Option<DataProcessing>, async_graphql::Error> {
84+
let loader = ctx.data_unchecked::<DataLoader<DataCollectionLoader>>();
85+
Ok(loader.load_one(self.id).await?)
5086
}
5187

5288
/// Fetched all the processing jobs

processed_data/src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ struct SchemaArgs {
126126
/// The path to write the schema to, if not set the schema will be printed to stdout
127127
#[arg(short, long)]
128128
path: Option<PathBuf>,
129+
#[arg(long, env = "DATABASE_URL")]
130+
database_url: Url,
129131
}
130132

131133
/// Creates a connection pool to access the database
@@ -239,12 +241,13 @@ async fn main() {
239241
Cli::Serve(args) => {
240242
setup_telemetry(args.log_level, args.otel_collector_url).unwrap();
241243
let database = setup_database(args.database_url).await.unwrap();
242-
let schema = root_schema_builder().data(database).finish();
244+
let schema = root_schema_builder(database).finish();
243245
let router = setup_router(schema);
244246
serve(router, args.port).await.unwrap();
245247
}
246248
Cli::Schema(args) => {
247-
let schema = root_schema_builder().finish();
249+
let database = setup_database(args.database_url).await.unwrap();
250+
let schema = root_schema_builder(database).finish();
248251
let schema_string = schema.sdl_with_options(SDLExportOptions::new().federation());
249252
if let Some(path) = args.path {
250253
let mut file = File::create(path).unwrap();

0 commit comments

Comments
 (0)