Skip to content

Commit e40ee3d

Browse files
caseydmclaude
andcommitted
Reduce ES sync workers to 1 and add FWCI batch parameters
Halves concurrent ES pressure by dropping from 2 to 1 worker nodes. Adds fwci_batch/fwci_total_batches params to process FWCI sync in ~50M-record chunks via id modulo, allowing incremental runs with progress preserved by ES upsert. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 65e16f3 commit e40ee3d

File tree

2 files changed

+7
-3
lines changed

2 files changed

+7
-3
lines changed

jobs/sync_all_works_to_elasticsearch.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ resources:
2929
enable_elastic_disk: true
3030
data_security_mode: SINGLE_USER
3131
runtime_engine: STANDARD
32-
num_workers: 2
32+
num_workers: 1
3333
- job_cluster_key: es_sync_daily
3434
new_cluster:
3535
cluster_name: ""
@@ -54,3 +54,7 @@ resources:
5454
default: "true"
5555
- name: changed_fwci_sync
5656
default: "false"
57+
- name: fwci_batch
58+
default: "0"
59+
- name: fwci_total_batches
60+
default: "6"

notebooks/elastic/sync_works.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
}
4040
},
4141
"outputs": [],
42-
"source": "import uuid\nfrom datetime import datetime\nfrom pyspark.sql import functions as F\nfrom pyspark.sql.types import *\nfrom dataclasses import dataclass\n\nfrom elasticsearch import Elasticsearch, helpers\nimport logging\nimport json\n\nlogging.basicConfig(level=logging.WARNING, format='[%(asctime)s]: %(message)s')\nlog = logging.getLogger(__name__)\n\nELASTIC_INDEX = \"works-v33\"\nELASTIC_URL = dbutils.secrets.get(scope=\"elastic\", key=\"elastic_url\")\nMAX_LENGTH = 32000 # Slightly below the 32766 limit\n\ndbutils.widgets.text(\"is_full_sync\", \"false\")\ndbutils.widgets.text(\"changed_fwci_sync\", \"false\")\n\nIS_FULL_SYNC = dbutils.widgets.get(\"is_full_sync\").lower() == \"true\" # default is incremental\nIS_FWCI_SYNC = dbutils.widgets.get(\"changed_fwci_sync\").lower() == \"true\"\n\nprint(f\"IS_FULL_SYNC: {IS_FULL_SYNC}\")\nif IS_FWCI_SYNC:\n print(\"FWCI sync mode: syncing all FWCI-eligible works\")"
42+
"source": "import uuid\nfrom datetime import datetime\nfrom pyspark.sql import functions as F\nfrom pyspark.sql.types import *\nfrom dataclasses import dataclass\n\nfrom elasticsearch import Elasticsearch, helpers\nimport logging\nimport json\n\nlogging.basicConfig(level=logging.WARNING, format='[%(asctime)s]: %(message)s')\nlog = logging.getLogger(__name__)\n\nELASTIC_INDEX = \"works-v33\"\nELASTIC_URL = dbutils.secrets.get(scope=\"elastic\", key=\"elastic_url\")\nMAX_LENGTH = 32000 # Slightly below the 32766 limit\n\ndbutils.widgets.text(\"is_full_sync\", \"false\")\ndbutils.widgets.text(\"changed_fwci_sync\", \"false\")\ndbutils.widgets.text(\"fwci_batch\", \"0\")\ndbutils.widgets.text(\"fwci_total_batches\", \"6\")\n\nIS_FULL_SYNC = dbutils.widgets.get(\"is_full_sync\").lower() == \"true\" # default is incremental\nIS_FWCI_SYNC = dbutils.widgets.get(\"changed_fwci_sync\").lower() == \"true\"\nFWCI_BATCH = int(dbutils.widgets.get(\"fwci_batch\")) # 0 = all, 1-N = specific batch\nFWCI_TOTAL_BATCHES = int(dbutils.widgets.get(\"fwci_total_batches\"))\n\nprint(f\"IS_FULL_SYNC: {IS_FULL_SYNC}\")\nif IS_FWCI_SYNC:\n print(\"FWCI sync mode: syncing all FWCI-eligible works\")\n if FWCI_BATCH > 0:\n print(f\" Batch {FWCI_BATCH} of {FWCI_TOTAL_BATCHES} (id % {FWCI_TOTAL_BATCHES} = {FWCI_BATCH - 1})\")\n else:\n print(\" Processing ALL batches (no batching)\")"
4343
},
4444
{
4545
"cell_type": "code",
@@ -81,7 +81,7 @@
8181
}
8282
},
8383
"outputs": [],
84-
"source": "if IS_FWCI_SYNC:\n # Sync all FWCI-eligible works (used after inline FWCI calculation change)\n FWCI_WHERE = \"\"\"primary_topic.subfield.id IS NOT NULL\n AND (type IN ('article', 'book', 'review', 'book-chapter')\n OR (type = 'article' AND primary_location.source.type = 'conference'))\"\"\"\n SQL_QUERY = f\"SELECT * FROM openalex.works.openalex_works WHERE {FWCI_WHERE}\"\n COUNT_QUERY = f\"SELECT COUNT(*) as cnt FROM openalex.works.openalex_works WHERE {FWCI_WHERE}\"\nelif IS_FULL_SYNC:\n SQL_QUERY = \"\"\"SELECT * FROM openalex.works.openalex_works\"\"\"\n COUNT_QUERY = None # Full sync doesn't need count-based optimization\nelse:\n SQL_QUERY = \"\"\"SELECT * FROM openalex.works.openalex_works\nWHERE updated_date >= current_date() - INTERVAL 2 days\n\"\"\"\n COUNT_QUERY = \"\"\"SELECT COUNT(*) as cnt FROM openalex.works.openalex_works\nWHERE updated_date >= current_date() - INTERVAL 2 days\n\"\"\"\n\n# Get record count BEFORE loading data (lightweight SQL count, no transformations)\nrecord_count = None\nif not IS_FULL_SYNC and COUNT_QUERY:\n record_count = spark.sql(COUNT_QUERY).collect()[0].cnt\n print(f\"Record count for sync: {record_count:,}\")\n\ndf = (\n spark.sql(SQL_QUERY)\n .withColumn(\"display_name\", F.col(\"title\"))\n # First cast to date/timestamp\n .withColumn(\"created_date\", F.to_timestamp(\"created_date\"))\n .withColumn(\"updated_date\", F.to_timestamp(\"updated_date\"))\n .withColumn(\"publication_date\", F.to_date(\"publication_date\"))\n .withColumn(\n \"concepts\",\n F.transform(\n F.col(\"concepts\"),\n lambda c: F.struct(\n F.concat(F.lit(\"https://openalex.org/C\"), c.id).alias(\"id\"),\n c.wikidata.alias(\"wikidata\"),\n c.display_name.alias(\"display_name\"),\n c.level.alias(\"level\"),\n c.score.alias(\"score\")\n )\n )\n )\n # Apply range checks using BETWEEN\n .withColumn(\n \"created_date\",\n F.when(\n F.col(\"created_date\").between(F.lit(\"1000-01-01\"), F.lit(\"9999-12-31\")),\n F.col(\"created_date\")\n ).otherwise(F.lit(None).cast(\"timestamp\"))\n )\n .withColumn(\n \"updated_date\",\n F.when(\n F.col(\"updated_date\").between(F.lit(\"1000-01-01\"), F.lit(\"9999-12-31\")),\n F.col(\"updated_date\")\n ).otherwise(F.lit(None).cast(\"timestamp\"))\n )\n .withColumn(\n \"publication_date\",\n F.when(\n F.col(\"publication_date\").between(F.lit(\"1000-01-01\"), F.lit(\"2050-12-31\")),\n F.col(\"publication_date\")\n ).otherwise(F.lit(None).cast(\"date\"))\n )\n .filter(F.col(\"id\").isNotNull())\n)\n\n# Dynamic partitioning based on record volume\n# Only apply partition optimization for non-full syncs\nif not IS_FULL_SYNC and record_count is not None:\n # Calculate optimal partition count:\n # - Small updates (<500k): use fewer partitions for efficiency\n # - Medium updates (500k-5M): moderate partitions \n # - Large updates (5M-20M): many partitions like full sync\n # - Very large updates (>20M): use repartitionByRange for even distribution\n RECORDS_PER_PARTITION = 10000 # Target ~10k records per partition\n \n if record_count < 2_000_000:\n # Small daily update - coalesce to reduce overhead\n optimal_partitions = max(64, record_count // RECORDS_PER_PARTITION)\n df = df.coalesce(optimal_partitions)\n print(f\"Small update: coalesced to {optimal_partitions} partitions\")\n elif record_count < 10_000_000:\n # Medium daily update - use more partitions\n optimal_partitions = max(1024, record_count // RECORDS_PER_PARTITION)\n df = df.repartition(optimal_partitions)\n print(f\"Medium update: repartitioned to {optimal_partitions} partitions\")\n elif record_count < 20_000_000:\n # Large daily update - repartition for better distribution\n optimal_partitions = min(4096, record_count // RECORDS_PER_PARTITION)\n df = df.repartition(optimal_partitions)\n print(f\"Large update: repartitioned to {optimal_partitions} partitions\")\n else:\n # Very large update - use repartitionByRange like full sync\n df = df.repartitionByRange(8096, \"id\")\n print(f\"Very large update: using repartitionByRange with 8096 partitions\")\n\nprint(f\"SQL query:\\n{SQL_QUERY}\")"
84+
"source": "if IS_FWCI_SYNC:\n # Sync all FWCI-eligible works (used after inline FWCI calculation change)\n FWCI_WHERE = \"\"\"primary_topic.subfield.id IS NOT NULL\n AND (type IN ('article', 'book', 'review', 'book-chapter')\n OR (type = 'article' AND primary_location.source.type = 'conference'))\"\"\"\n if FWCI_BATCH > 0:\n FWCI_WHERE += f\"\\n AND id % {FWCI_TOTAL_BATCHES} = {FWCI_BATCH - 1}\"\n SQL_QUERY = f\"SELECT * FROM openalex.works.openalex_works WHERE {FWCI_WHERE}\"\n COUNT_QUERY = f\"SELECT COUNT(*) as cnt FROM openalex.works.openalex_works WHERE {FWCI_WHERE}\"\nelif IS_FULL_SYNC:\n SQL_QUERY = \"\"\"SELECT * FROM openalex.works.openalex_works\"\"\"\n COUNT_QUERY = None # Full sync doesn't need count-based optimization\nelse:\n SQL_QUERY = \"\"\"SELECT * FROM openalex.works.openalex_works\nWHERE updated_date >= current_date() - INTERVAL 2 days\n\"\"\"\n COUNT_QUERY = \"\"\"SELECT COUNT(*) as cnt FROM openalex.works.openalex_works\nWHERE updated_date >= current_date() - INTERVAL 2 days\n\"\"\"\n\n# Get record count BEFORE loading data (lightweight SQL count, no transformations)\nrecord_count = None\nif not IS_FULL_SYNC and COUNT_QUERY:\n record_count = spark.sql(COUNT_QUERY).collect()[0].cnt\n print(f\"Record count for sync: {record_count:,}\")\n\ndf = (\n spark.sql(SQL_QUERY)\n .withColumn(\"display_name\", F.col(\"title\"))\n # First cast to date/timestamp\n .withColumn(\"created_date\", F.to_timestamp(\"created_date\"))\n .withColumn(\"updated_date\", F.to_timestamp(\"updated_date\"))\n .withColumn(\"publication_date\", F.to_date(\"publication_date\"))\n .withColumn(\n \"concepts\",\n F.transform(\n F.col(\"concepts\"),\n lambda c: F.struct(\n F.concat(F.lit(\"https://openalex.org/C\"), c.id).alias(\"id\"),\n c.wikidata.alias(\"wikidata\"),\n c.display_name.alias(\"display_name\"),\n c.level.alias(\"level\"),\n c.score.alias(\"score\")\n )\n )\n )\n # Apply range checks using BETWEEN\n .withColumn(\n \"created_date\",\n F.when(\n F.col(\"created_date\").between(F.lit(\"1000-01-01\"), F.lit(\"9999-12-31\")),\n F.col(\"created_date\")\n ).otherwise(F.lit(None).cast(\"timestamp\"))\n )\n .withColumn(\n \"updated_date\",\n F.when(\n F.col(\"updated_date\").between(F.lit(\"1000-01-01\"), F.lit(\"9999-12-31\")),\n F.col(\"updated_date\")\n ).otherwise(F.lit(None).cast(\"timestamp\"))\n )\n .withColumn(\n \"publication_date\",\n F.when(\n F.col(\"publication_date\").between(F.lit(\"1000-01-01\"), F.lit(\"2050-12-31\")),\n F.col(\"publication_date\")\n ).otherwise(F.lit(None).cast(\"date\"))\n )\n .filter(F.col(\"id\").isNotNull())\n)\n\n# Dynamic partitioning based on record volume\n# Only apply partition optimization for non-full syncs\nif not IS_FULL_SYNC and record_count is not None:\n # Calculate optimal partition count:\n # - Small updates (<500k): use fewer partitions for efficiency\n # - Medium updates (500k-5M): moderate partitions \n # - Large updates (5M-20M): many partitions like full sync\n # - Very large updates (>20M): use repartitionByRange for even distribution\n RECORDS_PER_PARTITION = 10000 # Target ~10k records per partition\n \n if record_count < 2_000_000:\n # Small daily update - coalesce to reduce overhead\n optimal_partitions = max(64, record_count // RECORDS_PER_PARTITION)\n df = df.coalesce(optimal_partitions)\n print(f\"Small update: coalesced to {optimal_partitions} partitions\")\n elif record_count < 10_000_000:\n # Medium daily update - use more partitions\n optimal_partitions = max(1024, record_count // RECORDS_PER_PARTITION)\n df = df.repartition(optimal_partitions)\n print(f\"Medium update: repartitioned to {optimal_partitions} partitions\")\n elif record_count < 20_000_000:\n # Large daily update - repartition for better distribution\n optimal_partitions = min(4096, record_count // RECORDS_PER_PARTITION)\n df = df.repartition(optimal_partitions)\n print(f\"Large update: repartitioned to {optimal_partitions} partitions\")\n else:\n # Very large update - use repartitionByRange like full sync\n df = df.repartitionByRange(8096, \"id\")\n print(f\"Very large update: using repartitionByRange with 8096 partitions\")\n\nprint(f\"SQL query:\\n{SQL_QUERY}\")"
8585
},
8686
{
8787
"cell_type": "code",

0 commit comments

Comments
 (0)