Skip to content

Commit a0bfda2

Browse files
caseydmclaude
andcommitted
Add taxicab rescrape queue for generic URL rescraping
Adds a rescrape_queue table and rescrape_queue_only parameter to the taxicab notebook. When enabled, reads native_id/namespace from the queue, resolves URLs (DOIs constructed directly, others looked up from taxicab_results), scrapes them, and truncates the queue. New TaxiCab_Rescrape job (manual trigger only) uses this mode. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 70533a6 commit a0bfda2

File tree

3 files changed

+63
-22
lines changed

3 files changed

+63
-22
lines changed

databricks.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ include:
3838
- jobs/pubmed.yaml
3939
- jobs/repo.yaml
4040
- jobs/taxicab.yaml
41+
- jobs/taxicab_rescrape.yaml
4142
- jobs/parse_pdfs.yaml
4243
- jobs/walden_end2end.yaml
4344
- jobs/wunpaywall_weekly_data_feed.yaml

jobs/taxicab_rescrape.yaml

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
resources:
2+
jobs:
3+
TaxiCab_Rescrape:
4+
name: TaxiCab_Rescrape
5+
email_notifications:
6+
on_failure:
7+
- casey@ourresearch.org
8+
max_concurrent_runs: 1
9+
tasks:
10+
- task_key: run_taxicab_rescrape
11+
notebook_task:
12+
notebook_path: notebooks/scraping/taxicab
13+
source: GIT
14+
base_parameters:
15+
lookback_days: "3"
16+
url_limit: "1000000"
17+
rescrape_queue_only: "true"
18+
job_cluster_key: Taxicab_Rescrape_job_cluster
19+
job_clusters:
20+
- job_cluster_key: Taxicab_Rescrape_job_cluster
21+
new_cluster:
22+
cluster_name: ""
23+
spark_version: 16.4.x-scala2.13
24+
spark_conf:
25+
spark.master: local[*, 4]
26+
spark.databricks.cluster.profile: singleNode
27+
aws_attributes:
28+
first_on_demand: 1
29+
availability: SPOT_WITH_FALLBACK
30+
zone_id: us-east-1f
31+
spot_bid_price_percent: 100
32+
node_type_id: m5d.4xlarge
33+
driver_node_type_id: m5d.4xlarge
34+
custom_tags:
35+
ResourceClass: SingleNode
36+
enable_elastic_disk: true
37+
data_security_mode: SINGLE_USER
38+
runtime_engine: STANDARD
39+
num_workers: 0
40+
git_source:
41+
git_url: https://github.com/ourresearch/openalex-walden.git
42+
git_provider: gitHub
43+
git_branch: main
44+
parameters:
45+
- name: lookback_days
46+
default: "3"
47+
- name: url_limit
48+
default: "1000000"
49+
- name: rescrape_queue_only
50+
default: "true"

