Skip to content

Commit ec82546

Browse files
committed
Ensure closing of sqlite DB connection when closing DiskCachedAsyncSubstrateInterface
1 parent 136ca77 commit ec82546

File tree

2 files changed

+20
-5
lines changed

2 files changed

+20
-5
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
from async_substrate_interface.utils.cache import (
6464
async_sql_lru_cache,
6565
cached_fetcher,
66+
AsyncSqliteDB,
6667
)
6768
from async_substrate_interface.utils.decoding import (
6869
_determine_if_old_runtime_call,
@@ -4026,6 +4027,18 @@ class DiskCachedAsyncSubstrateInterface(AsyncSubstrateInterface):
40264027
Experimental new class that uses disk-caching in addition to memory-caching for the cached methods
40274028
"""
40284029

4030+
async def close(self):
4031+
"""
4032+
Closes the substrate connection, and the websocket connection.
4033+
"""
4034+
try:
4035+
await self.ws.shutdown()
4036+
except AttributeError:
4037+
pass
4038+
db_conn = AsyncSqliteDB(self.url)
4039+
if db_conn._db is not None:
4040+
await db_conn._db.close()
4041+
40294042
@async_sql_lru_cache(maxsize=SUBSTRATE_CACHE_METHOD_SIZE)
40304043
async def get_parent_block_hash(self, block_hash):
40314044
return await self._get_parent_block_hash(block_hash)

async_substrate_interface/utils/cache.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def __new__(cls, chain_endpoint: str):
3636
cls._instances[chain_endpoint] = instance
3737
return instance
3838

39-
async def __call__(self, chain, func, args, kwargs) -> Optional[Any]:
39+
async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any]:
4040
if not self._db:
4141
_ensure_dir()
4242
self._db = await aiosqlite.connect(CACHE_LOCATION)
@@ -65,26 +65,28 @@ async def __call__(self, chain, func, args, kwargs) -> Optional[Any]:
6565
);
6666
END;"""
6767
)
68-
key = pickle.dumps((args, kwargs))
68+
await self._db.commit()
69+
key = pickle.dumps((args, kwargs or None))
6970
try:
7071
cursor: aiosqlite.Cursor = await self._db.execute(
7172
f"SELECT value FROM {table_name} WHERE key=? AND chain=?",
7273
(key, chain),
7374
)
7475
result = await cursor.fetchone()
76+
await cursor.close()
7577
if result is not None:
7678
return pickle.loads(result[0])
7779
except (pickle.PickleError, sqlite3.Error) as e:
7880
logger.exception("Cache error", exc_info=e)
7981
pass
80-
81-
result = await func(*args, **kwargs)
82+
result = await func(other_self, *args, **kwargs)
8283
if not local_chain or not USE_CACHE:
8384
# TODO use a task here
8485
await self._db.execute(
8586
f"INSERT OR REPLACE INTO {table_name} (key, value, chain) VALUES (?,?,?)",
8687
(key, pickle.dumps(result), chain),
8788
)
89+
await self._db.commit()
8890
return result
8991

9092

@@ -200,7 +202,7 @@ def decorator(func):
200202
@cached_fetcher(max_size=maxsize)
201203
async def inner(self, *args, **kwargs):
202204
async_sql_db = AsyncSqliteDB(self.url)
203-
result = await async_sql_db(self.url, func, args, kwargs)
205+
result = await async_sql_db(self.url, self, func, args, kwargs)
204206
return result
205207

206208
return inner

0 commit comments

Comments
 (0)