Skip to content

Commit 924117d

Browse files
authored
Merge pull request #42 from dataforgoodfr/feat/scraping
Add code for downloading PDFs, extracting text from PDFs, cleaning text and extracting sections from it.
2 parents e26ff28 + c756245 commit 924117d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+4340
-513
lines changed

library/.env.example

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Scaleway API key ID and secret key obtained on the platform
2+
# used to upload files to object storage
3+
S3_ACCESS_KEY=XXX # pretty short and starts with SCW
4+
S3_SECRET_KEY=XXX # looks like a uuid
5+
6+
DATABASE_URL=postgresql://username:password@host-postgresql.services.clever-cloud.com:port/dbname
7+
8+
# for pymupdf with OCR
9+
# after installing tesseract-ocr, you can fin this with `which tesseract` and then browsing
10+
TESSDATA_PREFIX=/usr/share/tesseract-ocr/5/tessdata/

library/README.md

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,8 @@
44
[Voir la doc dédiée](prescreening/README.md).
55

66

7-
### 2. Extraction full-text
8-
Cette étape regroupe à nouveau deux sous-étapes :
9-
1. Obtention quand disponible (open access) d'un lien pour le texte complet, généralement en PDF.
10-
2. Téléchargement et lecture du PDF pour obtenir le texte converti en format markdown.
11-
12-
Les PDF téléchargés doivent être stockés pour affichage aux utilisateurs finaux quand ils sont cités.
13-
14-
Le code pour l'étape 1 (à perfectionner car il ne gère pas les cas où il faut cliquer sur une popup avant d'accéder au PDF, la branche `scraping` contient de légères améliorations) est dans `scraping/extract_openalex.py` et celui de l'étape 2 dans `pdfextraction/pdf/`.
7+
### 2. Obtention des PDF et extraction des textes complets
8+
[Voir la doc dédiée](scraping/README.md).
159

1610

