Skip to content

Commit 420b537

Browse files
committed
Make parsing more efficient
1 parent 7c6b3e8 commit 420b537

File tree

5 files changed

+33
-13
lines changed

5 files changed

+33
-13
lines changed

consensus_decentralization/aggregate.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@ class Aggregator:
1212
blocks they produced
1313
"""
1414

15-
def __init__(self, project, io_dir):
15+
def __init__(self, project, io_dir, mapped_data=None):
1616
"""
1717
:param project: str. Name of the project
1818
:param io_dir: Path. Path to the project's output directory
1919
"""
2020
self.project = project
21-
self.data_to_aggregate = hlp.read_mapped_project_data(io_dir)
21+
self.data_to_aggregate = hlp.read_mapped_project_data(io_dir) if mapped_data is None else mapped_data
2222
self.data_start_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(self.data_to_aggregate[0]))
2323
self.data_end_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(self.data_to_aggregate[-1]))
24-
self.aggregated_data_dir = io_dir / 'blocks_per_entity'
24+
self.aggregated_data_dir = io_dir / hlp.get_aggregated_data_dir_name(hlp.get_clustering_flag())
2525
self.aggregated_data_dir.mkdir(parents=True, exist_ok=True)
2626

2727
self.monthly_data_breaking_points = [(self.data_start_date.strftime('%Y-%m'), 0)]
@@ -89,7 +89,7 @@ def divide_timeframe(timeframe, estimation_window, frequency):
8989
return time_chunks
9090

9191

92-
def aggregate(project, output_dir, timeframe, estimation_window, frequency, force_aggregate):
92+
def aggregate(project, output_dir, timeframe, estimation_window, frequency, force_aggregate, mapped_data=None):
9393
"""
9494
Aggregates the results of the mapping process for the given project and timeframe. The results are saved in a csv
9595
file in the project's output directory. Note that the output file is created (just with the headers) even if there
@@ -113,7 +113,7 @@ def aggregate(project, output_dir, timeframe, estimation_window, frequency, forc
113113
raise ValueError('The estimation window is too large for the given timeframe')
114114

115115
project_io_dir = output_dir / project
116-
aggregator = Aggregator(project, project_io_dir)
116+
aggregator = Aggregator(project, project_io_dir, mapped_data=mapped_data)
117117

118118
filename = hlp.get_blocks_per_entity_filename(timeframe=timeframe, estimation_window=estimation_window, frequency=frequency)
119119
output_file = aggregator.aggregated_data_dir / filename

consensus_decentralization/mappings/default_mapping.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
23
import consensus_decentralization.helper as hlp
34

45

@@ -9,8 +10,6 @@ class DefaultMapping:
910
1011
:ivar project_name: the name of the project associated with a specific mapping instance
1112
:ivar output_dir: the directory that includes the parsed data related to the project
12-
:ivar mapped_data_dir: the directory to save the mapped data files in
13-
:ivar multi_pool_dir: the directory to save the multi pool data files in
1413
:ivar data_to_map: a list with the parsed data of the project (list of dictionaries with block information
1514
:ivar special_addresses: a set with the special addresses of the project (addresses that don't count in the
1615
context of out analysis)
@@ -45,7 +44,7 @@ def perform_mapping(self):
4544
project.
4645
:returns: a list of dictionaries (mapped block data)
4746
"""
48-
clustering_flag = hlp.get_config_data()['analyze_flags']['clustering']
47+
clustering_flag = hlp.get_clustering_flag()
4948
for block in self.data_to_map:
5049
if not clustering_flag:
5150
entity = self.fallback_mapping(block)
@@ -83,7 +82,7 @@ def perform_mapping(self):
8382
})
8483

8584
if len(self.mapped_data) > 0:
86-
self.write_mapped_data()
85+
self.write_mapped_data(clustering_flag)
8786
self.write_multi_pool_files()
8887

8988
return self.mapped_data
@@ -187,11 +186,12 @@ def write_multi_pool_files(self):
187186
with open(self.output_dir / 'multi_pool_blocks.csv', 'w') as f:
188187
f.write('Block No,Timestamp,Entities\n' + '\n'.join(self.multi_pool_blocks))
189188

190-
def write_mapped_data(self):
189+
def write_mapped_data(self, clustering_flag):
191190
"""
192191
Writes the mapped data into a file in a directory associated with the mapping instance. Specifically,
193192
into a folder named after the project, inside the general output directory
193+
:param clustering_flag: boolean, indicating whether clustering was used in the mapping process
194194
"""
195-
filename = 'mapped_data.json'
195+
filename = hlp.get_mapped_data_filename(clustering_flag)
196196
with open(self.output_dir / filename, 'w') as f:
197197
json.dump(self.mapped_data, f, indent=4)

consensus_decentralization/mappings/dummy_mapping.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from consensus_decentralization.mappings.default_mapping import DefaultMapping
2+
import consensus_decentralization.helper as hlp
23

34

45
class DummyMapping(DefaultMapping):
@@ -28,6 +29,6 @@ def perform_mapping(self):
2829
})
2930

3031
if len(self.mapped_data) > 0:
31-
self.write_mapped_data()
32+
self.write_mapped_data(hlp.get_clustering_flag())
3233

3334
return self.mapped_data

consensus_decentralization/parsers/dummy_parser.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ def parse(self):
2424
directory associated with the parser instance (specifically in <general output directory>/<project_name>)
2525
"""
2626
data = self.read_and_sort_data()
27+
2728
for block in data:
2829
if 'identifiers' not in block.keys():
2930
block['identifiers'] = None
3031
else:
3132
block['identifiers'] = self.parse_identifiers(block['identifiers'])
3233
if 'reward_addresses' not in block.keys():
3334
block['reward_addresses'] = None
34-
return data
35+
yield block

consensus_decentralization/parsers/ethereum_parser.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from consensus_decentralization.parsers.dummy_parser import DummyParser
2+
import json
23

34

45
class EthereumParser(DummyParser):
@@ -20,3 +21,20 @@ def parse_identifiers(block_identifiers):
2021
return bytes.fromhex(block_identifiers[2:]).decode('utf-8')
2122
except (UnicodeDecodeError, ValueError):
2223
return block_identifiers
24+
25+
def read_and_sort_data(self):
26+
"""
27+
Reads the "raw" block data associated with the project
28+
:returns: a list of dictionaries (block data) sorted by timestamp
29+
Note that the current version does not sort the data (because it is too memory-intensive) but assumes that the
30+
data are already sorted (which is generally the case given the suggested queries).
31+
"""
32+
filename = f'{self.project_name}_raw_data.json'
33+
filepath = self.input_dir / filename
34+
35+
def generate_data():
36+
with open(filepath) as f:
37+
for line in f:
38+
yield json.loads(line.strip())
39+
40+
return generate_data()

0 commit comments

Comments
 (0)