Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions hlink/linking/core/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,21 +515,14 @@ def apply_transform(
return column_select[transform["value"]]
elif transform_type == "mapping":
mapped_column = column_select
if transform.get("values", False):
print(
"DEPRECATION WARNING: The 'mapping' transform no longer takes the 'values' parameter with a list of mappings in dictionaries; instead each mapping should be its own transform. Please change your config for future releases."
)
for mapping in transform["values"]:
from_regexp = "|".join(f"^{from_val}$" for from_val in mapping["from"])
mapped_column = regexp_replace(
mapped_column, from_regexp, str(mapping["to"])
)
else:
for key, value in transform["mappings"].items():
from_regexp = f"^{key}$"
mapped_column = regexp_replace(mapped_column, from_regexp, str(value))

for key, value in transform["mappings"].items():
from_regexp = f"^{key}$"
mapped_column = regexp_replace(mapped_column, from_regexp, str(value))

if transform.get("output_type", False) == "int":
mapped_column = mapped_column.cast(LongType())

return mapped_column
elif transform_type == "swap_words":
mapped_column = column_select
Expand Down
14 changes: 0 additions & 14 deletions hlink/linking/matching/_helpers.py

This file was deleted.

3 changes: 1 addition & 2 deletions hlink/linking/matching/link_step_explode.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from pyspark.sql.functions import array, explode, col

import hlink.linking.core.comparison as comparison_core
from . import _helpers as matching_helpers
from hlink.linking.link_step import LinkStep


Expand Down Expand Up @@ -41,7 +40,7 @@ def _run(self):
)

# self.spark.sql("set spark.sql.shuffle.partitions=4000")
blocking = matching_helpers.get_blocking(config)
blocking = config["blocking"]

self.task.run_register_python(
name="exploded_df_a",
Expand Down
3 changes: 1 addition & 2 deletions hlink/linking/matching/link_step_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import hlink.linking.core.dist_table as dist_table_core
import hlink.linking.core.comparison as comparison_core
from hlink.linking.util import spark_shuffle_partitions_heuristic
from . import _helpers as matching_helpers

from hlink.linking.link_step import LinkStep

Expand Down Expand Up @@ -83,7 +82,7 @@ def _run(self):
f"Dataset sizes are A={dataset_size_a}, B={dataset_size_b}, so set Spark partitions to {num_partitions} for this step"
)

blocking = matching_helpers.get_blocking(config)
blocking = config["blocking"]

t_ctx = {}
if config.get("comparisons", False):
Expand Down
11 changes: 1 addition & 10 deletions hlink/linking/preprocessing/link_step_prep_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,8 @@ def _prep_dataframe(
df_selected = df
spark = self.task.spark
column_selects = [col(id_column)]
if column_definitions and isinstance(column_definitions[0], list):
print(
"DEPRECATION WARNING: The config value 'column_mappings' is no longer a nested (double) array and is now an array of objects. Please change your config for future releases."
)
flat_column_mappings = [
item for sublist in column_definitions for item in sublist
]
else:
flat_column_mappings = column_definitions

for column_mapping in flat_column_mappings:
for column_mapping in column_definitions:
df_selected, column_selects = column_mapping_core.select_column_mapping(
column_mapping, df_selected, is_a, column_selects
)
Expand Down
72 changes: 0 additions & 72 deletions hlink/linking/transformers/interaction_transformer.py

This file was deleted.

68 changes: 68 additions & 0 deletions hlink/tests/core/transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,71 @@ def test_apply_transform_error_when_unrecognized_transform_type(is_a: bool) -> N
transform = {"type": "not_supported"}
with pytest.raises(ValueError, match="Invalid transform type"):
apply_transform(column_select, transform, is_a)


@pytest.mark.parametrize("is_a", [True, False])
def test_apply_transform_mapping(spark: SparkSession, is_a: bool) -> None:
transform = {"type": "mapping", "mappings": {"first": "abcd", "second": "efg"}}
input_col = col("input")
output_col = apply_transform(input_col, transform, is_a)

df = spark.createDataFrame(
[
["first"],
["second"],
["third"],
["secondagain"],
],
"input:string",
)

transformed = df.select(output_col.alias("output"))
rows = transformed.collect()

# Note that the mapping must exactly match the value to transform it, so the
# value "secondagain" is unchanged.
assert rows == [
Row(output="abcd"),
Row(output="efg"),
Row(output="third"),
Row(output="secondagain"),
]


@pytest.mark.parametrize("is_a", [True, False])
def test_apply_transform_mapping_integer_column(
spark: SparkSession, is_a: bool
) -> None:
"""
The mapping transform works over integer columns, and you can cast the output
to an integer by passing output_type = "int".
"""
transform = {
"type": "mapping",
"mappings": {"1": "10", "2": "30", "3": ""},
"output_type": "int",
}
input_col = col("input")
output_col = apply_transform(input_col, transform, is_a)

df = spark.createDataFrame(
[
[5],
[4],
[3],
[2],
[1],
],
"input:integer",
)

transformed = df.select(output_col.alias("output"))
rows = transformed.collect()

assert rows == [
Row(output=5),
Row(output=4),
Row(output=None),
Row(output=30),
Row(output=10),
]
11 changes: 2 additions & 9 deletions hlink/tests/matching_comparison_features_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,9 @@ def test_step_2_jaro_winkler_rate(
)["neighbor_namelast_jw_rate_threshold"].iloc[0]


def test_step_2_JW_double_array_blocking_conf(spark, matching_conf, matching, capsys):
def test_step_2_JW_with_blocking(spark, matching_conf, matching):
"""Test matching step 2 to ensure that comparison features are generated (can a regular comparison (as represented by J/W) still run if there's NOT a distance lookup feature)"""
matching_conf["blocking_steps"] = [[{"column_name": "sex"}]]
matching_conf.pop("blocking")
matching_conf["blocking"] = [{"column_name": "sex"}]

matching_conf["comparison_features"] = [
{
Expand Down Expand Up @@ -685,12 +684,6 @@ def test_step_2_JW_double_array_blocking_conf(spark, matching_conf, matching, ca
> 0.87
)

captured = capsys.readouterr()
assert (
"DEPRECATION WARNING: The config value 'blocking_steps' has been renamed to 'blocking' and is now just a single array of objects."
in captured.out
)


def test_step_2_comparison_features_comp_c_and_caution(
spark, matching_comparison_conf, matching
Expand Down
24 changes: 13 additions & 11 deletions sphinx-docs/column_mappings.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,27 @@ transforms = [

### mapping

Map single or multiple values to a single output value, otherwise known as a "recoding."
Explicitly map from input values to output values. This is also known as a "recoding".
Input values which do not appear in the mapping are unchanged. By default, the output
column is of type string, but you can set `output_type = "int"` to cast the output
column to type integer instead.

Maps T → U.

```
```toml
[[column_mappings]]
column_name = "birthyr"
alias = "clean_birthyr"
transforms = [
{
type = "mapping",
values = [
{"from"=[9999,1999], "to" = ""},
{"from" = -9998, "to" = 9999}
]
}
]

[[column_mappings.transforms]]
type = "mapping"
mappings = {9999 = "", 1999 = "", "-9998" = "9999"}
output_type = "int"
```

*Changed in version 4.0.0: The deprecated `values` key is no longer supported.
Please use the `mappings` key documented above instead.*

### substring

Replace a column with a substring of the data in the column.
Expand Down