Skip to content

Commit 4915b6f

Browse files
authored
Add IPFS usage metrics / extend logging / extend supported content path formats (#6058)
* add ipfs metrics * add ipfs context and use metrics * extend ipfs content path to support multiple input formats * fix release build * fix ipfs gateway tests * fix(ipfs): make IPFS accept a reference to the context * fix(ipfs): create test context and metrics using dedicated methods * fix(ipfs): use type name when creating a default deployment hash for tests * fix(ipfs): update tests after rebase * fix(tests): fix release build
1 parent 5820151 commit 4915b6f

File tree

40 files changed

+1218
-460
lines changed

40 files changed

+1218
-460
lines changed

chain/ethereum/src/data_source.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use anyhow::{anyhow, Error};
22
use anyhow::{ensure, Context};
33
use graph::blockchain::{BlockPtr, TriggerWithHandler};
4+
use graph::components::link_resolver::LinkResolverContext;
45
use graph::components::metrics::subgraph::SubgraphInstanceMetrics;
56
use graph::components::store::{EthereumCallCache, StoredDynamicDataSource};
67
use graph::components::subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError};
78
use graph::components::trigger_processor::RunnableTriggers;
9+
use graph::data::subgraph::DeploymentHash;
810
use graph::data_source::common::{
911
AbiJson, CallDecls, DeclaredCall, FindMappingABI, MappingABI, UnresolvedCallDecls,
1012
UnresolvedMappingABI,
@@ -1198,6 +1200,7 @@ pub struct UnresolvedDataSource {
11981200
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
11991201
async fn resolve(
12001202
self,
1203+
deployment_hash: &DeploymentHash,
12011204
resolver: &Arc<dyn LinkResolver>,
12021205
logger: &Logger,
12031206
manifest_idx: u32,
@@ -1212,7 +1215,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
12121215
context,
12131216
} = self;
12141217

1215-
let mapping = mapping.resolve(resolver, logger, spec_version).await.with_context(|| {
1218+
let mapping = mapping.resolve(deployment_hash, resolver, logger, spec_version).await.with_context(|| {
12161219
format!(
12171220
"failed to resolve data source {} with source_address {:?} and source_start_block {}",
12181221
name, source.address, source.start_block
@@ -1246,6 +1249,7 @@ pub struct DataSourceTemplate {
12461249
impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTemplate {
12471250
async fn resolve(
12481251
self,
1252+
deployment_hash: &DeploymentHash,
12491253
resolver: &Arc<dyn LinkResolver>,
12501254
logger: &Logger,
12511255
manifest_idx: u32,
@@ -1260,7 +1264,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
12601264
} = self;
12611265

12621266
let mapping = mapping
1263-
.resolve(resolver, logger, spec_version)
1267+
.resolve(deployment_hash, resolver, logger, spec_version)
12641268
.await
12651269
.with_context(|| format!("failed to resolve data source template {}", name))?;
12661270

@@ -1358,6 +1362,7 @@ impl FindMappingABI for Mapping {
13581362
impl UnresolvedMapping {
13591363
pub async fn resolve(
13601364
self,
1365+
deployment_hash: &DeploymentHash,
13611366
resolver: &Arc<dyn LinkResolver>,
13621367
logger: &Logger,
13631368
spec_version: &semver::Version,
@@ -1380,12 +1385,18 @@ impl UnresolvedMapping {
13801385
// resolve each abi
13811386
abis.into_iter()
13821387
.map(|unresolved_abi| async {
1383-
Result::<_, Error>::Ok(unresolved_abi.resolve(resolver, logger).await?)
1388+
Result::<_, Error>::Ok(
1389+
unresolved_abi
1390+
.resolve(deployment_hash, resolver, logger)
1391+
.await?,
1392+
)
13841393
})
13851394
.collect::<FuturesOrdered<_>>()
13861395
.try_collect::<Vec<_>>(),
13871396
async {
1388-
let module_bytes = resolver.cat(logger, &link).await?;
1397+
let module_bytes = resolver
1398+
.cat(&LinkResolverContext::new(deployment_hash, logger), &link)
1399+
.await?;
13891400
Ok(Arc::new(module_bytes))
13901401
},
13911402
)

chain/near/src/data_source.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use graph::anyhow::Context;
22
use graph::blockchain::{Block, TriggerWithHandler};
3+
use graph::components::link_resolver::LinkResolverContext;
34
use graph::components::store::StoredDynamicDataSource;
45
use graph::components::subgraph::InstanceDSTemplateInfo;
5-
use graph::data::subgraph::DataSourceContext;
6+
use graph::data::subgraph::{DataSourceContext, DeploymentHash};
67
use graph::prelude::SubgraphManifestValidationError;
78
use graph::{
89
anyhow::{anyhow, Error},
@@ -330,6 +331,7 @@ pub struct UnresolvedDataSource {
330331
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
331332
async fn resolve(
332333
self,
334+
deployment_hash: &DeploymentHash,
333335
resolver: &Arc<dyn LinkResolver>,
334336
logger: &Logger,
335337
_manifest_idx: u32,
@@ -344,7 +346,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
344346
context,
345347
} = self;
346348

347-
let mapping = mapping.resolve(resolver, logger).await.with_context(|| {
349+
let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| {
348350
format!(
349351
"failed to resolve data source {} with source_account {:?} and source_start_block {}",
350352
name, source.account, source.start_block
@@ -370,6 +372,7 @@ pub type DataSourceTemplate = BaseDataSourceTemplate<Mapping>;
370372
impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTemplate {
371373
async fn resolve(
372374
self,
375+
deployment_hash: &DeploymentHash,
373376
resolver: &Arc<dyn LinkResolver>,
374377
logger: &Logger,
375378
_manifest_idx: u32,
@@ -383,7 +386,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
383386
} = self;
384387

385388
let mapping = mapping
386-
.resolve(resolver, logger)
389+
.resolve(deployment_hash, resolver, logger)
387390
.await
388391
.with_context(|| format!("failed to resolve data source template {}", name))?;
389392

@@ -434,6 +437,7 @@ pub struct UnresolvedMapping {
434437
impl UnresolvedMapping {
435438
pub async fn resolve(
436439
self,
440+
deployment_hash: &DeploymentHash,
437441
resolver: &Arc<dyn LinkResolver>,
438442
logger: &Logger,
439443
) -> Result<Mapping, Error> {
@@ -449,7 +453,7 @@ impl UnresolvedMapping {
449453
let api_version = semver::Version::parse(&api_version)?;
450454

451455
let module_bytes = resolver
452-
.cat(logger, &link)
456+
.cat(&LinkResolverContext::new(deployment_hash, logger), &link)
453457
.await
454458
.with_context(|| format!("failed to resolve mapping {}", link.link))?;
455459

chain/substreams/src/data_source.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ use anyhow::{anyhow, Context, Error};
44
use graph::{
55
blockchain,
66
cheap_clone::CheapClone,
7-
components::{link_resolver::LinkResolver, subgraph::InstanceDSTemplateInfo},
7+
components::{
8+
link_resolver::{LinkResolver, LinkResolverContext},
9+
subgraph::InstanceDSTemplateInfo,
10+
},
11+
data::subgraph::DeploymentHash,
812
prelude::{async_trait, BlockNumber, Link},
913
slog::Logger,
1014
};
@@ -184,12 +188,18 @@ pub struct UnresolvedMapping {
184188
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
185189
async fn resolve(
186190
self,
191+
deployment_hash: &DeploymentHash,
187192
resolver: &Arc<dyn LinkResolver>,
188193
logger: &Logger,
189194
_manifest_idx: u32,
190195
_spec_version: &semver::Version,
191196
) -> Result<DataSource, Error> {
192-
let content = resolver.cat(logger, &self.source.package.file).await?;
197+
let content = resolver
198+
.cat(
199+
&LinkResolverContext::new(deployment_hash, logger),
200+
&self.source.package.file,
201+
)
202+
.await?;
193203

194204
let mut package = graph::substreams::Package::decode(content.as_ref())?;
195205

@@ -235,7 +245,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
235245
let handler = match (self.mapping.handler, self.mapping.file) {
236246
(Some(handler), Some(file)) => {
237247
let module_bytes = resolver
238-
.cat(logger, &file)
248+
.cat(&LinkResolverContext::new(deployment_hash, logger), &file)
239249
.await
240250
.with_context(|| format!("failed to resolve mapping {}", file.link))?;
241251

@@ -315,6 +325,7 @@ impl blockchain::DataSourceTemplate<Chain> for NoopDataSourceTemplate {
315325
impl blockchain::UnresolvedDataSourceTemplate<Chain> for NoopDataSourceTemplate {
316326
async fn resolve(
317327
self,
328+
_deployment_hash: &DeploymentHash,
318329
_resolver: &Arc<dyn LinkResolver>,
319330
_logger: &Logger,
320331
_manifest_idx: u32,
@@ -331,8 +342,8 @@ mod test {
331342
use anyhow::Error;
332343
use graph::{
333344
blockchain::{DataSource as _, UnresolvedDataSource as _},
334-
components::link_resolver::LinkResolver,
335-
data::subgraph::{LATEST_VERSION, SPEC_VERSION_1_2_0},
345+
components::link_resolver::{LinkResolver, LinkResolverContext},
346+
data::subgraph::{DeploymentHash, LATEST_VERSION, SPEC_VERSION_1_2_0},
336347
prelude::{async_trait, serde_yaml, JsonValueStream, Link},
337348
slog::{o, Discard, Logger},
338349
substreams::{
@@ -436,7 +447,13 @@ mod test {
436447
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
437448
let logger = Logger::root(Discard, o!());
438449
let ds: DataSource = ds
439-
.resolve(&link_resolver, &logger, 0, &SPEC_VERSION_1_2_0)
450+
.resolve(
451+
&DeploymentHash::default(),
452+
&link_resolver,
453+
&logger,
454+
0,
455+
&SPEC_VERSION_1_2_0,
456+
)
440457
.await
441458
.unwrap();
442459
let expected = DataSource {
@@ -476,7 +493,13 @@ mod test {
476493
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
477494
let logger = Logger::root(Discard, o!());
478495
let ds: DataSource = ds
479-
.resolve(&link_resolver, &logger, 0, &SPEC_VERSION_1_2_0)
496+
.resolve(
497+
&DeploymentHash::default(),
498+
&link_resolver,
499+
&logger,
500+
0,
501+
&SPEC_VERSION_1_2_0,
502+
)
480503
.await
481504
.unwrap();
482505
let expected = DataSource {
@@ -717,17 +740,21 @@ mod test {
717740
unimplemented!()
718741
}
719742

720-
async fn cat(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
743+
async fn cat(&self, _ctx: &LinkResolverContext, _link: &Link) -> Result<Vec<u8>, Error> {
721744
Ok(gen_package().encode_to_vec())
722745
}
723746

724-
async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
747+
async fn get_block(
748+
&self,
749+
_ctx: &LinkResolverContext,
750+
_link: &Link,
751+
) -> Result<Vec<u8>, Error> {
725752
unimplemented!()
726753
}
727754

728755
async fn json_stream(
729756
&self,
730-
_logger: &Logger,
757+
_ctx: &LinkResolverContext,
731758
_link: &Link,
732759
) -> Result<JsonValueStream, Error> {
733760
unimplemented!()

core/src/polling_monitor/ipfs_service.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@ use anyhow::anyhow;
55
use anyhow::Error;
66
use bytes::Bytes;
77
use graph::futures03::future::BoxFuture;
8-
use graph::ipfs::ContentPath;
9-
use graph::ipfs::IpfsClient;
10-
use graph::ipfs::RetryPolicy;
8+
use graph::ipfs::{ContentPath, IpfsClient, IpfsContext, RetryPolicy};
119
use graph::{derive::CheapClone, prelude::CheapClone};
1210
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};
1311

14-
pub type IpfsService = Buffer<ContentPath, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
12+
pub type IpfsService = Buffer<IpfsRequest, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
13+
14+
#[derive(Debug, Clone, CheapClone)]
15+
pub struct IpfsRequest {
16+
pub ctx: IpfsContext,
17+
pub path: ContentPath,
18+
}
1519

1620
pub fn ipfs_service(
1721
client: Arc<dyn IpfsClient>,
@@ -43,7 +47,10 @@ struct IpfsServiceInner {
4347
}
4448

4549
impl IpfsServiceInner {
46-
async fn call_inner(self, path: ContentPath) -> Result<Option<Bytes>, Error> {
50+
async fn call_inner(
51+
self,
52+
IpfsRequest { ctx, path }: IpfsRequest,
53+
) -> Result<Option<Bytes>, Error> {
4754
let multihash = path.cid().hash().code();
4855
if !SAFE_MULTIHASHES.contains(&multihash) {
4956
return Err(anyhow!("CID multihash {} is not allowed", multihash));
@@ -52,6 +59,7 @@ impl IpfsServiceInner {
5259
let res = self
5360
.client
5461
.cat(
62+
&ctx,
5563
&path,
5664
self.max_file_size,
5765
Some(self.timeout),
@@ -99,8 +107,7 @@ mod test {
99107
use graph::components::link_resolver::ArweaveResolver;
100108
use graph::data::value::Word;
101109
use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing;
102-
use graph::ipfs::IpfsRpcClient;
103-
use graph::ipfs::ServerAddress;
110+
use graph::ipfs::{IpfsContext, IpfsMetrics, IpfsRpcClient, ServerAddress};
104111
use graph::log::discard;
105112
use graph::tokio;
106113
use tower::ServiceExt;
@@ -126,14 +133,24 @@ mod test {
126133

127134
let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash;
128135

129-
let client =
130-
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
131-
.unwrap();
136+
let client = IpfsRpcClient::new_unchecked(
137+
ServerAddress::local_rpc_api(),
138+
IpfsMetrics::test(),
139+
&graph::log::discard(),
140+
)
141+
.unwrap();
132142

133143
let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10);
134144

135145
let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
136-
let content = svc.oneshot(path).await.unwrap().unwrap();
146+
let content = svc
147+
.oneshot(IpfsRequest {
148+
ctx: IpfsContext::test(),
149+
path,
150+
})
151+
.await
152+
.unwrap()
153+
.unwrap();
137154

138155
assert_eq!(content.to_vec(), random_bytes);
139156
}
@@ -157,7 +174,8 @@ mod test {
157174
const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn";
158175

159176
let server = MockServer::start().await;
160-
let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap();
177+
let ipfs_client =
178+
IpfsRpcClient::new_unchecked(server.uri(), IpfsMetrics::test(), &discard()).unwrap();
161179
let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1);
162180
let path = ContentPath::new(CID).unwrap();
163181

@@ -179,6 +197,12 @@ mod test {
179197
.await;
180198

181199
// This means that we never reached the successful response.
182-
ipfs_service.oneshot(path).await.unwrap_err();
200+
ipfs_service
201+
.oneshot(IpfsRequest {
202+
ctx: IpfsContext::test(),
203+
path,
204+
})
205+
.await
206+
.unwrap_err();
183207
}
184208
}

0 commit comments

Comments
 (0)