Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions scrap/medium_scraping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import feedparser
from datetime import datetime
from typing import List, Dict, Optional
import time

SOURCE_SITE = "medium"

RSS_FEEDS = [
"https://medium.com/feed/tag/artificial-intelligence",
"https://medium.com/feed/tag/machine-learning",
"https://medium.com/feed/tag/deep-learning",
"https://medium.com/feed/tag/ai",
]

def normalize_medium_entry(entry: feedparser.FeedParserDict) -> Dict:
"""Normalise une entrée RSS Medium dans le format unifié."""
entry_id = entry.get('link', '')

published_date = datetime.utcnow().isoformat()
if getattr(entry, "published_parsed", None):
published_date = datetime.fromtimestamp(time.mktime(entry.published_parsed)).isoformat()

keywords = [tag.term for tag in entry.get('tags', [])] if 'tags' in entry else []

return {
"id": entry_id,
"source_site": SOURCE_SITE,
"title": entry.get('title', 'N/A'),
"description": entry.get('summary', 'N/A'),
"author_info": entry.get('author', 'N/A'),
"keywords": ", ".join(keywords),
"content_url": entry_id,
"published_date": published_date,
"item_type": "article",
}

def scrape_medium(max_articles_per_feed: int = 10) -> List[Dict]:
"""Scrape les flux RSS Medium et retourne les éléments unifiés."""
all_items = []
unique_links = set()

for feed_url in RSS_FEEDS:
print(f"📡 Fetching RSS: {feed_url}")
try:
feed = feedparser.parse(feed_url)

for entry in feed.entries[:max_articles_per_feed]:
link = entry.get('link')
if link and link not in unique_links:
all_items.append(normalize_medium_entry(entry))
unique_links.add(link)

except Exception as e:
print(f"❌ Error fetching {feed_url}: {e}")
time.sleep(1)

return all_items

if __name__ == "__main__":
results = scrape_medium(max_articles_per_feed=2)
print(f"Total Medium items scraped: {len(results)}")
if results:
print("\nExemple d'élément unifié:")
import json
print(json.dumps(results[0], indent=2))
59 changes: 59 additions & 0 deletions scrap/scrap_arxiv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import arxiv
from datetime import datetime
from typing import List, Dict

# Constantes de l'outil de veille
SOURCE_SITE = "arxiv"
CATEGORY = "cs.LG"

def normalize_arxiv_result(paper: arxiv.Result) -> Dict:
"""Normalise un résultat arXiv dans le format unifié."""

authors = ", ".join([a.name for a in paper.authors])

link = paper.entry_id

keywords_list = [paper.primary_category]
if paper.categories:
keywords_list.extend(paper.categories)

return {
"id": link,
"source_site": SOURCE_SITE,
"title": paper.title.replace('\n', ' '),
"description": paper.summary.replace('\n', ' '),
"author_info": authors,
"keywords": ", ".join(keywords_list),
"content_url": link,
"published_date": paper.published.isoformat(),
"item_type": "paper",
}

def scrape_arxiv(category: str = CATEGORY, max_results: int = 10) -> List[Dict]:
"""Scrape arXiv pour une catégorie et retourne les éléments unifiés."""

try:
search = arxiv.Search(
query=f"cat:{category}",
max_results=max_results,
sort_by=arxiv.SortCriterion.SubmittedDate,
sort_order=arxiv.SortOrder.Descending
)

normalized_results = []
for result in search.results():
normalized_results.append(normalize_arxiv_result(result))

return normalized_results

except Exception as e:
print(f"[ERREUR] arXiv Search: {e}")
return []

if __name__ == "__main__":
results = scrape_arxiv(max_results=5)
print(f"Total arXiv items scraped: {len(results)}")
if results:
print("\nExemple d'élément unifié:")
import json
print(json.dumps(results[0], indent=2))
71 changes: 71 additions & 0 deletions scrap/scrap_le_monde.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import feedparser
import time
from datetime import datetime
from typing import List, Dict

SOURCE_SITE = "le_monde"

FEEDS = [
"https://www.lemonde.fr/international/rss_full.xml",
"https://www.lemonde.fr/actualite-medias/rss_full.xml",
"https://www.lemonde.fr/en_continu/rss_full.xml"
]

def normalize_lemonde_entry(entry: feedparser.FeedParserDict, feed_url: str) -> Dict:
"""Normalise une entrée RSS Le Monde dans le format unifié."""
entry_id = getattr(entry, "id", None) or getattr(entry, "link", None)

published_date = datetime.utcnow().isoformat()
if getattr(entry, "published_parsed", None):
published_date = datetime.fromtimestamp(time.mktime(entry.published_parsed)).isoformat()
elif getattr(entry, "updated_parsed", None):
published_date = datetime.fromtimestamp(time.mktime(entry.updated_parsed)).isoformat()

category = "actualité générale"
if "international" in feed_url:
category = "international"
elif "medias" in feed_url:
category = "actualité médias"
elif "continu" in feed_url:
category = "en continu"

return {
"id": entry_id,
"source_site": SOURCE_SITE,
"title": getattr(entry, "title", ""),
"description": getattr(entry, "summary", ""),
"author_info": getattr(entry, "author", "Le Monde"),
"keywords": category,
"content_url": getattr(entry, "link", ""),
"published_date": published_date,
"item_type": "article",
}

