Skip to content

Commit bc0f1fa

Browse files
feat: Add crypto-news-aggregator example
Signed-off-by: Stepan Bagritsevich <[email protected]>
1 parent 5ab48c7 commit bc0f1fa

11 files changed

+510
-0
lines changed

crypto-news-aggregator/README.md

Whitespace-only changes.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from sentence_transformers import SentenceTransformer
2+
import numpy as np
3+
4+
model = SentenceTransformer("sentence-transformers/paraphrase-MiniLM-L3-v2")
5+
6+
def generate_embedding(text):
7+
return model.encode(text, convert_to_numpy=True).astype(np.float32)

crypto-news-aggregator/example.py

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
import json
2+
import asyncio
3+
import numpy as np
4+
from redis import Redis
5+
from datetime import datetime, timedelta
6+
from news_fetcher import fetch_and_normalize_news_data
7+
from weight_calculator import calculate_weight
8+
from embedding_generator import generate_embedding
9+
from features_dataset_initializer import initialize_features_dataset
10+
from features_predictor import predict_growth
11+
from search_result_parser import to_dict
12+
13+
14+
def get_key(article):
15+
return f"article::{article['id']}"
16+
17+
18+
def get_article(r, key):
19+
return json.loads(r.execute_command("JSON.GET", key))
20+
21+
22+
def set_article_field(r, key, field, value):
23+
return r.execute_command("JSON.SET", key, f"$.{field}", value)
24+
25+
26+
async def update_article_weight(r, key):
27+
article = get_article(r, key)
28+
weight = calculate_weight(article)
29+
set_article_field(r, key, "weight", weight)
30+
31+
32+
async def update_article_embedding(r, key):
33+
article = get_article(r, key)
34+
content = article.get("title", "")
35+
embedding = str(generate_embedding(content).tolist())
36+
set_article_field(r, key, "embedding", embedding)
37+
38+
39+
def get_timestamp(days_ago=10):
40+
return int((datetime.now() - timedelta(days=days_ago)).timestamp())
41+
42+
43+
def create_index(r):
44+
return r.execute_command(
45+
"FT.CREATE news_idx ON JSON PREFIX 1 article:: SCHEMA $.id AS id NUMERIC $.published_at AS published_at NUMERIC SORTABLE $.votes.negative AS votes_negative NUMERIC $.votes.positive AS votes_positive NUMERIC $.votes.important AS votes_important NUMERIC $.currencies[*].code AS currency_code TAG $.weight AS weight NUMERIC"
46+
)
47+
48+
49+
def create_news_vector_index(r):
50+
return r.execute_command(
51+
"FT.CREATE news_vector_idx ON JSON PREFIX 1 article:: SCHEMA $.id AS id NUMERIC $.published_at AS published_at NUMERIC SORTABLE $.title AS title TEXT $.embedding AS embedding VECTOR FLAT 6 TYPE FLOAT32 DIM 384 DISTANCE_METRIC COSINE"
52+
)
53+
54+
55+
def create_features_index(r):
56+
return r.execute_command(
57+
"FT.CREATE features_idx ON JSON PREFIX 1 week:: SCHEMA $.currency_code AS currency_code TAG $.features AS features VECTOR FLAT 6 TYPE FLOAT32 DIM 5 DISTANCE_METRIC COSINE $.label AS label NUMERIC"
58+
)
59+
60+
61+
async def get_most_mentioned_cryptocurrencies(r, from_timestamp, top=10):
62+
return r.execute_command(
63+
"FT.AGGREGATE",
64+
"news_idx",
65+
f"@published_at:[{from_timestamp} +inf]",
66+
"GROUPBY",
67+
1,
68+
"@currency_code",
69+
"REDUCE",
70+
"COUNT",
71+
0,
72+
"AS",
73+
"mentions_count",
74+
"SORTBY",
75+
2,
76+
"@mentions_count",
77+
"DESC",
78+
"LIMIT",
79+
0,
80+
top,
81+
)
82+
83+
84+
async def get_cryptocurrency_recent_mentions(r, currency, from_timestamp, limit=10):
85+
return r.execute_command(
86+
"FT.SEARCH",
87+
"news_idx",
88+
f"@currency_code:{{{currency}}} @published_at:[{from_timestamp} +inf]",
89+
"SORTBY",
90+
"published_at",
91+
"DESC",
92+
"LIMIT",
93+
0,
94+
limit,
95+
)
96+
97+
98+
async def get_cryptocurrency_votes(r, currency, from_timestamp):
99+
return r.execute_command(
100+
"FT.AGGREGATE",
101+
"news_idx",
102+
f"@currency_code:{{{currency}}} @published_at:[{from_timestamp} +inf]",
103+
"GROUPBY",
104+
1,
105+
"@currency_code",
106+
"REDUCE",
107+
"SUM",
108+
1,
109+
"@votes_positive",
110+
"AS",
111+
"total_positive_votes",
112+
"REDUCE",
113+
"SUM",
114+
1,
115+
"@votes_negative",
116+
"AS",
117+
"total_negative_votes",
118+
"REDUCE",
119+
"SUM",
120+
1,
121+
"@votes_important",
122+
"AS",
123+
"total_important_votes",
124+
)
125+
126+
127+
async def get_similar_articles(r, query_text, from_timestamp, top=10):
128+
query_embedding = generate_embedding(query_text).tobytes()
129+
return r.execute_command(
130+
"FT.SEARCH",
131+
"news_vector_idx",
132+
f"@published_at:[{from_timestamp} +inf] =>[KNN {top} @embedding $query AS score]",
133+
"PARAMS",
134+
2,
135+
"query",
136+
query_embedding,
137+
"SORTBY",
138+
"score",
139+
"ASC",
140+
"LIMIT",
141+
0,
142+
top,
143+
"RETURN",
144+
3,
145+
"title",
146+
"published_at",
147+
"score",
148+
)
149+
150+
151+
async def compute_features(r, currency, from_timestamp):
152+
recent_mensions = await get_cryptocurrency_recent_mentions(r, currency, from_timestamp, 1000)
153+
154+
count_news = float(recent_mensions[0])
155+
156+
votes = await get_cryptocurrency_votes(r, currency, from_timestamp)
157+
158+
votes_dict = to_dict(votes)
159+
160+
sum_positive = float(votes_dict.get("total_positive_votes", 0))
161+
sum_negative = float(votes_dict.get("total_negative_votes", 0))
162+
163+
mean_positive = sum_positive / count_news if count_news > 0.0 else 0.0
164+
mean_negative = sum_negative / count_news if count_news > 0.0 else 0.0
165+
166+
features = [count_news, sum_positive, sum_negative, mean_positive, mean_negative] #etc.
167+
return np.array(features, dtype=np.float32)
168+
169+
170+
async def cryptocurrency_will_grow(r, currency, from_timestamp):
171+
features = await compute_features(r, currency, from_timestamp)
172+
return await predict_growth(r, currency, features)
173+
174+
175+
async def main():
176+
# Connect to DragonflyDB
177+
r = Redis(host="localhost", port=6379)
178+
179+
r.execute_command("FLUSHALL")
180+
181+
news_data = fetch_and_normalize_news_data()
182+
183+
# Store the news data in DragonflyDB
184+
for article in news_data:
185+
key = get_key(article)
186+
value = json.dumps(article)
187+
r.execute_command("JSON.SET", key, "$", value)
188+
189+
for article in news_data:
190+
key = get_key(article)
191+
await update_article_weight(r, key)
192+
193+
for article in news_data:
194+
key = get_key(article)
195+
await update_article_embedding(r, key)
196+
197+
create_index(r)
198+
create_news_vector_index(r)
199+
200+
timestamp_ten_days_ago = 1700000000
201+
202+
# Get the most mentioned cryptocurrencies
203+
print("Most mentioned cryptocurrencies:")
204+
print(await get_most_mentioned_cryptocurrencies(r, timestamp_ten_days_ago))
205+
print()
206+
207+
# Get recent mentions of a cryptocurrency
208+
print("Recent mentions of BTC:")
209+
print(await get_cryptocurrency_recent_mentions(r, "BTC", timestamp_ten_days_ago, 2))
210+
print()
211+
212+
# Get votes for a cryptocurrency
213+
print("Votes for BTC:")
214+
print(await get_cryptocurrency_votes(r, "BTC", timestamp_ten_days_ago))
215+
print()
216+
217+
# Get similar articles
218+
print("Similar articles to 'Bitcoin crash':")
219+
print(await get_similar_articles(r, "Bitcoin crash", timestamp_ten_days_ago, 2))
220+
print()
221+
222+
print("Similar articles to 'Ethereum rally':")
223+
print(await get_similar_articles(r, "Ethereum rally", timestamp_ten_days_ago, 2))
224+
print()
225+
226+
print("Similar articles to 'Ripple lawsuit':")
227+
print(await get_similar_articles(r, "Ripple lawsuit", timestamp_ten_days_ago, 2))
228+
print()
229+
230+
# Compute features for a cryptocurrency
231+
initialize_features_dataset(r)
232+
create_features_index(r)
233+
234+
timestamp_seven_days_ago = 1730000000
235+
236+
# Predict if a cryptocurrency will grow
237+
will_grow = await cryptocurrency_will_grow(r, "BTC", timestamp_seven_days_ago)
238+
print(f"Will BTC grow in next seven days? {"Yes" if will_grow else "No"}")
239+
240+
241+
if __name__ == "__main__":
242+
asyncio.run(main())
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
[
2+
{
3+
"week": "2024-10-27",
4+
"currency_code": "BTC",
5+
"features": [10.0, 5.0, 2.0, 0.5, 0.2],
6+
"label": 1
7+
},
8+
{
9+
"week": "2024-11-03",
10+
"currency_code": "BTC",
11+
"features": [12.0, 3.0, 4.0, 0.25, 0.333],
12+
"label": 1
13+
},
14+
{
15+
"week": "2024-11-10",
16+
"currency_code": "BTC",
17+
"features": [8.0, 7.0, 1.5, 0.875, 0.1875],
18+
"label": 0
19+
},
20+
{
21+
"week": "2024-11-17",
22+
"currency_code": "BTC",
23+
"features": [9.0, 2.0, 2.0, 0.222, 0.222],
24+
"label": 0
25+
},
26+
{
27+
"week": "2024-11-24",
28+
"currency_code": "BTC",
29+
"features": [14.0, 4.0, 3.5, 0.286, 0.25],
30+
"label": 1
31+
},
32+
{
33+
"week": "2024-12-01",
34+
"currency_code": "BTC",
35+
"features": [16.0, 5.0, 4.0, 0.3125, 0.25],
36+
"label": 1
37+
},
38+
{
39+
"week": "2024-12-08",
40+
"currency_code": "BTC",
41+
"features": [11.0, 6.0, 2.0, 0.545, 0.181],
42+
"label": 1
43+
},
44+
{
45+
"week": "2024-12-15",
46+
"currency_code": "BTC",
47+
"features": [10.5, 2.0, 3.0, 0.190, 0.285],
48+
"label": 0
49+
},
50+
{
51+
"week": "2024-12-22",
52+
"currency_code": "BTC",
53+
"features": [13.0, 8.0, 2.5, 0.615, 0.192],
54+
"label": 0
55+
},
56+
{
57+
"week": "2024-12-29",
58+
"currency_code": "BTC",
59+
"features": [15.0, 5.0, 2.5, 0.333, 0.167],
60+
"label": 1
61+
},
62+
{
63+
"week": "2025-01-05",
64+
"currency_code": "BTC",
65+
"features": [10.0, 7.0, 3.0, 0.7, 0.3],
66+
"label": 0
67+
},
68+
{
69+
"week": "2025-01-12",
70+
"currency_code": "BTC",
71+
"features": [12.0, 4.0, 5.0, 0.333, 0.417],
72+
"label": 1
73+
},
74+
{
75+
"week": "2024-10-27",
76+
"currency_code": "ETH",
77+
"features": [8.0, 3.0, 6.0, 0.375, 0.75],
78+
"label": 0
79+
},
80+
{
81+
"week": "2025-01-05",
82+
"currency_code": "ETH",
83+
"features": [14.0, 6.0, 1.0, 0.428, 0.071],
84+
"label": 1
85+
},
86+
{
87+
"week": "2025-01-12",
88+
"currency_code": "ETH",
89+
"features": [11.0, 7.5, 2.0, 0.682, 0.182],
90+
"label": 1
91+
}
92+
]
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import json
2+
import os
3+
4+
def get_dataset():
5+
json_file_path = os.getenv("FEATURES_DATASET_PATH", "features_dataset.json")
6+
7+
try:
8+
with open(json_file_path, 'r', encoding='utf-8') as f:
9+
return json.load(f)
10+
except Exception as e:
11+
print(f"Error while reading features dataset from ({json_file_path}): {e}")
12+
return []
13+
14+
15+
def initialize_features_dataset(r):
16+
dataset = get_dataset()
17+
18+
for doc in dataset:
19+
key = f"week::{doc["week"]}::{doc["currency_code"]}"
20+
value = json.dumps(doc)
21+
r.execute_command("JSON.SET", key, "$", value)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import json
2+
3+
4+
def parse_search_result(result):
5+
docs = result[2::2]
6+
return [json.loads(doc[1].decode()) for doc in docs]
7+
8+
9+
# Simple KNN-based predictor
10+
async def predict_growth(r, currency, new_features, k=5):
11+
query_vector = new_features.tobytes()
12+
13+
search_result = r.execute_command(
14+
"FT.SEARCH", "features_idx", "*",
15+
f"@currency_code:{{{currency}}} =>[KNN {k} @features $query_vec AS score]",
16+
"PARAMS", "2", "query_vec", query_vector
17+
)
18+
19+
results = parse_search_result(search_result)
20+
21+
knn_labels = [doc["label"] for doc in results]
22+
return 1 if sum(knn_labels) > len(knn_labels) / 2 else 0

0 commit comments

Comments
 (0)