Skip to content

Commit 4816892

Browse files
Performance updates (#152)
* Pass sorted block distribution to metrics functions * Release parsed/mapped data variables to reduce memory * Fix tests * Fix typo in max entropy computation
1 parent 4484086 commit 4816892

File tree

12 files changed

+109
-114
lines changed

12 files changed

+109
-114
lines changed

consensus_decentralization/aggregate.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,15 @@ class Aggregator:
1313
blocks they produced
1414
"""
1515

16-
def __init__(self, project, io_dir, data_to_aggregate):
16+
def __init__(self, project, io_dir):
1717
"""
1818
:param project: str. Name of the project
1919
:param io_dir: Path. Path to the project's output directory
20-
:param data_to_aggregate: list of dictionaries, sorted by 'timestamp'; the data that will be aggregated
2120
"""
2221
self.project = project
23-
self.data_to_aggregate = data_to_aggregate
24-
self.data_start_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(data_to_aggregate[0]))
25-
self.data_end_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(data_to_aggregate[-1]))
22+
self.data_to_aggregate = hlp.read_mapped_project_data(io_dir)
23+
self.data_start_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(self.data_to_aggregate[0]))
24+
self.data_end_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(self.data_to_aggregate[-1]))
2625
self.aggregated_data_dir = io_dir / 'blocks_per_entity'
2726
self.aggregated_data_dir.mkdir(parents=True, exist_ok=True)
2827

@@ -91,7 +90,7 @@ def divide_timeframe(timeframe, granularity):
9190
return list(zip(start_dates, end_dates))
9291

9392

94-
def aggregate(project, output_dir, timeframe, aggregate_by, force_aggregate, mapped_data=None):
93+
def aggregate(project, output_dir, timeframe, aggregate_by, force_aggregate):
9594
"""
9695
Aggregates the results of the mapping process for the given project and timeframe. The results are saved in a csv
9796
file in the project's output directory. Note that the output file is created (just with the headers) even if there
@@ -103,15 +102,11 @@ def aggregate(project, output_dir, timeframe, aggregate_by, force_aggregate, map
103102
year, all
104103
:param force_aggregate: bool. If True, then the aggregation will be performed, regardless of whether aggregated
105104
data for the project and specified granularity already exist
106-
:param mapped_data: list of dictionaries (the data that will be aggregated). If None, then the data will be read
107-
from the project's output directory
108105
:returns: a list of strings that correspond to the time chunks of the aggregation or None if no aggregation took
109106
place (the corresponding output file already existed and force_aggregate was set to False)
110107
"""
111108
project_io_dir = output_dir / project
112-
if mapped_data is None:
113-
mapped_data = hlp.read_mapped_project_data(project_io_dir)
114-
aggregator = Aggregator(project, project_io_dir, mapped_data)
109+
aggregator = Aggregator(project, project_io_dir)
115110

116111
filename = hlp.get_blocks_per_entity_filename(aggregate_by=aggregate_by, timeframe=timeframe)
117112
output_file = aggregator.aggregated_data_dir / filename

consensus_decentralization/analyze.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def analyze(projects, aggregated_data_filename, output_dir):
3939
csv_contents[metric] = [['timeframe'] + projects]
4040

4141
for column_index, project in enumerate(projects):
42+
logging.info(f'Calculating {project} metrics')
4243
aggregate_output[project] = {}
4344
aggregated_data_dir = output_dir / project / 'blocks_per_entity'
4445
time_chunks, blocks_per_entity = hlp.get_blocks_per_entity_from_file(aggregated_data_dir / aggregated_data_filename)
@@ -50,24 +51,26 @@ def analyze(projects, aggregated_data_filename, output_dir):
5051
for tchunk, nblocks in block_values.items():
5152
if nblocks > 0:
5253
chunks_with_blocks.add(tchunk)
53-
for metric_name, metric, param in metric_params:
54-
logging.info(f'Calculating {metric_name}')
5554

56-
for row_index, time_chunk in enumerate(time_chunks):
57-
time_chunk_blocks_per_entity = {}
58-
if column_index == 0:
55+
for row_index, time_chunk in enumerate(time_chunks):
56+
time_chunk_blocks_per_entity = {}
57+
if column_index == 0:
58+
for metric_name, _, _ in metric_params:
5959
csv_contents[metric_name].append([time_chunk])
60-
if time_chunk in chunks_with_blocks:
61-
for entity, block_values in blocks_per_entity.items():
62-
try:
63-
time_chunk_blocks_per_entity[entity] = block_values[time_chunk]
64-
except KeyError:
65-
time_chunk_blocks_per_entity[entity] = 0
60+
if time_chunk in chunks_with_blocks:
61+
for entity, block_values in blocks_per_entity.items():
62+
try:
63+
time_chunk_blocks_per_entity[entity] = block_values[time_chunk]
64+
except KeyError:
65+
time_chunk_blocks_per_entity[entity] = 0
66+
sorted_time_chunk_blocks = sorted(time_chunk_blocks_per_entity.values(), reverse=True)
67+
68+
for metric_name, metric, param in metric_params:
6669
func = eval(f'compute_{metric}')
6770
if param:
68-
result = func(time_chunk_blocks_per_entity, param)
71+
result = func(sorted_time_chunk_blocks, param)
6972
else:
70-
result = func(time_chunk_blocks_per_entity)
73+
result = func(sorted_time_chunk_blocks)
7174
csv_contents[metric_name][row_index + 1].append(result)
7275
aggregate_output[project][time_chunk][metric_name] = result
7376

consensus_decentralization/metrics/entropy.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
from math import log
22

33

4-
def compute_entropy(blocks_per_entity, alpha):
4+
def compute_entropy(block_distribution, alpha):
55
"""
66
Calculates the entropy of a distribution of blocks to entities
77
Pi is the relative frequency of each entity.
88
Renyi entropy: 1/(1-alpha) * log2 (sum (Pi**alpha))
99
Shannon entropy (alpha=1): −sum P(Si) log2 (Pi)
1010
Min entropy (alpha=-1): -log max Pi
11-
:param blocks_per_entity: a dictionary with entities and the blocks they have produced
11+
:param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order
1212
:param alpha: the entropy parameter (depending on its value the corresponding entropy measure is used)
1313
:returns: a float that represents the entropy of the data or None if the data is empty
1414
"""
15-
block_distribution = blocks_per_entity.values()
1615
all_blocks = sum(block_distribution)
1716
if all_blocks == 0:
1817
return None
@@ -35,13 +34,13 @@ def compute_entropy(blocks_per_entity, alpha):
3534

3635

3736
def compute_max_entropy(num_entities, alpha):
38-
return compute_entropy({i: 1 for i in range(num_entities)}, alpha)
37+
return compute_entropy([1 for i in range(num_entities)], alpha)
3938

4039

41-
def compute_entropy_percentage(blocks_per_entity, alpha):
42-
if sum(blocks_per_entity.values()) == 0:
40+
def compute_entropy_percentage(block_distribution, alpha):
41+
if sum(block_distribution) == 0:
4342
return None
4443
try:
45-
return compute_entropy(blocks_per_entity, alpha) / compute_max_entropy(len(blocks_per_entity), alpha)
44+
return compute_entropy(block_distribution, alpha) / compute_max_entropy(len(block_distribution), alpha)
4645
except ZeroDivisionError:
4746
return 0

consensus_decentralization/metrics/gini.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import numpy as np
22

33

4-
def compute_gini(blocks_per_entity):
4+
def compute_gini(block_distribution):
55
"""
66
Calculates the Gini coefficient of a distribution of blocks to entities
7-
:param blocks_per_entity: a dictionary with entities and the blocks they have produced
7+
:param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order
88
:returns: a float that represents the Gini coefficient of the given distribution or None if the data is empty
99
"""
10-
if sum(blocks_per_entity.values()) == 0:
10+
if sum(block_distribution) == 0:
1111
return None
12-
array = np.array(list(blocks_per_entity.values()))
12+
array = np.array(block_distribution)
1313
return gini(array)
1414

1515

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1-
def compute_hhi(blocks_per_entity):
1+
def compute_hhi(block_distribution):
22
"""
33
Calculates the Herfindahl-Hirschman index of a distribution of blocks to entities
44
From investopedia: The HHI is calculated by squaring the market share of each firm competing in a market and then
55
summing the resulting numbers. It can range from close to 0 to 10,000, with lower values indicating a less
66
concentrated market. The U.S. Department of Justice considers a market with an HHI of less than 1,500 to be a
77
competitive marketplace, an HHI of 1,500 to 2,500 to be a moderately concentrated marketplace,
88
and an HHI of 2,500 or greater to be a highly concentrated marketplace.
9-
:param blocks_per_entity: a dictionary with entities and the blocks they have produced
9+
:param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order
1010
:return: float between 0 and 10,000 that represents the HHI of the given distribution or None if the data is empty
1111
"""
12-
total_blocks = sum(blocks_per_entity.values())
12+
total_blocks = sum(block_distribution)
1313
if total_blocks == 0:
1414
return None
1515

1616
hhi = 0
17-
for num_blocks in blocks_per_entity.values():
18-
hhi += pow(num_blocks / total_blocks * 100, 2)
17+
for num_blocks in block_distribution:
18+
hhi += pow(100 * num_blocks / total_blocks, 2)
1919

2020
return hhi
Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
def compute_max_power_ratio(blocks_per_entity):
1+
def compute_max_power_ratio(block_distribution):
22
"""
33
Calculates the maximum power ratio of a distribution of balances
4-
:param blocks_per_entity: a dictionary with entities and the blocks they have produced
4+
:param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order
55
:returns: float that represents the maximum power ratio among all block producers (0 if there weren't any)
66
"""
7-
if len(blocks_per_entity) == 0:
8-
return 0
9-
max_nblocks = max(blocks_per_entity.values())
10-
total_blocks = sum(blocks_per_entity.values())
11-
return max_nblocks / total_blocks if total_blocks > 0 else 0
7+
total_blocks = sum(block_distribution)
8+
return block_distribution[0] / total_blocks if total_blocks else 0
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from consensus_decentralization.metrics.tau_index import compute_tau_index
22

33

4-
def compute_nakamoto_coefficient(blocks_per_entity):
4+
def compute_nakamoto_coefficient(block_distribution):
55
"""
66
Calculates the Nakamoto coefficient of a distribution of blocks to entities
7-
:param blocks_per_entity: a dictionary with entities and the blocks they have produced
7+
:param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order
88
:returns: int that represents the Nakamoto coefficient of the given distribution, or None if the data is empty
99
"""
10-
return compute_tau_index(blocks_per_entity=blocks_per_entity, threshold=0.5)
10+
return compute_tau_index(block_distribution, 0.5)
Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
def compute_tau_index(blocks_per_entity, threshold):
1+
def compute_tau_index(block_distribution, threshold):
22
"""
33
Calculates the tau-decentralization index of a distribution of blocks
4-
:param blocks_per_entity: a dictionary with entities and the blocks they have produced
4+
:param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order
55
:param threshold: float, the parameter of the tau-decentralization index, i.e. the threshold for the power
66
ratio that is captured by the index (e.g. 0.66 for 66%)
77
:returns: int that corresponds to the tau index of the given distribution, or None if there were no blocks
88
"""
9-
total_blocks = sum(blocks_per_entity.values())
9+
total_blocks = sum(block_distribution)
1010
if total_blocks == 0:
1111
return None
1212
tau_index, power_ratio_covered = 0, 0
13-
blocks_per_entity_copy = blocks_per_entity.copy()
14-
while power_ratio_covered < threshold:
15-
current_max_entity = max(blocks_per_entity_copy, key=blocks_per_entity_copy.get)
13+
for block_amount in block_distribution:
14+
if power_ratio_covered >= threshold:
15+
break
1616
tau_index += 1
17-
power_ratio_covered += blocks_per_entity_copy[current_max_entity] / total_blocks
18-
del blocks_per_entity_copy[current_max_entity]
17+
power_ratio_covered += block_amount / total_blocks
1918
return tau_index

consensus_decentralization/metrics/theil_index.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
from math import log
22

33

4-
def compute_theil_index(blocks_per_entity):
4+
def compute_theil_index(block_distribution):
55
"""
66
Calculates the Thiel index of a distribution of blocks to entities
7-
:param blocks_per_entity: a dictionary with entities and the blocks they have produced
7+
:param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order
88
:returns: float that represents the Thiel index of the given distribution
99
"""
10-
n = len(blocks_per_entity)
10+
n = len(block_distribution)
1111
if n == 0:
1212
return 0
13-
total_blocks = sum(blocks_per_entity.values())
13+
total_blocks = sum(block_distribution)
1414
mu = total_blocks / n
1515
theil = 0
16-
for nblocks in blocks_per_entity.values():
16+
for nblocks in block_distribution:
1717
x = nblocks / mu
1818
if x > 0:
1919
theil += x * log(x)

run.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@
1010
logging.basicConfig(format='[%(asctime)s] %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p', level=logging.INFO)
1111

1212

13+
def process_data(force_map, project_dir, project, output_dir):
14+
mapped_data_file = project_dir / 'mapped_data.json'
15+
if force_map or not mapped_data_file.is_file():
16+
parsed_data = parse(project=project, input_dir=hlp.RAW_DATA_DIR)
17+
apply_mapping(project=project, parsed_data=parsed_data, output_dir=output_dir)
18+
19+
1320
def main(projects, timeframe, aggregate_by, force_map, make_plots, make_animated_plots, output_dir=hlp.OUTPUT_DIR):
1421
"""
1522
Executes the entire pipeline (parsing, mapping, analyzing) for some projects and timeframes.
@@ -28,19 +35,15 @@ def main(projects, timeframe, aggregate_by, force_map, make_plots, make_animated
2835
for project in projects:
2936
project_dir = output_dir / project
3037
project_dir.mkdir(parents=True, exist_ok=True) # create project output directory if it doesn't already exist
31-
mapped_data_file = project_dir / 'mapped_data.json'
32-
if force_map or not mapped_data_file.is_file():
33-
parsed_data = parse(project=project, input_dir=hlp.RAW_DATA_DIR)
34-
mapped_data = apply_mapping(project=project, parsed_data=parsed_data, output_dir=output_dir)
35-
else:
36-
mapped_data = None
38+
39+
process_data(force_map, project_dir, project, output_dir)
40+
3741
aggregate(
3842
project=project,
3943
output_dir=output_dir,
4044
timeframe=timeframe,
4145
aggregate_by=aggregate_by,
42-
force_aggregate=force_map,
43-
mapped_data=mapped_data
46+
force_aggregate=force_map
4447
)
4548

4649
used_metrics = analyze(

0 commit comments

Comments
 (0)