Skip to content

Commit a98a1be

Browse files
caseydmclaude
andcommitted
Add changed_fwci_sync parameter to sync_works for FWCI-eligible resync
Adds a changed_fwci_sync param that filters to works with eligible types and topics, for resyncing FWCI values to Elasticsearch after the inline calculation change. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0e6c278 commit a98a1be

File tree

2 files changed

+4
-2
lines changed

2 files changed

+4
-2
lines changed

jobs/sync_all_works_to_elasticsearch.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,5 @@ resources:
5252
parameters:
5353
- name: is_full_sync
5454
default: "true"
55+
- name: changed_fwci_sync
56+
default: "false"

notebooks/elastic/sync_works.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
}
4040
},
4141
"outputs": [],
42-
"source": "import uuid\nfrom datetime import datetime\nfrom pyspark.sql import functions as F\nfrom pyspark.sql.types import *\nfrom dataclasses import dataclass\n\nfrom elasticsearch import Elasticsearch, helpers\nimport logging\nimport json\n\nlogging.basicConfig(level=logging.WARNING, format='[%(asctime)s]: %(message)s')\nlog = logging.getLogger(__name__)\n\nELASTIC_INDEX = \"works-v33\"\nELASTIC_URL = dbutils.secrets.get(scope=\"elastic\", key=\"elastic_url\")\nMAX_LENGTH = 32000 # Slightly below the 32766 limit\n\nIS_FULL_SYNC = dbutils.widgets.get(\"is_full_sync\").lower() == \"true\" # default is incremental\n\nprint(f\"IS_FULL_SYNC: {IS_FULL_SYNC}\")"
42+
"source": "import uuid\nfrom datetime import datetime\nfrom pyspark.sql import functions as F\nfrom pyspark.sql.types import *\nfrom dataclasses import dataclass\n\nfrom elasticsearch import Elasticsearch, helpers\nimport logging\nimport json\n\nlogging.basicConfig(level=logging.WARNING, format='[%(asctime)s]: %(message)s')\nlog = logging.getLogger(__name__)\n\nELASTIC_INDEX = \"works-v33\"\nELASTIC_URL = dbutils.secrets.get(scope=\"elastic\", key=\"elastic_url\")\nMAX_LENGTH = 32000 # Slightly below the 32766 limit\n\nIS_FULL_SYNC = dbutils.widgets.get(\"is_full_sync\").lower() == \"true\" # default is incremental\nIS_FWCI_SYNC = dbutils.widgets.get(\"changed_fwci_sync\").lower() == \"true\" if dbutils.widgets.get(\"changed_fwci_sync\") else False\n\nprint(f\"IS_FULL_SYNC: {IS_FULL_SYNC}\")\nif IS_FWCI_SYNC:\n print(\"FWCI sync mode: syncing all FWCI-eligible works\")"
4343
},
4444
{
4545
"cell_type": "code",
@@ -100,7 +100,7 @@
100100
}
101101
},
102102
"outputs": [],
103-
"source": "SQL_QUERY = \"\"\"SELECT * FROM openalex.works.openalex_works\nWHERE updated_date >= current_date() - INTERVAL 2 days\n\"\"\"\nCOUNT_QUERY = \"\"\"SELECT COUNT(*) as cnt FROM openalex.works.openalex_works\nWHERE updated_date >= current_date() - INTERVAL 2 days\n\"\"\"\n\nif (IS_FULL_SYNC):\n SQL_QUERY = \"\"\"SELECT * FROM openalex.works.openalex_works\"\"\"\n COUNT_QUERY = None # Full sync doesn't need count-based optimization\n\n# Get record count BEFORE loading data (lightweight SQL count, no transformations)\nrecord_count = None\nif not IS_FULL_SYNC and COUNT_QUERY:\n record_count = spark.sql(COUNT_QUERY).collect()[0].cnt\n print(f\"Record count for daily sync: {record_count:,}\")\n\ndf = (\n spark.sql(SQL_QUERY)\n .withColumn(\"display_name\", F.col(\"title\"))\n # First cast to date/timestamp\n .withColumn(\"created_date\", F.to_timestamp(\"created_date\"))\n .withColumn(\"updated_date\", F.to_timestamp(\"updated_date\"))\n .withColumn(\"publication_date\", F.to_date(\"publication_date\"))\n .withColumn(\n \"concepts\",\n F.transform(\n F.col(\"concepts\"),\n lambda c: F.struct(\n F.concat(F.lit(\"https://openalex.org/C\"), c.id).alias(\"id\"),\n c.wikidata.alias(\"wikidata\"),\n c.display_name.alias(\"display_name\"),\n c.level.alias(\"level\"),\n c.score.alias(\"score\")\n )\n )\n )\n # Apply range checks using BETWEEN\n .withColumn(\n \"created_date\",\n F.when(\n F.col(\"created_date\").between(F.lit(\"1000-01-01\"), F.lit(\"9999-12-31\")),\n F.col(\"created_date\")\n ).otherwise(F.lit(None).cast(\"timestamp\"))\n )\n .withColumn(\n \"updated_date\",\n F.when(\n F.col(\"updated_date\").between(F.lit(\"1000-01-01\"), F.lit(\"9999-12-31\")),\n F.col(\"updated_date\")\n ).otherwise(F.lit(None).cast(\"timestamp\"))\n )\n .withColumn(\n \"publication_date\",\n F.when(\n F.col(\"publication_date\").between(F.lit(\"1000-01-01\"), F.lit(\"2050-12-31\")),\n F.col(\"publication_date\")\n ).otherwise(F.lit(None).cast(\"date\"))\n )\n .filter(F.col(\"id\").isNotNull())\n)\n\n# Dynamic partitioning based on record volume\n# Only apply partition optimization for non-full syncs\nif not IS_FULL_SYNC and record_count is not None:\n # Calculate optimal partition count:\n # - Small updates (<500k): use fewer partitions for efficiency\n # - Medium updates (500k-5M): moderate partitions \n # - Large updates (5M-20M): many partitions like full sync\n # - Very large updates (>20M): use repartitionByRange for even distribution\n RECORDS_PER_PARTITION = 10000 # Target ~10k records per partition\n \n if record_count < 2_000_000:\n # Small daily update - coalesce to reduce overhead\n optimal_partitions = max(64, record_count // RECORDS_PER_PARTITION)\n df = df.coalesce(optimal_partitions)\n print(f\"Small update: coalesced to {optimal_partitions} partitions\")\n elif record_count < 10_000_000:\n # Medium daily update - use more partitions\n optimal_partitions = max(1024, record_count // RECORDS_PER_PARTITION)\n df = df.repartition(optimal_partitions)\n print(f\"Medium update: repartitioned to {optimal_partitions} partitions\")\n elif record_count < 20_000_000:\n # Large daily update - repartition for better distribution\n optimal_partitions = min(4096, record_count // RECORDS_PER_PARTITION)\n df = df.repartition(optimal_partitions)\n print(f\"Large update: repartitioned to {optimal_partitions} partitions\")\n else:\n # Very large update - use repartitionByRange like full sync\n df = df.repartitionByRange(8096, \"id\")\n print(f\"Very large update: using repartitionByRange with 8096 partitions\")\n\nprint(f\"SQL query:\\n{SQL_QUERY}\")"
103+
"source": "if IS_FWCI_SYNC:\n # Sync all FWCI-eligible works (used after inline FWCI calculation change)\n FWCI_WHERE = \"\"\"primary_topic.subfield.id IS NOT NULL\n AND (type IN ('article', 'book', 'review', 'book-chapter')\n OR (type = 'article' AND primary_location.source.type = 'conference'))\"\"\"\n SQL_QUERY = f\"SELECT * FROM openalex.works.openalex_works WHERE {FWCI_WHERE}\"\n COUNT_QUERY = f\"SELECT COUNT(*) as cnt FROM openalex.works.openalex_works WHERE {FWCI_WHERE}\"\nelif IS_FULL_SYNC:\n SQL_QUERY = \"\"\"SELECT * FROM openalex.works.openalex_works\"\"\"\n COUNT_QUERY = None # Full sync doesn't need count-based optimization\nelse:\n SQL_QUERY = \"\"\"SELECT * FROM openalex.works.openalex_works\nWHERE updated_date >= current_date() - INTERVAL 2 days\n\"\"\"\n COUNT_QUERY = \"\"\"SELECT COUNT(*) as cnt FROM openalex.works.openalex_works\nWHERE updated_date >= current_date() - INTERVAL 2 days\n\"\"\"\n\n# Get record count BEFORE loading data (lightweight SQL count, no transformations)\nrecord_count = None\nif not IS_FULL_SYNC and COUNT_QUERY:\n record_count = spark.sql(COUNT_QUERY).collect()[0].cnt\n print(f\"Record count for sync: {record_count:,}\")\n\ndf = (\n spark.sql(SQL_QUERY)\n .withColumn(\"display_name\", F.col(\"title\"))\n # First cast to date/timestamp\n .withColumn(\"created_date\", F.to_timestamp(\"created_date\"))\n .withColumn(\"updated_date\", F.to_timestamp(\"updated_date\"))\n .withColumn(\"publication_date\", F.to_date(\"publication_date\"))\n .withColumn(\n \"concepts\",\n F.transform(\n F.col(\"concepts\"),\n lambda c: F.struct(\n F.concat(F.lit(\"https://openalex.org/C\"), c.id).alias(\"id\"),\n c.wikidata.alias(\"wikidata\"),\n c.display_name.alias(\"display_name\"),\n c.level.alias(\"level\"),\n c.score.alias(\"score\")\n )\n )\n )\n # Apply range checks using BETWEEN\n .withColumn(\n \"created_date\",\n F.when(\n F.col(\"created_date\").between(F.lit(\"1000-01-01\"), F.lit(\"9999-12-31\")),\n F.col(\"created_date\")\n ).otherwise(F.lit(None).cast(\"timestamp\"))\n )\n .withColumn(\n \"updated_date\",\n F.when(\n F.col(\"updated_date\").between(F.lit(\"1000-01-01\"), F.lit(\"9999-12-31\")),\n F.col(\"updated_date\")\n ).otherwise(F.lit(None).cast(\"timestamp\"))\n )\n .withColumn(\n \"publication_date\",\n F.when(\n F.col(\"publication_date\").between(F.lit(\"1000-01-01\"), F.lit(\"2050-12-31\")),\n F.col(\"publication_date\")\n ).otherwise(F.lit(None).cast(\"date\"))\n )\n .filter(F.col(\"id\").isNotNull())\n)\n\n# Dynamic partitioning based on record volume\n# Only apply partition optimization for non-full syncs\nif not IS_FULL_SYNC and record_count is not None:\n # Calculate optimal partition count:\n # - Small updates (<500k): use fewer partitions for efficiency\n # - Medium updates (500k-5M): moderate partitions \n # - Large updates (5M-20M): many partitions like full sync\n # - Very large updates (>20M): use repartitionByRange for even distribution\n RECORDS_PER_PARTITION = 10000 # Target ~10k records per partition\n \n if record_count < 2_000_000:\n # Small daily update - coalesce to reduce overhead\n optimal_partitions = max(64, record_count // RECORDS_PER_PARTITION)\n df = df.coalesce(optimal_partitions)\n print(f\"Small update: coalesced to {optimal_partitions} partitions\")\n elif record_count < 10_000_000:\n # Medium daily update - use more partitions\n optimal_partitions = max(1024, record_count // RECORDS_PER_PARTITION)\n df = df.repartition(optimal_partitions)\n print(f\"Medium update: repartitioned to {optimal_partitions} partitions\")\n elif record_count < 20_000_000:\n # Large daily update - repartition for better distribution\n optimal_partitions = min(4096, record_count // RECORDS_PER_PARTITION)\n df = df.repartition(optimal_partitions)\n print(f\"Large update: repartitioned to {optimal_partitions} partitions\")\n else:\n # Very large update - use repartitionByRange like full sync\n df = df.repartitionByRange(8096, \"id\")\n print(f\"Very large update: using repartitionByRange with 8096 partitions\")\n\nprint(f\"SQL query:\\n{SQL_QUERY}\")"
104104
},
105105
{
106106
"cell_type": "code",

0 commit comments

Comments
 (0)