Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions Albania/ALB_export_excel_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
TARGET_TABLE = 'prd_mega.boost.alb_publish'
TARGET_TABLE_REVENUE = 'prd_mega.boost_intermediate.alb_boost_rev_gold'
OUTPUT_MISSING_DESC_FILE_PATH = f"{OUTPUT_DIR}/Albania_missing_code_descriptions.xlsx"


TAG_MAPPING_PATH = "../quality/tag_code_mapping.csv"

# COMMAND ----------

Expand All @@ -47,35 +46,35 @@
.cache()
)

tag_code_mapping = pd.read_csv(TAG_MAPPING_URL)
tag_code_mapping = pd.read_csv(TAG_MAPPING_PATH)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bhupatiraju

Thank you so much for taking a look!
Could you try running this scrip here to confirm that the files can now be loaded using the relative path in the project?
Could you also make sure that you're in the "Repo/{your email address}/{repo_name} folder when testing?

years = [str(year) for year in sorted(raw_data.select("year").distinct().rdd.flatMap(lambda x: x).collect())]

def create_pivot(df, parent, child, agg_col ):
filtered_mapping = tag_code_mapping[~tag_code_mapping['subnational']]

# Step 1: Get detailed level e.g. (econ + econ_sub + year)
detailed = (
df.groupBy(parent, child, "boost_year")
df.groupBy(parent, child, "year")
.agg(F.sum(agg_col).alias(agg_col))
.withColumnRenamed(parent, "parent")
.withColumnRenamed(child, "child")
)

# Step 2: Get subtotals at parent level e.g (econ + year), econ_sub = 'Subtotal'
subtotals = (
df.groupBy(parent, "boost_year")
df.groupBy(parent, "year")
.agg(F.sum(agg_col).alias(agg_col))
.withColumn("child", F.lit("Subtotal")) # Ensure same schema
.withColumnRenamed(parent, "parent")
)

# Step 3: Union both
combined = detailed.unionByName(subtotals).filter(F.col("boost_year").isNotNull())
combined = detailed.unionByName(subtotals).filter(F.col("year").isNotNull())

# Step 4: Pivot to wide format
pivoted = (
combined.groupBy("parent", "child")
.pivot("boost_year")
.pivot("year")
.agg(F.sum(agg_col))
.fillna(0) # Replace NaNs with 0
)
Expand All @@ -98,56 +97,66 @@ def create_pivot(df, parent, child, agg_col ):
def create_pivot_total(df, agg_col):
# Step 1: Calculate total expenditure by year
total = (
df.groupBy("boost_year")
df.groupBy("year")
.agg(F.sum(agg_col).alias(agg_col))
.withColumn("code", F.lit("EXP_ECON_TOT_EXP_EXE"))
)

# Step 2: Calculate foreign expenditure by year
foreign_total = (
df.filter(F.col("boost_is_foreign") == True)
.groupBy("boost_year")
df.filter(F.col("is_foreign") == True)
.groupBy("year")
.agg(F.sum(agg_col).alias(agg_col))
.withColumn("code", F.lit("EXP_ECON_TOT_EXP_FOR_EXE"))
)

# Step 3: Combine total and foreign expenditure
combined = total.unionByName(foreign_total).filter(F.col("boost_year").isNotNull())
combined = total.unionByName(foreign_total).filter(F.col("year").isNotNull())

# Step 4: Pivot to wide format
pivoted = (
combined.groupBy("code")
.pivot("boost_year")
.pivot("year")
.agg(F.first(agg_col))
.fillna(0)
)
return pivoted

# COMMAND ----------

pairs = [('boost_econ', 'boost_econ_sub'), ('boost_func', 'boost_econ_sub'), ('boost_func', 'boost_econ'), ('boost_func', 'boost_func_sub'), ('boost_func_sub', 'boost_econ'), ('boost_func_sub', 'boost_econ_sub')]
pairs = [
("boost_econ", "boost_econ_sub"),
("boost_func", "boost_econ_sub"),
("boost_func", "boost_econ"),
("boost_func", "boost_func_sub"),
("boost_func_sub", "boost_econ"),
("boost_func_sub", "boost_econ_sub"),
]

filtered_raw_data = raw_data.filter(col('transfer') == 'Excluding transfers')

def generate_combined_pivots(pairs, agg_col):
# Initialize an empty DataFrame for combining results
combined = None

for parent, child in pairs:
# Create pivot for central and regional levels
pivoted = create_pivot(raw_data, parent, child,agg_col)
pivoted = create_pivot(filtered_raw_data, parent, child, agg_col)

# Combine the results
if combined is None:
combined = pivoted
else:
combined = combined.unionByName(pivoted)

# Add totals to the combined DataFrame
totals = create_pivot_total(raw_data, agg_col)
totals = create_pivot_total(filtered_raw_data, agg_col)
combined = combined.unionByName(totals)
return combined

executed = generate_combined_pivots(pairs, "boost_executed")
approved = generate_combined_pivots(pairs, "boost_approved")

