Skip to content

Commit 8237724

Browse files
authored
Merge pull request #228 from ipums/spark-job-descriptions
Add Spark job descriptions and tweak debug logging
2 parents c378436 + 8fe811d commit 8237724

16 files changed

+254
-118
lines changed

hlink/linking/hh_matching/link_step_block_on_households.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pyspark.sql.functions import col
99

1010
from hlink.linking.link_step import LinkStep
11+
from hlink.linking.util import set_job_description
1112

1213

1314
logger = logging.getLogger(__name__)
@@ -112,11 +113,13 @@ def _run(self):
112113

113114
logger.debug("Blocking on household serial ID and generating potential matches")
114115
# Generate potential matches with those unmatched people who were in a household with a match, blocking only on household id
115-
self.task.run_register_python(
116-
"hh_blocked_matches",
117-
lambda: stm.join(uma, hhid_a).join(umb, hhid_b).distinct(),
118-
persist=True,
119-
)
116+
spark_context = self.task.spark.sparkContext
117+
with set_job_description("create table hh_blocked_matches", spark_context):
118+
self.task.run_register_python(
119+
"hh_blocked_matches",
120+
lambda: stm.join(uma, hhid_a).join(umb, hhid_b).distinct(),
121+
persist=True,
122+
)
120123

121124
hh_blocked_matches = self.task.spark.table("hh_blocked_matches")
122125
logger.debug(f"hh_blocked_matches has {hh_blocked_matches.count()} records")

hlink/linking/hh_matching/link_step_filter.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import hlink.linking.core.comparison as comparison_core
88

99
from hlink.linking.link_step import LinkStep
10+
from hlink.linking.util import set_job_description
1011

1112

1213
class LinkStepFilter(LinkStep):
@@ -21,6 +22,7 @@ def __init__(self, task):
2122
def _run(self):
2223
# self.task.spark.sql("set spark.sql.shuffle.partitions=4000")
2324
config = self.task.link_run.config
25+
spark_context = self.task.spark.sparkContext
2426

2527
# establish empty table context dict to pass to SQL template
2628
t_ctx = {}
@@ -44,19 +46,25 @@ def _run(self):
4446
if f["alias"] in comp_feature_names
4547
]
4648

47-
self.task.run_register_sql(
48-
"hh_potential_matches", t_ctx=t_ctx, persist=True
49-
)
49+
with set_job_description(
50+
"create table hh_potential_matches", spark_context
51+
):
52+
self.task.run_register_sql(
53+
"hh_potential_matches", t_ctx=t_ctx, persist=True
54+
)
5055

5156
else:
52-
self.task.run_register_python(
53-
"hh_potential_matches",
54-
lambda: self.task.spark.table("hh_blocked_matches"),
55-
persist=True,
56-
)
57+
with set_job_description(
58+
"create table hh_potential_matches", spark_context
59+
):
60+
self.task.run_register_python(
61+
"hh_potential_matches",
62+
lambda: self.task.spark.table("hh_blocked_matches"),
63+
persist=True,
64+
)
5765

5866
self.task.spark.sql("set spark.sql.shuffle.partitions=200")
5967

6068
print(
61-
"Potential matches from households which meet hh_comparsions thresholds have been saved to table 'hh_potential_matches'."
69+
"Potential matches from households which meet hh_comparisons thresholds have been saved to table 'hh_potential_matches'."
6270
)

hlink/linking/matching/link_step_explode.py

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
# in this project's top-level directory, and also on-line at:
44
# https://github.com/ipums/hlink
55

6+
import logging
67
from typing import Any
78

89
from pyspark.sql import Column, DataFrame
910
from pyspark.sql.functions import array, explode, col
1011

1112
import hlink.linking.core.comparison as comparison_core
1213
from hlink.linking.link_step import LinkStep
14+
from hlink.linking.util import set_job_description
15+
16+
logger = logging.getLogger(__name__)
1317

1418

1519
class LinkStepExplode(LinkStep):
@@ -23,6 +27,7 @@ def __init__(self, task):
2327