def scrape_lemonde(feeds: List[str] = FEEDS) -> List[Dict]:
"""Scrape les flux RSS Le Monde et retourne les éléments unifiés."""
all_items = []
unique_ids = set()

for feed_url in feeds:
try:
d = feedparser.parse(feed_url)

for entry in d.entries:
entry_id = getattr(entry, "id", None) or getattr(entry, "link", None)
if entry_id and entry_id not in unique_ids:
all_items.append(normalize_lemonde_entry(entry, feed_url))
unique_ids.add(entry_id)

except Exception as e:
print(f"[ERREUR] du fetch du feed {feed_url}: {e}")
time.sleep(1)

return all_items

if __name__ == "__main__":
results = scrape_lemonde(feeds=FEEDS[:1])
print(f"Total Le Monde items scraped: {len(results)}")
if results:
print("\nExemple d'élément unifié:")
import json
print(json.dumps(results[0], indent=2))
130 changes: 130 additions & 0 deletions scrap/scrape_github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import os
import requests
from datetime import datetime, UTC
from typing import List, Dict

SOURCE_SITE = "github"
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")


THEMES = [
"large-language-model", "llm", "transformer", "text-generation", "retrieval-augmented-generation",
"rag", "agents", "chatbot", "fine-tuning", "quantization", "lora", "peft",
"diffusion", "stable-diffusion", "image-generation", "multimodal",
"speech-to-text", "speech-synthesis", "audio", "reinforcement-learning",
"computer-vision",
]

HEADERS = {
"Accept": "application/vnd.github+json",
"User-Agent": "github-ai-theme-watcher/1.0"
}
if GITHUB_TOKEN:
HEADERS["Authorization"] = f"Bearer {GITHUB_TOKEN}"

class RateLimitError(Exception):
def __init__(self, retry_after=None):
self.retry_after = retry_after
super().__init__("Rate limit hit on GitHub API. Retry after: {}".format(retry_after))

def sanitize_text(s):
return str(s) if s is not None else ""

def normalize_github_repo(repo: Dict, theme: str) -> Dict:
full_name = repo.get("full_name")
keywords_list = [theme, repo.get("language") or ""]
if repo.get("topics"):
keywords_list.extend(repo.get("topics"))
updated_at = repo.get("updated_at") or repo.get("pushed_at") or datetime.now(UTC).isoformat()
return {
"id": full_name, "source_site": SOURCE_SITE, "title": repo.get("name"),
"description": sanitize_text(repo.get("description")), "author_info": repo.get("owner", {}).get("login", ""),
"keywords": ", ".join(filter(None, keywords_list)), "content_url": repo.get("html_url") or f"https://github.com/{full_name}",
"published_date": updated_at, "item_type": "repository",
}

def build_query_for_theme(theme: str) -> str:
theme_token = theme.replace(" ", "+")
q = f"{theme_token} in:name,description,readme stars:>50"
return q


def search_github_repos(query: str, per_page: int = 20) -> List[Dict]:
"""
Recherche des repositories GitHub.
Lève RateLimitError ou retourne List[Dict] (vide ou pleine).
"""
url = "https://api.github.com/search/repositories"
params = {
"q": query,
"sort": "stars",
"order": "desc",
"per_page": per_page
}

try:
resp = requests.get(url, headers=HEADERS, params=params, timeout=20)

if resp.status_code == 403:
retry_after = resp.headers.get("Retry-After")
raise RateLimitError(retry_after=int(retry_after) if retry_after and retry_after.isdigit() else None)

if resp.status_code != 200:
print(f"[WARN] HTTP Status {resp.status_code} for query: {query}")
return []

data = resp.json()
return data.get("items", [])

except RateLimitError:
raise
except requests.exceptions.RequestException as e:
print(f"[ERREUR CONNEXION/HTTP] GitHub Search: {e}")
return []
except Exception as e:
print(f"[ERREUR INCONNUE/JSON] GitHub Search: {e}")
return []


def scrape_github(themes: List[str] = THEMES, limit_per_theme: int = 20) -> List[Dict]:
"""Scrape GitHub pour les thèmes donnés et retourne les éléments unifiés."""

all_items = []
stop_scraping = False

try:
for theme in themes:
if stop_scraping:
break

q = build_query_for_theme(theme)
print(f"-> Recherche thème '{theme}' (q={q})")

try:
items = search_github_repos(q, limit_per_theme)

if not isinstance(items, list):
print(f"[FATAL WARN] search_github_repos a retourné {type(items)} au lieu de list. Arrêt.")
stop_scraping = True
continue

normalized_items = [normalize_github_repo(repo, theme) for repo in items]
all_items.extend(normalized_items)

except RateLimitError:
print(f"[RATE LIMIT] Limite atteinte. Arrêt de la veille GitHub pour cette itération.")
stop_scraping = True
except Exception as e:
print(f"[ERREUR THÈME] '{theme}': {e}")
continue

finally:
return all_items

if __name__ == "__main__":
results = scrape_github(themes=["llm"], limit_per_theme=5)
print(f"Total GitHub items scraped: {len(results)}")
if results:
import json
print("\nExemple d'élément unifié:")
print(json.dumps(results[0], indent=2))
Loading
Loading