Skip to content

Commit 4835dbe

Browse files
caseydmclaude
andcommitted
Make RAS ES sync incremental with hash-based change detection
Replace CREATE OR REPLACE with MERGE using content_hash (SHA256 of key fields) so only changed rows get a new refreshed_at timestamp. ES sync filters by refreshed_at in incremental mode, reducing ~1h40m full syncs to minutes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 2581b47 commit 4835dbe

File tree

3 files changed

+56
-38
lines changed

3 files changed

+56
-38
lines changed

jobs/walden_end2end.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ resources:
234234
- task_key: Sync_to_Elasticsearch
235235
notebook_task:
236236
notebook_path: notebooks/elastic/sync_affiliation_strings_to_elastic_v2
237+
base_parameters:
238+
is_full_sync: "false"
237239
source: GIT
238240
job_cluster_key: es_sync_cluster
239241
- task_key: Wunpaywall

notebooks/elastic/sync_affiliation_strings_to_elastic_v2.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@
2525
log = logging.getLogger(__name__)
2626

2727
ELASTIC_URL = dbutils.secrets.get(scope="elastic", key="elastic_url")
28+
IS_FULL_SYNC = dbutils.widgets.get("is_full_sync").lower() == "true"
2829

2930
CONFIG = {
3031
"table_name": "openalex.institutions.affiliation_strings_lookup_with_counts",
3132
"index_name": "raw-affiliation-strings-v2"
3233
}
3334

35+
print(f"IS_FULL_SYNC: {IS_FULL_SYNC}")
36+
3437
def create_index_if_not_exists(client, index_name):
3538
"""Create index with accent-folding analyzer if it doesn't exist."""
3639
if client.indices.exists(index=index_name):
@@ -130,10 +133,17 @@ def format_institution_id(id_val):
130133

131134
format_id_udf = F.udf(format_institution_id, StringType())
132135

133-
print(f"\n=== Processing {CONFIG['table_name']} ===")
136+
print(f"\n=== Processing {CONFIG['table_name']} (full_sync={IS_FULL_SYNC}) ===")
134137

