99 data_collection_scripts directory of the project under the name 'google-service-account-key-0.json'. Any additional
1010 keys should be named 'google-service-account-key-1.json', 'google-service-account-key-2.json', etc.
1111"""
12+ import json
1213import google .cloud .bigquery as bq
1314import csv
1415from yaml import safe_load
1516import logging
1617import argparse
1718import tokenomics_decentralization .helper as hlp
19+ from datetime import datetime
1820
1921
20- def collect_data (ledgers , snapshot_dates , force_query ):
22+ def collect_data (ledger_snapshot_dates , force_query ):
2123 input_dir = hlp .get_input_directories ()[0 ]
2224 root_dir = hlp .ROOT_DIR
2325 if not input_dir .is_dir ():
@@ -28,15 +30,17 @@ def collect_data(ledgers, snapshot_dates, force_query):
2830
2931 i = 0
3032 all_quota_exceeded = False
33+ ledger_last_updates = dict .fromkeys (ledger_snapshot_dates .keys ())
3134
32- for ledger in ledgers :
35+ for ledger , snapshot_dates in ledger_snapshot_dates . items () :
3336 for date in snapshot_dates :
3437 if all_quota_exceeded :
35- break
38+ return ledger_last_updates
3639 file = input_dir / f'{ ledger } _{ date } _raw_data.csv'
3740 if not force_query and file .is_file ():
3841 logging .info (f'{ ledger } data for { date } already exists locally. '
3942 f'For querying { ledger } anyway please run the script using the flag --force-query' )
43+ ledger_last_updates [ledger ] = date
4044 continue
4145 logging .info (f"Querying { ledger } at snapshot { date } .." )
4246
@@ -49,7 +53,7 @@ def collect_data(ledgers, snapshot_dates, force_query):
4953 except FileNotFoundError :
5054 logging .info (f'Exhausted all { i } service account keys. Aborting..' )
5155 all_quota_exceeded = True
52- break
56+ return ledger_last_updates
5357 query_job = client .query (query )
5458 try :
5559 rows = query_job .result ()
@@ -72,24 +76,73 @@ def collect_data(ledgers, snapshot_dates, force_query):
7276 writer .writerow ([field .name for field in rows .schema ])
7377 writer .writerows (rows )
7478 logging .info (f'Done writing { ledger } data to file.\n ' )
79+ ledger_last_updates [ledger ] = date
80+ return ledger_last_updates
81+
82+
83+ def get_from_dates (granularity ):
84+ """
85+ Get the dates from which to start querying for each ledger, which corresponds to the last updated date + the granularity
86+ (e.g. the month following the last update if granularity is 'month').
87+ :param granularity: The granularity of the data collection. Can be 'day', 'week', 'month', or 'year'.
88+ :return: A dictionary with ledgers as keys and the corresponding start dates (or None if no date is set) as values.
89+ """
90+ with open (hlp .ROOT_DIR / "data_collection_scripts/last_update.json" ) as f :
91+ last_update = json .load (f )
92+ last_update = last_update [granularity ]
93+ from_dates = {}
94+ for ledger in last_update :
95+ ledger_from_date = last_update [ledger ]
96+ if ledger_from_date is None :
97+ from_dates [ledger ] = None
98+ else :
99+ from_dates [ledger ] = hlp .increment_date (date = hlp .get_date_beginning (last_update [ledger ]), by = granularity )
100+ return from_dates
101+
102+
103+ def update_last_update (ledger_last_updates ):
104+ """
105+ Update the last_update.json file with the last date for which data was collected for each ledger.
106+ :param ledger_last_updates: A dictionary with the ledgers for which data was collected and the last date for which data was collected for each of them.
107+ """
108+ filepath = hlp .ROOT_DIR / "data_collection_scripts/last_update.json"
109+ with open (filepath ) as f :
110+ last_update = json .load (f )
111+ for ledger , date in ledger_last_updates .items ():
112+ if date is not None :
113+ last_update [ledger ] = date
114+ with open (filepath , 'w' ) as f :
115+ json .dump (last_update , f )
75116
76117
77118if __name__ == '__main__' :
78119 logging .basicConfig (format = '[%(asctime)s] %(message)s' , datefmt = '%Y/%m/%d %I:%M:%S %p' , level = logging .INFO )
79120
80121 default_ledgers = hlp .get_ledgers ()
81- default_snapshot_dates = hlp .get_snapshot_dates ()
82122
83123 parser = argparse .ArgumentParser ()
84124 parser .add_argument ('--ledgers' , nargs = "*" , type = str .lower , default = default_ledgers ,
85125 choices = [ledger for ledger in default_ledgers ], help = 'The ledgers to collect data for.' )
86- parser .add_argument ('--snapshot_dates' , nargs = "*" , type = hlp .valid_date , default = default_snapshot_dates ,
87- help = 'The dates to collect data for.' )
126+ parser .add_argument ('--to_date' , type = hlp .valid_date ,
127+ default = datetime .today ().strftime ('%Y-%m-%d' ),
128+ help = 'The date until which to get data for (YYYY-MM-DD format). Defaults to today.' )
88129 parser .add_argument ('--force-query' , action = 'store_true' ,
89130 help = 'Flag to specify whether to query for project data regardless if the relevant data '
90131 'already exist.' )
91132 args = parser .parse_args ()
92133
93- snapshot_dates = [hlp .get_date_string_from_date (hlp .get_date_beginning (date )) for date in args .snapshot_dates ]
94-
95- collect_data (ledgers = args .ledgers , snapshot_dates = snapshot_dates , force_query = args .force_query )
134+ to_date = hlp .get_date_beginning (args .to_date )
135+ ledgers = args .ledgers
136+ granularity = hlp .get_granularity ()
137+ if granularity is None :
138+ # if no granularity is set, only the given snapshot date is queried
139+ ledger_snapshot_dates = {ledger : [hlp .get_date_string_from_date (to_date )] for ledger in ledgers }
140+ else :
141+ default_from_date = hlp .get_date_beginning (hlp .get_snapshot_dates ()[0 ])
142+ ledger_from_dates = get_from_dates (granularity = granularity )
143+ ledger_snapshot_dates = dict ()
144+ for ledger in ledgers :
145+ from_date = ledger_from_dates [ledger ] if ledger in ledger_from_dates and ledger_from_dates [ledger ] is not None else default_from_date
146+ ledger_snapshot_dates [ledger ] = hlp .get_dates_between (from_date , to_date , granularity )
147+ ledger_last_updates = collect_data (ledger_snapshot_dates = ledger_snapshot_dates , force_query = args .force_query )
148+ update_last_update (ledger_last_updates = ledger_last_updates )
0 commit comments