executed = generate_combined_pivots(pairs, "executed")
approved = generate_combined_pivots(pairs, "approved")

# COMMAND ----------

Expand Down
12 changes: 12 additions & 0 deletions Albania/ALB_extract_raw_microdata_excel_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
raw_microdata_csv_dir = prepare_raw_microdata_csv_dir(COUNTRY)
ADMIN2_PAD_LENGTH = 3

# admin2 to admin2_new mapping
mapping = pd.read_csv('./mapping.csv')
mapping = mapping[['admin2', 'admin2_new', 'county']].rename(columns={'admin2': 'admin2_tmp'}).astype({'admin2_new': 'str'})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this mapping going to change in the future? I assume this is a one time adjustment to get the new admin regions to conform the old ones but if not, could we check if the admin2 is a 2 digit code or a 3 digit code? In some cases I noticed that we have correct length padding but in other cases the leading zeros are not present. This may not be relevant here though.

Copy link
Contributor Author

@yukinko-iwasaki yukinko-iwasaki Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we check if the admin2 is a 2 digit code or a 3 digit code?

When merging mapping df with the main dataframe, I temporarily converted the admin2 code into integer, so that we could ignore the padding length inconsistencies. So I think for our case, we don't have to worry about the paddings.


col_format_map_7 = {
"admin2": r"\d{3}",
"admin3": r"\d{2}",
Expand Down Expand Up @@ -189,6 +193,8 @@ def format_float(x):

df = pd.concat([df_7, df_3], ignore_index=True)
df['counties'] = df.admin2.map(lambda x: map_to_region(pad_left(str(x).split('.')[0], length=ADMIN2_PAD_LENGTH)))
df['admin2_tmp'] = df.admin2.astype(int)
df = df.merge(mapping, on='admin2_tmp', how='left').drop(columns=['admin2_tmp'])
outfile = f'{raw_microdata_csv_dir}/{year}.csv'
df.to_csv(outfile, index=False)

Expand Down Expand Up @@ -217,5 +223,11 @@ def format_float(x):

df = pd.concat([df_7_rev, df_46655], ignore_index=True)
df['year'] = year
df['admin2_tmp'] = df.admin2.astype(int)
df = df.merge(mapping, on='admin2_tmp', how='left').drop(columns=['admin2_tmp'])
outfile = f'{raw_microdata_csv_dir}/{year}_rev.csv'
df.to_csv(outfile, index=False)

# COMMAND ----------

df
2 changes: 2 additions & 0 deletions Albania/ALB_rev_transform_load_raw_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def alb_2022_and_before_boost_rev_gold():
'admin0',
'admin1',
'admin2',
'admin2_new',
'admin3',
'admin4',
'econ1',
Expand All @@ -189,6 +190,7 @@ def alb_2023_onward_boost_rev_gold():
'admin0',
'admin1',
'admin2',
'admin2_new',
'admin3',
'admin4',
'econ1',
Expand Down
51 changes: 23 additions & 28 deletions Albania/ALB_transform_load_raw_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"escape": '"',
}

with open(f"{RAW_INPUT_DIR}/{COUNTRY}/labels_en_v01_overall.json", 'r') as json_file:
with open(f"./labels_en_v01_overall.json", 'r') as json_file:
labels = json.load(json_file)

with open(f"{RAW_INPUT_DIR}/{COUNTRY}/project_labels.json", 'r') as json_file:
Expand All @@ -36,13 +36,14 @@ def replace_value(value):
if value is None:
return value
value_str = str(value).split('.')[0]
return labels.get(column_name, {}).get(value_str, value_str)
column_labels = labels.get(column_name, {})
return column_labels.get(value_str, column_labels.get("__default__", value_str))
return udf(replace_value, StringType())

