Skip to content
88 changes: 65 additions & 23 deletions scripts/world_bank/datasets/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
output = os.path.join(_GCS_OUTPUT_DIR, 'output')
output_file_path = os.path.join(output, 'transformed_data_for_all_final.csv')

places_csv = os.path.join(_MODULE_DIR, 'places.csv')
skip_places_csv = os.path.join(_MODULE_DIR, 'skip_places.csv')

_UTIL_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(_UTIL_DIR, '../../../util/'))
import file_util
Expand All @@ -27,6 +30,24 @@
flags.DEFINE_string("historical_file", "bq-results-20250423.csv",
"historical file name")

DCID_MAP = {}
IGNORE_DCIDS = set()

try:
with open(places_csv, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
DCID_MAP[row['country code'].strip()] = row['dcid'].strip()
except Exception as e:
logging.fatal(f"Failed to load dcid mapping csv: {e}")

try:
with open(skip_places_csv, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
IGNORE_DCIDS.add(row['country code'].strip())
except Exception as e:
logging.fatal(f"Failed to load ignore csv: {e}")

def transform_worldbank_csv(input_filename,
writer,
Expand Down Expand Up @@ -57,7 +78,6 @@ def transform_worldbank_csv(input_filename,
break
except ValueError:
pass
"""Ignoring ValueError: expecting integer for year column identification."""
except ValueError:
logging.info(
f"Error: Could not find required columns in header of '{input_filename}'."
Expand All @@ -72,27 +92,42 @@ def transform_worldbank_csv(input_filename,
elif i >= data_start_row:
if header and indicator_code_column_index is not None and \
country_code_column_index is not None and year_columns_start_index is not None:
indicator_code = row[indicator_code_column_index].strip(
)
country_code = "country/" + row[
indicator_code = row[indicator_code_column_index].strip()
raw_country_code = "country/" + row[
country_code_column_index].strip()
stat_var = "worldBank/" + indicator_code.replace(
'.', '_')

if raw_country_code in IGNORE_DCIDS:
continue

if raw_country_code in DCID_MAP:
country_code = DCID_MAP[raw_country_code]
else:
country_code = raw_country_code

stat_var = "worldBank/" + indicator_code.replace('.', '_')

unit_value = get_unit_by_indicator(indicator_code)
for j in range(year_columns_start_index, len(row)):
year = header[j]
value = row[j].strip()
if value:
"""Keeping the first occurrence and removing subsequent duplicates. Verified with source and production; the initial value from the source now is matching with the production data(checked for 4-5 samples) ."""
duplicate_key = (indicator_code, stat_var,
MEASUREMENT_METHOD,
country_code, year, unit_value)
duplicate_key = (
indicator_code,
stat_var,
MEASUREMENT_METHOD,
country_code,
year,
unit_value
)
if duplicate_key not in processed_rows:
writer.writerow([
indicator_code, stat_var,
MEASUREMENT_METHOD, country_code, year,
value, unit_value
indicator_code,
stat_var,
MEASUREMENT_METHOD,
country_code,
year,
value,
unit_value
])
processed_rows.add(duplicate_key)

Expand Down Expand Up @@ -124,7 +159,7 @@ def get_unit_by_indicator(target_indicator_code):
return ""
except FileNotFoundError:
return ""
except Exception as e:
except Exception:
return ""


Expand All @@ -136,26 +171,31 @@ def main(_):
]
try:
os.makedirs(output, exist_ok=True)
with open(output_file_path, 'w', newline='',
encoding='utf-8') as outfile:
with open(output_file_path, 'w', newline='', encoding='utf-8') as outfile:
writer = csv.writer(outfile)
first_file_processed = False

for input_file in input_files:
logging.info(f"Processing: {input_file}")
transform_worldbank_csv(input_file,
writer,
write_header=not first_file_processed)
transform_worldbank_csv(
input_file,
writer,
write_header=not first_file_processed
)
first_file_processed = True

logging.info(
f"\nSuccessfully processed {len(input_files)} files. Combined output written to '{output_file_path}'"
)
# historical_file = "bq-results-20250423.csv"
file_util.file_copy(f'{FLAGS.gs_path}{FLAGS.historical_file}',
f'{output}/{FLAGS.historical_file}')

file_util.file_copy(
f'{FLAGS.gs_path}{FLAGS.historical_file}',
f'{output}/{FLAGS.historical_file}'
)

expected_output_files = [
FLAGS.historical_file, 'transformed_data_for_all_final.csv'
FLAGS.historical_file,
'transformed_data_for_all_final.csv'
]
actual_files = os.listdir(output)
if Counter(expected_output_files) != Counter(actual_files):
Expand All @@ -168,3 +208,5 @@ def main(_):

if __name__ == "__main__":
app.run(main)


Loading