We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 5259955 commit 3e97eccCopy full SHA for 3e97ecc
src/silvimetric/commands/shatter.py
@@ -194,14 +194,19 @@ def kill_gracefully(signum, frame):
194
195
## If dask is distributed, use the futures feature
196
dc = get_client()
197
+ consolidate_count = 1000
198
+ count = 0
199
if dc is not None:
200
pc_futures = futures_of(processes.persist())
201
for batch in as_completed(pc_futures, with_results=True).batches():
- storage.consolidate_shatter(config.timestamp)
202
for _, pack in batch:
203
if isinstance(pack, CancelledError):
204
continue
205
for pc in pack:
206
+ count += 1
207
+ if count >= consolidate_count:
208
+ storage.consolidate_shatter(config.timestamp)
209
210
config.point_count = config.point_count + pc
211
del pc
212
0 commit comments