Skip to content

Commit 8e8bb03

Browse files
authored
cli tooling for debugging (#62)
1 parent 156120e commit 8e8bb03

File tree

5 files changed

+569
-1
lines changed

5 files changed

+569
-1
lines changed

README.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,23 @@ async with Consumer(stream_name="my-stream") as consumer:
2323
print(message)
2424
```
2525

26+
**CLI:**
27+
```bash
28+
pip install async-kinesis[cli]
29+
30+
async-kinesis put my-stream '{"hello": "world"}'
31+
async-kinesis tail my-stream -n 10
32+
async-kinesis list
33+
async-kinesis describe my-stream
34+
```
35+
2636
📚 **New to async-kinesis?** Check out our [comprehensive Getting Started guide](./docs/getting-started.md) for step-by-step tutorials and examples.
2737

2838
## Table of Contents
2939

3040
- [Key Features](#key-features)
3141
- [Installation](#installation)
42+
- [CLI](#cli)
3243
- [Basic Usage](#basic-usage)
3344
- [Producer](#producer)
3445
- [Consumer](#consumer)
@@ -77,11 +88,61 @@ Unlike basic Kinesis libraries, async-kinesis provides enterprise-grade reshardi
7788
pip install async-kinesis
7889

7990
# With optional dependencies
91+
pip install async-kinesis[cli] # CLI tool (async-kinesis command)
8092
pip install async-kinesis[prometheus] # For Prometheus metrics
8193
pip install async-kinesis[redis] # For Redis checkpointing
8294
pip install async-kinesis[dynamodb] # For DynamoDB checkpointing
8395
```
8496

97+
## CLI
98+
99+
The optional CLI provides commands for interacting with Kinesis streams directly from the terminal. It uses the same Consumer/Producer classes under the hood, so you get the same deaggregation, rate limiting, and reconnection logic.
100+
101+
```bash
102+
pip install async-kinesis[cli]
103+
```
104+
105+
### Commands
106+
107+
```bash
108+
# List streams
109+
async-kinesis list
110+
111+
# Describe a stream (metadata + shard table)
112+
async-kinesis describe my-stream
113+
114+
# Put a JSON record
115+
async-kinesis put my-stream '{"event": "click", "user": 123}'
116+
117+
# Put from stdin (JSONL)
118+
cat events.jsonl | async-kinesis put my-stream
119+
120+
# Put with explicit partition key
121+
async-kinesis put my-stream '{"user": 123}' -k user-123
122+
123+
# Create stream on first put
124+
async-kinesis put --create my-stream '{"first": "record"}'
125+
126+
# Tail (live, latest records)
127+
async-kinesis tail my-stream
128+
129+
# Tail from beginning, stop after 10 records
130+
async-kinesis tail my-stream -i TRIM_HORIZON -n 10
131+
132+
# Tail with compact JSON output
133+
async-kinesis tail my-stream -i TRIM_HORIZON -f json -n 5
134+
```
135+
136+
### Global Options
137+
138+
| Option | Env Var | Description |
139+
| --- | --- | --- |
140+
| `--endpoint-url` | `ENDPOINT_URL` | Kinesis endpoint (for LocalStack/kinesalite) |
141+
| `--region` | `AWS_DEFAULT_REGION` | AWS region |
142+
| `-v, --verbose` | | Enable debug logging |
143+
144+
📖 **[Full CLI Reference](./docs/cli.md)**
145+
85146
## Basic Usage
86147

87148
### Environment Variables
@@ -616,6 +677,7 @@ python tests/resharding/resharding_test.py --scenario scale-up-small
616677
## Documentation
617678

618679
- **[Getting Started Guide](./docs/getting-started.md)** - Step-by-step tutorials for beginners
680+
- **[CLI Reference](./docs/cli.md)** - Command-line tool for stream operations
619681
- **[Testing Guide](./docs/testing.md)** - In-memory mocks, pytest fixtures, and test helpers
620682
- **[Common Patterns](./docs/common-patterns.md)** - Real-world use cases and examples
621683
- **[Metrics & Observability](./docs/metrics.md)** - Prometheus integration and monitoring

docs/cli.md

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
# CLI Reference
2+
3+
The `async-kinesis` CLI provides commands for interacting with Kinesis streams from the terminal. It uses the library's Consumer and Producer classes directly, giving you the same deaggregation, serialization, rate limiting, and reconnection logic as the Python API.
4+
5+
## Installation
6+
7+
```bash
8+
pip install async-kinesis[cli]
9+
```
10+
11+
This installs Click as a dependency and registers the `async-kinesis` console script.
12+
13+
## Global Options
14+
15+
```
16+
async-kinesis [OPTIONS] COMMAND [ARGS]...
17+
```
18+
19+
| Option | Env Var | Description |
20+
| --- | --- | --- |
21+
| `--endpoint-url` | `ENDPOINT_URL` | Kinesis endpoint URL (for LocalStack, kinesalite, etc.) |
22+
| `--region` | `AWS_DEFAULT_REGION` | AWS region name |
23+
| `-v, --verbose` | | Enable debug logging (shows library internals) |
24+
| `--version` | | Show version and exit |
25+
26+
AWS credentials are read from the standard boto3 chain (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `~/.aws/credentials`, etc.).
27+
28+
## Commands
29+
30+
### `list`
31+
32+
List Kinesis streams in the account/region.
33+
34+
```bash
35+
async-kinesis list [--limit N]
36+
```
37+
38+
| Option | Default | Description |
39+
| --- | --- | --- |
40+
| `--limit` | 100 | Maximum number of streams to return |
41+
42+
Handles both AWS (StreamSummaries with status/shard count/mode) and kinesalite (StreamNames-only) response formats.
43+
44+
**Example:**
45+
```
46+
$ async-kinesis list
47+
Name Status Shards
48+
----------- ------ ------
49+
my-stream ACTIVE 2
50+
events ACTIVE 4
51+
```
52+
53+
### `describe`
54+
55+
Show details about a specific stream including shard information.
56+
57+
```bash
58+
async-kinesis describe STREAM
59+
```
60+
61+
**Example:**
62+
```
63+
$ async-kinesis describe my-stream
64+
Name my-stream
65+
ARN arn:aws:kinesis:ap-southeast-2:123456789:stream/my-stream
66+
Status ACTIVE
67+
Retention 24 hours
68+
Encryption NONE
69+
Shards 1
70+
71+
Shard ID Status Start Hash End Hash Start Seq End Seq
72+
-------------------- ------ ---------- --------------------------------------- ---------- -------
73+
shardId-000000000000 OPEN 0 340282366920938463463374607431768211455 49635...
74+
```
75+
76+
### `tail`
77+
78+
Tail records from a stream. Uses a real Consumer with MemoryCheckPointer for stateless tailing.
79+
80+
```bash
81+
async-kinesis tail STREAM [OPTIONS]
82+
```
83+
84+
| Option | Default | Description |
85+
| --- | --- | --- |
86+
| `-i, --iterator-type` | `LATEST` | Where to start: `LATEST`, `TRIM_HORIZON`, `AT_TIMESTAMP` |
87+
| `-f, --format` | `json-pretty` | Output format: `json-pretty`, `json`, `raw`, `raw-short` |
88+
| `-p, --processor` | `json` | Record processor: `json`, `string` |
89+
| `-n, --max-records` | *(unlimited)* | Stop after N records |
90+
91+
**Output formats:**
92+
93+
| Format | Description |
94+
| --- | --- |
95+
| `json-pretty` | Indented JSON (default) |
96+
| `json` | Compact single-line JSON (good for piping to `jq`) |
97+
| `raw` | Python `repr()` of the deserialized record |
98+
| `raw-short` | Python `repr()` truncated to 120 characters |
99+
100+
**Examples:**
101+
102+
```bash
103+
# Live tail (latest records, Ctrl+C to stop)
104+
async-kinesis tail my-stream
105+
106+
# Read from beginning, stop after 10
107+
async-kinesis tail my-stream -i TRIM_HORIZON -n 10
108+
109+
# Compact JSON for piping
110+
async-kinesis tail my-stream -i TRIM_HORIZON -f json -n 100 | jq '.user_id'
111+
112+
# Read raw strings (for StringProcessor data)
113+
async-kinesis tail my-stream -p string -f raw -n 5
114+
```
115+
116+
### `put`
117+
118+
Put records into a stream. Uses a real Producer with full batching and flushing.
119+
120+
```bash
121+
async-kinesis put STREAM [DATA] [OPTIONS]
122+
```
123+
124+
| Option | Default | Description |
125+
| --- | --- | --- |
126+
| `-k, --partition-key` | *(auto-generated)* | Explicit partition key |
127+
| `-p, --processor` | `json` | Record processor: `json`, `string` |
128+
| `--create` | off | Create the stream if it does not exist |
129+
130+
**Input modes:**
131+
- **Argument**: `async-kinesis put my-stream '{"key": "value"}'` — single record
132+
- **Stdin**: `cat data.jsonl | async-kinesis put my-stream` — one record per line
133+
134+
When reading from a TTY without a pipe, a hint is printed to stderr.
135+
136+
**Examples:**
137+
138+
```bash
139+
# Single JSON record
140+
async-kinesis put my-stream '{"event": "page_view", "url": "/home"}'
141+
142+
# With partition key
143+
async-kinesis put my-stream '{"user": 123}' -k user-123
144+
145+
# Create stream if needed
146+
async-kinesis put --create new-stream '{"first": "record"}'
147+
148+
# Pipe JSONL file
149+
cat events.jsonl | async-kinesis put my-stream
150+
151+
# Generate records
152+
seq 5 | jq -c '{n: .}' | async-kinesis put my-stream
153+
154+
# String processor (no JSON parsing)
155+
async-kinesis put my-stream "plain text message" -p string
156+
```
157+
158+
## LocalStack / Kinesalite Usage
159+
160+
```bash
161+
export ENDPOINT_URL=http://localhost:4566 # LocalStack
162+
export AWS_DEFAULT_REGION=ap-southeast-2
163+
export AWS_ACCESS_KEY_ID=test
164+
export AWS_SECRET_ACCESS_KEY=test
165+
166+
async-kinesis list
167+
async-kinesis put --create test-stream '{"hello": "world"}'
168+
async-kinesis tail test-stream -i TRIM_HORIZON -n 1
169+
```
170+
171+
Or pass the endpoint directly:
172+
173+
```bash
174+
async-kinesis --endpoint-url http://localhost:4567 list
175+
```

kinesis/cli/__init__.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import asyncio
2+
import functools
3+
import json
4+
import logging
5+
import sys
6+
7+
import click
8+
9+
from kinesis import exceptions
10+
11+
12+
def async_command(f):
13+
"""Decorator that wraps an async click command with asyncio.run() and common error handling."""
14+
15+
@functools.wraps(f)
16+
def wrapper(*args, **kwargs):
17+
try:
18+
return asyncio.run(f(*args, **kwargs))
19+
except KeyboardInterrupt:
20+
sys.exit(130)
21+
except exceptions.StreamDoesNotExist as e:
22+
click.echo(f"Error: {e}", err=True)
23+
sys.exit(1)
24+
except json.JSONDecodeError as e:
25+
click.echo(f"Error: invalid JSON: {e}", err=True)
26+
sys.exit(1)
27+
28+
return wrapper
29+
30+
31+
def format_table(headers, rows):
32+
"""Format a list of rows as a padded text table."""
33+
if not rows:
34+
return ""
35+
36+
# Calculate column widths
37+
widths = [len(h) for h in headers]
38+
for row in rows:
39+
for i, cell in enumerate(row):
40+
widths[i] = max(widths[i], len(str(cell)))
41+
42+
# Build format string
43+
fmt = " ".join(f"{{:<{w}}}" for w in widths)
44+
lines = [fmt.format(*headers)]
45+
lines.append(" ".join("-" * w for w in widths))
46+
for row in rows:
47+
lines.append(fmt.format(*[str(c) for c in row]))
48+
49+
return "\n".join(lines)
50+
51+
52+
def format_kv(pairs):
53+
"""Format key-value pairs with aligned values."""
54+
if not pairs:
55+
return ""
56+
max_key = max(len(k) for k, _ in pairs)
57+
lines = []
58+
for key, value in pairs:
59+
lines.append(f" {key:<{max_key}} {value}")
60+
return "\n".join(lines)
61+
62+
63+
@click.group()
64+
@click.option("--endpoint-url", envvar="ENDPOINT_URL", default=None, help="Kinesis endpoint URL.")
65+
@click.option("--region", envvar="AWS_DEFAULT_REGION", default=None, help="AWS region name.")
66+
@click.option("-v", "--verbose", is_flag=True, help="Enable verbose (debug) logging.")
67+
@click.version_option(package_name="async-kinesis")
68+
@click.pass_context
69+
def main(ctx, endpoint_url, region, verbose):
70+
"""async-kinesis CLI — interact with Amazon Kinesis streams."""
71+
ctx.ensure_object(dict)
72+
ctx.obj["endpoint_url"] = endpoint_url
73+
ctx.obj["region"] = region
74+
75+
level = logging.DEBUG if verbose else logging.WARNING
76+
logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
77+
78+
79+
# Import commands to register them with the group
80+
from kinesis.cli.stream import describe, list_streams, put, tail # noqa: E402
81+
82+
main.add_command(describe)
83+
main.add_command(list_streams, "list")
84+
main.add_command(tail)
85+
main.add_command(put)

0 commit comments

Comments
 (0)