2428
def _run(self):
2529
config = self.task.link_run.config
30+
spark_context = self.task.spark.sparkContext
2631
# filter the universe of potential matches before exploding
2732
t_ctx = {}
2833
universe_conf = config.get("potential_matches_universe", [])
@@ -42,28 +47,33 @@ def _run(self):
4247
# self.spark.sql("set spark.sql.shuffle.partitions=4000")
4348
blocking = config["blocking"]
4449

45-
self.task.run_register_python(
46-
name="exploded_df_a",
47-
func=lambda: self._explode(
48-
df=self.task.spark.table("match_universe_df_a"),
49-
comparisons=config["comparisons"],
50-
comparison_features=config["comparison_features"],
51-
blocking=blocking,
52-
id_column=config["id_column"],
53-
is_a=True,
54-
),
55-
)
56-
self.task.run_register_python(
57-
name="exploded_df_b",
58-
func=lambda: self._explode(
59-
df=self.task.spark.table("match_universe_df_b"),
60-
comparisons=config["comparisons"],
61-
comparison_features=config["comparison_features"],
62-
blocking=blocking,
63-
id_column=config["id_column"],
64-
is_a=False,
65-
),
66-
)
50+
logger.debug("Creating table exploded_df_a")
51+
with set_job_description("create table exploded_df_a", spark_context):
52+
self.task.run_register_python(
53+
name="exploded_df_a",
54+
func=lambda: self._explode(
55+
df=self.task.spark.table("match_universe_df_a"),
56+
comparisons=config["comparisons"],
57+
comparison_features=config["comparison_features"],
58+
blocking=blocking,
59+
id_column=config["id_column"],
60+
is_a=True,
61+
),
62+
)
63+
64+
logger.debug("Creating table exploded_df_b")
65+
with set_job_description("create table exploded_df_b", spark_context):
66+
self.task.run_register_python(
67+
name="exploded_df_b",
68+
func=lambda: self._explode(
69+
df=self.task.spark.table("match_universe_df_b"),
70+
comparisons=config["comparisons"],
71+
comparison_features=config["comparison_features"],
72+
blocking=blocking,
73+
id_column=config["id_column"],
74+
is_a=False,
75+
),
76+
)
6777

