Skip to content

Commit 6283736

Browse files
initial commit
0 parents  commit 6283736

File tree

16 files changed

+1130
-0
lines changed

16 files changed

+1130
-0
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
venv/
2+
__pycache__/
3+
*.py[cod]
4+
*$py.class

.vscode/launch.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"version": "0.2.0",
3+
"configurations": [
4+
{
5+
"name": "Python: Main",
6+
"type": "debugpy",
7+
"request": "launch",
8+
"program": "main.py",
9+
"console": "integratedTerminal",
10+
"justMyCode": true
11+
}
12+
]
13+
}

.vscode/settings.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"python.defaultInterpreterPath": "${workspaceFolder}/venv/bin/python",
3+
"python.terminal.activateEnvironment": true
4+
}

LICENSE

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Apache License
2+
Version 2.0, January 2004
3+
http://www.apache.org/licenses/
4+
5+
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION

MANIFEST.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
include README.md
2+
include LICENSE

README.md

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# Superstream Client for Python (`superclient`)
2+
3+
Superclient is a zero-code optimisation agent for Python applications that use Apache Kafka.
4+
It transparently intercepts producer creation in the popular client libraries and tunes
5+
configuration parameters (compression, batching, etc.) based on recommendations
6+
provided by the Superstream platform.
7+
8+
---
9+
10+
## Why use Superclient?
11+
12+
**No code changes** – simply install the package and run your program.
13+
**Dynamic configuration** – adapts to cluster-specific and topic-specific insights
14+
coming from `superstream.metadata_v1`.
15+
**Safe by design** – any internal failure falls back to your original
16+
configuration; the application never crashes because of the agent.
17+
**Minimal overhead** – uses a single lightweight background thread (or an async
18+
coroutine when running with `aiokafka`).
19+
20+
---
21+
22+
## Supported Kafka libraries
23+
24+
| Library | Producer class | Status |
25+
|---------|----------------|--------|
26+
| kafka-python | `kafka.KafkaProducer` | ✓ implemented |
27+
| aiokafka | `aiokafka.AIOKafkaProducer` | ✓ implemented |
28+
| confluent-kafka | `confluent_kafka.Producer` | ✓ implemented |
29+
30+
Other libraries/frameworks that wrap these producers (e.g. Faust, FastAPI event
31+
publishers, Celery Kafka back-ends) inherit the benefits automatically.
32+
33+
---
34+
35+
## Installation
36+
37+
```bash
38+
pip install superclient
39+
```
40+
41+
The package ships with a `sitecustomize.py` entry-point, therefore Python
42+
imports the agent automatically **before your application's code starts**.
43+
If `sitecustomize` is disabled in your environment you can initialise manually:
44+
45+
```python
46+
import superclient # side-effects automatically enable the agent
47+
```
48+
49+
---
50+
51+
## Environment variables
52+
53+
| Variable | Description |
54+
|----------|-------------|
55+
| `SUPERSTREAM_DISABLED` | `true` disables all functionality |
56+
| `SUPERSTREAM_DEBUG` | `true` prints verbose debug logs |
57+
| `SUPERSTREAM_TOPICS_LIST` | Comma-separated list of topics your application *may* write to |
58+
| `SUPERSTREAM_LATENCY_SENSITIVE` | `true` prevents the agent from lowering `linger.ms` |
59+
60+
At start-up the agent logs the set of variables detected:
61+
62+
```
63+
[superstream] INFO agent: Superstream Agent initialized with environment variables: {'SUPERSTREAM_DEBUG': 'true', ...}
64+
```
65+
66+
---
67+
68+
## How it works
69+
70+
1. An **import hook** patches the producer classes once their modules are
71+
imported.
72+
2. When your code creates a producer the agent:
73+
a. Skips internal Superstream clients (their `client_id` starts with
74+
`superstreamlib-`).
75+
b. Fetches the latest optimisation metadata from
76+
`superstream.metadata_v1`.
77+
c. Computes an optimal configuration for the most impactful topic (or falls
78+
back to sensible defaults) while respecting the
79+
latency-sensitive flag.
80+
d. Overrides producer kwargs/in-dict values before the original constructor
81+
executes.
82+
e. Sends a *client_info* message to `superstream.clients` that contains both
83+
original and optimised configurations.
84+
3. A single background heartbeat thread (or async task for `aiokafka`) emits
85+
*client_stats* messages every `report_interval_ms` (default 5 minutes).
86+
4. When the application closes the producer the agent stops tracking it and
87+
ceases heart-beats.
88+
89+
---
90+
91+
## Logging
92+
93+
Log lines are printed to `stdout`/`stderr` and start with the `[superstream]`
94+
prefix so they integrate with existing log pipelines. Set
95+
`SUPERSTREAM_DEBUG=true` for additional diagnostic messages.
96+
97+
---
98+
99+
## Security & compatibility
100+
101+
• Authentication/SSL/SASL/DNS settings are **copied from your original
102+
configuration** to every short-lived internal client.
103+
• The agent only relies on the Kafka library already present in your
104+
environment, therefore **no dependency conflicts** are introduced.
105+
• All exceptions are caught internally; your application will **never crash or
106+
hang** because of Superclient.
107+
108+
---
109+
110+
## License
111+
112+
Apache 2.0

