9
9
from pathlib import Path
10
10
from typing import Callable , Any , Awaitable , Hashable , Optional
11
11
12
+ import aiosqlite
13
+
14
+
12
15
USE_CACHE = True if os .getenv ("NO_CACHE" ) != "1" else False
13
16
CACHE_LOCATION = (
14
17
os .path .expanduser (
21
24
logger = logging .getLogger ("async_substrate_interface" )
22
25
23
26
27
+ class AsyncSqliteDB :
28
+ _instances : dict [str , "AsyncSqliteDB" ] = {}
29
+ _db : Optional [aiosqlite .Connection ] = None
30
+
31
+ def __new__ (cls , chain_endpoint : str ):
32
+ try :
33
+ return cls ._instances [chain_endpoint ]
34
+ except KeyError :
35
+ instance = super ().__new__ (cls )
36
+ cls ._instances [chain_endpoint ] = instance
37
+ return instance
38
+
39
+ async def __call__ (self , chain , func , args , kwargs ) -> Optional [Any ]:
40
+ if not self ._db :
41
+ _ensure_dir ()
42
+ self ._db = await aiosqlite .connect (CACHE_LOCATION )
43
+ table_name = _get_table_name (func )
44
+ key = None
45
+ if not (local_chain := _check_if_local (chain )) or not USE_CACHE :
46
+ await self ._db .execute (
47
+ f"""CREATE TABLE IF NOT EXISTS { table_name }
48
+ (
49
+ rowid INTEGER PRIMARY KEY AUTOINCREMENT,
50
+ key BLOB,
51
+ value BLOB,
52
+ chain TEXT,
53
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
54
+ );
55
+ """
56
+ )
57
+ await self ._db .execute (
58
+ f"""CREATE TRIGGER IF NOT EXISTS prune_rows_trigger AFTER INSERT ON { table_name }
59
+ BEGIN
60
+ DELETE FROM { table_name }
61
+ WHERE rowid IN (
62
+ SELECT rowid FROM { table_name }
63
+ ORDER BY created_at DESC
64
+ LIMIT -1 OFFSET 500
65
+ );
66
+ END;"""
67
+ )
68
+ key = pickle .dumps ((args , kwargs ))
69
+ try :
70
+ cursor : aiosqlite .Cursor = await self ._db .execute (
71
+ f"SELECT value FROM { table_name } WHERE key=? AND chain=?" ,
72
+ (key , chain ),
73
+ )
74
+ result = await cursor .fetchone ()
75
+ if result is not None :
76
+ return pickle .loads (result [0 ])
77
+ except (pickle .PickleError , sqlite3 .Error ) as e :
78
+ logger .exception ("Cache error" , exc_info = e )
79
+ pass
80
+
81
+ result = await func (* args , ** kwargs )
82
+ if not local_chain or not USE_CACHE :
83
+ # TODO use a task here
84
+ await self ._db .execute (
85
+ f"INSERT OR REPLACE INTO { table_name } (key, value, chain) VALUES (?,?,?)" ,
86
+ (key , pickle .dumps (result ), chain ),
87
+ )
88
+ return result
89
+
90
+
24
91
def _ensure_dir ():
25
92
path = Path (CACHE_LOCATION ).parent
26
93
if not path .exists ():
@@ -115,7 +182,8 @@ def inner(self, *args, **kwargs):
115
182
)
116
183
117
184
# If not in DB, call func and store in DB
118
- result = func (self , * args , ** kwargs )
185
+ if result is None :
186
+ result = func (self , * args , ** kwargs )
119
187
120
188
if not local_chain or not USE_CACHE :
121
189
_insert_into_cache (c , conn , table_name , key , result , chain )
@@ -131,15 +199,8 @@ def async_sql_lru_cache(maxsize: Optional[int] = None):
131
199
def decorator (func ):
132
200
@cached_fetcher (max_size = maxsize )
133
201
async def inner (self , * args , ** kwargs ):
134
- c , conn , table_name , key , result , chain , local_chain = (
135
- _shared_inner_fn_logic (func , self , args , kwargs )
136
- )
137
-
138
- # If not in DB, call func and store in DB
139
- result = await func (self , * args , ** kwargs )
140
- if not local_chain or not USE_CACHE :
141
- _insert_into_cache (c , conn , table_name , key , result , chain )
142
-
202
+ async_sql_db = AsyncSqliteDB (self .url )
203
+ result = await async_sql_db (self .url , func , args , kwargs )
143
204
return result
144
205
145
206
return inner
0 commit comments