Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions 8-application-demos/6-kalshi-bet-predictor/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DS_Store
.env
.venv/
.vscode/
23 changes: 23 additions & 0 deletions 8-application-demos/6-kalshi-bet-predictor/cerebrium.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[cerebrium.deployment]
name = "kalshi-bet-predictor"
python_version = "3.11"
docker_base_image_url = "debian:bookworm-slim"
disable_auth = true
include = ['./*', 'main.py', 'cerebrium.toml']
exclude = ['.*']

[cerebrium.dependencies.paths]
pip = "requirements.txt"

[cerebrium.hardware]
cpu = 4
memory = 16
compute = "CPU"

[cerebrium.scaling]
min_replicas = 0
max_replicas = 100
cooldown = 30
replica_concurrency = 1
scaling_metric = "concurrency_utilization"

127 changes: 127 additions & 0 deletions 8-application-demos/6-kalshi-bet-predictor/compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import csv
import json
from typing import Dict, List, Tuple
import asyncio
import aiohttp

def load_markets(csv_path: str) -> List[Tuple[str, str]]:
markets = []
with open(csv_path, 'r') as f:
reader = csv.reader(f)
next(reader) # skip header
for row in reader:
if len(row) >= 2:
markets.append((row[0], row[1]))
return markets

async def get_market_data(session: aiohttp.ClientSession, kalshi_ticker: str,
polymarket_slug: str, endpoint_url: str) -> Dict:

payload = json.dumps({
'kalshi_ticker': kalshi_ticker,
'poly_slug': polymarket_slug
})

headers = {
'Authorization': '<YOUR AUTHORIZATION>',
'Content-Type': 'application/json'
}

try:
async with session.post(endpoint_url, headers=headers, data=payload) as response:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think axios might be much clearner than aiohttp. There a reason you used it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. How would I use Axios in a python script? Axios is used in node

response.raise_for_status()
data = await response.json()
print(data)
data = data['result']

kalshi_data = data['kalshi']
polymarket_data = data['polymarket']

return {
'kalshi_ticker': kalshi_ticker,
'polymarket_slug': polymarket_slug,
'kalshi_edge': kalshi_data['edge'],
'polymarket_edge': polymarket_data['edge'],
'kalshi_buy_yes': kalshi_data['buy_yes'],
'kalshi_buy_no': kalshi_data['buy_no'],
'polymarket_buy_yes': polymarket_data['buy_yes'],
'polymarket_buy_no': polymarket_data['buy_no'],
}
except Exception as e:
print(f"Error fetching data for {kalshi_ticker}/{polymarket_slug}: {e}")
return None

async def analyze_markets_async(csv_path: str, endpoint_url: str) -> List[Dict]:
markets = load_markets(csv_path)

print(f"Fetching data for {len(markets)} markets all at once...")

async with aiohttp.ClientSession() as session:
tasks = [get_market_data(session, kalshi_ticker, polymarket_slug, endpoint_url)
for kalshi_ticker, polymarket_slug in markets]

results = await asyncio.gather(*tasks)

return [r for r in results if r is not None]

def compute_statistics(results: List[Dict]) -> None:
print("\n" + "="*80)
print("STATISTICS")
print("="*80)

if not results:
print("No results to analyze")
return

total_markets = len(results)

kalshi_edges = [r['kalshi_edge'] for r in results]
total_kalshi_edge = sum(kalshi_edges)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its tough to know if this is a count or money. Also is money in dollars or cents?

Copy link
Author

@SeaUrc SeaUrc Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it more clear. Money is in cents.


polymarket_edges = [r['polymarket_edge'] for r in results]
total_polymarket_edge = sum(polymarket_edges)

kalshi_better_count = sum(1 for r in results if r['kalshi_edge'] > r['polymarket_edge'])
polymarket_better_count = sum(1 for r in results if r['polymarket_edge'] > r['kalshi_edge'])
equal_count = total_markets - kalshi_better_count - polymarket_better_count

edge_differences = [abs(r['kalshi_edge'] - r['polymarket_edge']) for r in results]
avg_edge_difference = sum(edge_differences) / total_markets
max_edge_difference = max(edge_differences)

