Skip to content

Commit 05ffa8b

Browse files
Fix issue with determining the last level when syncing with node (#856)
* DIPDUP_DEBUG env * Fixed issue with determining the last level when syncing with node * Remove outdated check * Log realtime messages
1 parent 34453e3 commit 05ffa8b

File tree

9 files changed

+80
-23
lines changed

9 files changed

+80
-23
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,15 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic
66

77
## [Unreleased]
88

9+
### Added
10+
11+
- env: Added `DIPDUP_DEBUG` environment variable to enable debug logging.
12+
913
### Fixed
1014

1115
- demos: Fixed decimal overflow in `demo_uniswap` project.
1216
- evm.node: Fixed incorrect log request parameters.
17+
- evm.subsquid.events: Fixed issue with determining the last level when syncing with node.
1318

1419
## [7.0.0] - 2023-09-25
1520

src/dipdup/cli.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import asyncclick as click
1919

2020
from dipdup import __version__
21+
from dipdup import env
2122
from dipdup.install import EPILOG
2223
from dipdup.install import WELCOME_ASCII
2324
from dipdup.performance import metrics
@@ -216,6 +217,8 @@ async def cli(ctx: click.Context, config: list[str], env_file: list[str]) -> Non
216217
from dipdup.sys import set_up_logging
217218

218219
set_up_logging()
220+
if env.DEBUG:
221+
logging.getLogger('dipdup').setLevel(logging.DEBUG)
219222

220223
env_file_paths = [Path(file) for file in env_file]
221224
config_paths = [Path(file) for file in config]
@@ -232,7 +235,6 @@ async def cli(ctx: click.Context, config: list[str], env_file: list[str]) -> Non
232235
logging.getLogger('dipdup').setLevel(logging.INFO)
233236
return
234237

235-
from dipdup import env
236238
from dipdup.config import DipDupConfig
237239
from dipdup.exceptions import InitializationRequiredError
238240
from dipdup.package import DipDupPackage

src/dipdup/config/evm_subsquid_events.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class SubsquidEventsIndexConfig(IndexConfig):
5858
:param datasource: Subsquid datasource
5959
:param handlers: Event handlers
6060
:param abi: One or more `evm.abi` datasource(s) for the same network
61+
:param node_only: Don't use Subsquid Archives API (dev only)
6162
:param first_level: Level to start indexing from
6263
:param last_level: Level to stop indexing and disable this index
6364
"""
@@ -66,6 +67,7 @@ class SubsquidEventsIndexConfig(IndexConfig):
6667
datasource: SubsquidDatasourceConfig
6768
handlers: tuple[SubsquidEventsHandlerConfig, ...] = field(default_factory=tuple)
6869
abi: AbiDatasourceConfig | tuple[AbiDatasourceConfig, ...] | None = None
70+
node_only: bool = False
6971

7072
first_level: int = 0
7173
last_level: int = 0

src/dipdup/datasources/evm_node.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ async def _on_message(self, message: Message) -> None:
267267
if subscription_id not in self._subscription_ids:
268268
raise FrameworkException(f'{self.name}: Unknown subscription ID: {subscription_id}')
269269
subscription = self._subscription_ids[subscription_id]
270-
self._logger.debug('Received subscription for channel %s', subscription_id)
270+
self._logger.info('Received a message from channel %s', subscription_id)
271271
await self._handle_subscription(subscription, data['params']['result'])
272272
else:
273273
raise DatasourceError(f'Unknown method: {data["method"]}', self.name)

src/dipdup/env.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,30 @@ def get_path(key: str) -> Path | None:
6262
def set_test() -> None:
6363
global TEST, REPLAY_PATH
6464
TEST = True
65-
REPLAY_PATH = str(Path(__file__).parent.parent.parent / 'tests' / 'replays')
66-
env['DIPDUP_REPLAY_PATH'] = REPLAY_PATH
65+
REPLAY_PATH = Path(__file__).parent.parent.parent / 'tests' / 'replays'
6766

6867

69-
if get('CI') == 'true':
70-
env['DIPDUP_CI'] = '1'
71-
if platform.system() == 'Linux' and Path('/.dockerenv').exists():
72-
env['DIPDUP_DOCKER'] = '1'
68+
CI: bool
69+
DEBUG: bool
70+
DOCKER: bool
71+
NEXT: bool
72+
REPLAY_PATH: Path | None
73+
TEST: bool
7374

74-
CI = get_bool('DIPDUP_CI')
75-
DOCKER = get_bool('DIPDUP_DOCKER')
76-
NEXT = get_bool('DIPDUP_NEXT')
77-
REPLAY_PATH = get_path('DIPDUP_REPLAY_PATH')
78-
TEST = get_bool('DIPDUP_TEST')
75+
76+
def read() -> None:
77+
global CI, DEBUG, DOCKER, NEXT, REPLAY_PATH, TEST
78+
CI = get_bool('DIPDUP_CI')
79+
DEBUG = get_bool('DIPDUP_DEBUG')
80+
DOCKER = get_bool('DIPDUP_DOCKER')
81+
NEXT = get_bool('DIPDUP_NEXT')
82+
REPLAY_PATH = get_path('DIPDUP_REPLAY_PATH')
83+
TEST = get_bool('DIPDUP_TEST')
84+
85+
if get('CI') == 'true':
86+
CI = True
87+
if platform.system() == 'Linux' and Path('/.dockerenv').exists():
88+
DOCKER = True
89+
90+
91+
read()

src/dipdup/http.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from aiolimiter import AsyncLimiter
2424

2525
from dipdup import __version__
26+
from dipdup import env
2627
from dipdup.config import ResolvedHttpConfig
2728
from dipdup.exceptions import FrameworkException
2829
from dipdup.exceptions import InvalidRequestError
@@ -143,7 +144,7 @@ async def _retry_request(
143144
"""Retry a request in case of failure sleeping according to config"""
144145
attempt = 1
145146
retry_sleep = self._config.retry_sleep
146-
retry_count = self._config.retry_count
147+
retry_count = 0 if env.TEST else self._config.retry_count
147148
retry_count_str = 'inf' if retry_count is sys.maxsize else str(retry_count)
148149

149150
while True:

src/dipdup/indexes/evm_subsquid_events/index.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,6 @@ async def _process_queue(self) -> None:
9999
break
100100

101101
for message_level, level_logs in logs_by_level.items():
102-
# NOTE: If it's not a next block - resync with Subsquid
103-
if message_level != self.state.level + 1:
104-
self._logger.info('Not enough messages in queue; resyncing to %s', message_level)
105-
self._queue.clear()
106-
self.datasource.set_sync_level(None, message_level)
107-
return
108-
109102
await self._process_level_events(tuple(level_logs), self.topics, message_level)
110103

111104
def get_sync_level(self) -> int:
@@ -147,16 +140,20 @@ async def _synchronize(self, sync_level: int) -> None:
147140
self._logger.info('Subsquid is %s levels behind; %s available', subsquid_lag, subsquid_available)
148141
if subsquid_available < NODE_SYNC_LIMIT:
149142
use_node = True
143+
elif self._config.node_only:
144+
self._logger.debug('Using node anyway')
145+
use_node = True
150146

151147
# NOTE: Fetch last blocks from node if there are not enough realtime messages in queue
152148
if use_node and self.node_datasources:
153-
sync_level = node_sync_level
149+
sync_level = min(sync_level, node_sync_level)
150+
self._logger.debug('Using node datasource; sync level: %s', sync_level)
154151
topics = set()
155152
for handler in self._config.handlers:
156153
typename = handler.contract.module_name
157154
topics.add(self.topics[typename][handler.name])
158155
# FIXME: This is terribly inefficient (but okay for the last mile); see advanced example in web3.py docs.
159-
for level in range(first_level, sync_level):
156+
for level in range(first_level, sync_level + 1):
160157
# NOTE: Get random one every time
161158
level_logs = await self.random_node.get_logs(
162159
{
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
spec_version: 2.0
2+
package: demo_evm_events
3+
4+
datasources:
5+
ethscan:
6+
kind: abi.etherscan
7+
8+
mainnet_node:
9+
kind: evm.node
10+
url: https://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_KEY:-''}
11+
ws_url: wss://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_KEY:-''}
12+
13+
mainnet_subsquid:
14+
kind: evm.subsquid
15+
url: ${ARCHIVE_URL:-https://v2.archive.subsquid.io/network/ethereum-mainnet}
16+
node: mainnet_node
17+
http:
18+
replay_path: ${DIPDUP_REPLAY_PATH:-}
19+
20+
contracts:
21+
eth_usdt:
22+
kind: evm
23+
address: 0xdac17f958d2ee523a2206206994597c13d831ec7
24+
typename: eth_usdt
25+
26+
indexes:
27+
eth_usdt_events:
28+
kind: evm.subsquid.events
29+
datasource: mainnet_subsquid
30+
handlers:
31+
- callback: on_transfer
32+
contract: eth_usdt
33+
name: Transfer
34+
first_level: 18077421
35+
last_level: 18077421
36+
node_only: true

tests/test_demos.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ async def assert_run_dao() -> None:
241241
('demo_raw.yml', 'demo_raw', 'init', partial(assert_init, 'demo_raw')),
242242
('demo_evm_events.yml', 'demo_evm_events', 'run', assert_run_evm_events),
243243
('demo_evm_events.yml', 'demo_evm_events', 'init', partial(assert_init, 'demo_evm_events')),
244+
('demo_evm_events_node.yml', 'demo_evm_events', 'run', assert_run_evm_events),
244245
)
245246

246247

0 commit comments

Comments
 (0)