Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ the same on-disk cache.
As with the other two caches, this also takes `SUBSTRATE_CACHE_METHOD_SIZE` and `SUBSTRATE_RUNTIME_CACHE_SIZE` env vars.


### ENV VARS
The following environment variables are used within async-substrate-interface
- NO_CACHE (default 0): if set to 1, when using the DiskCachedAsyncSubstrateInterface class, no persistent on-disk cache will be stored, instead using only in-memory cache.
- CACHE_LOCATION (default `~/.cache/async-substrate-interface`): this determines the location for the cache file, if using DiskCachedAsyncSubstrateInterface
- SUBSTRATE_CACHE_METHOD_SIZE (default 512): the cache size (either in-memory or on-disk) of the smaller return-size methods (see the Caching section for more info)
- SUBSTRATE_RUNTIME_CACHE_SIZE (default 16): the cache size (either in-memory or on-disk) of the larger return-size methods (see the Caching section for more info)


## Contributing

Contributions are welcome! Please open an issue or submit a pull request to the `staging` branch.
Expand Down
13 changes: 13 additions & 0 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from async_substrate_interface.utils.cache import (
async_sql_lru_cache,
cached_fetcher,
AsyncSqliteDB,
)
from async_substrate_interface.utils.decoding import (
_determine_if_old_runtime_call,
Expand Down Expand Up @@ -4026,6 +4027,18 @@ class DiskCachedAsyncSubstrateInterface(AsyncSubstrateInterface):
Experimental new class that uses disk-caching in addition to memory-caching for the cached methods
"""

async def close(self):
"""
Closes the substrate connection, and the websocket connection.
"""
try:
await self.ws.shutdown()
except AttributeError:
pass
db_conn = AsyncSqliteDB(self.url)
if db_conn._db is not None:
await db_conn._db.close()

@async_sql_lru_cache(maxsize=SUBSTRATE_CACHE_METHOD_SIZE)
async def get_parent_block_hash(self, block_hash):
return await self._get_parent_block_hash(block_hash)
Expand Down
89 changes: 79 additions & 10 deletions async_substrate_interface/utils/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from pathlib import Path
from typing import Callable, Any, Awaitable, Hashable, Optional

import aiosqlite


USE_CACHE = True if os.getenv("NO_CACHE") != "1" else False
CACHE_LOCATION = (
os.path.expanduser(
Expand All @@ -21,6 +24,78 @@
logger = logging.getLogger("async_substrate_interface")


class AsyncSqliteDB:
_instances: dict[str, "AsyncSqliteDB"] = {}
_db: Optional[aiosqlite.Connection] = None
_lock: Optional[asyncio.Lock] = None

def __new__(cls, chain_endpoint: str):
try:
return cls._instances[chain_endpoint]
except KeyError:
instance = super().__new__(cls)
instance._lock = asyncio.Lock()
cls._instances[chain_endpoint] = instance
return instance

async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any]:
async with self._lock:
if not self._db:
_ensure_dir()
self._db = await aiosqlite.connect(CACHE_LOCATION)
table_name = _get_table_name(func)
key = None
if not (local_chain := _check_if_local(chain)) or not USE_CACHE:
await self._db.execute(
f"""
CREATE TABLE IF NOT EXISTS {table_name}
(
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
key BLOB,
value BLOB,
chain TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
)
await self._db.execute(
f"""
CREATE TRIGGER IF NOT EXISTS prune_rows_trigger_{table_name} AFTER INSERT ON {table_name}
BEGIN
DELETE FROM {table_name}
WHERE rowid IN (
SELECT rowid FROM {table_name}
ORDER BY created_at DESC
LIMIT -1 OFFSET 500
);
END;
"""
)
await self._db.commit()
key = pickle.dumps((args, kwargs or None))
try:
cursor: aiosqlite.Cursor = await self._db.execute(
f"SELECT value FROM {table_name} WHERE key=? AND chain=?",
(key, chain),
)
result = await cursor.fetchone()
await cursor.close()
if result is not None:
return pickle.loads(result[0])
except (pickle.PickleError, sqlite3.Error) as e:
logger.exception("Cache error", exc_info=e)
pass
result = await func(other_self, *args, **kwargs)
if not local_chain or not USE_CACHE:
# TODO use a task here
await self._db.execute(
f"INSERT OR REPLACE INTO {table_name} (key, value, chain) VALUES (?,?,?)",
(key, pickle.dumps(result), chain),
)
await self._db.commit()
return result


def _ensure_dir():
path = Path(CACHE_LOCATION).parent
if not path.exists():
Expand Down Expand Up @@ -115,7 +190,8 @@ def inner(self, *args, **kwargs):
)

# If not in DB, call func and store in DB
result = func(self, *args, **kwargs)
if result is None:
result = func(self, *args, **kwargs)

if not local_chain or not USE_CACHE:
_insert_into_cache(c, conn, table_name, key, result, chain)
Expand All @@ -131,15 +207,8 @@ def async_sql_lru_cache(maxsize: Optional[int] = None):
def decorator(func):
@cached_fetcher(max_size=maxsize)
async def inner(self, *args, **kwargs):
c, conn, table_name, key, result, chain, local_chain = (
_shared_inner_fn_logic(func, self, args, kwargs)
)

# If not in DB, call func and store in DB
result = await func(self, *args, **kwargs)
if not local_chain or not USE_CACHE:
_insert_into_cache(c, conn, table_name, key, result, chain)

async_sql_db = AsyncSqliteDB(self.url)
result = await async_sql_db(self.url, self, func, args, kwargs)
return result

return inner
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ dependencies = [
"bt-decode==v0.6.0",
"scalecodec~=1.2.11",
"websockets>=14.1",
"xxhash"
"xxhash",
"aiosqlite>=0.21.0,<1.0.0"
]

requires-python = ">=3.9,<3.14"
Expand Down
Loading