@dlt.expect_or_drop("year_not_null", "Year IS NOT NULL")
@dlt.table(name=f'alb_2023_onward_boost_bronze')
def boost_2023_onward_bronze():
file_paths = glob(f"{RAW_COUNTRY_MICRODATA_DIR}/*.csv")
file_paths = [file for file in glob(f"{RAW_COUNTRY_MICRODATA_DIR}/*.csv" ) if ('rev' not in file.lower())]
dfs = []
for f in file_paths:
df = (spark.read
Expand Down Expand Up @@ -110,7 +111,7 @@ def boost_silver():
).withColumn("program1", col("func3")
).withColumn("func3", col("func3").cast("double") # substituting with values from func3_n for those where the code is alphanumeric
).withColumn("func3_n",
when((col("year") == 2023) & col("func3").isNull() & (col("program1") != ""),
when((col("year") >= 2023) & col("func3").isNull() & (col("program1") != ""),
substring(col("project"), 2, 3))
.otherwise(lit(None))
).withColumn("func3_n", when(col("func3_n").isNotNull(), concat(col("func3_n"), lit("0")))
Expand All @@ -119,7 +120,7 @@ def boost_silver():
concat(lit("1"), col("func3_n")))
.otherwise(col("func3_n"))
).withColumn("func3_n", col("func3_n").cast("double")
).withColumn("func3", when((col("year") == 2023) & col("func3").isNull() & (col("program1") != ""),
).withColumn("func3", when((col("year") >= 2023) & col("func3").isNull() & (col("program1") != ""),
col("func3_n"))
.otherwise(col("func3"))
).withColumn("func1", (col("func3") / 1000).cast("int")
Expand Down Expand Up @@ -179,7 +180,7 @@ def boost_silver():
.otherwise(col('counties'))
).withColumn('admin2_tmp',
when(col('admin2').startswith('00'), 'Central')
.otherwise(col('admin2'))
.otherwise(col('admin2_new'))
).withColumn('geo1', col('admin1_tmp')
).withColumn('func_sub',
# spending in Judiciary
Expand Down Expand Up @@ -260,16 +261,14 @@ def boost_silver():
.when(col('econ2').startswith('65') | col('econ2').startswith('66'), 'Interest on debt')
# other expenses
.otherwise('Other expenses')
).withColumn('admin2_new', col('admin2')
)
return silver_df


@dlt.table(name=f'alb_2022_and_before_boost_silver')
def boost_silver():
return (dlt.read(f'alb_2022_and_before_boost_bronze')
.filter(col('transfer') == 'Excluding transfers'
).withColumn('is_foreign', col('fin_source').startswith('2')
.withColumn('is_foreign', col('fin_source').startswith('2')
).withColumn('admin0',
when(col('admin2').startswith('00') | col('admin2').startswith('999'), 'Central')
.otherwise('Regional')
Expand Down Expand Up @@ -380,14 +379,14 @@ def alb_2022_and_before_boost_gold():
'func',
'econ_sub',
'econ',
'transfer',
'id')
)


@dlt.table(name=f'alb_2023_onward_boost_gold')
def alb_2023_onward_boost_gold():
return (dlt.read(f'alb_2023_onward_boost_silver')
.filter(col('transfer')=='Excluding transfers')
.withColumn('country_name', lit(COUNTRY))
.select('country_name',
'year',
Expand All @@ -403,34 +402,30 @@ def alb_2023_onward_boost_gold():
'func',
'econ_sub',
'econ',
'transfer',
'id')
)

@dlt.table(name="alb_boost_gold")
def alb_boost_gold():
df_before_2023 = dlt.read("alb_2022_and_before_boost_gold")
df_from_2023 = dlt.read("alb_2023_onward_boost_gold")

return df_before_2023.unionByName(df_from_2023).drop("id")
return df_before_2023.unionByName(df_from_2023).drop("id").filter(col('transfer') == 'Excluding transfers')


@dlt.table(name='boost.alb_publish',
comment='The Ministry of Finance of Albania together with the World Bank developed and published a BOOST platform obtained from the National Treasury System in order to facilitate access to the detailed public finance data for comprehensive budget analysis. In this context, the Albania BOOST Public Finance Portal aims to strengthen the disclosure and demand for availability of public finance information at all level of government in the country from 2010 onward.Note that 2020 execution only covers 6 months.')
def alb_publish():
alb_bronze_before_2023 = dlt.read('alb_2022_and_before_boost_bronze')
alb_bronze_from_2023 = dlt.read('alb_2023_onward_boost_silver')
col_list = [col for col in alb_bronze_before_2023.columns if col in alb_bronze_from_2023.columns]
alb_bronze_from_2023 = alb_bronze_from_2023.select(col_list)
alb_bronze_before_2023 = alb_bronze_before_2023.select(col_list)
alb_bronze_union = alb_bronze_before_2023.unionByName(alb_bronze_from_2023)

alb_gold_from_2023 = dlt.read(f'alb_2023_onward_boost_gold')
alb_gold_before_2023 = dlt.read('alb_2022_and_before_boost_gold')
alb_gold_union = alb_gold_before_2023.unionByName(alb_gold_from_2023)

alb_silver_from_2023 = dlt.read(f'alb_2023_onward_boost_silver').drop('id','src','program1', 'func3_n', 'program_tmp')
alb_silver_before_2023 = dlt.read('alb_2022_and_before_boost_silver').drop('county')
alb_silver_union = alb_silver_before_2023.unionByName(alb_silver_from_2023, allowMissingColumns=True)
BOOST_COLS = ['func', 'func_sub', 'econ', 'econ_sub', 'admin0', 'admin1_tmp', 'admin2_tmp','geo1']
prefix = "boost_"
for column in alb_gold_union.columns:
alb_gold_union = alb_gold_union.withColumnRenamed(column, prefix + column)

return alb_bronze_union.join(alb_gold_union, on=[alb_gold_union['boost_id'] == alb_bronze_union['id']], how='left').drop("id", "boost_id")

for column in BOOST_COLS:
if column == 'admin2_tmp' or column == 'admin1_tmp':
new_column = prefix +column.split("_")[0]
else:
new_column = prefix + column
alb_silver_union = alb_silver_union.withColumnRenamed(column,new_column)
return alb_silver_union
Loading