135138
try:
136-
df = (spark.table(CONFIG['table_name'])
139+
source_df = spark.table(CONFIG['table_name'])
140+
141+
if not IS_FULL_SYNC:
142+
source_df = source_df.filter(
143+
F.col("refreshed_at") >= F.expr("current_date() - INTERVAL 3 DAYS")
144+
)
145+
146+
df = (source_df
137147
# Create a hashed ID from raw_affiliation_string
138148
.withColumn("id", F.sha2(F.col("raw_affiliation_string"), 256))
139149
# Format institution IDs with I prefix
@@ -159,9 +169,21 @@ def format_institution_id(id_val):
159169
.filter(F.col("id").isNotNull())
160170
)
161171

162-
df = df.repartition(8) # Reduced from 32 to limit ES load
163172
record_count = df.count()
164-
print(f"Total records to process: {record_count}")
173+
print(f"Total records to process: {record_count:,}")
174+
175+
# Dynamic partitioning based on record volume
176+
if IS_FULL_SYNC:
177+
num_partitions = 8
178+
elif record_count < 100_000:
179+
num_partitions = 2
180+
elif record_count < 1_000_000:
181+
num_partitions = 4
182+
else:
183+
num_partitions = 8
184+
185+
df = df.repartition(num_partitions)
186+
print(f"Using {num_partitions} partitions")
165187

166188
def send_partition_wrapper(partition):
167189
return send_partition_to_elastic(

notebooks/end2end/RefreshRasWorksCounts.ipynb

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
{
44
"cell_type": "markdown",
55
"metadata": {},
6-
"source": "# Refresh RAS Works Counts\n\nRebuilds the `affiliation_strings_lookup_with_counts` table with fresh works counts\nfrom `work_authorships` and institution IDs from the MV (which includes curations).\n\nThis keeps the affiliations dashboard in sync with actual works data.\n\n**Runs after**: Guardrails (needs finalized works data)\n**Feeds**: `sync_affiliation_strings_to_elastic_v2` (ES sync for dashboard)"
6+
"source": "# Refresh RAS Works Counts\n\nRebuilds the `affiliation_strings_lookup_with_counts` table with fresh works counts\nfrom `OpenAlex_works` and institution IDs from the MV (which includes curations).\n\nUses a MERGE with content hashing to detect changes — only rows with changed data\nget a new `refreshed_at` timestamp, enabling incremental ES sync downstream.\n\n**Runs after**: Guardrails (needs finalized works data)\n**Feeds**: `sync_affiliation_strings_to_elastic_v2` (ES sync for dashboard)"
77
},
88
{
99
"cell_type": "markdown",
@@ -15,7 +15,7 @@
1515
{
1616
"cell_type": "code",
1717
"metadata": {},
18-
"source": "-- Rebuild works counts by exploding authorships from work_authorships.\n-- Uses work_authorships instead of OpenAlex_works for a much faster scan (narrow table).\n-- This replaces the entire table with fresh counts.\nCREATE OR REPLACE TABLE openalex.institutions.affiliation_string_works_counts AS\nSELECT \n raw_aff_string,\n COUNT(DISTINCT w.work_id) as works_count\nFROM openalex.works.work_authorships w\nLATERAL VIEW EXPLODE(authorships) AS authorship\nLATERAL VIEW EXPLODE(authorship.raw_affiliation_strings) AS raw_aff_string\nGROUP BY raw_aff_string",
18+
"source": "-- Rebuild works counts by exploding authorships from OpenAlex_works.\n-- This replaces the entire counts table with fresh data.\nCREATE OR REPLACE TABLE openalex.institutions.affiliation_string_works_counts AS\nSELECT\n raw_aff_string,\n COUNT(DISTINCT w.id) as works_count\nFROM openalex.works.OpenAlex_works w\nLATERAL VIEW EXPLODE(authorships) AS authorship\nLATERAL VIEW EXPLODE(authorship.raw_affiliation_strings) AS raw_aff_string\nGROUP BY raw_aff_string",
1919
"outputs": [],
2020
"execution_count": null
2121
},
@@ -37,47 +37,41 @@
3737
{
3838
"cell_type": "markdown",
3939
"metadata": {},
40-
"source": [
41-
"## Step 2: Rebuild lookup with counts\n",
42-
"\n",
43-
"Joins the MV (which has curations applied via 3-layer priority) with fresh counts.\n",
44-
"Only keeps RAS that appear in at least one work."
45-
]
40+
"source": "## Step 2: MERGE lookup with counts (hash-based change detection)\n\nBuilds a staging table with a `content_hash` of key fields, then MERGEs into the\ntarget. Only rows where the hash changed get `refreshed_at` updated, enabling\nincremental ES sync. New rows are inserted, removed rows are deleted."
4641
},
4742
{
4843
"cell_type": "code",
44+
"source": "-- Enable schema auto-merge so the MERGE can add content_hash and refreshed_at\n-- columns to the existing table on first run (they'll start as NULLs).\nSET spark.databricks.delta.schema.autoMerge.enabled = true",
4945
"metadata": {},
50-
"source": [
51-
"%sql\n",
52-
"CREATE OR REPLACE TABLE openalex.institutions.affiliation_strings_lookup_with_counts AS\n",
53-
"SELECT \n",
54-
" mv.raw_affiliation_string,\n",
55-
" mv.institution_ids AS institution_ids_final,\n",
56-
" mv.model_institution_ids AS institution_ids_from_model,\n",
57-
" mv.institution_ids_override,\n",
58-
" mv.countries,\n",
59-
" mv.source,\n",
60-
" mv.created_datetime,\n",
61-
" mv.updated_datetime,\n",
62-
" c.works_count\n",
63-
"FROM openalex.institutions.raw_affiliation_strings_institutions_mv mv\n",
64-
"INNER JOIN openalex.institutions.affiliation_string_works_counts c\n",
65-
" ON mv.raw_affiliation_string = c.raw_aff_string"
66-
],
46+
"execution_count": null,
6747
"outputs": []
6848
},
6949
{
7050
"cell_type": "code",
7151
"metadata": {},
72-
"source": [
73-
"%sql\n",
74-
"-- Verify rebuild\n",
75-
"SELECT\n",
76-
" COUNT(*) AS total_rows,\n",
77-
" COUNT(CASE WHEN SIZE(institution_ids_final) > 0 THEN 1 END) AS rows_with_institutions,\n",
78-
" ROUND(COUNT(CASE WHEN SIZE(institution_ids_final) > 0 THEN 1 END) * 100.0 / COUNT(*), 1) AS pct_with_institutions\n",
79-
"FROM openalex.institutions.affiliation_strings_lookup_with_counts"
80-
],
52+
"source": "-- Build staging table with content hash for change detection\nCREATE OR REPLACE TABLE openalex.institutions._ras_lookup_staging AS\nSELECT\n mv.raw_affiliation_string,\n mv.institution_ids AS institution_ids_final,\n mv.model_institution_ids AS institution_ids_from_model,\n mv.institution_ids_override,\n mv.countries,\n mv.source,\n mv.created_datetime,\n mv.updated_datetime,\n c.works_count,\n SHA2(TO_JSON(NAMED_STRUCT(\n 'iif', mv.institution_ids,\n 'iim', mv.model_institution_ids,\n 'iio', mv.institution_ids_override,\n 'c', mv.countries,\n 'wc', c.works_count\n )), 256) AS content_hash\nFROM openalex.institutions.raw_affiliation_strings_institutions_mv mv\nINNER JOIN openalex.institutions.affiliation_string_works_counts c\n ON mv.raw_affiliation_string = c.raw_aff_string",
53+
"outputs": [],
54+
"execution_count": null
55+
},
56+
{
57+
"cell_type": "code",
58+
"source": "-- MERGE with hash-based change detection.\n-- Only updates rows where content actually changed (new refreshed_at).\n-- Inserts new rows, deletes rows no longer in source.\n-- On first run, COALESCE(target.content_hash, '') handles NULLs from schema migration.\nMERGE INTO openalex.institutions.affiliation_strings_lookup_with_counts AS target\nUSING openalex.institutions._ras_lookup_staging AS source\nON target.raw_affiliation_string = source.raw_affiliation_string\nWHEN MATCHED AND COALESCE(target.content_hash, '') <> source.content_hash THEN\n UPDATE SET\n institution_ids_final = source.institution_ids_final,\n institution_ids_from_model = source.institution_ids_from_model,\n institution_ids_override = source.institution_ids_override,\n countries = source.countries,\n source = source.source,\n created_datetime = source.created_datetime,\n updated_datetime = source.updated_datetime,\n works_count = source.works_count,\n content_hash = source.content_hash,\n refreshed_at = CURRENT_TIMESTAMP()\nWHEN NOT MATCHED THEN\n INSERT (raw_affiliation_string, institution_ids_final, institution_ids_from_model,\n institution_ids_override, countries, source, created_datetime, updated_datetime,\n works_count, content_hash, refreshed_at)\n VALUES (source.raw_affiliation_string, source.institution_ids_final, source.institution_ids_from_model,\n source.institution_ids_override, source.countries, source.source, source.created_datetime,\n source.updated_datetime, source.works_count, source.content_hash, CURRENT_TIMESTAMP())\nWHEN NOT MATCHED BY SOURCE THEN DELETE",
59+
"metadata": {},
60+
"execution_count": null,
61+
"outputs": []
62+
},
63+
{
64+
"cell_type": "code",
65+
"source": "DROP TABLE IF EXISTS openalex.institutions._ras_lookup_staging",
66+
"metadata": {},
67+
"execution_count": null,
68+
"outputs": []
69+
},
70+
{
71+
"cell_type": "code",
72+
"source": "-- Verify rebuild + change detection stats\nSELECT\n COUNT(*) AS total_rows,\n COUNT(CASE WHEN SIZE(institution_ids_final) > 0 THEN 1 END) AS rows_with_institutions,\n ROUND(COUNT(CASE WHEN SIZE(institution_ids_final) > 0 THEN 1 END) * 100.0 / COUNT(*), 1) AS pct_with_institutions,\n COUNT(CASE WHEN refreshed_at >= CURRENT_DATE() THEN 1 END) AS rows_refreshed_today,\n MIN(refreshed_at) AS oldest_refresh,\n MAX(refreshed_at) AS newest_refresh\nFROM openalex.institutions.affiliation_strings_lookup_with_counts",
73+
"metadata": {},
74+
"execution_count": null,
8175
"outputs": []
8276
}
8377
],

0 commit comments

Comments
 (0)