@@ -4,207 +4,71 @@ use std::time::Instant;
44
55use async_trait:: async_trait;
66
7- use graph:: data:: subgraph:: schema:: * ;
8- use graph:: data:: subgraph:: UnresolvedDataSource ;
9- use graph:: prelude:: { DataSourceLoader as DataSourceLoaderTrait , GraphQlRunner , * } ;
10- use graph_graphql:: graphql_parser:: { parse_query, query as q} ;
7+ use graph:: components:: store:: StoredDynamicDataSource ;
8+ use graph:: prelude:: { DataSourceLoader as DataSourceLoaderTrait , * } ;
119
12- pub struct DataSourceLoader < L , Q , S > {
10+ pub struct DataSourceLoader < S > {
1311 store : Arc < S > ,
14- link_resolver : Arc < L > ,
15- graphql_runner : Arc < Q > ,
1612}
1713
18- impl < L , Q , S > DataSourceLoader < L , Q , S >
14+ impl < S > DataSourceLoader < S >
1915where
20- L : LinkResolver ,
2116 S : Store + SubgraphDeploymentStore ,
22- Q : GraphQlRunner ,
2317{
24- pub fn new ( store : Arc < S > , link_resolver : Arc < L > , graphql_runner : Arc < Q > ) -> Self {
25- Self {
26- store,
27- link_resolver,
28- graphql_runner,
29- }
30- }
31-
32- fn dynamic_data_sources_query (
33- & self ,
34- deployment : & SubgraphDeploymentId ,
35- skip : i32 ,
36- ) -> Result < Query , Error > {
37- // Obtain the "subgraphs" schema
38- let schema = self . store . api_schema ( & SUBGRAPHS_ID ) ?;
39-
40- // Construct a query for the subgraph deployment and all its
41- // dynamic data sources
42- //
43- // See also: ed42d219c6704a4aab57ce1ea66698e7.
44- // Note: This query needs to be in sync with the metadata schema.
45- let document = parse_query (
46- r#"
47- query deployment($id: ID!, $skip: Int!) {
48- subgraphDeployment(id: $id) {
49- dynamicDataSources(orderBy: id, skip: $skip) {
50- kind
51- network
52- name
53- context
54- source { address abi }
55- mapping {
56- kind
57- apiVersion
58- language
59- file
60- entities
61- abis { name file }
62- blockHandlers { handler filter }
63- callHandlers { function handler }
64- eventHandlers { event handler topic0 }
65- }
66- templates {
67- kind
68- network
69- name
70- source { abi }
71- mapping {
72- kind
73- apiVersion
74- language
75- file
76- entities
77- abis { name file }
78- blockHandlers { handler filter }
79- callHandlers { function handler }
80- eventHandlers { event handler topic0 }
81- }
82- }
83- }
84- }
85- }
86- "# ,
87- )
88- . expect ( "invalid query for dynamic data sources" ) ;
89- let variables = Some ( QueryVariables :: new ( HashMap :: from_iter (
90- vec ! [
91- ( String :: from( "id" ) , q:: Value :: String ( deployment. to_string( ) ) ) ,
92- ( String :: from( "skip" ) , q:: Value :: Int ( skip. into( ) ) ) ,
93- ]
94- . into_iter ( ) ,
95- ) ) ) ;
96-
97- Ok ( Query :: new ( schema, document, variables, None ) )
98- }
99-
100- fn parse_data_sources (
101- & self ,
102- deployment_id : & SubgraphDeploymentId ,
103- query_result : q:: Value ,
104- ) -> Result < Vec < UnresolvedDataSource > , Error > {
105- let data = match query_result {
106- q:: Value :: Object ( obj) => Ok ( obj) ,
107- _ => Err ( format_err ! (
108- "Query result for deployment `{}` is not an on object" ,
109- deployment_id,
110- ) ) ,
111- } ?;
112-
113- // Extract the deployment from the query result
114- let deployment = match data. get ( "subgraphDeployment" ) {
115- Some ( q:: Value :: Object ( obj) ) => Ok ( obj) ,
116- _ => Err ( format_err ! (
117- "Deployment `{}` is not an object" ,
118- deployment_id,
119- ) ) ,
120- } ?;
121-
122- // Extract the dynamic data sources from the query result
123- let values = match deployment. get ( "dynamicDataSources" ) {
124- Some ( q:: Value :: List ( objs) ) => {
125- if objs. iter ( ) . all ( |obj| match obj {
126- q:: Value :: Object ( _) => true ,
127- _ => false ,
128- } ) {
129- Ok ( objs)
130- } else {
131- Err ( format_err ! (
132- "Not all dynamic data sources of deployment `{}` are objects" ,
133- deployment_id
134- ) )
135- }
136- }
137- _ => Err ( format_err ! (
138- "Dynamic data sources of deployment `{}` are not a list" ,
139- deployment_id
140- ) ) ,
141- } ?;
142-
143- // Parse the raw data sources into typed entities
144- let entities = values. iter ( ) . try_fold ( vec ! [ ] , |mut entities, value| {
145- entities. push ( UnresolvedDataSource :: try_from_value ( value) ?) ;
146- Ok ( entities)
147- } ) ;
148-
149- entities. map_err ( |e : Error | {
150- format_err ! (
151- "Failed to parse dynamic data source entities of deployment `{}`: {}" ,
152- deployment_id,
153- e
154- )
155- } )
156- }
157-
158- async fn resolve_data_sources (
159- & self ,
160- unresolved_data_sources : Vec < UnresolvedDataSource > ,
161- logger : & Logger ,
162- ) -> Result < Vec < DataSource > , Error > {
163- // Resolve the data sources and return them
164- let mut result = Vec :: new ( ) ;
165- for item in unresolved_data_sources. into_iter ( ) {
166- let resolved = item. resolve ( & * self . link_resolver , logger) . await ?;
167- result. push ( resolved) ;
168- }
169- Ok ( result)
18+ pub fn new ( store : Arc < S > ) -> Self {
19+ Self { store }
17020 }
17121}
17222
17323#[ async_trait]
174- impl < L , Q , S > DataSourceLoaderTrait for DataSourceLoader < L , Q , S >
24+ impl < S > DataSourceLoaderTrait for DataSourceLoader < S >
17525where
176- L : LinkResolver ,
177- Q : GraphQlRunner ,
17826 S : Store + SubgraphDeploymentStore ,
17927{
18028 async fn load_dynamic_data_sources (
18129 & self ,
18230 deployment_id : SubgraphDeploymentId ,
18331 logger : Logger ,
32+ manifest : SubgraphManifest ,
18433 ) -> Result < Vec < DataSource > , Error > {
18534 let start_time = Instant :: now ( ) ;
18635
36+ let template_map: HashMap < & str , & DataSourceTemplate > = HashMap :: from_iter (
37+ manifest
38+ . templates
39+ . iter ( )
40+ . map ( |template| ( template. name . as_str ( ) , template) ) ,
41+ ) ;
18742 let mut data_sources = vec ! [ ] ;
18843
189- loop {
190- let skip = data_sources. len ( ) as i32 ;
191- let query = self . dynamic_data_sources_query ( & deployment_id, skip) ?;
192- let query_result = self
193- . graphql_runner
194- . cheap_clone ( )
195- . query_metadata ( query)
196- . await ?;
197- let unresolved_data_sources =
198- self . parse_data_sources ( & deployment_id, query_result. as_ref ( ) . clone ( ) ) ?;
199- let next_data_sources = self
200- . resolve_data_sources ( unresolved_data_sources, & logger)
201- . await ?;
202-
203- if next_data_sources. is_empty ( ) {
204- break ;
205- }
206-
207- data_sources. extend ( next_data_sources) ;
44+ for stored in self . store . load_dynamic_data_sources ( & deployment_id) ? {
45+ let StoredDynamicDataSource {
46+ name,
47+ source,
48+ context,
49+ } = stored;
50+
51+ let template = template_map. get ( name. as_str ( ) ) . ok_or_else ( || {
52+ format_err ! (
53+ "deployment `{}` does not have a template called `{}`" ,
54+ deployment_id. as_str( ) ,
55+ name
56+ )
57+ } ) ?;
58+ let context = context
59+ . map ( |ctx| serde_json:: from_str :: < Entity > ( & ctx) )
60+ . transpose ( ) ?;
61+
62+ let ds = DataSource {
63+ kind : template. kind . clone ( ) ,
64+ network : template. network . clone ( ) ,
65+ name,
66+ source,
67+ mapping : template. mapping . clone ( ) ,
68+ context,
69+ templates : Vec :: new ( ) ,
70+ } ;
71+ data_sources. push ( ds) ;
20872 }
20973
21074 trace ! (
0 commit comments