print(f"\nTotal markets analyzed: {total_markets}")
print("\n" + "-"*80)
print("COMPARISON")
print("-"*80)
print(f"Markets with greater Kalshi edge: {kalshi_better_count} ({kalshi_better_count/total_markets*100:.1f}%)")
print(f"Markets with greater Polymarket edge: {polymarket_better_count} ({polymarket_better_count/total_markets*100:.1f}%)")
print(f"Markets with equal edge: {equal_count} ({equal_count/total_markets*100:.1f}%)")
print(f"\nAverage edge difference: {avg_edge_difference:.4f}")
print(f"Max edge difference: {max_edge_difference:.4f}")

print("\n" + "="*80)
if total_kalshi_edge > total_polymarket_edge:
advantage = total_kalshi_edge - total_polymarket_edge
print(f"OVERALL: Kalshi has greater total edge (+{advantage:.4f})")
print(f"OVERALL: Kalshi has an average edge of (+{advantage/total_markets:.4f}) per market")
elif total_polymarket_edge > total_kalshi_edge:
advantage = total_polymarket_edge - total_kalshi_edge
print(f"OVERALL: Polymarket has greater total edge (+{advantage:.4f})")
print(f"OVERALL: Polymarket has an average edge of (+{advantage/total_markets:.4f}) per market")
else:
print(f"OVERALL: Both platforms have equal total edge")
print("="*80)

def main():
CSV_PATH = '<PATH TO YOUR .csv FILE>'
ENDPOINT_URL = '<YOUR HOSTED ENDPOINT>'

print("Starting async market analysis...")
results = asyncio.run(analyze_markets_async(CSV_PATH, ENDPOINT_URL))

print(f"\nSuccessfully fetched {len(results)} markets")

compute_statistics(results)

if __name__ == "__main__":
main()
202 changes: 202 additions & 0 deletions 8-application-demos/6-kalshi-bet-predictor/find_equiv_markets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import csv
import os
import requests
import faiss
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any

# --- Config ---
SIMILARITY_THRESHOLD = 0.70 # threshold for cosine simlarity
MAX_MARKET_LIMIT = 40000 # max number of active & open markets to gather
TOP_K = 5 # number of top Polymarket markets to check for each Kalshi market
KALSHI_API_URL = "https://api.elections.kalshi.com/trade-api/v2/markets"
POLYMARKET_API_URL = "https://clob.polymarket.com/markets"
OUTPUT_FILE = "markets.csv"

# ---------------------- API Fetch Functions ----------------------

def get_kalshi_markets() -> List[Dict[str, Any]]:
print("Fetching Kalshi markets...")
markets_list = []
cursor = ""
try:
while True:
params = {'limit': 1000}
if cursor:
params['cursor'] = cursor

response = requests.get(KALSHI_API_URL, params=params)
response.raise_for_status()
data = response.json()

if 'markets' not in data:
print("Error: 'markets' key not in Kalshi response.")
break

for market in data['markets']:
if market['status'] == 'active' and market['market_type'] == 'binary':

markets_list.append({
'platform': 'Kalshi',
'title': market['title'],
'ticker': market['ticker'],
'url': f"https://kalshi.com/markets/{market['ticker']}",
'event_url': f"https://kalshi.com/markets/{market['event_ticker']}",
'close_date': market['close_time']
})

cursor = data['cursor']
print(f"Found {len(markets_list)} active and open markets")

if len(markets_list) > MAX_MARKET_LIMIT or not cursor:
break

print(f"Found {len(markets_list)} open binary markets on Kalshi.")
return markets_list

except requests.exceptions.RequestException as e:
print(f"Error fetching Kalshi markets: {e}")
return []

def get_kalshi_market(ticker):
title = requests.get(f"{KALSHI_API_URL}/{ticker}")
title = title.json()
return title['market']['title']

def get_polymarket_markets() -> List[Dict[str, Any]]:
print("Fetching Polymarket markets...")
markets_list = []
next_cursor = None

try:
while True:
params = {}
if next_cursor:
params['next_cursor'] = next_cursor

response = requests.get(POLYMARKET_API_URL, params=params)
response.raise_for_status()
data = response.json()

market_list_page = data['data']
if not market_list_page:
break