notebooks/scraping/taxicab.ipynb

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
},
6565
{
6666
"cell_type": "code",
67-
"execution_count": 0,
67+
"execution_count": null,
6868
"metadata": {
6969
"application/vnd.databricks.v1+cell": {
7070
"cellMetadata": {
@@ -80,24 +80,14 @@
8080
}
8181
},
8282
"outputs": [],
83-
"source": [
84-
"%sql\n",
85-
"CREATE TABLE IF NOT EXISTS openalex.taxicab.taxicab_results (\n",
86-
" taxicab_id STRING,\n",
87-
" url STRING,\n",
88-
" resolved_url STRING,\n",
89-
" status_code INT,\n",
90-
" content_type STRING,\n",
91-
" native_id STRING,\n",
92-
" native_id_namespace STRING,\n",
93-
" s3_path STRING,\n",
94-
" is_soft_block BOOLEAN,\n",
95-
" created_date TIMESTAMP,\n",
96-
" processed_date TIMESTAMP,\n",
97-
" error STRING\n",
98-
")\n",
99-
"USING DELTA;"
100-
]
83+
"source": "%sql\nCREATE TABLE IF NOT EXISTS openalex.taxicab.taxicab_results (\n taxicab_id STRING,\n url STRING,\n resolved_url STRING,\n status_code INT,\n content_type STRING,\n native_id STRING,\n native_id_namespace STRING,\n s3_path STRING,\n is_soft_block BOOLEAN,\n created_date TIMESTAMP,\n processed_date TIMESTAMP,\n error STRING\n)\nUSING DELTA;"
84+
},
85+
{
86+
"cell_type": "code",
87+
"source": "%sql\nCREATE TABLE IF NOT EXISTS openalex.taxicab.rescrape_queue (\n native_id STRING,\n native_id_namespace STRING,\n created_date TIMESTAMP DEFAULT current_timestamp()\n)\nUSING DELTA;",
88+
"metadata": {},
89+
"execution_count": null,
90+
"outputs": []
10191
},
10292
{
10393
"cell_type": "code",
@@ -123,7 +113,7 @@
123113
}
124114
},
125115
"outputs": [],
126-
"source": "dbutils.widgets.text(\"lookback_days\", \"3\", \"Lookback window (days)\")\nlookback_days = int(dbutils.widgets.get(\"lookback_days\"))\n\nlast_processed_date = datetime.datetime.now(timezone.utc) - datetime.timedelta(days=lookback_days)\nprint(f\"Looking back {lookback_days} days from: {last_processed_date}\")"
116+
"source": "dbutils.widgets.text(\"lookback_days\", \"3\", \"Lookback window (days)\")\ndbutils.widgets.text(\"rescrape_queue_only\", \"false\", \"Rescrape queue only (true/false)\")\n\nrescrape_queue_only = dbutils.widgets.get(\"rescrape_queue_only\").strip().lower() == \"true\"\nlookback_days = int(dbutils.widgets.get(\"lookback_days\"))\n\nif rescrape_queue_only:\n print(\"RESCRAPE MODE: processing queue table\")\nelse:\n last_processed_date = datetime.datetime.now(timezone.utc) - datetime.timedelta(days=lookback_days)\n print(f\"Looking back {lookback_days} days from: {last_processed_date}\")"
127117
},
128118
{
129119
"cell_type": "code",
@@ -142,7 +132,7 @@
142132
}
143133
},
144134
"outputs": [],
145-
"source": "# urls to scrape\n\ndbutils.widgets.text(\"url_limit\", \"250000\", \"Max URLs to process\")\nurl_limit = int(dbutils.widgets.get(\"url_limit\"))\n\n# Source 1: Crossref works\nrecent_crossref_works = (\n spark.read\n .table(\"openalex.crossref.crossref_works\")\n .filter(F.col(\"created_date\") >= F.lit(last_processed_date))\n .select(\n \"native_id\",\n \"native_id_namespace\",\n F.expr(\"get(filter(urls, x -> x.url like '%doi.org%'), 0).url\").alias(\"url\"),\n F.to_timestamp(\"created_date\").alias(\"source_created_date\"),\n )\n)\n\n# Source 2: Repo works\nrecent_repo_works = (\n spark.read.table(\"openalex.repo.repo_works\")\n .filter(F.col(\"created_date\") >= F.lit(last_processed_date))\n .select(\n \"native_id\",\n \"native_id_namespace\",\n F.slice(\"urls\", 1, 3).alias(\"urls\"),\n F.to_timestamp(\"created_date\").alias(\"source_created_date\"),\n )\n .filter(F.col(\"urls\").isNotNull())\n .select(\"*\", F.explode(\"urls\").alias(\"url_struct\"))\n .select(\n \"native_id\",\n \"native_id_namespace\",\n \"source_created_date\",\n F.col(\"url_struct.url\").alias(\"url\")\n )\n .filter(~F.col(\"url\").contains(\"doi.org\"))\n)\n\n# Source 3: Landing page PDF URLs\nrecent_pdf_works = (\n spark.read\n .table(\"openalex.landing_page.landing_page_works\")\n .filter(F.col(\"created_date\") >= F.lit(last_processed_date))\n .select(\n \"ids\",\n \"native_id\",\n \"native_id_namespace\",\n F.expr(\"get(filter(urls, x -> x.content_type = 'pdf'), 0).url\").alias(\"url\"),\n F.to_timestamp(\"created_date\").alias(\"source_created_date\"),\n )\n .withColumn(\"pmh_id\", F.expr(\"get(filter(ids, x -> x.namespace = 'pmh'), 0).id\"))\n .withColumn(\"doi_id\", F.expr(\"get(filter(ids, x -> x.namespace = 'doi'), 0).id\"))\n # Set priority: PMH first, then DOI, then original\n .withColumn(\"final_native_id\", \n F.when(F.col(\"pmh_id\").isNotNull(), F.col(\"pmh_id\"))\n .when(F.col(\"doi_id\").isNotNull(), F.col(\"doi_id\"))\n .otherwise(F.col(\"native_id\")))\n .withColumn(\"final_namespace\", \n F.when(F.col(\"pmh_id\").isNotNull(), F.lit(\"pmh\"))\n .when(F.col(\"doi_id\").isNotNull(), F.lit(\"doi\"))\n .otherwise(F.col(\"native_id_namespace\")))\n .select(\n F.col(\"final_native_id\").alias(\"native_id\"),\n F.col(\"final_namespace\").alias(\"native_id_namespace\"),\n \"url\",\n \"source_created_date\",\n )\n .filter(F.col(\"url\").isNotNull())\n)\n\n# Union all sources, clean native_id, dedup, order newest first, then drop the ordering column\ntaxicab_results = spark.table(\"openalex.taxicab.taxicab_results\").select(\"url\")\n\nall_urls = (\n recent_crossref_works\n .unionByName(recent_repo_works)\n .unionByName(recent_pdf_works)\n .withColumn(\"native_id\", F.regexp_replace(\"native_id\", \"^https://doi\\\\.org/\", \"\"))\n .join(taxicab_results, [\"url\"], \"left_anti\")\n .orderBy(F.col(\"source_created_date\").desc())\n .limit(url_limit)\n .drop(\"source_created_date\")\n)\n\nall_urls_pd = all_urls.toPandas()\n\njsonUrls = [\n {\n \"url\": row[\"url\"],\n \"native_id\": row.get(\"native_id\", \"\"),\n \"native_id_namespace\": row.get(\"native_id_namespace\", \"\")\n }\n for row in all_urls_pd.to_dict('records')\n if row[\"url\"] is not None\n]\n\ntotal_urls = len(jsonUrls)\npdf_urls = sum(1 for url in jsonUrls if '.pdf' in url['url'].lower())\ndoi_urls = sum(1 for url in jsonUrls if 'doi.org' in url['url'].lower())\nother_urls = total_urls - pdf_urls - doi_urls\n\nprint(f\"Harvesting {total_urls} URLs ({pdf_urls} PDFs, {doi_urls} DOIs, {other_urls} other URLs)\")"
135+
"source": "# urls to scrape\n\ndbutils.widgets.text(\"url_limit\", \"250000\", \"Max URLs to process\")\nurl_limit = int(dbutils.widgets.get(\"url_limit\"))\n\nif rescrape_queue_only:\n queue_df = spark.read.table(\"openalex.taxicab.rescrape_queue\")\n queue_count = queue_df.count()\n if queue_count == 0:\n dbutils.notebook.exit(\"Queue empty — nothing to rescrape\")\n\n print(f\"Rescrape queue has {queue_count} records\")\n\n # DOIs: construct URL directly\n doi_urls = (\n queue_df.filter(F.col(\"native_id_namespace\") == \"doi\")\n .withColumn(\"url\", F.concat(F.lit(\"https://doi.org/\"), F.col(\"native_id\")))\n .select(\"native_id\", \"native_id_namespace\", \"url\")\n )\n\n # Non-DOIs: look up most recent URL from taxicab_results\n non_doi_urls = (\n queue_df.filter(F.col(\"native_id_namespace\") != \"doi\")\n .join(\n spark.read.table(\"openalex.taxicab.taxicab_results\")\n .select(\"native_id\", \"native_id_namespace\", \"url\")\n .dropDuplicates([\"native_id\", \"native_id_namespace\", \"url\"]),\n [\"native_id\", \"native_id_namespace\"], \"inner\"\n )\n .select(\"native_id\", \"native_id_namespace\", \"url\")\n )\n\n all_urls = doi_urls.unionByName(non_doi_urls).limit(url_limit)\n\n all_urls_pd = all_urls.toPandas()\n\n jsonUrls = [\n {\n \"url\": row[\"url\"],\n \"native_id\": row.get(\"native_id\", \"\"),\n \"native_id_namespace\": row.get(\"native_id_namespace\", \"\")\n }\n for row in all_urls_pd.to_dict('records')\n if row[\"url\"] is not None\n ]\n\nelse:\n # Source 1: Crossref works\n recent_crossref_works = (\n spark.read\n .table(\"openalex.crossref.crossref_works\")\n .filter(F.col(\"created_date\") >= F.lit(last_processed_date))\n .select(\n \"native_id\",\n \"native_id_namespace\",\n F.expr(\"get(filter(urls, x -> x.url like '%doi.org%'), 0).url\").alias(\"url\"),\n F.to_timestamp(\"created_date\").alias(\"source_created_date\"),\n )\n )\n\n # Source 2: Repo works\n recent_repo_works = (\n spark.read.table(\"openalex.repo.repo_works\")\n .filter(F.col(\"created_date\") >= F.lit(last_processed_date))\n .select(\n \"native_id\",\n \"native_id_namespace\",\n F.slice(\"urls\", 1, 3).alias(\"urls\"),\n F.to_timestamp(\"created_date\").alias(\"source_created_date\"),\n )\n .filter(F.col(\"urls\").isNotNull())\n .select(\"*\", F.explode(\"urls\").alias(\"url_struct\"))\n .select(\n \"native_id\",\n \"native_id_namespace\",\n \"source_created_date\",\n F.col(\"url_struct.url\").alias(\"url\")\n )\n .filter(~F.col(\"url\").contains(\"doi.org\"))\n )\n\n # Source 3: Landing page PDF URLs\n recent_pdf_works = (\n spark.read\n .table(\"openalex.landing_page.landing_page_works\")\n .filter(F.col(\"created_date\") >= F.lit(last_processed_date))\n .select(\n \"ids\",\n \"native_id\",\n \"native_id_namespace\",\n F.expr(\"get(filter(urls, x -> x.content_type = 'pdf'), 0).url\").alias(\"url\"),\n F.to_timestamp(\"created_date\").alias(\"source_created_date\"),\n )\n .withColumn(\"pmh_id\", F.expr(\"get(filter(ids, x -> x.namespace = 'pmh'), 0).id\"))\n .withColumn(\"doi_id\", F.expr(\"get(filter(ids, x -> x.namespace = 'doi'), 0).id\"))\n # Set priority: PMH first, then DOI, then original\n .withColumn(\"final_native_id\", \n F.when(F.col(\"pmh_id\").isNotNull(), F.col(\"pmh_id\"))\n .when(F.col(\"doi_id\").isNotNull(), F.col(\"doi_id\"))\n .otherwise(F.col(\"native_id\")))\n .withColumn(\"final_namespace\", \n F.when(F.col(\"pmh_id\").isNotNull(), F.lit(\"pmh\"))\n .when(F.col(\"doi_id\").isNotNull(), F.lit(\"doi\"))\n .otherwise(F.col(\"native_id_namespace\")))\n .select(\n F.col(\"final_native_id\").alias(\"native_id\"),\n F.col(\"final_namespace\").alias(\"native_id_namespace\"),\n \"url\",\n \"source_created_date\",\n )\n .filter(F.col(\"url\").isNotNull())\n )\n\n # Union all sources, clean native_id, dedup, order newest first, then drop the ordering column\n taxicab_results = spark.table(\"openalex.taxicab.taxicab_results\").select(\"url\")\n\n all_urls = (\n recent_crossref_works\n .unionByName(recent_repo_works)\n .unionByName(recent_pdf_works)\n .withColumn(\"native_id\", F.regexp_replace(\"native_id\", \"^https://doi\\\\.org/\", \"\"))\n .join(taxicab_results, [\"url\"], \"left_anti\")\n .orderBy(F.col(\"source_created_date\").desc())\n .limit(url_limit)\n .drop(\"source_created_date\")\n )\n\n all_urls_pd = all_urls.toPandas()\n\n jsonUrls = [\n {\n \"url\": row[\"url\"],\n \"native_id\": row.get(\"native_id\", \"\"),\n \"native_id_namespace\": row.get(\"native_id_namespace\", \"\")\n }\n for row in all_urls_pd.to_dict('records')\n if row[\"url\"] is not None\n ]\n\ntotal_urls = len(jsonUrls)\npdf_urls = sum(1 for url in jsonUrls if '.pdf' in url['url'].lower())\ndoi_urls_count = sum(1 for url in jsonUrls if 'doi.org' in url['url'].lower())\nother_urls = total_urls - pdf_urls - doi_urls_count\n\nprint(f\"Harvesting {total_urls} URLs ({pdf_urls} PDFs, {doi_urls_count} DOIs, {other_urls} other URLs)\")"
146136
},
147137
{
148138
"cell_type": "code",
@@ -199,7 +189,7 @@
199189
}
200190
},
201191
"outputs": [],
202-
"source": "# run it all\nresults = process_urls_with_threadpool(jsonUrls, max_workers=120)\n\nnow = datetime.datetime.now(timezone.utc)\n\nfor result in results:\n result[\"created_date\"] = now\n result[\"processed_date\"] = now\n\n# create DataFrame directly from results and save to table\nresults_df = spark.createDataFrame(results, schema=results_schema)\nresults_df.write.mode(\"append\").format(\"delta\").saveAsTable(\"openalex.taxicab.taxicab_results\")\n\nprint(f\"Updated {results_df.count()} records in the results table\")"
192+
"source": "# run it all\nresults = process_urls_with_threadpool(jsonUrls, max_workers=120)\n\nnow = datetime.datetime.now(timezone.utc)\n\nfor result in results:\n result[\"created_date\"] = now\n result[\"processed_date\"] = now\n\n# create DataFrame directly from results and save to table\nresults_df = spark.createDataFrame(results, schema=results_schema)\nresults_df.write.mode(\"append\").format(\"delta\").saveAsTable(\"openalex.taxicab.taxicab_results\")\n\nprint(f\"Updated {results_df.count()} records in the results table\")\n\nif rescrape_queue_only:\n spark.sql(\"TRUNCATE TABLE openalex.taxicab.rescrape_queue\")\n print(\"Rescrape queue cleared\")"
203193
}
204194
],
205195
"metadata": {

0 commit comments

Comments
 (0)