KubeMQ is an enterprise-grade message broker for containers, designed for any workload and architecture running in Kubernetes. This SDK provides a production-ready Python client supporting all KubeMQ messaging patterns: Events (pub/sub), Events Store (persistent pub/sub), Queues (with ack/reject), Commands, and Queries (RPC).
- Installation
- Quick Start
- Messaging Patterns
- Configuration
- Error Handling
- Troubleshooting
- Security
- Additional Resources
- Contributing
- License
Requires Python 3.11+.
pip install kubemqFor optional features:
pip install kubemq[docs] # API reference generation
pip install kubemq[otel] # OpenTelemetry integrationPrerequisites: Python 3.11+, KubeMQ server running at
localhost:50000(install KubeMQ)
Send an event:
from kubemq import PubSubClient, EventMessage
with PubSubClient(address="localhost:50000") as client:
client.publish_event(
EventMessage(channel="quickstart", body=b"Hello KubeMQ!")
)
print("Event sent!")Subscribe to events:
import time
from kubemq import PubSubClient, EventsSubscription, CancellationToken
def on_event(event):
print(f"Received: {event.body.decode('utf-8')}")
with PubSubClient(address="localhost:50000") as client:
client.subscribe_to_events(
subscription=EventsSubscription(
channel="quickstart",
on_receive_event_callback=on_event,
on_error_callback=lambda e: print(f"Error: {e}"),
),
cancel=CancellationToken(),
)
time.sleep(60) # Keep listeningSee also: Queues Quick Start | RPC Quick Start
| Pattern | Delivery Guarantee | Use When | Example Use Case |
|---|---|---|---|
| Events | At-most-once | Fire-and-forget broadcasting to multiple subscribers | Real-time notifications, log streaming |
| Events Store | At-least-once (persistent) | Subscribers must not miss messages, even if offline | Audit trails, event sourcing, replay |
| Queues | At-least-once (with ack) | Work must be processed exactly by one consumer with acknowledgment | Job processing, task distribution |
| Commands | At-most-once (request/reply) | You need a response confirming the action was executed | Device control, configuration changes |
| Queries | At-most-once (request/reply) | You need to retrieve data from a responder | Data lookups, service-to-service reads |
Fire-and-forget pub/sub. Use PubSubClient (sync) or AsyncPubSubClient (async).
View examples →
Persistent pub/sub with replay. Subscribers can start from a sequence number, timestamp, or receive only new messages. View examples →
Pull-based message queues with acknowledgment, reject, requeue, dead-letter queues, and delayed delivery. View examples →
Request/reply where the sender expects confirmation of execution (no response payload). View examples →
Request/reply where the sender expects a data response. View examples →
| Option | Type | Default | Description |
|---|---|---|---|
address |
str |
"localhost:50000" |
KubeMQ server gRPC address |
client_id |
str |
hostname | Unique client identifier |
auth_token |
str |
"" |
Authentication token |
tls |
TLSConfig |
disabled | TLS configuration (cert, key, CA files) |
max_send_size |
int |
104857600 |
Maximum send message size in bytes |
max_receive_size |
int |
104857600 |
Maximum receive message size in bytes |
auto_reconnect |
bool |
True |
Auto-reconnect on connection loss |
reconnect_interval_seconds |
int |
1 |
Seconds between reconnect attempts |
log_level |
int | None |
None (no logging) |
Python logging level |
All clients accept these options as constructor arguments:
from kubemq import PubSubClient, TLSConfig
client = PubSubClient(
address="kubemq-server:50000",
client_id="my-service",
auth_token="your-token",
tls=TLSConfig(
enabled=True,
cert_file="/path/to/cert.pem",
key_file="/path/to/key.pem",
ca_file="/path/to/ca.pem",
),
)All SDK errors extend KubeMQError:
from kubemq import (
PubSubClient, EventMessage,
KubeMQError, KubeMQConnectionError, KubeMQTimeoutError,
)
try:
with PubSubClient(address="localhost:50000") as client:
client.publish_event(
EventMessage(channel="ch1", body=b"hello")
)
except KubeMQConnectionError as e:
print(f"Connection failed: {e}")
except KubeMQTimeoutError as e:
print(f"Operation timed out: {e}")
except KubeMQError as e:
print(f"KubeMQ error: {e}")See the Troubleshooting Guide for solutions to common errors.
| Problem | Solution |
|---|---|
Connection refused |
Ensure KubeMQ is running: docker run -p 50000:50000 kubemq/kubemq-community |
Authentication failed |
Verify auth_token matches server configuration |
Channel not found |
Create the channel first or enable auto-create on the server |
Timeout / deadline exceeded |
Increase timeout_in_seconds or check network latency |
No messages received |
Verify subscriber is connected before sender publishes |
→ Full Troubleshooting Guide (11+ entries)
See SECURITY.md for vulnerability reporting. The SDK supports TLS and mTLS connections — for configuration details, see How to Connect with TLS.
- KubeMQ Documentation — Official KubeMQ documentation and guides
- API Reference — Auto-generated API documentation
- Full Documentation Index — Complete SDK documentation index
- KubeMQ Concepts — Core KubeMQ messaging concepts
- SDK Feature Parity Matrix — Cross-SDK feature comparison
- CHANGELOG.md — Release history
- TROUBLESHOOTING.md — Common issues and solutions
- Examples — Runnable code examples for all patterns
Contributions are welcome! See CONTRIBUTING.md for development setup, coding standards, and pull request guidelines.
This project is licensed under the MIT License — see the LICENSE file for details.