Skip to content

Commit 18003c7

Browse files
authored
Merge pull request #2 from shafa-dev/kafka-rest-proxy
add Confluent REST Proxy api
2 parents e6e638d + d232d5b commit 18003c7

File tree

3 files changed

+34
-10
lines changed

3 files changed

+34
-10
lines changed

README.md

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# kafka-bridge-client
2-
Python async client for [Strimzi Kafka Bridge](https://github.com/strimzi/strimzi-kafka-bridge). Package include consumer only.
2+
Python async client for [Strimzi Kafka Bridge](https://github.com/strimzi/strimzi-kafka-bridge) and [Confluent REST Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html) Package include consumer only.
33

44
[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)
55
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-green.svg)](https://github.com/shafa-dev/kafka-bridge-client/issues)
@@ -11,11 +11,13 @@ pip install kafka-bridge-client
1111
```
1212

1313
## Usage
14+
By default client use [Strimzi Kafka Bridge](https://github.com/strimzi/strimzi-kafka-bridge) API
1415
```python
1516
from kafka_bridge_client import KafkaBridgeConsumer
1617

18+
# Strimzi Kafka Bridge
1719

18-
consumer = KafkaBridgeConsumer(
20+
consumer1 = KafkaBridgeConsumer(
1921
'topic1',
2022
'topic2',
2123
group_id='my-group,
@@ -25,7 +27,19 @@ consumer = KafkaBridgeConsumer(
2527
consumer_name='consumer-name',
2628
)
2729

28-
async for rec in consumer.get_records():
30+
# Confluent REST Proxy
31+
consumer2 = KafkaBridgeConsumer(
32+
'topic1',
33+
'topic2',
34+
group_id='my-group,
35+
auto_offset_reset='earliest',
36+
enable_auto_commit=False,
37+
bootstrap_server='your-kafka-bridge-url',
38+
consumer_name='consumer-name',
39+
proxy='confluent'
40+
)
41+
42+
async for rec in consumer1.get_records():
2943
print(rec['value'])
3044
await consumer.commit()
3145
```

kafka_bridge_client/consumer.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,27 @@ def __init__(
5757
sleep_interval_seconds: int = 2,
5858
client_timeout_seconds: int = 15,
5959
headers: t.Dict[str, t.Any] = None,
60+
proxy: t.Literal['strimzi', 'confluent'] = 'strimzi'
6061
) -> None:
6162
self._group_id = group_id
6263
self._consumer_name = consumer_name
6364
self._topics = topics
6465
self._offsets: t.Dict[str, t.Dict[str, t.Any]] = {}
65-
self._config = {
66-
'auto.offset.reset': auto_offset_reset,
67-
'enable.auto.commit': enable_auto_commit,
68-
'format': 'binary',
69-
'name': consumer_name,
70-
}
66+
if proxy == 'strimzi':
67+
self._config = {
68+
'auto.offset.reset': auto_offset_reset,
69+
'enable.auto.commit': enable_auto_commit,
70+
'format': 'binary',
71+
'name': consumer_name,
72+
}
73+
elif proxy == 'confluent':
74+
self._config = {
75+
'auto.offset.reset': auto_offset_reset,
76+
'auto.commit.enable': 'true' if enable_auto_commit else 'false',
77+
'format': 'binary',
78+
'name': consumer_name,
79+
}
80+
7181
# Delay between fetching of records if
7282
# the previous fetch return zero records
7383
self._sleep_interval_seconds = sleep_interval_seconds

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "kafka-bridge-client"
3-
version = "0.1.3"
3+
version = "0.1.4"
44
description = "Python client for Strimzi Kafka Bridge"
55
authors = ["Bogdan Zaseka <zaseka.bogdan@gmail.com>"]
66
license = "MIT"

0 commit comments

Comments
 (0)