Skip to content

feat: Add crypto-news-aggregator example #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
7 changes: 7 additions & 0 deletions crypto-news-aggregator/embedding_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer("sentence-transformers/paraphrase-MiniLM-L3-v2")

def generate_embedding(text):
return model.encode(text, convert_to_numpy=True).astype(np.float32)
247 changes: 247 additions & 0 deletions crypto-news-aggregator/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import json
import asyncio
import numpy as np
from redis import Redis
from datetime import datetime, timedelta
from news_fetcher import fetch_and_normalize_news_data
from weight_calculator import calculate_weight
from embedding_generator import generate_embedding
from features_dataset_initializer import initialize_features_dataset
from features_predictor import predict_growth


def get_key(article):
return f"article::{article['id']}"


def get_article(r, key):
return json.loads(r.execute_command("JSON.GET", key))


def set_article_field(r, key, field, value):
return r.execute_command("JSON.SET", key, f"$.{field}", value)


async def update_article_weight(r, key):
article = get_article(r, key)
weight = calculate_weight(article)
set_article_field(r, key, "weight", weight)


async def update_article_embedding(r, key):
article = get_article(r, key)
content = article.get("title", "")
embedding = str(generate_embedding(content).tolist())
set_article_field(r, key, "embedding", embedding)


def get_timestamp(days_ago=10):
return int((datetime.now() - timedelta(days=days_ago)).timestamp())


def create_index(r):
return r.execute_command(
"FT.CREATE news_idx ON JSON PREFIX 1 article:: SCHEMA $.id AS id NUMERIC $.published_at AS published_at NUMERIC SORTABLE $.votes.negative AS votes_negative NUMERIC $.votes.positive AS votes_positive NUMERIC $.votes.important AS votes_important NUMERIC $.currencies[*].code AS currency_code TAG $.weight AS weight NUMERIC"
)


def create_news_vector_index(r):
return r.execute_command(
"FT.CREATE news_vector_idx ON JSON PREFIX 1 article:: SCHEMA $.id AS id NUMERIC $.published_at AS published_at NUMERIC SORTABLE $.title AS title TEXT $.embedding AS embedding VECTOR FLAT 6 TYPE FLOAT32 DIM 384 DISTANCE_METRIC COSINE"
)


def create_features_index(r):
return r.execute_command(
"FT.CREATE features_idx ON JSON PREFIX 1 week:: SCHEMA $.currency_code AS currency_code TAG $.features AS features VECTOR FLAT 6 TYPE FLOAT32 DIM 5 DISTANCE_METRIC COSINE $.label AS label NUMERIC"
)


async def get_most_mentioned_cryptocurrencies(r, from_timestamp, top=10):
return r.execute_command(
"FT.AGGREGATE",
"news_idx",
f"@published_at:[{from_timestamp} +inf]",
"GROUPBY",
1,
"@currency_code",
"REDUCE",
"COUNT",
0,
"AS",
"mentions_count",
"SORTBY",
2,
"@mentions_count",
"DESC",
"LIMIT",
0,
top,
)


async def get_cryptocurrency_recent_mentions(r, currency, from_timestamp, limit=10):
return r.execute_command(
"FT.SEARCH",
"news_idx",
f"@currency_code:{{{currency}}} @published_at:[{from_timestamp} +inf]",
"SORTBY",
"published_at",
"DESC",
"LIMIT",
0,
limit,
)


async def get_cryptocurrency_votes(r, currency, from_timestamp):
return r.execute_command(
"FT.AGGREGATE",
"news_idx",
f"@currency_code:{{{currency}}} @published_at:[{from_timestamp} +inf]",
"GROUPBY",
1,
"@currency_code",
"REDUCE",
"SUM",
1,
"@votes_positive",
"AS",
"total_positive_votes",
"REDUCE",
"SUM",
1,
"@votes_negative",
"AS",
"total_negative_votes",
"REDUCE",
"SUM",
1,
"@votes_important",
"AS",
"total_important_votes",
)


async def get_similar_articles(r, query_text, from_timestamp, top=10):
query_embedding = generate_embedding(query_text).tobytes()
return r.execute_command(
"FT.SEARCH",
"news_vector_idx",
f"@published_at:[{from_timestamp} +inf] =>[KNN {top} @embedding $query AS score]",
"PARAMS",
2,
"query",
query_embedding,
"SORTBY",
"score",
"ASC",
"LIMIT",
0,
top,
"RETURN",
3,
"title",
"published_at",
"score",
)


def to_dict(result):
if (len(result) < 2):
return {}
return {k.decode(): v.decode() for k, v in zip(result[1][::2], result[1][1::2])}


async def compute_features(r, currency, from_timestamp):
recent_mensions = await get_cryptocurrency_recent_mentions(r, currency, from_timestamp, 1000)

count_news = float(recent_mensions[0])

votes = await get_cryptocurrency_votes(r, currency, from_timestamp)

votes_dict = to_dict(votes)

sum_positive = float(votes_dict.get("total_positive_votes", 0))
sum_negative = float(votes_dict.get("total_negative_votes", 0))

mean_positive = sum_positive / count_news if count_news > 0.0 else 0.0
mean_negative = sum_negative / count_news if count_news > 0.0 else 0.0

features = [count_news, sum_positive, sum_negative, mean_positive, mean_negative] #etc.
return np.array(features, dtype=np.float32)


async def cryptocurrency_will_grow(r, currency, from_timestamp):
features = await compute_features(r, currency, from_timestamp)
return await predict_growth(r, currency, features)


async def main():
# Connect to DragonflyDB
r = Redis(host="localhost", port=6379)

r.execute_command("FLUSHALL")

news_data = fetch_and_normalize_news_data()

