Skip to content

Commit e7c7f13

Browse files
committed
Add helper function to compute concurrency params
1 parent d87f8b6 commit e7c7f13

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed

tests/test_helper.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import tokenomics_decentralization.helper as hlp
2+
from collections import namedtuple
23
import pathlib
34
import os
45
import datetime
@@ -485,3 +486,28 @@ def test_get_clusters(mocker):
485486
assert clusters['entity1'] == clusters['entity3']
486487
assert clusters['entity4'] == clusters['entity5']
487488
assert 'entity7' not in clusters.keys()
489+
490+
491+
def test_get_concurrency_per_ledger(mocker):
492+
psutil_memory_mock = mocker.patch('psutil.virtual_memory')
493+
psutil_memory_mock.return_value = namedtuple('VM', 'total')(10*10**9)
494+
495+
get_input_directories_mock = mocker.patch('tokenomics_decentralization.helper.get_input_directories')
496+
get_input_directories_mock.return_value = [pathlib.Path('/').resolve()]
497+
498+
get_ledgers_mock = mocker.patch('tokenomics_decentralization.helper.get_ledgers')
499+
get_ledgers_mock.return_value = ['bitcoin', 'ethereum']
500+
501+
os_walk_mock = mocker.patch('os.walk')
502+
os_walk_mock.return_value = [('/', 'foo', ['bitcoin_2010-01-01_raw_data.csv'])]
503+
504+
os_stat_mock = mocker.patch('os.stat')
505+
os_stat_mock.return_value = namedtuple('ST', 'st_size')(10*10**8)
506+
507+
concurrency = hlp.get_concurrency_per_ledger()
508+
assert concurrency == {'bitcoin': 3, 'ethereum': 1}
509+
510+
os_stat_mock.return_value = namedtuple('ST', 'st_size')(5*10**9)
511+
512+
with pytest.raises(ValueError):
513+
hlp.get_concurrency_per_ledger()

tokenomics_decentralization/helper.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import datetime
88
import calendar
9+
import psutil
910
import json
1011
from collections import defaultdict
1112
import logging
@@ -606,3 +607,41 @@ def get_clusters(ledger):
606607
cluster_mapping[item[0]] = cluster_name
607608

608609
return cluster_mapping
610+
611+
612+
def get_concurrency_per_ledger():
613+
"""
614+
Computes the maximum number of parallel processes that can run per ledger,
615+
based on the system's available memory.
616+
:returns: a dictionary where the keys are ledger names and values are integers
617+
"""
618+
system_memory_total = psutil.virtual_memory().total # Get the system's total memory
619+
system_memory_total -= 10**9 # Leave 1GB of memory to be used by other processes
620+
621+
concurrency = {}
622+
too_large_ledgers = set()
623+
input_dirs = get_input_directories()
624+
for ledger in get_ledgers():
625+
# Find the size of the largest input file per ledger
626+
max_file_size = 0
627+
for input_dir in input_dirs:
628+
for folder, _, files in os.walk(input_dir):
629+
for file in files:
630+
if file.startswith(ledger):
631+
max_file_size = max(max_file_size, os.stat(os.path.join(folder, file)).st_size)
632+
# Compute the max number of processes that can open the largest ledger file
633+
# and run in parallel without exhausting the system's memory.
634+
if max_file_size > 0:
635+
# When loaded in (a dict in) memory, each file consumes approx. 2.5x space compared to storage.
636+
concurrency[ledger] = int(system_memory_total / (2.5 * max_file_size))
637+
# Find if some ledger files are too large to fit in the system's available memory.
638+
if concurrency[ledger] == 0:
639+
too_large_ledgers.add(ledger)
640+
else:
641+
concurrency[ledger] = 1
642+
643+
if too_large_ledgers:
644+
raise ValueError('The max input files of the following ledgers are too'
645+
'large to load in memory' + ','.join(too_large_ledgers))
646+
647+
return concurrency

0 commit comments

Comments
 (0)