From be888df29cea3f87ce488eb69e1d6184c97ec6dd Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich Date: Thu, 30 Jan 2025 10:02:04 +0100 Subject: [PATCH] feat: Add crypto-news-aggregator example Signed-off-by: Stepan Bagritsevich --- crypto-news-aggregator/README.md | 0 crypto-news-aggregator/embedding_generator.py | 7 + crypto-news-aggregator/example.py | 247 ++++++++++++++++++ crypto-news-aggregator/features_dataset.json | 92 +++++++ .../features_dataset_initializer.py | 21 ++ crypto-news-aggregator/features_predictor.py | 22 ++ crypto-news-aggregator/news_dataset.json | 96 +++++++ crypto-news-aggregator/news_fetcher.py | 12 + crypto-news-aggregator/requirements.txt | 5 + crypto-news-aggregator/weight_calculator.py | 8 + 10 files changed, 510 insertions(+) create mode 100644 crypto-news-aggregator/README.md create mode 100644 crypto-news-aggregator/embedding_generator.py create mode 100644 crypto-news-aggregator/example.py create mode 100644 crypto-news-aggregator/features_dataset.json create mode 100644 crypto-news-aggregator/features_dataset_initializer.py create mode 100644 crypto-news-aggregator/features_predictor.py create mode 100644 crypto-news-aggregator/news_dataset.json create mode 100644 crypto-news-aggregator/news_fetcher.py create mode 100644 crypto-news-aggregator/requirements.txt create mode 100644 crypto-news-aggregator/weight_calculator.py diff --git a/crypto-news-aggregator/README.md b/crypto-news-aggregator/README.md new file mode 100644 index 0000000..e69de29 diff --git a/crypto-news-aggregator/embedding_generator.py b/crypto-news-aggregator/embedding_generator.py new file mode 100644 index 0000000..9e92e2e --- /dev/null +++ b/crypto-news-aggregator/embedding_generator.py @@ -0,0 +1,7 @@ +from sentence_transformers import SentenceTransformer +import numpy as np + +model = SentenceTransformer("sentence-transformers/paraphrase-MiniLM-L3-v2") + +def generate_embedding(text): + return model.encode(text, convert_to_numpy=True).astype(np.float32) diff --git a/crypto-news-aggregator/example.py b/crypto-news-aggregator/example.py new file mode 100644 index 0000000..2754d7d --- /dev/null +++ b/crypto-news-aggregator/example.py @@ -0,0 +1,247 @@ +import json +import asyncio +import numpy as np +from redis import Redis +from datetime import datetime, timedelta +from news_fetcher import fetch_and_normalize_news_data +from weight_calculator import calculate_weight +from embedding_generator import generate_embedding +from features_dataset_initializer import initialize_features_dataset +from features_predictor import predict_growth + + +def get_key(article): + return f"article::{article['id']}" + + +def get_article(r, key): + return json.loads(r.execute_command("JSON.GET", key)) + + +def set_article_field(r, key, field, value): + return r.execute_command("JSON.SET", key, f"$.{field}", value) + + +async def update_article_weight(r, key): + article = get_article(r, key) + weight = calculate_weight(article) + set_article_field(r, key, "weight", weight) + + +async def update_article_embedding(r, key): + article = get_article(r, key) + content = article.get("title", "") + embedding = str(generate_embedding(content).tolist()) + set_article_field(r, key, "embedding", embedding) + + +def get_timestamp(days_ago=10): + return int((datetime.now() - timedelta(days=days_ago)).timestamp()) + + +def create_index(r): + return r.execute_command( + "FT.CREATE news_idx ON JSON PREFIX 1 article:: SCHEMA $.id AS id NUMERIC $.published_at AS published_at NUMERIC SORTABLE $.votes.negative AS votes_negative NUMERIC $.votes.positive AS votes_positive NUMERIC $.votes.important AS votes_important NUMERIC $.currencies[*].code AS currency_code TAG $.weight AS weight NUMERIC" + ) + + +def create_news_vector_index(r): + return r.execute_command( + "FT.CREATE news_vector_idx ON JSON PREFIX 1 article:: SCHEMA $.id AS id NUMERIC $.published_at AS published_at NUMERIC SORTABLE $.title AS title TEXT $.embedding AS embedding VECTOR FLAT 6 TYPE FLOAT32 DIM 384 DISTANCE_METRIC COSINE" + ) + + +def create_features_index(r): + return r.execute_command( + "FT.CREATE features_idx ON JSON PREFIX 1 week:: SCHEMA $.currency_code AS currency_code TAG $.features AS features VECTOR FLAT 6 TYPE FLOAT32 DIM 5 DISTANCE_METRIC COSINE $.label AS label NUMERIC" + ) + + +async def get_most_mentioned_cryptocurrencies(r, from_timestamp, top=10): + return r.execute_command( + "FT.AGGREGATE", + "news_idx", + f"@published_at:[{from_timestamp} +inf]", + "GROUPBY", + 1, + "@currency_code", + "REDUCE", + "COUNT", + 0, + "AS", + "mentions_count", + "SORTBY", + 2, + "@mentions_count", + "DESC", + "LIMIT", + 0, + top, + ) + + +async def get_cryptocurrency_recent_mentions(r, currency, from_timestamp, limit=10): + return r.execute_command( + "FT.SEARCH", + "news_idx", + f"@currency_code:{{{currency}}} @published_at:[{from_timestamp} +inf]", + "SORTBY", + "published_at", + "DESC", + "LIMIT", + 0, + limit, + ) + + +async def get_cryptocurrency_votes(r, currency, from_timestamp): + return r.execute_command( + "FT.AGGREGATE", + "news_idx", + f"@currency_code:{{{currency}}} @published_at:[{from_timestamp} +inf]", + "GROUPBY", + 1, + "@currency_code", + "REDUCE", + "SUM", + 1, + "@votes_positive", + "AS", + "total_positive_votes", + "REDUCE", + "SUM", + 1, + "@votes_negative", + "AS", + "total_negative_votes", + "REDUCE", + "SUM", + 1, + "@votes_important", + "AS", + "total_important_votes", + ) + + +async def get_similar_articles(r, query_text, from_timestamp, top=10): + query_embedding = generate_embedding(query_text).tobytes() + return r.execute_command( + "FT.SEARCH", + "news_vector_idx", + f"@published_at:[{from_timestamp} +inf] =>[KNN {top} @embedding $query AS score]", + "PARAMS", + 2, + "query", + query_embedding, + "SORTBY", + "score", + "ASC", + "LIMIT", + 0, + top, + "RETURN", + 3, + "title", + "published_at", + "score", + ) + + +def to_dict(result): + if (len(result) < 2): + return {} + return {k.decode(): v.decode() for k, v in zip(result[1][::2], result[1][1::2])} + + +async def compute_features(r, currency, from_timestamp): + recent_mensions = await get_cryptocurrency_recent_mentions(r, currency, from_timestamp, 1000) + + count_news = float(recent_mensions[0]) + + votes = await get_cryptocurrency_votes(r, currency, from_timestamp) + + votes_dict = to_dict(votes) + + sum_positive = float(votes_dict.get("total_positive_votes", 0)) + sum_negative = float(votes_dict.get("total_negative_votes", 0)) + + mean_positive = sum_positive / count_news if count_news > 0.0 else 0.0 + mean_negative = sum_negative / count_news if count_news > 0.0 else 0.0 + + features = [count_news, sum_positive, sum_negative, mean_positive, mean_negative] #etc. + return np.array(features, dtype=np.float32) + + +async def cryptocurrency_will_grow(r, currency, from_timestamp): + features = await compute_features(r, currency, from_timestamp) + return await predict_growth(r, currency, features) + + +async def main(): + # Connect to DragonflyDB + r = Redis(host="localhost", port=6379) + + r.execute_command("FLUSHALL") + + news_data = fetch_and_normalize_news_data() + + # Store the news data in DragonflyDB + for article in news_data: + key = get_key(article) + value = json.dumps(article) + r.execute_command("JSON.SET", key, "$", value) + + for article in news_data: + key = get_key(article) + await update_article_weight(r, key) + + for article in news_data: + key = get_key(article) + await update_article_embedding(r, key) + + create_index(r) + create_news_vector_index(r) + + timestamp_ten_days_ago = 1700000000 + + # Get the most mentioned cryptocurrencies + print("Most mentioned cryptocurrencies:") + print(await get_most_mentioned_cryptocurrencies(r, timestamp_ten_days_ago)) + print() + + # Get recent mentions of a cryptocurrency + print("Recent mentions of BTC:") + print(await get_cryptocurrency_recent_mentions(r, "BTC", timestamp_ten_days_ago, 2)) + print() + + # Get votes for a cryptocurrency + print("Votes for BTC:") + print(await get_cryptocurrency_votes(r, "BTC", timestamp_ten_days_ago)) + print() + + # Get similar articles + print("Similar articles to 'Bitcoin crash':") + print(await get_similar_articles(r, "Bitcoin crash", timestamp_ten_days_ago, 2)) + print() + + print("Similar articles to 'Ethereum rally':") + print(await get_similar_articles(r, "Ethereum rally", timestamp_ten_days_ago, 2)) + print() + + print("Similar articles to 'Ripple lawsuit':") + print(await get_similar_articles(r, "Ripple lawsuit", timestamp_ten_days_ago, 2)) + print() + + # Compute features for a cryptocurrency + initialize_features_dataset(r) + create_features_index(r) + + timestamp_seven_days_ago = 1730000000 + + # Predict if a cryptocurrency will grow + will_grow = await cryptocurrency_will_grow(r, "BTC", timestamp_seven_days_ago) + print(f"Will BTC grow in next seven days? {"Yes" if will_grow else "No"}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/crypto-news-aggregator/features_dataset.json b/crypto-news-aggregator/features_dataset.json new file mode 100644 index 0000000..507afbd --- /dev/null +++ b/crypto-news-aggregator/features_dataset.json @@ -0,0 +1,92 @@ +[ + { + "week": "2024-10-27", + "currency_code": "BTC", + "features": [10.0, 5.0, 2.0, 0.5, 0.2], + "label": 1 + }, + { + "week": "2024-11-03", + "currency_code": "BTC", + "features": [12.0, 3.0, 4.0, 0.25, 0.333], + "label": 1 + }, + { + "week": "2024-11-10", + "currency_code": "BTC", + "features": [8.0, 7.0, 1.5, 0.875, 0.1875], + "label": 0 + }, + { + "week": "2024-11-17", + "currency_code": "BTC", + "features": [9.0, 2.0, 2.0, 0.222, 0.222], + "label": 0 + }, + { + "week": "2024-11-24", + "currency_code": "BTC", + "features": [14.0, 4.0, 3.5, 0.286, 0.25], + "label": 1 + }, + { + "week": "2024-12-01", + "currency_code": "BTC", + "features": [16.0, 5.0, 4.0, 0.3125, 0.25], + "label": 1 + }, + { + "week": "2024-12-08", + "currency_code": "BTC", + "features": [11.0, 6.0, 2.0, 0.545, 0.181], + "label": 1 + }, + { + "week": "2024-12-15", + "currency_code": "BTC", + "features": [10.5, 2.0, 3.0, 0.190, 0.285], + "label": 0 + }, + { + "week": "2024-12-22", + "currency_code": "BTC", + "features": [13.0, 8.0, 2.5, 0.615, 0.192], + "label": 0 + }, + { + "week": "2024-12-29", + "currency_code": "BTC", + "features": [15.0, 5.0, 2.5, 0.333, 0.167], + "label": 1 + }, + { + "week": "2025-01-05", + "currency_code": "BTC", + "features": [10.0, 7.0, 3.0, 0.7, 0.3], + "label": 0 + }, + { + "week": "2025-01-12", + "currency_code": "BTC", + "features": [12.0, 4.0, 5.0, 0.333, 0.417], + "label": 1 + }, + { + "week": "2024-10-27", + "currency_code": "ETH", + "features": [8.0, 3.0, 6.0, 0.375, 0.75], + "label": 0 + }, + { + "week": "2025-01-05", + "currency_code": "ETH", + "features": [14.0, 6.0, 1.0, 0.428, 0.071], + "label": 1 + }, + { + "week": "2025-01-12", + "currency_code": "ETH", + "features": [11.0, 7.5, 2.0, 0.682, 0.182], + "label": 1 + } +] diff --git a/crypto-news-aggregator/features_dataset_initializer.py b/crypto-news-aggregator/features_dataset_initializer.py new file mode 100644 index 0000000..7bab913 --- /dev/null +++ b/crypto-news-aggregator/features_dataset_initializer.py @@ -0,0 +1,21 @@ +import json +import os + +def get_dataset(): + json_file_path = os.getenv("FEATURES_DATASET_PATH", "features_dataset.json") + + try: + with open(json_file_path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f"Error while reading features dataset from ({json_file_path}): {e}") + return [] + + +def initialize_features_dataset(r): + dataset = get_dataset() + + for doc in dataset: + key = f"week::{doc["week"]}::{doc["currency_code"]}" + value = json.dumps(doc) + r.execute_command("JSON.SET", key, "$", value) diff --git a/crypto-news-aggregator/features_predictor.py b/crypto-news-aggregator/features_predictor.py new file mode 100644 index 0000000..d95f00e --- /dev/null +++ b/crypto-news-aggregator/features_predictor.py @@ -0,0 +1,22 @@ +import json + + +def parse_search_result(result): + docs = result[2::2] + return [json.loads(doc[1].decode()) for doc in docs] + + +# Simple KNN-based predictor +async def predict_growth(r, currency, new_features, k=5): + query_vector = new_features.tobytes() + + search_result = r.execute_command( + "FT.SEARCH", "features_idx", "*", + f"@currency_code:{{{currency}}} =>[KNN {k} @features $query_vec AS score]", + "PARAMS", "2", "query_vec", query_vector + ) + + results = parse_search_result(search_result) + + knn_labels = [doc["label"] for doc in results] + return 1 if sum(knn_labels) > len(knn_labels) / 2 else 0 \ No newline at end of file diff --git a/crypto-news-aggregator/news_dataset.json b/crypto-news-aggregator/news_dataset.json new file mode 100644 index 0000000..5430318 --- /dev/null +++ b/crypto-news-aggregator/news_dataset.json @@ -0,0 +1,96 @@ +[ + { + "id": 1245186, + "title": "Bitfinex Decrease BTC Withdrawal Fees By 25%", + "published_at": 1737556205, + "url": "https://cryptopanic.com/news/1245186/Bitfinex-Decrease-BTC-Withdrawal-Fees-By-25", + "source": { + "domain": "reddit.com", + "title": "r/Bitcoin Reddit", + "path": "r/bitcoin" + }, + "votes": {"negative": 0, "positive": 11, "important": 6}, + "currencies": [{"code": "BTC", "title": "Bitcoin", "slug": "bitcoin"}] + }, + { + "id": 1245187, + "title": "Ethereum Upgrade Brings New Scalability Features", + "published_at": 1737890000, + "url": "https://cryptopanic.com/news/1245187/Ethereum-Upgrade-Scalability", + "source": { + "domain": "coindesk.com", + "title": "CoinDesk", + "path": "/ethereum" + }, + "votes": {"negative": 1, "positive": 20, "important": 10}, + "currencies": [{"code": "ETH", "title": "Ethereum", "slug": "ethereum"}] + }, + { + "id": 1245188, + "title": "Major Exchange Reports Security Breach", + "published_at": 1696155330, + "url": "https://cryptopanic.com/news/1245188/Major-Exchange-Security-Breach", + "source": { + "domain": "twitter.com", + "title": "Crypto Alerts", + "path": "/crypto-alerts" + }, + "votes": {"negative": 10, "positive": 5, "important": 25}, + "currencies": [ + {"code": "BTC", "title": "Bitcoin", "slug": "bitcoin"}, + {"code": "ETH", "title": "Ethereum", "slug": "ethereum"} + ] + }, + { + "id": 1245194, + "title": "Bitcoin Plummets 20% After Market Sell-Off", + "published_at": 1737800000, + "url": "https://cryptopanic.com/news/1245194/Bitcoin-Plummets-20", + "source": { + "domain": "cryptoslate.com", + "title": "CryptoSlate", + "path": "/bitcoin" + }, + "votes": {"negative": 20, "positive": 3, "important": 15}, + "currencies": [{"code": "BTC", "title": "Bitcoin", "slug": "bitcoin"}] + }, + { + "id": 1245195, + "title": "Bitcoin Crash: Investors Lose Confidence", + "published_at": 1737557105, + "url": "https://cryptopanic.com/news/1245195/Bitcoin-Crash-Investors", + "source": { + "domain": "decrypt.co", + "title": "Decrypt", + "path": "/bitcoin" + }, + "votes": {"negative": 30, "positive": 2, "important": 20}, + "currencies": [{"code": "BTC", "title": "Bitcoin", "slug": "bitcoin"}] + }, + { + "id": 1245196, + "title": "Ripple's Legal Battle: What's Next?", + "published_at": 1737556125, + "url": "https://cryptopanic.com/news/1245196/Ripples-Legal-Battle", + "source": { + "domain": "newsbtc.com", + "title": "NewsBTC", + "path": "/ripple" + }, + "votes": {"negative": 3, "positive": 8, "important": 12}, + "currencies": [{"code": "XRP", "title": "Ripple", "slug": "ripple"}] + }, + { + "id": 1245197, + "title": "Ethereum 2.0 Launches Successfully", + "published_at": 1737800000, + "url": "https://cryptopanic.com/news/1245197/Ethereum-2-Launch", + "source": { + "domain": "cryptoslate.com", + "title": "CryptoSlate", + "path": "/ethereum" + }, + "votes": {"negative": 2, "positive": 30, "important": 20}, + "currencies": [{"code": "ETH", "title": "Ethereum", "slug": "ethereum"}] + } +] diff --git a/crypto-news-aggregator/news_fetcher.py b/crypto-news-aggregator/news_fetcher.py new file mode 100644 index 0000000..dc3f384 --- /dev/null +++ b/crypto-news-aggregator/news_fetcher.py @@ -0,0 +1,12 @@ +import os +import json + +def fetch_and_normalize_news_data(): + json_file_path = os.getenv("NEWS_DATASET_PATH", "news_dataset.json") + + try: + with open(json_file_path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f"Error while reading news dataset from ({json_file_path}): {e}") + return [] \ No newline at end of file diff --git a/crypto-news-aggregator/requirements.txt b/crypto-news-aggregator/requirements.txt new file mode 100644 index 0000000..3ed3da1 --- /dev/null +++ b/crypto-news-aggregator/requirements.txt @@ -0,0 +1,5 @@ +setuptools +redis +numpy +sentence-transformers +scikit-learn diff --git a/crypto-news-aggregator/weight_calculator.py b/crypto-news-aggregator/weight_calculator.py new file mode 100644 index 0000000..8895bca --- /dev/null +++ b/crypto-news-aggregator/weight_calculator.py @@ -0,0 +1,8 @@ +def calculate_weight(article): + votes = article.get("votes", {}) + weight = ( + votes.get("positive", 0) + + votes.get("important", 0) * 2 + - votes.get("negative", 0) + ) + return weight