# Store the news data in DragonflyDB
for article in news_data:
key = get_key(article)
value = json.dumps(article)
r.execute_command("JSON.SET", key, "$", value)

for article in news_data:
key = get_key(article)
await update_article_weight(r, key)

for article in news_data:
key = get_key(article)
await update_article_embedding(r, key)

create_index(r)
create_news_vector_index(r)

timestamp_ten_days_ago = 1700000000

# Get the most mentioned cryptocurrencies
print("Most mentioned cryptocurrencies:")
print(await get_most_mentioned_cryptocurrencies(r, timestamp_ten_days_ago))
print()

# Get recent mentions of a cryptocurrency
print("Recent mentions of BTC:")
print(await get_cryptocurrency_recent_mentions(r, "BTC", timestamp_ten_days_ago, 2))
print()

# Get votes for a cryptocurrency
print("Votes for BTC:")
print(await get_cryptocurrency_votes(r, "BTC", timestamp_ten_days_ago))
print()

# Get similar articles
print("Similar articles to 'Bitcoin crash':")
print(await get_similar_articles(r, "Bitcoin crash", timestamp_ten_days_ago, 2))
print()

print("Similar articles to 'Ethereum rally':")
print(await get_similar_articles(r, "Ethereum rally", timestamp_ten_days_ago, 2))
print()

print("Similar articles to 'Ripple lawsuit':")
print(await get_similar_articles(r, "Ripple lawsuit", timestamp_ten_days_ago, 2))
print()

# Compute features for a cryptocurrency
initialize_features_dataset(r)
create_features_index(r)

timestamp_seven_days_ago = 1730000000

# Predict if a cryptocurrency will grow
will_grow = await cryptocurrency_will_grow(r, "BTC", timestamp_seven_days_ago)
print(f"Will BTC grow in next seven days? {"Yes" if will_grow else "No"}")


if __name__ == "__main__":
asyncio.run(main())
92 changes: 92 additions & 0 deletions crypto-news-aggregator/features_dataset.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
[
{
"week": "2024-10-27",
"currency_code": "BTC",
"features": [10.0, 5.0, 2.0, 0.5, 0.2],
"label": 1
},
{
"week": "2024-11-03",
"currency_code": "BTC",
"features": [12.0, 3.0, 4.0, 0.25, 0.333],
"label": 1
},
{
"week": "2024-11-10",
"currency_code": "BTC",
"features": [8.0, 7.0, 1.5, 0.875, 0.1875],
"label": 0
},
{
"week": "2024-11-17",
"currency_code": "BTC",
"features": [9.0, 2.0, 2.0, 0.222, 0.222],
"label": 0
},
{
"week": "2024-11-24",
"currency_code": "BTC",
"features": [14.0, 4.0, 3.5, 0.286, 0.25],
"label": 1
},
{
"week": "2024-12-01",
"currency_code": "BTC",
"features": [16.0, 5.0, 4.0, 0.3125, 0.25],
"label": 1
},
{
"week": "2024-12-08",
"currency_code": "BTC",
"features": [11.0, 6.0, 2.0, 0.545, 0.181],
"label": 1
},
{
"week": "2024-12-15",
"currency_code": "BTC",
"features": [10.5, 2.0, 3.0, 0.190, 0.285],
"label": 0
},
{
"week": "2024-12-22",
"currency_code": "BTC",
"features": [13.0, 8.0, 2.5, 0.615, 0.192],
"label": 0
},
{
"week": "2024-12-29",
"currency_code": "BTC",
"features": [15.0, 5.0, 2.5, 0.333, 0.167],
"label": 1
},
{
"week": "2025-01-05",
"currency_code": "BTC",
"features": [10.0, 7.0, 3.0, 0.7, 0.3],
"label": 0
},
{
"week": "2025-01-12",
"currency_code": "BTC",
"features": [12.0, 4.0, 5.0, 0.333, 0.417],
"label": 1
},
{
"week": "2024-10-27",
"currency_code": "ETH",
"features": [8.0, 3.0, 6.0, 0.375, 0.75],
"label": 0
},
{
"week": "2025-01-05",
"currency_code": "ETH",
"features": [14.0, 6.0, 1.0, 0.428, 0.071],
"label": 1
},
{
"week": "2025-01-12",
"currency_code": "ETH",
"features": [11.0, 7.5, 2.0, 0.682, 0.182],
"label": 1
}
]
21 changes: 21 additions & 0 deletions crypto-news-aggregator/features_dataset_initializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import json
import os

def get_dataset():
json_file_path = os.getenv("FEATURES_DATASET_PATH", "features_dataset.json")

try:
with open(json_file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error while reading features dataset from ({json_file_path}): {e}")
return []


def initialize_features_dataset(r):
dataset = get_dataset()

for doc in dataset:
key = f"week::{doc["week"]}::{doc["currency_code"]}"
value = json.dumps(doc)
r.execute_command("JSON.SET", key, "$", value)
22 changes: 22 additions & 0 deletions crypto-news-aggregator/features_predictor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import json


def parse_search_result(result):
docs = result[2::2]
return [json.loads(doc[1].decode()) for doc in docs]


# Simple KNN-based predictor
async def predict_growth(r, currency, new_features, k=5):
query_vector = new_features.tobytes()

search_result = r.execute_command(
"FT.SEARCH", "features_idx", "*",
f"@currency_code:{{{currency}}} =>[KNN {k} @features $query_vec AS score]",
"PARAMS", "2", "query_vec", query_vector
)

results = parse_search_result(search_result)

knn_labels = [doc["label"] for doc in results]
return 1 if sum(knn_labels) > len(knn_labels) / 2 else 0
Loading