5
5
import logging
6
6
import time
7
7
from datetime import datetime , timezone
8
- from typing import Callable , Dict , Optional , Sequence , Tuple
8
+ from typing import Any , Callable , Dict , List , Optional , Sequence , Tuple
9
9
10
10
import clickhouse_driver
11
11
import numpy
@@ -80,13 +80,13 @@ def create_chain_table(client: clickhouse_driver.Client, meta: ChainMeta, rmeta:
80
80
)
81
81
columns .append (column_spec_for (var , is_stat = True ))
82
82
assert len (set (columns )) == len (columns ), columns
83
- columns = ",\n " .join (columns )
83
+ cols = ",\n " .join (columns )
84
84
85
85
query = f"""
86
86
CREATE TABLE { cid }
87
87
(
88
88
`_draw_idx` UInt64,
89
- { columns }
89
+ { cols }
90
90
)
91
91
ENGINE TinyLog();
92
92
"""
@@ -110,8 +110,8 @@ def __init__(
110
110
self ._client = client
111
111
# The following attributes belong to the batched insert mechanism.
112
112
# Inserting in batches is much faster than inserting single rows.
113
- self ._insert_query = None
114
- self ._insert_queue = []
113
+ self ._insert_query : str = ""
114
+ self ._insert_queue : List [ Dict [ str , Any ]] = []
115
115
self ._last_insert = time .time ()
116
116
self ._insert_interval = insert_interval
117
117
self ._insert_every = insert_every
@@ -121,7 +121,7 @@ def append(
121
121
self , draw : Dict [str , numpy .ndarray ], stats : Optional [Dict [str , numpy .ndarray ]] = None
122
122
):
123
123
stat = {f"__stat_{ sname } " : svals for sname , svals in (stats or {}).items ()}
124
- params = {"_draw_idx" : self ._draw_idx , ** draw , ** stat }
124
+ params : Dict [ str , Any ] = {"_draw_idx" : self ._draw_idx , ** draw , ** stat }
125
125
self ._draw_idx += 1
126
126
if not self ._insert_query :
127
127
names = ", " .join (params .keys ())
@@ -186,9 +186,10 @@ def _get_rows( # pylint: disable=W0221
186
186
187
187
# The unpacking must also account for non-rigid shapes
188
188
if is_rigid (nshape ):
189
+ assert nshape is not None
189
190
buffer = numpy .empty ((draws , * nshape ), dtype )
190
191
else :
191
- buffer = numpy .repeat ( None , draws )
192
+ buffer = numpy .array ([ None ] * draws )
192
193
for d , (vals ,) in enumerate (data ):
193
194
buffer [d ] = numpy .asarray (vals , dtype )
194
195
return buffer
@@ -228,23 +229,21 @@ def __init__(
228
229
self .created_at = created_at
229
230
# We need handles on the chains to commit their batched inserts
230
231
# before returning them to callers of `.get_chains()`.
231
- self ._chains = None
232
+ self ._chains : List [ ClickHouseChain ] = []
232
233
super ().__init__ (meta )
233
234
234
235
def init_chain (self , chain_number : int ) -> ClickHouseChain :
235
236
cmeta = ChainMeta (self .meta .rid , chain_number )
236
237
create_chain_table (self ._client , cmeta , self .meta )
237
238
chain = ClickHouseChain (cmeta , self .meta , client = self ._client_fn ())
238
- if self ._chains is None :
239
- self ._chains = []
240
239
self ._chains .append (chain )
241
240
return chain
242
241
243
- def get_chains (self ) -> Tuple [ClickHouseChain ]:
242
+ def get_chains (self ) -> Tuple [ClickHouseChain , ... ]:
244
243
# Preferably return existing handles on chains that might have
245
244
# uncommitted inserts pending.
246
245
if self ._chains :
247
- return self ._chains
246
+ return tuple ( self ._chains )
248
247
249
248
# Otherwise fetch existing chains from the DB.
250
249
chains = []
@@ -274,13 +273,14 @@ def __init__(
274
273
"""
275
274
if client is None and client_fn is None :
276
275
raise ValueError ("Either a `client` or a `client_fn` must be provided." )
277
- self ._client_fn = client_fn
278
- self ._client = client
279
276
280
277
if client_fn is None :
281
- self . _client_fn = lambda : client
278
+ client_fn = lambda : client
282
279
if client is None :
283
- self ._client = self ._client_fn ()
280
+ client = client_fn ()
281
+
282
+ self ._client_fn : Callable [[], clickhouse_driver .Client ] = client_fn
283
+ self ._client : clickhouse_driver .Client = client
284
284
285
285
create_runs_table (self ._client )
286
286
super ().__init__ ()
0 commit comments