6878
def _explode(
6979
self,
@@ -118,6 +128,7 @@ def _explode(
118128

119129
all_exploding_columns = [bc for bc in blocking if bc.get("explode", False)]
120130

131+
logger.debug(f"Exploding {len(all_exploding_columns)} column(s)")
121132
for exploding_column in all_exploding_columns:
122133
exploding_column_name = exploding_column["column_name"]
123134
if exploding_column.get("expand_length", False):

hlink/linking/matching/link_step_match.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import hlink.linking.core.comparison_feature as comparison_feature_core
1111
import hlink.linking.core.dist_table as dist_table_core
1212
import hlink.linking.core.comparison as comparison_core
13-
from hlink.linking.util import spark_shuffle_partitions_heuristic
13+
from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic
1414

1515
from hlink.linking.link_step import LinkStep
1616

@@ -149,7 +149,13 @@ def _run(self):
149149

150150
if config.get("streamline_potential_match_generation", False):
151151
t_ctx["dataset_columns"] = [config["id_column"]]
152+
153+
spark_context = self.task.spark.sparkContext
154+
logger.debug("Creating table potential_matches via potential_matches.sql")
152155
try:
153-
self.task.run_register_sql("potential_matches", t_ctx=t_ctx, persist=True)
156+
with set_job_description("create table potential_matches", spark_context):
157+
self.task.run_register_sql(
158+
"potential_matches", t_ctx=t_ctx, persist=True
159+
)
154160
finally:
155161
self.task.spark.sql("set spark.sql.shuffle.partitions=200")

hlink/linking/matching/link_step_score.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import hlink.linking.core.comparison_feature as comparison_feature_core
1111
import hlink.linking.core.threshold as threshold_core
1212
import hlink.linking.core.dist_table as dist_table_core
13-
from hlink.linking.util import spark_shuffle_partitions_heuristic
13+
from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic
1414

1515
from hlink.linking.link_step import LinkStep
1616

@@ -35,6 +35,7 @@ def _run(self):
3535
training_conf = str(self.task.training_conf)
3636
table_prefix = self.task.table_prefix
3737
config = self.task.link_run.config
38+
spark_context = self.task.spark.sparkContext
3839

3940
if training_conf not in config or "chosen_model" not in config[training_conf]:
4041
print(
@@ -52,7 +53,8 @@ def _run(self):
5253
id_a = config["id_column"] + "_a"
5354
id_b = config["id_column"] + "_b"
5455
chosen_model_params = config[training_conf]["chosen_model"].copy()
55-
self._create_features(config)
56+
with set_job_description("create comparison features", spark_context):
57+
self._create_features(config)
5658
pm = self.task.spark.table(f"{table_prefix}potential_matches_prepped")
5759

5860
ind_var_columns = config[training_conf]["independent_vars"]
@@ -80,11 +82,13 @@ def _run(self):
8082
"Missing a temporary table from the training task. This table will not be persisted between sessions of hlink for technical reasons. Please run training before running this step."
8183
)
8284

83-
self.task.run_register_python(
84-
f"{table_prefix}potential_matches_pipeline",
85-
lambda: pre_pipeline.transform(pm.select(*required_columns)),
86-
persist=True,
87-
)
85+
logger.debug(f"Creating table {table_prefix}potential_matches_pipeline")
86+
with set_job_description("prepare the data for the model", spark_context):
87+
self.task.run_register_python(
88+
f"{table_prefix}potential_matches_pipeline",
89+
lambda: pre_pipeline.transform(pm.select(*required_columns)),
90+
persist=True,
91+
)
8892
plm = self.task.link_run.trained_models[f"{table_prefix}trained_model"]
8993
pp_required_cols = set(plm.stages[0].getInputCols() + [id_a, id_b])
9094
pre_pipeline = self.task.spark.table(
@@ -97,17 +101,34 @@ def _run(self):
97101
config[training_conf], chosen_model_params, default=1.3
98102
)
99103
decision = config[training_conf].get("decision")
104+
logger.debug("Predicting with thresholds")
100105
predictions = threshold_core.predict_using_thresholds(
101106
score_tmp,
102107
alpha_threshold,
103108
threshold_ratio,
104109
config["id_column"],
105110
decision,
106111
)
107-
predictions.write.mode("overwrite").saveAsTable(f"{table_prefix}predictions")
112+
113+
with set_job_description(
114+
f"create table {table_prefix}predictions", spark_context
115+
):
116+
predictions.write.mode("overwrite").saveAsTable(
117+
f"{table_prefix}predictions"
118+
)
108119
pmp = self.task.spark.table(f"{table_prefix}potential_matches_pipeline")
109-
self._save_table_with_requested_columns(pm, pmp, predictions, id_a, id_b)
110-
self._save_predicted_matches(config, id_a, id_b)
120+
logger.debug(f"Creating table {table_prefix}scored_potential_matches")
121+
with set_job_description(
122+
f"create table {table_prefix}scored_potential_matches", spark_context
123+
):
124+
self._save_table_with_requested_columns(pm, pmp, predictions, id_a, id_b)
125+
logger.debug(
126+
f"Creating table {table_prefix}predicted_matches and removing records with duplicated id_b"
127+
)
128+
with set_job_description(
129+
f"create table {table_prefix}predicted_matches", spark_context
130+
):
131+
self._save_predicted_matches(config, id_a, id_b)
111132
self.task.spark.sql("set spark.sql.shuffle.partitions=200")
112133

113134
def _save_table_with_requested_columns(self, pm, pmp, predictions, id_a, id_b):
@@ -174,6 +195,7 @@ def _create_features(self, conf):
174195
potential_matches = f"{table_prefix}potential_matches"
175196
table_name = f"{table_prefix}potential_matches_prepped"
176197
pm_columns = self.task.spark.table(potential_matches).columns
198+
logger.debug("Getting comparison features")
177199
(
178200
comp_features,
179201
advanced_comp_features,
@@ -200,6 +222,7 @@ def _create_features(self, conf):
200222
dist_tables
201223
)
202224

225+
logger.debug("Creating all of the comparison features")
203226
comparison_feature_core.create_feature_tables(
204227
self.task,
205228
t_ctx_def,

hlink/linking/preprocessing/link_step_prep_dataframes.py

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import hlink.linking.core.column_mapping as column_mapping_core
1010
import hlink.linking.core.substitutions as substitutions_core
1111
import hlink.linking.core.transforms as transforms_core
12-
from hlink.linking.util import spark_shuffle_partitions_heuristic
12+
from hlink.linking.util import set_job_description, spark_shuffle_partitions_heuristic
1313

1414
from hlink.linking.link_step import LinkStep
1515

@@ -27,6 +27,7 @@ def __init__(self, task):
2727

2828
def _run(self):
2929
config = self.task.link_run.config
30+
spark_context = self.task.spark.sparkContext
3031

3132
dataset_size_a = self.task.spark.table("raw_df_a").count()
3233
dataset_size_b = self.task.spark.table("raw_df_b").count()
@@ -40,30 +41,35 @@ def _run(self):
4041
substitution_columns = config.get("substitution_columns", [])
4142
feature_selections = config.get("feature_selections", [])
4243

43-
self.task.run_register_python(
44-
name="prepped_df_a",
45-
func=lambda: self._prep_dataframe(
46-
self.task.spark.table("raw_df_a"),
47-
config["column_mappings"],
48-
substitution_columns,
49-
feature_selections,
50-
True,
51-
config["id_column"],
52-
),
53-
persist=True,
54-
)
55-
self.task.run_register_python(
56-
name="prepped_df_b",
57-
func=lambda: self._prep_dataframe(
58-
self.task.spark.table("raw_df_b"),
59-
config["column_mappings"],
60-
substitution_columns,
61-
feature_selections,
62-
False,
63-
config["id_column"],
64-
),
65-
persist=True,
66-
)
44+
logger.debug("Creating table prepped_df_a")
45+
with set_job_description("create table prepped_df_a", spark_context):
46+
self.task.run_register_python(
47+
name="prepped_df_a",
48+
func=lambda: self._prep_dataframe(
49+
self.task.spark.table("raw_df_a"),
50+
config["column_mappings"],
51+
substitution_columns,
52+
feature_selections,
53+
True,
54+
config["id_column"],
55+
),
56+
persist=True,
57+
)
58+
59+
logger.debug("Creating table prepped_df_b")
60+
with set_job_description("create table prepped_df_b", spark_context):
61+
self.task.run_register_python(
62+
name="prepped_df_b",
63+
func=lambda: self._prep_dataframe(
64+
self.task.spark.table("raw_df_b"),
65+
config["column_mappings"],
66+
substitution_columns,
67+
feature_selections,
68+
False,
69+
config["id_column"],
70+
),
71+
persist=True,
72+
)
6773

6874
self.task.spark.sql("set spark.sql.shuffle.partitions=200")
6975

@@ -97,6 +103,7 @@ def _prep_dataframe(
97103
column_selects = [col(id_column)]
98104
custom_transforms = self.task.link_run.custom_column_mapping_transforms
99105

106+
logger.debug("Selecting column mappings")
100107
for column_mapping in column_definitions:
101108
df_selected, column_selects = column_mapping_core.select_column_mapping(
102109
column_mapping,
@@ -108,10 +115,12 @@ def _prep_dataframe(
108115

109116
df_selected = df_selected.select(column_selects)
110117

118+
logger.debug("Generating substitutions")
111119
df_selected = substitutions_core.generate_substitutions(
112120
spark, df_selected, substitution_columns
113121
)
114122

123+
logger.debug("Generating transforms")
115124
df_selected = transforms_core.generate_transforms(
116125
spark, df_selected, feature_selections, self.task, is_a, id_column
117126
)

0 commit comments

Comments
 (0)