examples/asynciokafka.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
"""
2+
Kafka Producer using aiokafka library
3+
This is an asyncio-based client, great for async Python applications
4+
"""
5+
import json
6+
import asyncio
7+
from aiokafka import AIOKafkaProducer
8+
from aiokafka.errors import KafkaError
9+
from json_generator import generate_random_json
10+
11+
async def create_producer(client_id):
12+
"""Create and configure Kafka producer"""
13+
producer = AIOKafkaProducer(
14+
bootstrap_servers='localhost:9092',
15+
client_id=client_id,
16+
compression_type=None, # No compression
17+
max_batch_size=150, # Batch size in bytes
18+
linger_ms=10, # Linger time
19+
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
20+
)
21+
await producer.start()
22+
return producer
23+
24+
async def send_messages_to_topics(producer, topics, producer_name, num_messages=50):
25+
"""Send random JSON messages to specified Kafka topics"""
26+
27+
successful = 0
28+
failed = 0
29+
30+
for i in range(num_messages):
31+
try:
32+
# Generate random JSON of at least 1KB
33+
message = generate_random_json(min_size_kb=1)
34+
message['message_number'] = i + 1
35+
message['producer'] = producer_name
36+
37+
# Send message to each topic
38+
for topic in topics:
39+
result = await producer.send_and_wait(topic, message)
40+
41+
successful += 1
42+
43+
except KafkaError as e:
44+
failed += 1
45+
print(f"Failed to send message {i+1}: {e}")
46+
47+
# Small delay between messages (optional)
48+
await asyncio.sleep(0.01)
49+
50+
print(f"\n{producer_name} Summary: {successful} successful, {failed} failed")
51+
52+
async def main():
53+
producer1 = None
54+
producer2 = None
55+
try:
56+
# Create two separate producers
57+
producer1 = await create_producer('aiokafka-producer-1')
58+
producer2 = await create_producer('aiokafka-producer-2')
59+
60+
# First producer sends to test-topic and test-topic-1
61+
topics1 = ['test-topic', 'test-topic-1']
62+
await send_messages_to_topics(producer1, topics1, 'aiokafka-producer-1')
63+
64+
# Second producer sends to test-topic-2 and test-topic-3
65+
topics2 = ['test-topic-2', 'test-topic-3']
66+
await send_messages_to_topics(producer2, topics2, 'aiokafka-producer-2')
67+
68+
except Exception as e:
69+
print(f"Error: {e}")
70+
finally:
71+
if producer1:
72+
await producer1.stop()
73+
print("Producer 1 closed")
74+
if producer2:
75+
await producer2.stop()
76+
print("Producer 2 closed")
77+
78+
# Sleep for 10 minutes at the end
79+
print("Sleeping for 10 minutes...")
80+
await asyncio.sleep(600)
81+
print("Sleep completed")
82+
83+
if __name__ == "__main__":
84+
# Run the async main function
85+
asyncio.run(main())

