Skip to content

Commit 97c52af

Browse files
authored
add implementation (#4)
1 parent 0cc163f commit 97c52af

File tree

11 files changed

+6951
-0
lines changed

11 files changed

+6951
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
OPENAI_API_KEY=
2+
3+
BOTFATHER_API_TOKEN=
4+
BITQUERY_OAUTH_TOKEN=
5+
LOG_LEVEL=INFO # Options: DEBUG, INFO, WARNING, ERROR
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
.venv
2+
.env
3+
swarmzero-data
4+
**/*/__pycache__/
5+
__pycache__/
6+
.DS_Store

agents/crypto_trader_bot/README.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Crypto Trader Telegram Bot
2+
3+
An AI-powered cryptocurrency trading bot that leverages BitQuery for market data and provides trading insights through a Telegram interface.
4+
5+
Inspired by [How to build your own Add Liquidity Signal Telegram Bot for Solana DEX Pools | Bitquery API Tutorial
6+
](https://youtu.be/s5GTjKhUmEo?si=DI61viBplqKIYwXG).
7+
8+
## Features
9+
10+
- Real-time cryptocurrency market data monitoring via BitQuery
11+
- Telegram bot interface for user interaction
12+
- Configurable logging levels
13+
- AI-powered trading analysis and recommendations
14+
15+
## Prerequisites
16+
17+
- Python 3.11 or higher
18+
- Poetry for dependency management
19+
- Telegram Bot Token (from BotFather)
20+
- BitQuery OAuth Token
21+
- OpenAI API Key (for AI features)
22+
23+
## Installation
24+
25+
1. Clone the repository:
26+
```bash
27+
git clone https://github.com/swarmzero/examples.git
28+
cd examples/agents/crypto-trader-bot
29+
```
30+
31+
2. Install dependencies using Poetry:
32+
```bash
33+
poetry install --no-root
34+
```
35+
36+
3. Set up environment variables:
37+
- Copy `.env.example` to `.env`
38+
- Fill in the required tokens and API keys:
39+
- `OPENAI_API_KEY`: Your OpenAI API key
40+
- `BOTFATHER_API_TOKEN`: Your Telegram bot token
41+
- `BITQUERY_OAUTH_TOKEN`: Your BitQuery OAuth token
42+
- `LOG_LEVEL`: Desired logging level (DEBUG, INFO, WARNING, ERROR)
43+
44+
## Usage
45+
46+
1. Activate the Poetry environment:
47+
```bash
48+
poetry shell
49+
```
50+
51+
2. Run the bot:
52+
```bash
53+
poetry run python main.py
54+
```
55+
56+
The bot will start and listen for commands through Telegram.
57+
58+
## Project Structure
59+
60+
- `main.py`: Entry point and bot initialization
61+
- `bitquery_service.py`: BitQuery API integration
62+
- `dex_agent.py`: Trading logic and analysis
63+
- `dex_rabbit_bot.py`: Telegram bot implementation
64+
- `config.py`: Configuration management
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import logging
2+
from datetime import datetime, timedelta, timezone
3+
from typing import Dict
4+
5+
import aiohttp
6+
7+
from config import get_chain_id
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class BitQueryService:
13+
def __init__(self, oauth_token: str):
14+
if not oauth_token:
15+
raise ValueError("BitQuery OAuth token is required")
16+
17+
logger.debug("Initializing BitQueryService")
18+
self.oauth_token = oauth_token
19+
self.url = "https://streaming.bitquery.io/graphql"
20+
self.headers = {
21+
"Content-Type": "application/json",
22+
"Authorization": f"Bearer {oauth_token}",
23+
}
24+
25+
def _get_base_trade_fields(self, address_field: str = "SmartContract") -> str:
26+
"""Get common trade fields structure for all chains"""
27+
return f"""
28+
Block {{
29+
Number
30+
Time
31+
}}
32+
Transaction {{
33+
Hash
34+
}}
35+
Trade {{
36+
Buy {{
37+
Amount
38+
Currency {{
39+
Name
40+
Symbol
41+
{address_field}
42+
}}
43+
Price
44+
}}
45+
Sell {{
46+
Amount
47+
Currency {{
48+
Name
49+
Symbol
50+
{address_field}
51+
}}
52+
Price
53+
}}
54+
Dex {{
55+
ProtocolName
56+
}}
57+
}}
58+
"""
59+
60+
def _get_chain_query(self, chain: str) -> tuple[str, str]:
61+
"""Get chain-specific query structure and namespace"""
62+
if chain == "solana":
63+
return "Solana", "MintAddress"
64+
elif chain == "tron":
65+
return "Tron", "Address"
66+
elif chain == "ton":
67+
return "TON", "Address"
68+
else: # EVM chains
69+
return "EVM", "SmartContract"
70+
71+
async def get_chain_activity(self, chain: str, time_window: int = 60) -> Dict:
72+
"""
73+
Fetch trading activity for specified chain
74+
time_window: minutes to look back
75+
"""
76+
try:
77+
logger.debug(f"Fetching chain activity for {chain}, time window: {time_window}min")
78+
now = datetime.now(timezone.utc)
79+
time_ago = now - timedelta(minutes=time_window)
80+
81+
# Normalize chain name
82+
chain = get_chain_id(chain)
83+
namespace, address_field = self._get_chain_query(chain)
84+
trade_fields = self._get_base_trade_fields(address_field)
85+
86+
# Build query based on chain type
87+
if namespace == "EVM":
88+
# Query for EVM chains
89+
query = f"""
90+
query ($network: evm_network!, $since: DateTime) {{
91+
{namespace}(network: $network) {{
92+
DEXTrades(
93+
orderBy: {{descending: Block_Time}}
94+
where: {{Block: {{Time: {{since: $since}}}}}}
95+
) {{
96+
{trade_fields}
97+
}}
98+
}}
99+
}}
100+
"""
101+
variables = {"network": chain.lower(), "since": time_ago.isoformat()}
102+
else:
103+
# Query for non-EVM chains (Solana, Tron, TON)
104+
query = f"""
105+
query ($since: DateTime) {{
106+
{namespace} {{
107+
DEXTrades(
108+
orderBy: {{descending: Block_Time}}
109+
where: {{Block: {{Time: {{since: $since}}}}}}
110+
) {{
111+
{trade_fields}
112+
}}
113+
}}
114+
}}
115+
"""
116+
variables = {"since": time_ago.isoformat()}
117+
118+
# Log the query and variables
119+
logger.debug(f"BitQuery request for {chain}:")
120+
logger.debug(f"Query: {query}")
121+
logger.debug(f"Variables: {variables}")
122+
123+
async with aiohttp.ClientSession() as session:
124+
async with session.post(
125+
self.url,
126+
headers=self.headers,
127+
json={"query": query, "variables": variables},
128+
) as response:
129+
if response.status != 200:
130+
error_text = await response.text()
131+
logger.error(f"BitQuery API error: Status {response.status}, Response: {error_text}")
132+
raise aiohttp.ClientError(f"BitQuery API returned status {response.status}")
133+
134+
data = await response.json()
135+
136+
if "errors" in data:
137+
logger.error(f"GraphQL errors: {data['errors']}")
138+
raise ValueError(f"GraphQL query failed: {data['errors']}")
139+
140+
# Log the response data
141+
trades = data.get("data", {}).get(namespace, {}).get("DEXTrades", [])
142+
logger.info(f"Received {len(trades)} trades from BitQuery for {chain}")
143+
144+
# Log sample of trades for debugging
145+
if trades:
146+
logger.debug("Sample trade data (first trade):")
147+
logger.debug(f"Block: {trades[0].get('Block', {})}")
148+
logger.debug(f"Transaction: {trades[0].get('Transaction', {})}")
149+
logger.debug(f"Trade details: {trades[0].get('Trade', {})}")
150+
151+
logger.debug(f"Successfully fetched data for {chain}")
152+
return data
153+
154+
except aiohttp.ClientError as e:
155+
logger.error(f"Network error while fetching chain activity: {str(e)}")
156+
raise
157+
except Exception as e:
158+
logger.error(f"Error fetching chain activity for {chain}: {str(e)}")
159+
raise

agents/crypto_trader_bot/config.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import logging
2+
from typing import Dict, NamedTuple
3+
4+
logger = logging.getLogger(__name__)
5+
6+
7+
class ChainConfig(NamedTuple):
8+
name: str
9+
url_path: str
10+
native_token: str
11+
explorer: str
12+
13+
14+
SUPPORTED_CHAINS: Dict[str, ChainConfig] = {
15+
"solana": ChainConfig("Solana", "solana", "SOL", "https://solscan.io"),
16+
"eth": ChainConfig("Ethereum", "eth", "ETH", "https://etherscan.io"),
17+
"tron": ChainConfig("Tron", "tron", "TRX", "https://tronscan.org"),
18+
"ton": ChainConfig("TON", "ton", "TON", "https://tonscan.org"),
19+
"base": ChainConfig("Base", "base", "ETH", "https://basescan.org"),
20+
"matic": ChainConfig("Polygon", "matic", "MATIC", "https://polygonscan.com"),
21+
"bsc": ChainConfig("BSC", "bsc", "BNB", "https://bscscan.com"),
22+
"opbnb": ChainConfig("opBNB", "opbnb", "BNB", "https://opbnbscan.com"),
23+
"optimism": ChainConfig("Optimism", "optimism", "ETH", "https://optimistic.etherscan.io"),
24+
"arbitrum": ChainConfig("Arbitrum", "arbitrum", "ETH", "https://arbiscan.io"),
25+
}
26+
27+
# Chain name variations mapping
28+
CHAIN_ALIASES = {
29+
# Ethereum variations
30+
"ethereum": "eth",
31+
"ether": "eth",
32+
# BSC variations
33+
"binance": "bsc",
34+
"binance smart chain": "bsc",
35+
# Polygon variations
36+
"polygon": "matic",
37+
# Common variations for other chains
38+
"sol": "solana",
39+
"opt": "optimism",
40+
"arb": "arbitrum",
41+
}
42+
43+
# Log supported chains on module load
44+
logger.info(f"Loaded {len(SUPPORTED_CHAINS)} supported chains:")
45+
for chain_id, config in SUPPORTED_CHAINS.items():
46+
logger.debug(f" {chain_id}: {config.name} ({config.native_token})")
47+
48+
49+
def normalize_chain_name(chain: str) -> str:
50+
"""Convert various chain name formats to our standard chain ID"""
51+
chain = chain.lower().strip()
52+
return CHAIN_ALIASES.get(chain, chain)
53+
54+
55+
def validate_chain(chain: str) -> bool:
56+
"""Validate if a chain is supported"""
57+
normalized_chain = normalize_chain_name(chain)
58+
is_valid = normalized_chain in SUPPORTED_CHAINS
59+
if not is_valid:
60+
logger.warning(f"Attempted to use unsupported chain: {chain} (normalized: {normalized_chain})")
61+
return is_valid
62+
63+
64+
def get_chain_id(chain: str) -> str:
65+
"""Get the standardized chain ID from any valid chain name variation"""
66+
normalized_chain = normalize_chain_name(chain)
67+
if normalized_chain in SUPPORTED_CHAINS:
68+
return normalized_chain
69+
raise ValueError(f"Unsupported chain: {chain}")

0 commit comments

Comments
 (0)