Skip to content

Commit be888df

Browse files
feat: Add crypto-news-aggregator example
Signed-off-by: Stepan Bagritsevich <[email protected]>
1 parent 5ab48c7 commit be888df

10 files changed

+510
-0
lines changed

crypto-news-aggregator/README.md

Whitespace-only changes.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from sentence_transformers import SentenceTransformer
2+
import numpy as np
3+
4+
model = SentenceTransformer("sentence-transformers/paraphrase-MiniLM-L3-v2")
5+
6+
def generate_embedding(text):
7+
return model.encode(text, convert_to_numpy=True).astype(np.float32)

crypto-news-aggregator/example.py

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
import json
2+
import asyncio
3+
import numpy as np
4+
from redis import Redis
5+
from datetime import datetime, timedelta
6+
from news_fetcher import fetch_and_normalize_news_data
7+
from weight_calculator import calculate_weight
8+
from embedding_generator import generate_embedding
9+
from features_dataset_initializer import initialize_features_dataset
10+
from features_predictor import predict_growth
11+
12+
13+
def get_key(article):
14+
return f"article::{article['id']}"
15+
16+
17+
def get_article(r, key):
18+
return json.loads(r.execute_command("JSON.GET", key))
19+
20+
21+
def set_article_field(r, key, field, value):
22+
return r.execute_command("JSON.SET", key, f"$.{field}", value)
23+
24+
25+
async def update_article_weight(r, key):
26+
article = get_article(r, key)
27+
weight = calculate_weight(article)
28+
set_article_field(r, key, "weight", weight)
29+
30+
31+
async def update_article_embedding(r, key):
32+
article = get_article(r, key)
33+
content = article.get("title", "")
34+
embedding = str(generate_embedding(content).tolist())
35+
set_article_field(r, key, "embedding", embedding)
36+
37+
38+
def get_timestamp(days_ago=10):
39+
return int((datetime.now() - timedelta(days=days_ago)).timestamp())
40+
41+
42+
def create_index(r):
43+
return r.execute_command(
44+
"FT.CREATE news_idx ON JSON PREFIX 1 article:: SCHEMA $.id AS id NUMERIC $.published_at AS published_at NUMERIC SORTABLE $.votes.negative AS votes_negative NUMERIC $.votes.positive AS votes_positive NUMERIC $.votes.important AS votes_important NUMERIC $.currencies[*].code AS currency_code TAG $.weight AS weight NUMERIC"
45+
)
46+
47+
48+
def create_news_vector_index(r):
49+
return r.execute_command(
50+
"FT.CREATE news_vector_idx ON JSON PREFIX 1 article:: SCHEMA $.id AS id NUMERIC $.published_at AS published_at NUMERIC SORTABLE $.title AS title TEXT $.embedding AS embedding VECTOR FLAT 6 TYPE FLOAT32 DIM 384 DISTANCE_METRIC COSINE"
51+
)
52+
53+
54+
def create_features_index(r):
55+
return r.execute_command(
56+
"FT.CREATE features_idx ON JSON PREFIX 1 week:: SCHEMA $.currency_code AS currency_code TAG $.features AS features VECTOR FLAT 6 TYPE FLOAT32 DIM 5 DISTANCE_METRIC COSINE $.label AS label NUMERIC"
57+
)
58+
59+
60+
async def get_most_mentioned_cryptocurrencies(r, from_timestamp, top=10):
61+
return r.execute_command(
62+
"FT.AGGREGATE",
63+
"news_idx",
64+
f"@published_at:[{from_timestamp} +inf]",
65+
"GROUPBY",
66+
1,
67+
"@currency_code",
68+
"REDUCE",
69+
"COUNT",
70+
0,
71+
"AS",
72+
"mentions_count",
73+
"SORTBY",
74+
2,
75+
"@mentions_count",
76+
"DESC",
77+
"LIMIT",
78+
0,
79+
top,
80+
)
81+
82+
83+
async def get_cryptocurrency_recent_mentions(r, currency, from_timestamp, limit=10):
84+
return r.execute_command(
85+
"FT.SEARCH",
86+
"news_idx",
87+
f"@currency_code:{{{currency}}} @published_at:[{from_timestamp} +inf]",
88+
"SORTBY",
89+
"published_at",
90+
"DESC",
91+
"LIMIT",
92+
0,
93+
limit,
94+
)
95+
96+
97+
async def get_cryptocurrency_votes(r, currency, from_timestamp):
98+
return r.execute_command(
99+
"FT.AGGREGATE",
100+
"news_idx",
101+
f"@currency_code:{{{currency}}} @published_at:[{from_timestamp} +inf]",
102+
"GROUPBY",
103+
1,
104+
"@currency_code",
105+
"REDUCE",
106+
"SUM",
107+
1,
108+
"@votes_positive",
109+
"AS",
110+
"total_positive_votes",
111+
"REDUCE",
112+
"SUM",
113+
1,
114+
"@votes_negative",
115+
"AS",
116+
"total_negative_votes",
117+
"REDUCE",
118+
"SUM",
119+
1,
120+
"@votes_important",
121+
"AS",
122+
"total_important_votes",
123+
)
124+
125+
126+
async def get_similar_articles(r, query_text, from_timestamp, top=10):
127+
query_embedding = generate_embedding(query_text).tobytes()
128+
return r.execute_command(
129+
"FT.SEARCH",
130+
"news_vector_idx",
131+
f"@published_at:[{from_timestamp} +inf] =>[KNN {top} @embedding $query AS score]",
132+
"PARAMS",
133+
2,
134+
"query",
135+
query_embedding,
136+
"SORTBY",
137+
"score",
138+
"ASC",
139+
"LIMIT",
140+
0,
141+
top,
142+
"RETURN",
143+
3,
144+
"title",
145+
"published_at",
146+
"score",
147+
)
148+
149+
150+
def to_dict(result):
151+
if (len(result) < 2):
152+
return {}
153+
return {k.decode(): v.decode() for k, v in zip(result[1][::2], result[1][1::2])}
154+
155+
156+
async def compute_features(r, currency, from_timestamp):
157+
recent_mensions = await get_cryptocurrency_recent_mentions(r, currency, from_timestamp, 1000)
158+
159+
count_news = float(recent_mensions[0])
160+
161+
votes = await get_cryptocurrency_votes(r, currency, from_timestamp)
162+
163+
votes_dict = to_dict(votes)
164+
165+
sum_positive = float(votes_dict.get("total_positive_votes", 0))
166+
sum_negative = float(votes_dict.get("total_negative_votes", 0))
167+
168+
mean_positive = sum_positive / count_news if count_news > 0.0 else 0.0
169+
mean_negative = sum_negative / count_news if count_news > 0.0 else 0.0
170+
171+
features = [count_news, sum_positive, sum_negative, mean_positive, mean_negative] #etc.
172+
return np.array(features, dtype=np.float32)
173+
174+
175+
async def cryptocurrency_will_grow(r, currency, from_timestamp):
176+
features = await compute_features(r, currency, from_timestamp)
177+
return await predict_growth(r, currency, features)
178+
179+
180+
async def main():
181+
# Connect to DragonflyDB
182+
r = Redis(host="localhost", port=6379)
183+
184+
r.execute_command("FLUSHALL")
185+
186+
news_data = fetch_and_normalize_news_data()
187+
188+
# Store the news data in DragonflyDB
189+
for article in news_data:
190+
key = get_key(article)
191+
value = json.dumps(article)
192+
r.execute_command("JSON.SET", key, "$", value)
193+
194+
for article in news_data:
195+
key = get_key(article)
196+
await update_article_weight(r, key)
197+
198+
for article in news_data:
199+
key = get_key(article)
200+
await update_article_embedding(r, key)
201+
202+
create_index(r)
203+
create_news_vector_index(r)
204+
205+
timestamp_ten_days_ago = 1700000000
206+
207+
# Get the most mentioned cryptocurrencies
208+
print("Most mentioned cryptocurrencies:")
209+
print(await get_most_mentioned_cryptocurrencies(r, timestamp_ten_days_ago))
210+
print()
211+
212+
# Get recent mentions of a cryptocurrency
213+
print("Recent mentions of BTC:")
214+
print(await get_cryptocurrency_recent_mentions(r, "BTC", timestamp_ten_days_ago, 2))
215+
print()
216+
217+
# Get votes for a cryptocurrency
218+
print("Votes for BTC:")
219+
print(await get_cryptocurrency_votes(r, "BTC", timestamp_ten_days_ago))
220+
print()
221+
222+
# Get similar articles
223+
print("Similar articles to 'Bitcoin crash':")
224+
print(await get_similar_articles(r, "Bitcoin crash", timestamp_ten_days_ago, 2))
225+
print()
226+
227+
print("Similar articles to 'Ethereum rally':")
228+
print(await get_similar_articles(r, "Ethereum rally", timestamp_ten_days_ago, 2))
229+
print()
230+
231+
print("Similar articles to 'Ripple lawsuit':")
232+
print(await get_similar_articles(r, "Ripple lawsuit", timestamp_ten_days_ago, 2))
233+
print()
234+
235+
# Compute features for a cryptocurrency
236+
initialize_features_dataset(r)
237+
create_features_index(r)
238+
239+
timestamp_seven_days_ago = 1730000000
240+
241+
# Predict if a cryptocurrency will grow
242+
will_grow = await cryptocurrency_will_grow(r, "BTC", timestamp_seven_days_ago)
243+
print(f"Will BTC grow in next seven days? {"Yes" if will_grow else "No"}")
244+
245+
246+
if __name__ == "__main__":
247+
asyncio.run(main())
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
[
2+
{
3+
"week": "2024-10-27",
4+
"currency_code": "BTC",
5+
"features": [10.0, 5.0, 2.0, 0.5, 0.2],
6+
"label": 1
7+
},
8+
{
9+
"week": "2024-11-03",
10+
"currency_code": "BTC",
11+
"features": [12.0, 3.0, 4.0, 0.25, 0.333],
12+
"label": 1
13+
},
14+
{
15+
"week": "2024-11-10",
16+
"currency_code": "BTC",
17+
"features": [8.0, 7.0, 1.5, 0.875, 0.1875],
18+
"label": 0
19+
},
20+
{
21+
"week": "2024-11-17",
22+
"currency_code": "BTC",
23+
"features": [9.0, 2.0, 2.0, 0.222, 0.222],
24+
"label": 0
25+
},
26+
{
27+
"week": "2024-11-24",
28+
"currency_code": "BTC",
29+
"features": [14.0, 4.0, 3.5, 0.286, 0.25],
30+
"label": 1
31+
},
32+
{
33+
"week": "2024-12-01",
34+
"currency_code": "BTC",
35+
"features": [16.0, 5.0, 4.0, 0.3125, 0.25],
36+
"label": 1
37+
},
38+
{
39+
"week": "2024-12-08",
40+
"currency_code": "BTC",
41+
"features": [11.0, 6.0, 2.0, 0.545, 0.181],
42+
"label": 1
43+
},
44+
{
45+
"week": "2024-12-15",
46+
"currency_code": "BTC",
47+
"features": [10.5, 2.0, 3.0, 0.190, 0.285],
48+
"label": 0
49+
},
50+
{
51+
"week": "2024-12-22",
52+
"currency_code": "BTC",
53+
"features": [13.0, 8.0, 2.5, 0.615, 0.192],
54+
"label": 0
55+
},
56+
{
57+
"week": "2024-12-29",
58+
"currency_code": "BTC",
59+
"features": [15.0, 5.0, 2.5, 0.333, 0.167],
60+
"label": 1
61+
},
62+
{
63+
"week": "2025-01-05",
64+
"currency_code": "BTC",
65+
"features": [10.0, 7.0, 3.0, 0.7, 0.3],
66+
"label": 0
67+
},
68+
{
69+
"week": "2025-01-12",
70+
"currency_code": "BTC",
71+
"features": [12.0, 4.0, 5.0, 0.333, 0.417],
72+
"label": 1
73+
},
74+
{
75+
"week": "2024-10-27",
76+
"currency_code": "ETH",
77+
"features": [8.0, 3.0, 6.0, 0.375, 0.75],
78+
"label": 0
79+
},
80+
{
81+
"week": "2025-01-05",
82+
"currency_code": "ETH",
83+
"features": [14.0, 6.0, 1.0, 0.428, 0.071],
84+
"label": 1
85+
},
86+
{
87+
"week": "2025-01-12",
88+
"currency_code": "ETH",
89+
"features": [11.0, 7.5, 2.0, 0.682, 0.182],
90+
"label": 1
91+
}
92+
]
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import json
2+
import os
3+
4+
def get_dataset():
5+
json_file_path = os.getenv("FEATURES_DATASET_PATH", "features_dataset.json")
6+
7+
try:
8+
with open(json_file_path, 'r', encoding='utf-8') as f:
9+
return json.load(f)
10+
except Exception as e:
11+
print(f"Error while reading features dataset from ({json_file_path}): {e}")
12+
return []
13+
14+
15+
def initialize_features_dataset(r):
16+
dataset = get_dataset()
17+
18+
for doc in dataset:
19+
key = f"week::{doc["week"]}::{doc["currency_code"]}"
20+
value = json.dumps(doc)
21+
r.execute_command("JSON.SET", key, "$", value)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import json
2+
3+
4+
def parse_search_result(result):
5+
docs = result[2::2]
6+
return [json.loads(doc[1].decode()) for doc in docs]
7+
8+
9+
# Simple KNN-based predictor
10+
async def predict_growth(r, currency, new_features, k=5):
11+
query_vector = new_features.tobytes()
12+
13+
search_result = r.execute_command(
14+
"FT.SEARCH", "features_idx", "*",
15+
f"@currency_code:{{{currency}}} =>[KNN {k} @features $query_vec AS score]",
16+
"PARAMS", "2", "query_vec", query_vector
17+
)
18+
19+
results = parse_search_result(search_result)
20+
21+
knn_labels = [doc["label"] for doc in results]
22+
return 1 if sum(knn_labels) > len(knn_labels) / 2 else 0

0 commit comments

Comments
 (0)