|
2 | 2 |
|
3 | 3 | [](https://github.com/python/black) [](https://badge.fury.io/py/async-kinesis) [](https://www.python.org/downloads/release/python-3100/) [](https://www.python.org/downloads/release/python-3110/) [](https://www.python.org/downloads/release/python-3120/) |
4 | 4 |
|
5 | | -``` |
| 5 | +High-performance async Python library for AWS Kinesis with production-ready resharding support. |
| 6 | + |
| 7 | +```bash |
6 | 8 | pip install async-kinesis |
7 | 9 | ``` |
8 | 10 |
|
9 | | -## Features |
10 | | - |
11 | | -- uses queues for both producer and consumer |
12 | | - - producer flushes with put_records() if has enough to flush or after "buffer_time" reached |
13 | | - - consumer iterates over msg queue independent of shard readers |
14 | | -- Configurable to handle Sharding limits but will throttle/retry if required |
15 | | - - ie multiple independent clients are saturating the Shards |
16 | | -- **Dynamic shard handling and resharding support** |
17 | | - - automatically discovers new shards during Kinesis stream resharding |
18 | | - - graceful handling of closed shards when `NextShardIterator` is null |
19 | | - - robust error recovery for expired shard iterators |
20 | | - - shard status monitoring and operational visibility |
21 | | -- Checkpointing with heartbeats |
22 | | - - deadlock + reallocation of shards if checkpoint fails to heartbeat within "session_timeout" |
23 | | -- processors (aggregator + serializer) |
24 | | - - json line delimited, msgpack |
25 | | -- Address Kinesis streams by name or [ARN] |
26 | | - |
27 | | -See [docs/design](./docs/DESIGN.md) for more details. |
28 | | -See [docs/yetanother](docs/YETANOTHER.md) as to why reinvent the wheel. |
| 11 | +## Quick Start |
| 12 | + |
| 13 | +**Producer:** |
| 14 | +```python |
| 15 | +from kinesis import Producer |
| 16 | + |
| 17 | +async with Producer(stream_name="my-stream") as producer: |
| 18 | + await producer.put({"message": "hello world"}) |
| 19 | +``` |
| 20 | + |
| 21 | +**Consumer:** |
| 22 | +```python |
| 23 | +from kinesis import Consumer |
| 24 | + |
| 25 | +async with Consumer(stream_name="my-stream") as consumer: |
| 26 | + async for message in consumer: |
| 27 | + print(message) |
| 28 | +``` |
| 29 | + |
| 30 | +## Key Features |
| 31 | + |
| 32 | +✅ **Production-Ready Resharding**: Automatic shard discovery and topology management |
| 33 | +✅ **Async/Await Native**: Built for modern Python async patterns |
| 34 | +✅ **High Performance**: Queue-based architecture with configurable batching |
| 35 | +✅ **AWS Best Practices**: Parent-child shard ordering and proper error handling |
| 36 | +✅ **Stream Addressing**: Support for both stream names and ARNs |
| 37 | +✅ **Multi-Consumer Support**: Redis-based checkpointing with heartbeats |
| 38 | +✅ **Flexible Processing**: Pluggable serialization (JSON, MessagePack, KPL) |
| 39 | +✅ **Operational Visibility**: Rich monitoring APIs for production debugging |
| 40 | + |
| 41 | +### Resharding Support Highlights |
| 42 | + |
| 43 | +Unlike basic Kinesis libraries, async-kinesis provides enterprise-grade resharding capabilities: |
| 44 | + |
| 45 | +- **Automatic discovery** of new shards during resharding operations |
| 46 | +- **Parent-child ordering** enforcement following AWS best practices |
| 47 | +- **Graceful handling** of closed shards and iterator expiration |
| 48 | +- **Real-time monitoring** with detailed topology and status reporting |
| 49 | +- **Seamless coordination** between multiple consumer instances |
| 50 | + |
| 51 | +📖 **[Architecture Details](./docs/DESIGN.md)** | **[Why Another Library?](docs/YETANOTHER.md)** |
29 | 52 |
|
30 | 53 | ## Environment Variables |
31 | 54 |
|
@@ -241,7 +264,7 @@ Refer https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable- |
241 | 264 | | JsonListProcessor | ListAggregator | JsonSerializer | Multiple JSON record returned by list | |
242 | 265 | | MsgpackProcessor | NetstringAggregator | MsgpackSerializer | Multiple Msgpack record framed with Netstring Protocol (https://en.wikipedia.org/wiki/Netstring) | |
243 | 266 | | KPLJsonProcessor | KPLAggregator | JsonSerializer | Multiple JSON record in a KPL Aggregated Record (https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md) | |
244 | | -| KPLStringProcessor | KPLAggregator | StringSerializer | Multiple String record in a KPL Aggregated Record (https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md) | |
| 267 | +| KPLStringProcessor | KPLAggregator | StringSerializer | Multiple String record in a KPL Aggregated Record (https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md) | |
245 | 268 |
|
246 | 269 | Note you can define your own processor easily as it's simply a class inheriting the Aggregator + Serializer. |
247 | 270 |
|
@@ -315,41 +338,62 @@ Choose the optimal processor based on your use case: |
315 | 338 | **Performance Testing:** Use the benchmark tool with different `--record-size-kb` and `--processors` options to determine the best processor for your specific data patterns. |
316 | 339 |
|
317 | 340 |
|
318 | | -## Unit Testing |
| 341 | +## Development & Testing |
319 | 342 |
|
320 | | -Uses https://github.com/mhart/kinesalite for local testing. |
| 343 | +### Local Testing |
321 | 344 |
|
322 | | -Run tests via docker |
| 345 | +Uses LocalStack for integration testing: |
323 | 346 |
|
324 | | -``` |
| 347 | +```bash |
| 348 | +# Run full test suite via Docker |
325 | 349 | docker-compose up --abort-on-container-exit --exit-code-from test |
326 | | -``` |
327 | 350 |
|
328 | | -For local testing use |
329 | | - |
330 | | -``` |
| 351 | +# Local development setup |
331 | 352 | docker-compose up kinesis redis |
| 353 | +pip install -r test-requirements.txt |
| 354 | +pytest |
332 | 355 | ``` |
333 | 356 |
|
334 | | -then within your virtualenv |
| 357 | +### Code Quality |
335 | 358 |
|
336 | | -``` |
337 | | -nosetests |
| 359 | +This project uses automated code formatting and linting: |
| 360 | + |
| 361 | +```bash |
| 362 | +# Install development tools |
| 363 | +pip install -r test-requirements.txt |
| 364 | + |
| 365 | +# Run formatting and linting |
| 366 | +black . |
| 367 | +isort . |
| 368 | +flake8 . |
338 | 369 |
|
339 | | -# or run individual test |
340 | | -nosetests tests.py:KinesisTests.test_create_stream_shard_limit_exceeded |
| 370 | +# Or use pre-commit hooks |
| 371 | +pre-commit install |
| 372 | +pre-commit run --all-files |
341 | 373 | ``` |
342 | 374 |
|
343 | | -Note there are a few test cases using the *actual* AWS Kinesis (AWSKinesisTests) |
344 | | -These require setting an env in order to run |
| 375 | +### AWS Integration Tests |
345 | 376 |
|
346 | | -Create an ".env" file with |
| 377 | +Some tests require actual AWS Kinesis. Create `.env` file: |
347 | 378 |
|
348 | 379 | ``` |
349 | 380 | TESTING_USE_AWS_KINESIS=1 |
350 | 381 | ``` |
351 | 382 |
|
352 | | -Note you can ignore these tests if submitting PR unless core batching/processing behaviour is being changed. |
| 383 | +### Resharding Tests |
| 384 | + |
| 385 | +Comprehensive resharding test suite available: |
| 386 | + |
| 387 | +```bash |
| 388 | +# Unit tests (no AWS required) |
| 389 | +python tests/resharding/test_resharding_simple.py |
| 390 | + |
| 391 | +# Integration tests (requires LocalStack) |
| 392 | +python tests/resharding/test_resharding_integration.py |
| 393 | + |
| 394 | +# Production testing (requires AWS) |
| 395 | +python tests/resharding/resharding_test.py --scenario scale-up-small |
| 396 | +``` |
353 | 397 |
|
354 | 398 |
|
355 | 399 | [ARN]: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html |
|
0 commit comments