@@ -2,11 +2,14 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered};
22use graph:: blockchain:: client:: ChainClient ;
33use graph:: blockchain:: BlockHash ;
44use graph:: blockchain:: ChainIdentifier ;
5+ use graph:: blockchain:: SubgraphFilter ;
6+ use graph:: blockchain:: TriggerFilterWrapper ;
57use graph:: components:: transaction_receipt:: LightTransactionReceipt ;
68use graph:: data:: store:: ethereum:: call;
79use graph:: data:: store:: scalar;
810use graph:: data:: subgraph:: UnifiedMappingApiVersion ;
911use graph:: data:: subgraph:: API_VERSION_0_0_7 ;
12+ use graph:: data_source:: subgraph;
1013use graph:: futures01:: stream;
1114use graph:: futures01:: Future ;
1215use graph:: futures01:: Stream ;
@@ -18,6 +21,10 @@ use graph::prelude::ethabi::ParamType;
1821use graph:: prelude:: ethabi:: Token ;
1922use graph:: prelude:: tokio:: try_join;
2023use graph:: prelude:: web3:: types:: U256 ;
24+ use graph:: prelude:: DeploymentHash ;
25+ use graph:: prelude:: Entity ;
26+ use graph:: prelude:: Value ;
27+ use graph:: schema:: InputSchema ;
2128use graph:: slog:: o;
2229use graph:: tokio:: sync:: RwLock ;
2330use graph:: tokio:: time:: timeout;
@@ -66,7 +73,7 @@ use crate::{
6673 } ,
6774 transport:: Transport ,
6875 trigger:: { EthereumBlockTriggerType , EthereumTrigger } ,
69- TriggerFilter , ENV_VARS ,
76+ ENV_VARS ,
7077} ;
7178
7279#[ derive( Debug , Clone ) ]
@@ -1722,6 +1729,81 @@ impl EthereumAdapterTrait for EthereumAdapter {
17221729 }
17231730}
17241731
1732+ // TODO(krishna): Currently this is a mock implementation of subgraph triggers.
1733+ // This will be replaced with the actual implementation which will use the filters to
1734+ // query the database of the source subgraph and return the entity triggers.
1735+ async fn subgraph_triggers (
1736+ adapter : Arc < EthereumAdapter > ,
1737+ logger : Logger ,
1738+ chain_store : Arc < dyn ChainStore > ,
1739+ _subgraph_metrics : Arc < SubgraphEthRpcMetrics > ,
1740+ from : BlockNumber ,
1741+ to : BlockNumber ,
1742+ filter : & Arc < TriggerFilterWrapper < Chain > > ,
1743+ _unified_api_version : UnifiedMappingApiVersion ,
1744+ ) -> Result < ( Vec < BlockWithTriggers < crate :: Chain > > , BlockNumber ) , Error > {
1745+ let logger2 = logger. cheap_clone ( ) ;
1746+ let eth = adapter. clone ( ) ;
1747+ let to_ptr = eth. next_existing_ptr_to_number ( & logger, to) . await ?;
1748+ let to = to_ptr. block_number ( ) ;
1749+
1750+ let first_filter = filter. subgraph_filter . first ( ) . unwrap ( ) ;
1751+
1752+ let blocks = adapter
1753+ . load_blocks_by_numbers (
1754+ logger. cheap_clone ( ) ,
1755+ chain_store. clone ( ) ,
1756+ HashSet :: from_iter ( from..=to) ,
1757+ )
1758+ . await
1759+ . and_then ( move |block| {
1760+ Ok ( BlockWithTriggers :: < Chain > :: new_with_subgraph_triggers (
1761+ BlockFinality :: Final ( block. clone ( ) ) ,
1762+ vec ! [ create_mock_subgraph_trigger( first_filter, & block) ] ,
1763+ & logger2,
1764+ ) )
1765+ } )
1766+ . collect ( )
1767+ . compat ( )
1768+ . await ?;
1769+
1770+ Ok ( ( blocks, to) )
1771+ }
1772+
1773+ fn create_mock_subgraph_trigger (
1774+ filter : & SubgraphFilter ,
1775+ block : & LightEthereumBlock ,
1776+ ) -> subgraph:: TriggerData {
1777+ let mock_entity = create_mock_entity ( block) ;
1778+ subgraph:: TriggerData {
1779+ source : filter. subgraph . clone ( ) ,
1780+ entity : mock_entity,
1781+ entity_type : filter. entities . first ( ) . unwrap ( ) . clone ( ) ,
1782+ }
1783+ }
1784+
1785+ fn create_mock_entity ( block : & LightEthereumBlock ) -> Entity {
1786+ let id = DeploymentHash :: new ( "test" ) . unwrap ( ) ;
1787+ let data_schema = InputSchema :: parse_latest (
1788+ "type Block @entity { id: Bytes!, number: BigInt!, hash: Bytes! }" ,
1789+ id. clone ( ) ,
1790+ )
1791+ . unwrap ( ) ;
1792+ let hash = Value :: Bytes ( scalar:: Bytes :: from ( block. hash . unwrap ( ) . as_bytes ( ) . to_vec ( ) ) ) ;
1793+ let data = data_schema
1794+ . make_entity ( vec ! [
1795+ ( "id" . into( ) , hash. clone( ) ) ,
1796+ (
1797+ "number" . into( ) ,
1798+ Value :: BigInt ( scalar:: BigInt :: from( block. number( ) ) ) ,
1799+ ) ,
1800+ ( "hash" . into( ) , hash) ,
1801+ ] )
1802+ . unwrap ( ) ;
1803+
1804+ data
1805+ }
1806+
17251807/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved
17261808/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block.
17271809/// If a block contains no triggers, there may be no corresponding item in the stream.
@@ -1743,13 +1825,33 @@ pub(crate) async fn blocks_with_triggers(
17431825 subgraph_metrics : Arc < SubgraphEthRpcMetrics > ,
17441826 from : BlockNumber ,
17451827 to : BlockNumber ,
1746- filter : & TriggerFilter ,
1828+ filter : & Arc < TriggerFilterWrapper < Chain > > ,
17471829 unified_api_version : UnifiedMappingApiVersion ,
17481830) -> Result < ( Vec < BlockWithTriggers < crate :: Chain > > , BlockNumber ) , Error > {
17491831 // Each trigger filter needs to be queried for the same block range
17501832 // and the blocks yielded need to be deduped. If any error occurs
17511833 // while searching for a trigger type, the entire operation fails.
17521834 let eth = adapter. clone ( ) ;
1835+ let subgraph_filter = filter. subgraph_filter . clone ( ) ;
1836+
1837+ // TODO(krishna): In the initial implementation we do not allow any other datasource type
1838+ // When using subgraph data sources, there if subgraph_filter is not empty, we can return
1839+ // by just processing the subgraph triggers.
1840+ if !subgraph_filter. is_empty ( ) {
1841+ return subgraph_triggers (
1842+ adapter. clone ( ) ,
1843+ logger. clone ( ) ,
1844+ chain_store. clone ( ) ,
1845+ subgraph_metrics. clone ( ) ,
1846+ from,
1847+ to,
1848+ filter,
1849+ unified_api_version,
1850+ )
1851+ . await ;
1852+ }
1853+
1854+ let filter = filter. filter . clone ( ) ;
17531855 let call_filter = EthereumCallFilter :: from ( & filter. block ) ;
17541856
17551857 // Scan the block range to find relevant triggers
0 commit comments