1711
### 3. Extraction de la taxonomie
@@ -24,8 +18,11 @@ Le traitement des chunks pour cette étape reste à clarifier (métadonnées en
2418

2519

2620
### Roadmap
27-
- [ ] Nettoyer la base de données Postgres et repartir d'une table propre de 250k articles avec a minima OpenAlex ID, DOI, titre et abstract
28-
- [ ] Récupérer le texte complet d'autant de ces articles que possible, le stocker en format texte dans Postgres et stocker les PDF dans un object storage sur CleverCloud
29-
- [ ] Traiter les textes complets par NLP pour extraire la taxonomie, la stocker en métadonnées sur Postgres
21+
- [x] Mettre au propre le jeu de mots-clés
22+
- [x] Etape 1 du pré-screening : obtenir les références des articles candidats en par des recherches par mot-clé sur l'API OpenAlex
23+
- [x] Etape 2 du pré-screning : filtrer les résultats de l'étape 1 en faisant classifier l'abstract à un modèle BERT fine-tuné
24+
- [x] Récupérer quand c'est possible les PDF des articles et en extraire les textes complets -> textes bruts et non markdown, md serait mieux
25+
- [x] Extraire les sections Résultats et Conclusion
26+
- [ ] Extraire la taxonomie
3027
- [ ] Mettre en place un pipeline pour mettre à jour automatiquement la library de façon régulière
31-
- [ ] Intégrer d'autres sources qu'OpenAlex
28+
- [ ] Intégrer d'autres sources qu'OpenAlex

library/old/cli.py

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
"""
2+
Paper Processing Pipeline CLI
3+
Unified command-line interface for scraping, queue management, and testing
4+
"""
5+
6+
from typing import Optional
7+
import typer
8+
from rich.console import Console
9+
from pathlib import Path
10+
11+
app = typer.Typer(
12+
name="pipeline",
13+
help="Paper Processing Pipeline - 5x faster parallel scraping, queue management, and testing",
14+
no_args_is_help=True,
15+
rich_markup_mode="rich"
16+
)
17+
18+
console = Console()
19+
20+
@app.command()
21+
def test():
22+
"""
23+
🧪 Run scraping tests with small batch
24+
This command will:
25+
- Test scraping functionality with 10 papers
26+
- Create folder structure
27+
- Show you what to expect
28+
29+
Equivalent to: [cyan]python cli.py scrape --batch-size 10[/cyan]
30+
"""
31+
try:
32+
from ...tests.scraping.test_scraping import main as test_main
33+
console.print("🧪 Running scraping tests...", style="bold blue")
34+
test_main()
35+
except ImportError as e:
36+
console.print(f"❌ Failed to import test module: {e}", style="bold red")
37+
raise typer.Exit(1)
38+
except Exception as e:
39+
console.print(f"❌ Test error: {e}", style="bold red")
40+
raise typer.Exit(1)
41+
42+
43+
@app.command()
44+
def scrape(
45+
batch_size: int = typer.Option(100, "--batch-size", help="Number of papers to scrape per batch"),
46+
output_dir: str = typer.Option("./scraping_output", "--output-dir", help="Output directory"),
47+
max_wait_time: int = typer.Option(30, "--max-wait-time", help="Max wait time for downloads (seconds)"),
48+
workers: int = typer.Option(1, "--workers", help="Number of parallel Chrome workers (1=sequential, 5=recommended)"),
49+
test_paper: Optional[str] = typer.Option(None, "--test-paper", help="Test scraping a specific OpenAlex ID"),
50+
all_papers: bool = typer.Option(False, "--all", help="Process ALL papers in database continuously (resumable)"),
51+
quiet: bool = typer.Option(False, "--quiet", help="Reduce output for large batch processing"),
52+
stats: bool = typer.Option(False, "--stats", help="Show scraping statistics and progress"),
53+
clear: bool = typer.Option(False, "--clear", help="Clear the entire scraping queue")
54+
):
55+
"""
56+
🌐 Paper scraping with DOI retry logic and parallel processing
57+
Automatically retries failed downloads using DOI if OpenAlex ID scraping fails.
58+
Papers are distributed across 12 folders for parallel metadata extraction.
59+
Fully resumable - can be interrupted and restarted without issues.
60+
61+
🚀 PARALLEL SCRAPING: Use --workers 5 for ~5x faster scraping with multiple Chrome instances.
62+
63+
[bold blue]Examples:[/bold blue]
64+
- [cyan]python cli.py scrape --all --workers 5 --batch-size 100[/cyan] - Parallel processing (FAST!)
65+
- [cyan]python cli.py scrape --workers 5 --batch-size 50[/cyan] - Single parallel batch
66+
- [cyan]python cli.py scrape --batch-size 50[/cyan] - Single worker (sequential)
67+
- [cyan]python cli.py scrape --test-paper "https://openalex.org/W123"[/cyan] - Test single paper
68+
- [cyan]python cli.py scrape --stats[/cyan] - Show scraping progress and statistics
69+
- [cyan]python cli.py scrape --clear[/cyan] - Clear the entire scraping queue
70+
- [cyan]python cli.py scrape --all --workers 5 --quiet[/cyan] - Fast parallel processing, minimal output
71+
"""
72+
try:
73+
# Handle clear operation
74+
if clear:
75+
from database.models import clear_scraping_queue
76+
77+
console.print("⚠️ [bold red]This will DELETE ALL entries in the scraping queue![/bold red]")
78+
if not typer.confirm("Are you sure you want to continue?"):
79+
console.print("❌ Operation cancelled", style="yellow")
80+
return
81+
82+
cleared_count = clear_scraping_queue()
83+
if cleared_count > 0:
84+
console.print(f"🗑️ Cleared {cleared_count:,} entries from scraping queue", style="green")
85+
else:
86+
console.print("📭 Scraping queue was already empty", style="blue")
87+
88+
# If only clear was requested, we're done
89+
if not test_paper and not all_papers and not stats:
90+
return
91+
92+
# Handle stats display
93+
if stats:
94+
from database.models import get_scraping_stats
95+
96+
stats_data = get_scraping_stats()
97+
console.print("📊 [bold]SCRAPING STATISTICS[/bold]")
98+
console.print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
99+
console.print(f"📋 Total papers: {stats_data.get('total', 0):,}", style="blue")
100+
console.print(f"✅ Successfully scraped: {stats_data.get('scraped', 0):,}", style="green")
101+
console.print(f"❌ Failed: {stats_data.get('failed', 0):,}", style="red")
102+
console.print(f"⏳ Pending: {stats_data.get('pending', 0):,}", style="yellow")
103+
104+
if stats_data.get('total', 0) > 0:
105+
completion_rate = (stats_data.get('scraped', 0) / stats_data['total']) * 100
106+
failure_rate = (stats_data.get('failed', 0) / stats_data['total']) * 100
107+
console.print(f"📈 Completion rate: {completion_rate:.1f}%", style="blue")
108+
console.print(f"📉 Failure rate: {failure_rate:.1f}%", style="blue")
109+
110+
# If only stats was requested, we're done
111+
if not test_paper and not all_papers:
112+
return
113+
114+
# Handle scraping operations
115+
from scraping.targeted_scraper import TargetedPaperScraper, ParallelPaperScraper
116+
import logfire
117+
118+
# Validate workers parameter
119+
if workers < 1:
120+
console.print("❌ Error: Number of workers must be at least 1", style="bold red")
121+
raise typer.Exit(1)
122+
elif workers > 10:
123+
console.print("⚠️ Warning: Using more than 10 workers may overwhelm your system", style="yellow")
124+
if not typer.confirm("Continue anyway?"):
125+
console.print("❌ Operation cancelled", style="yellow")
126+
raise typer.Exit(0)
127+
128+
# Configure logfire
129+
LOGFIRE_TOKEN = "pylf_v1_us_qTtmbDFpkfhFwzTfZyZrTJcl4C4lC7FhmZ65BgJ7dLDV"
130+
logfire.configure(token=LOGFIRE_TOKEN)
131+
132+
# Choose scraper based on worker count
133+
if workers > 1:
134+
console.print(f"🚀 Using parallel scraper with {workers} Chrome workers", style="bold green")
135+
scraper = ParallelPaperScraper(
136+
base_output_dir=output_dir,
137+
max_wait_time=max_wait_time,
138+
num_workers=workers
139+
)
140+
is_parallel = True
141+
else:
142+
console.print("🔧 Using sequential scraper (single Chrome worker)", style="blue")
143+
scraper = TargetedPaperScraper(
144+
base_output_dir=output_dir,
145+
max_wait_time=max_wait_time
146+
)
147+
is_parallel = False
148+
149+
if test_paper:
150+
console.print(f"🧪 Testing single paper: {test_paper}", style="bold blue")
151+
# Test paper only works with sequential scraper for now
152+
if is_parallel:
153+
console.print("💡 Test mode uses sequential scraper for single paper testing", style="dim")
154+
test_scraper = TargetedPaperScraper(
155+
base_output_dir=output_dir,
156+
max_wait_time=max_wait_time
157+
)
158+
success = test_scraper.scrape_paper(test_paper)
159+
test_scraper.close_driver()
160+
else:
161+
success = scraper.scrape_paper(test_paper)
162+
163+
if success:
164+
console.print("✅ Test successful!", style="bold green")
165+
else:
166+
console.print("❌ Test failed", style="bold red")
167+
elif all_papers:
168+
if is_parallel:
169+
console.print(f"🚀 Starting parallel continuous scraping with {workers} workers...", style="bold blue")
170+
else:
171+
console.print("🚀 Starting continuous scraping of ALL papers in database...", style="bold blue")
172+
console.print("💡 [bold yellow]TIP:[/bold yellow] You can interrupt with Ctrl+C and resume later", style="dim")
173+
174+
if is_parallel:
175+
stats = scraper.scrape_all_continuous_parallel(
176+
batch_size=batch_size,
177+
show_progress=not quiet
178+
)
179+
else:
180+
stats = scraper.scrape_all_continuous(
181+
batch_size=batch_size,
182+
show_progress=not quiet
183+
)
184+
185+
# Final summary
186+
console.print("\n🎉 [bold]CONTINUOUS SCRAPING COMPLETED![/bold]")
187+
console.print(f" ✅ Successfully scraped: {stats['total_successful']:,} papers", style="green")
188+
console.print(f" 📊 Batches completed: {stats['batches_completed']}", style="blue")
189+
console.print(f" ⏱️ Total time: {stats['total_time']/60:.1f} minutes", style="blue")
190+
191+
if is_parallel:
192+
console.print(f" 🔧 Workers used: {stats.get('workers_used', workers)}", style="blue")
193+
194+
if stats['total_successful'] > 0:
195+
papers_per_hour = stats['total_successful'] / (stats['total_time'] / 3600)
196+
console.print(f" 📈 Average rate: {papers_per_hour:.0f} papers/hour", style="blue")
197+
198+
if is_parallel and workers > 1:
199+
estimated_single_rate = papers_per_hour / workers
200+
console.print(f" 🚀 Speedup: ~{papers_per_hour/estimated_single_rate:.1f}x vs single worker", style="green")
201+
202+
console.print(f"\n📁 Files distributed across 12 folders in: {output_dir}")
203+
console.print("💡 Next: Run metadata extraction with [cyan]./run_metadata_extraction.sh[/cyan]")
204+
else:
205+
if is_parallel:
206+
console.print(f"🚀 Starting parallel batch scraping of {batch_size} papers with {workers} workers...", style="bold blue")
207+
stats = scraper.scrape_batch_parallel(batch_size, show_progress=not quiet)
208+
else:
209+
console.print(f"🚀 Starting single batch scraping of {batch_size} papers...", style="bold blue")
210+
stats = scraper.scrape_batch(batch_size, show_progress=not quiet)
211+
console.print("\n📊 [bold]BATCH RESULTS:[/bold]")
212+
console.print(f" ✅ Successful: {stats['successful']}", style="green")
213+
console.print(f" ❌ Failed: {stats['failed']}", style="red")
214+
console.print(f" 📋 Total processed: {stats['processed']}", style="blue")
215+
216+
if is_parallel:
217+
console.print(f" 🔧 Workers used: {stats.get('workers_used', workers)}", style="blue")
218+
219+
if stats.get('total_time'):
220+
console.print(f" ⏱️ Batch time: {stats['total_time']/60:.1f} minutes", style="blue")
221+
console.print(f" 📈 Success rate: {stats.get('success_rate', 0):.1f}%", style="blue")
222+
223+
# Show speed metrics for parallel processing
224+
if is_parallel and stats.get('papers_per_second'):
225+
console.print(f" 🚀 Speed: {stats['papers_per_second']:.1f} papers/second", style="green")
226+
if workers > 1:
227+
console.print(f" 📈 Estimated speedup: ~{stats['papers_per_second']*5:.1f}x vs single worker", style="green")
228+
229+
if stats['successful'] > 0:
230+
if is_parallel:
231+
console.print(f"\n🎉 Successfully scraped {stats['successful']} papers with {workers} parallel workers!", style="bold green")
232+
else:
233+
console.print(f"\n🎉 Successfully scraped {stats['successful']} papers!", style="bold green")
234+
235+
console.print("💡 [bold yellow]TIP:[/bold yellow] Use [cyan]--all --workers 5[/cyan] for fast continuous processing")
236+
console.print(f"📁 Files distributed across folders in: {output_dir}")
237+
238+
except ImportError as e:
239+
console.print(f"❌ Failed to import scraping module: {e}", style="bold red")
240+
raise typer.Exit(1)
241+
except Exception as e:
242+
console.print(f"❌ Scraping error: {e}", style="bold red")
243+
raise typer.Exit(1)
244+
245+
@app.callback()
246+
def main(
247+
ctx: typer.Context,
248+
version: bool = typer.Option(False, "--version", help="Show version information")
249+
):
250+
"""
251+
📄 Paper Processing Pipeline
252+
253+
A production-ready system for scraping 250k academic papers and extracting metadata.
254+
Features DOI retry logic for improved scraping performance and resilience.
255+
256+
[bold blue]Available Commands:[/bold blue]
257+
258+
• [green]test[/green] - Run test scraping with 10 papers
259+
• [green]scrape[/green] - Paper scraping with progress reporting
260+
[bold blue]Quick Examples:[/bold blue]
261+
262+
• [cyan]python cli.py test[/cyan] - Test the system
263+
• [cyan]python cli.py scrape --stats[/cyan] - Check scraping progress
264+
• [cyan]python cli.py scrape --all --batch-size 100[/cyan] - Process all papers (resumable)
265+
• [cyan]python cli.py scrape --clear[/cyan] - Clear scraping queue
266+
"""
267+
if version:
268+
console.print("Paper Processing Pipeline CLI v2.0.0", style="bold blue")
269+
raise typer.Exit()
270+
271+
def cli_main():
272+
"""Entry point for CLI"""
273+
try:
274+
app()
275+
except KeyboardInterrupt:
276+
console.print("\n⏹️ Operation cancelled by user", style="bold yellow")
277+
raise typer.Exit(1)
278+
except Exception as e:
279+
console.print(f"❌ Unexpected error: {e}", style="bold red")
280+
raise typer.Exit(1)
281+
282+
if __name__ == "__main__":
283+
cli_main()
File renamed without changes.

library/old/extract_openalex.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""
2+
Old utils to query Open Alex API.
3+
Prefer using OpenAlexConnector.
4+
"""
5+
6+
7+
import requests
8+
9+
10+
# Request OpenAlex API
11+
def search_openalex(
12+
query: str, cursor="*", per_page: int = 50, from_dois: bool = False, dois: list = None
13+
) -> dict:
14+
if dois is None:
15+
dois = []
16+
17+
if from_dois:
18+
pipe_separated_dois = "|".join(dois)
19+
params = {
20+
"filter": f"open_access.is_oa:true,doi:{pipe_separated_dois}",
21+
"cursor": cursor,
22+
"per-page": per_page,
23+
}
24+
else:
25+
params = {
26+
"filter": "open_access.is_oa:true",
27+
"search": f"{query}",
28+
"cursor": cursor,
29+
"per-page": per_page,
30+
}
31+
32+
url = "https://api.openalex.org/works"
33+
response = requests.get(url, params=params)
34+
response.raise_for_status()
35+
query_data = response.json()
36+
return query_data
37+
38+
39+
# Retrieve PDF urls and OpenAlex IDs
40+
def get_urls_to_fetch(query_data: dict):
41+
urls_to_fetch = []
42+
filenames = []
43+
for i in range(len(query_data["results"])):
44+
file_title = query_data["results"][i]["id"]
45+
filenames.append(file_title.split("/")[-1])
46+
try:
47+
urls_to_fetch.append(query_data["results"][i]["best_oa_location"]["pdf_url"])
48+
except TypeError:
49+
urls_to_fetch.append(query_data["results"][i]["open_access"]["oa_url"])
50+
return urls_to_fetch, filenames

0 commit comments

Comments
 (0)