Skip to content

Commit a662e87

Browse files
authored
Merge pull request #3 from shafa-dev/kafka-rest-proxy-fix
add Confluent REST Proxy api
2 parents 18003c7 + 4cb08d4 commit a662e87

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

kafka_bridge_client/consumer.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,15 @@ def __init__(
5757
sleep_interval_seconds: int = 2,
5858
client_timeout_seconds: int = 15,
5959
headers: t.Dict[str, t.Any] = None,
60-
proxy: t.Literal['strimzi', 'confluent'] = 'strimzi'
60+
proxy: t.Literal['strimzi', 'confluent'] = 'strimzi',
61+
content_type: str = 'application/vnd.kafka.v2+json'
6162
) -> None:
63+
self._content_type = content_type
6264
self._group_id = group_id
6365
self._consumer_name = consumer_name
6466
self._topics = topics
6567
self._offsets: t.Dict[str, t.Dict[str, t.Any]] = {}
68+
self._proxy = proxy
6669
if proxy == 'strimzi':
6770
self._config = {
6871
'auto.offset.reset': auto_offset_reset,
@@ -85,6 +88,10 @@ def __init__(
8588
self._client_timeout_seconds = client_timeout_seconds
8689
self._headers = headers or {}
8790

91+
@property
92+
def is_strimzi_proxy(self) -> bool:
93+
return self._proxy == 'strimzi'
94+
8895
async def _request(
8996
self,
9097
method: str,
@@ -98,6 +105,7 @@ async def _request(
98105
data = data or {}
99106
_headers = headers or {}
100107
_headers.update(self._headers)
108+
_headers['Content-Type'] = self._content_type
101109
url = urljoin(self._bootstrap_server, path)
102110

103111
async with aiohttp.ClientSession(
@@ -146,7 +154,6 @@ async def _create_consumer_instance(self) -> None:
146154
'POST',
147155
self.CONSUMER_PATH.format(group_id=self._group_id),
148156
data=self._config,
149-
headers={'Content-Type': 'application/vnd.kafka.v2+json'},
150157
)
151158
if response.status != 200:
152159
raise exceptions.KafkaBridgeError(
@@ -180,7 +187,6 @@ async def _subscribe(self) -> None:
180187
name=self._consumer_name,
181188
),
182189
data={'topics': self._topics},
183-
headers={'Content-Type': 'application/vnd.kafka.v2+json'},
184190
)
185191

186192
if response.status != 204:
@@ -227,9 +233,9 @@ async def commit(self) -> None:
227233
name=self._consumer_name,
228234
),
229235
data={'offsets': list(self._offsets.values())},
230-
headers={'Content-Type': 'application/vnd.kafka.v2+json'},
231236
)
232-
if response.status != 204:
237+
success_status = 204 if self.is_strimzi_proxy else 200
238+
if response.status != success_status:
233239
raise exceptions.KafkaBridgeError(
234240
f'status: {response.status}, text: {response.content!r}',
235241
)

0 commit comments

Comments
 (0)