-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
136 lines (118 loc) · 5 KB
/
database.py
File metadata and controls
136 lines (118 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import sqlite3
import json
import time
from typing import List, Dict, Optional, Any
from config import config
class Database:
def __init__(self, db_path: str = config.DB_PATH):
self.db_path = db_path
self._init_db()
def _get_conn(self):
return sqlite3.connect(self.db_path)
def _init_db(self):
conn = self._get_conn()
cursor = conn.cursor()
# News table
cursor.execute('''
CREATE TABLE IF NOT EXISTS news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
title TEXT,
source_name TEXT,
published_at REAL,
fetched_at REAL,
summary TEXT,
-- L1 Analysis
l1_score INTEGER DEFAULT 0,
l1_reason TEXT,
-- L2 Analysis
l2_score INTEGER DEFAULT 0,
l2_summary TEXT,
l2_title_zh TEXT,
category TEXT,
-- Status
status TEXT DEFAULT 'pending' -- pending, filtered, processed
)
''')
conn.commit()
conn.close()
def add_news(self, url: str, title: str, source_name: str, published_at: float, summary: str = "") -> bool:
"""Returns True if added, False if already exists."""
conn = self._get_conn()
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO news (url, title, source_name, published_at, fetched_at, summary, status)
VALUES (?, ?, ?, ?, ?, ?, 'pending')
''', (url, title, source_name, published_at, time.time(), summary))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
finally:
conn.close()
def get_pending_news(self, limit: int = 20) -> List[Dict]:
conn = self._get_conn()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM news WHERE status = 'pending' LIMIT ?", (limit,))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def update_l1_result(self, news_id: int, score: int, reason: str, status: str):
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute('''
UPDATE news
SET l1_score = ?, l1_reason = ?, status = ?
WHERE id = ?
''', (score, reason, status, news_id))
conn.commit()
conn.close()
def get_high_score_pending_l2(self, min_score: int = 70, limit: int = 20) -> List[Dict]:
"""Get news that passed L1 but haven't been processed by L2 yet."""
conn = self._get_conn()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# We define a temporary status 'l1_passed' or just check if score is high and l2_score is 0/default?
# Let's rely on 'status' being 'filtered' if it failed.
# Use a new L2 check: if status='processed' check if l2 is done?
# Actually simplest is: Status='pending' -> L1 -> Status='filtered' (if fail) OR 'l1_done' (if success)
# Then L2 picks up 'l1_done' -> Status='processed'
# Let's adjust status logic in code.
# But for now, let's look for "l1_done" status.
cursor.execute("SELECT * FROM news WHERE status = 'l1_done' AND l1_score >= ? LIMIT ?", (min_score, limit))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def update_l2_result(self, news_id: int, score: int, summary: str, title_zh: str, category: str):
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute('''
UPDATE news
SET l2_score = ?, l2_summary = ?, l2_title_zh = ?, category = ?, status = 'processed'
WHERE id = ?
''', (score, summary, title_zh, category, news_id))
conn.commit()
conn.close()
def get_processed_news(self, limit: int = 50) -> List[Dict]:
conn = self._get_conn()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM news WHERE status = 'processed' ORDER BY published_at DESC LIMIT ?", (limit,))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_conn(self):
return self._get_conn()
def get_recent_processed_news(self, hours: int = config.RANKING_WINDOW_HOURS) -> List[Dict]:
"""Get all processed news from the last N hours."""
conn = self._get_conn()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cutoff = time.time() - (hours * 3600)
cursor.execute("SELECT * FROM news WHERE status = 'processed' AND published_at > ? ORDER BY published_at DESC", (cutoff,))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
db = Database()