for market in market_list_page:
if market.get('active') and not market.get('closed'):
markets_list.append({
'platform': 'Polymarket',
'title': market.get('question'),
'id': market.get('condition_id'),
'url': f"https://polymarket.com/event/{market.get('market_slug')}",
'close_date': market.get('end_date_iso')
})

next_cursor = data.get('next_cursor')
print(f"Found {len(markets_list)} active and open markets")

if len(markets_list) > MAX_MARKET_LIMIT or not next_cursor or next_cursor == 'LTE=':
break

print(f"Found {len(markets_list)} open markets on Polymarket.")
return markets_list

except requests.exceptions.RequestException as e:
print(f"Error fetching Polymarket markets: {e}")
return []


# ---------------------- Matching ----------------------

def find_similar_markets(kalshi_markets, polymarket_markets, threshold=0.9, top_k=TOP_K):
print("\nLoading NLP model...")
model = SentenceTransformer('all-MiniLM-L6-v2')

kalshi_titles = [m['title'] for m in kalshi_markets]
poly_titles = [m['title'] for m in polymarket_markets]

if not kalshi_titles or not poly_titles:
print("Not enough market data to compare.")
return []

print("Encoding titles into embeddings...")
kalshi_embeddings = model.encode(kalshi_titles, convert_to_numpy=True, normalize_embeddings=True)
poly_embeddings = model.encode(poly_titles, convert_to_numpy=True, normalize_embeddings=True)

print(f"Building vector index for {len(poly_embeddings)} Polymarket markets...")
dim = poly_embeddings.shape[1]
index = faiss.IndexFlatIP(dim) # Inner product for cosine similarity
index.add(poly_embeddings)

print(f"Querying top {top_k} nearest Polymarket markets for each Kalshi market...")
scores, indices = index.search(kalshi_embeddings, top_k)

potential_matches = []
for i, kalshi_market in enumerate(kalshi_markets):
for j in range(top_k):
score = float(scores[i][j])
if score >= threshold:
poly_market = polymarket_markets[indices[i][j]]
potential_matches.append({
'score': score,
'kalshi_market': kalshi_market,
'polymarket_market': poly_market
})
if i % 100 == 0:
print(f"Processed {i}/{len(kalshi_markets)} Kalshi markets...")


return potential_matches

def interactive_save(matches: List[Dict[str, Any]]):
print("\n--- Review Mode ---")
print("Press 'y' to save a match, anything else to skip.\n")

file_exists = os.path.exists(OUTPUT_FILE)
with open(OUTPUT_FILE, "a", newline='', encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
if not file_exists:
writer.writerow(["kalshi_ticker", "polymarket_slug"])

for i, match in enumerate(matches):
kalshi_ticker = match['kalshi_market']['ticker']
poly_slug = match['polymarket_market']['url'].split("event/")[1]
kalshi_title = get_kalshi_market(kalshi_ticker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to use this function? Isn't the title already returned i the the matches

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matches only returns the kalshi ticker. To get a title of a market in the Kalshi API, you have to query for the specific market (so you can't get it from /markets either).

poly_title = match['polymarket_market']['title']
score = match['score']

print(f"\nMatch #{i+1} (Score: {score:.4f})")
print(f"[KALSHI] {kalshi_title}")
print(f"[POLYMARKET] {poly_title}")
print(f" > Kalshi URL: {match['kalshi_market']['url']}")
print(f" > Polymarket URL:{match['polymarket_market']['url']}")

choice = input("Save this match? (y/n): ").strip().lower()
if choice == 'y':
writer.writerow([kalshi_ticker, poly_slug])
print("Saved.")
else:
print("Skipped.")

print(f"\nDone. Saved matches to '{OUTPUT_FILE}'.")

def main():
kalshi_markets = get_kalshi_markets()
polymarket_markets = get_polymarket_markets()

if not kalshi_markets or not polymarket_markets:
print("\nCould not fetch markets from one or both platforms. Exiting.")
return

matches = find_similar_markets(kalshi_markets, polymarket_markets, SIMILARITY_THRESHOLD)
print(f"\n--- Found {len(matches)} Potential Matches ---")

if not matches:
print("No strong matches found.")
return

matches.sort(key=lambda x: x['score'], reverse=True)
interactive_save(matches)

if __name__ == "__main__":
main()
Loading