examples/confluent.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
"""
2+
Kafka Producer using confluent-kafka library
3+
This is the Python wrapper around librdkafka (C library), offering high performance
4+
"""
5+
import json
6+
import time
7+
from confluent_kafka import Producer
8+
from json_generator import generate_random_json
9+
10+
def delivery_report(err, msg):
11+
"""Callback for message delivery reports"""
12+
if err is not None:
13+
print(f'Message delivery failed: {err}')
14+
15+
def create_producer(client_id):
16+
"""Create and configure Kafka producer"""
17+
config = {
18+
'bootstrap.servers': 'localhost:9092',
19+
'client.id': client_id,
20+
'compression.type': 'none',
21+
'batch.size': 150, # Batch size in bytes
22+
'linger.ms': 10, # Linger time
23+
}
24+
return Producer(config)
25+
26+
def send_messages_to_topics(producer, topics, producer_name, num_messages=50):
27+
"""Send random JSON messages to specified Kafka topics"""
28+
29+
successful = 0
30+
failed = 0
31+
32+
for i in range(num_messages):
33+
try:
34+
# Generate random JSON of at least 1KB
35+
message = generate_random_json(min_size_kb=1)
36+
message['message_number'] = i + 1
37+
message['producer'] = producer_name
38+
39+
# Serialize to JSON
40+
message_json = json.dumps(message)
41+
42+
# Send message to each topic
43+
for topic in topics:
44+
producer.produce(
45+
topic=topic,
46+
value=message_json.encode('utf-8'),
47+
callback=delivery_report
48+
)
49+
# Trigger any available delivery report callbacks
50+
producer.poll(0)
51+
52+
successful += 1
53+
54+
except Exception as e:
55+
failed += 1
56+
print(f"Failed to send message {i+1}: {e}")
57+
58+
# Small delay between messages (optional)
59+
time.sleep(0.01)
60+
61+
producer.flush(timeout=30)
62+
print(f"\n{producer_name} Summary: {successful} successful, {failed} failed")
63+
64+
def main():
65+
producer1 = None
66+
producer2 = None
67+
try:
68+
# Create two separate producers
69+
producer1 = create_producer('confluent-kafka-producer-1')
70+
producer2 = create_producer('confluent-kafka-producer-2')
71+
72+
# First producer sends to test-topic and test-topic-1
73+
topics1 = ['test-topic', 'test-topic-1']
74+
send_messages_to_topics(producer1, topics1, 'confluent-kafka-producer-1')
75+
76+
# Second producer sends to test-topic-2 and test-topic-3
77+
topics2 = ['test-topic-2', 'test-topic-3']
78+
send_messages_to_topics(producer2, topics2, 'confluent-kafka-producer-2')
79+
80+
except Exception as e:
81+
print(f"Error: {e}")
82+
83+
# Sleep for 10 minutes at the end
84+
print("Sleeping for 10 minutes...")
85+
time.sleep(600)
86+
print("Sleep completed")
87+
88+
if __name__ == "__main__":
89+
main()

examples/json_generator.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import random
2+
import string
3+
import json
4+
from datetime import datetime
5+
6+
def generate_random_json(min_size_kb=1):
7+
"""Generate a random JSON object of at least min_size_kb size"""
8+
base_data = {
9+
"timestamp": datetime.now().isoformat(),
10+
"event_id": f"evt_{random.randint(100000, 999999)}",
11+
"user_id": f"user_{random.randint(1000, 9999)}",
12+
"session_id": f"session_{random.randint(10000, 99999)}",
13+
"event_type": random.choice(["click", "view", "purchase", "login", "logout"]),
14+
"device_type": random.choice(["mobile", "desktop", "tablet"]),
15+
"os": random.choice(["Windows", "macOS", "Linux", "iOS", "Android"]),
16+
"browser": random.choice(["Chrome", "Firefox", "Safari", "Edge"]),
17+
"country": random.choice(["US", "UK", "DE", "FR", "JP", "BR", "IN"]),
18+
"metrics": {
19+
"load_time": round(random.uniform(0.1, 5.0), 3),
20+
"response_time": round(random.uniform(0.01, 1.0), 3),
21+
"cpu_usage": round(random.uniform(0, 100), 2),
22+
"memory_usage": round(random.uniform(0, 100), 2)
23+
}
24+
}
25+
26+
# Calculate current size
27+
current_json = json.dumps(base_data)
28+
current_size = len(current_json.encode('utf-8'))
29+
target_size = min_size_kb * 1024
30+
31+
# Add padding data if needed to reach target size
32+
if current_size < target_size:
33+
padding_size = target_size - current_size
34+
# Generate random string data for padding
35+
padding_data = {
36+
"additional_data": {
37+
f"field_{i}": ''.join(random.choices(string.ascii_letters + string.digits, k=50))
38+
for i in range(padding_size // 50)
39+
}
40+
}
41+
base_data.update(padding_data)
42+
43+
return base_data

0 commit comments

Comments
 (0)