Skip to content

Commit 33ea2de

Browse files
fix/Kafka destination doesn't send messages (#298)
- the credentials weren't decoded before using (which resulted in OK in precheck, and silent fail when uploading) - checking for existence of topic, in uploader precheck - when pushing messages, iterate while the producer queue is not empty - fix not reacting to errors when pushing messages - kafka lib is logging through unstructured logger (should solve elasticsearch not showing correct loglevel) - refactored the kafka tests to separate cloud and local files - some cleaning of code
1 parent f4284e9 commit 33ea2de

File tree

9 files changed

+449
-310
lines changed

9 files changed

+449
-310
lines changed

.github/workflows/e2e.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ jobs:
160160
MONGODB_DATABASE: ${{ secrets.MONGODB_DATABASE_NAME }}
161161
QDRANT_API_KEY: ${{ secrets.QDRANT_API_KEY }}
162162
QDRANT_SERVER_URL: ${{ secrets.QDRANT_SERVER_URL }}
163+
KAFKA_API_KEY: ${{ secrets.KAFKA_API_KEY }}
164+
KAFKA_SECRET: ${{ secrets.KAFKA_SECRET }}
165+
KAFKA_BOOTSTRAP_SERVER: ${{ secrets.KAFKA_BOOTSTRAP_SERVER }}
163166
run : |
164167
source .venv/bin/activate
165168
make install-test

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
## 0.3.12-dev0
2+
3+
### Fixes
4+
5+
* **Fix Kafka destination connection problems**
6+
7+
### Enhancements
8+
9+
* **Kafka destination connector checks for existence of topic**
10+
111
## 0.3.11
212

313
### Enhancements
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import json
2+
import time
3+
4+
from confluent_kafka import Consumer, KafkaError, KafkaException
5+
from confluent_kafka.admin import AdminClient
6+
7+
from unstructured_ingest.v2.logger import logger
8+
9+
TOPIC = "fake-topic"
10+
11+
12+
def get_admin_client() -> AdminClient:
13+
conf = {
14+
"bootstrap.servers": "localhost:29092",
15+
}
16+
return AdminClient(conf)
17+
18+
19+
def wait_for_topic(
20+
topic: str,
21+
retries: int = 10,
22+
interval: int = 2,
23+
exists: bool = True,
24+
admin_client=None,
25+
):
26+
if admin_client is None:
27+
admin_client = get_admin_client()
28+
current_topics = admin_client.list_topics().topics
29+
attempts = 0
30+
while (topic not in current_topics) == exists and attempts < retries:
31+
attempts += 1
32+
logger.info(
33+
"Attempt {}: Waiting for topic {} to {} exist. Current topics: [{}]".format(
34+
attempts, topic, "" if exists else "not", ", ".join(current_topics)
35+
)
36+
)
37+
time.sleep(interval)
38+
current_topics = admin_client.list_topics().topics
39+
logger.info(
40+
"Attempt {} succeeded: Waiting for topic {} to {} exist. Current topics: [{}]".format(
41+
attempts, topic, "" if exists else "not", ", ".join(current_topics)
42+
)
43+
)
44+
45+
if (topic not in current_topics) == exists:
46+
raise TimeoutError(f"Timeout out waiting for topic {topic} to exist")
47+
48+
49+
def get_all_messages(conf: dict, topic: str, max_empty_messages: int = 3) -> list[dict]:
50+
consumer = Consumer(conf)
51+
consumer.subscribe([topic])
52+
messages = []
53+
try:
54+
empty_count = 0
55+
while empty_count < max_empty_messages:
56+
msg = consumer.poll(timeout=5)
57+
if msg is None:
58+
empty_count += 1
59+
continue
60+
if msg.error():
61+
if msg.error().code() == KafkaError._PARTITION_EOF:
62+
break
63+
else:
64+
raise KafkaException(msg.error())
65+
try:
66+
message = json.loads(msg.value().decode("utf8"))
67+
messages.append(message)
68+
finally:
69+
consumer.commit(asynchronous=False)
70+
finally:
71+
consumer.close()
72+
return messages
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import json
2+
import os
3+
import tempfile
4+
from pathlib import Path
5+
6+
import pytest
7+
from confluent_kafka import Producer
8+
from confluent_kafka.admin import AdminClient
9+
from confluent_kafka.cimpl import NewTopic
10+
11+
from test.integration.connectors.kafka.conftest import TOPIC, get_all_messages, wait_for_topic
12+
from test.integration.connectors.utils.constants import DESTINATION_TAG, SOURCE_TAG
13+
from test.integration.connectors.utils.validation.source import (
14+
SourceValidationConfigs,
15+
source_connector_validation,
16+
)
17+
from test.integration.utils import requires_env
18+
from unstructured_ingest.v2.interfaces import FileData, SourceIdentifiers
19+
from unstructured_ingest.v2.logger import logger
20+
from unstructured_ingest.v2.processes.connectors.kafka.cloud import (
21+
CloudKafkaAccessConfig,
22+
CloudKafkaConnectionConfig,
23+
CloudKafkaDownloader,
24+
CloudKafkaDownloaderConfig,
25+
CloudKafkaIndexer,
26+
CloudKafkaIndexerConfig,
27+
CloudKafkaUploader,
28+
CloudKafkaUploaderConfig,
29+
)
30+
from unstructured_ingest.v2.processes.connectors.kafka.local import CONNECTOR_TYPE
31+
32+
33+
@pytest.fixture
34+
def kafka_seed_topic_cloud(request) -> int:
35+
expected_messages: int = request.param
36+
conf = {
37+
"bootstrap.servers": os.environ["KAFKA_BOOTSTRAP_SERVER"],
38+
"sasl.username": os.environ["KAFKA_API_KEY"],
39+
"sasl.password": os.environ["KAFKA_SECRET"],
40+
"sasl.mechanism": "PLAIN",
41+
"security.protocol": "SASL_SSL",
42+
}
43+
admin_client = AdminClient(conf)
44+
try:
45+
res = admin_client.delete_topics([TOPIC], operation_timeout=10)
46+
for topic, f in res.items():
47+
f.result()
48+
logger.info(f"Topic {topic} removed")
49+
wait_for_topic(TOPIC, 5, 1, False, admin_client)
50+
except Exception:
51+
pass
52+
53+
cluster_meta = admin_client.list_topics()
54+
current_topics = [topic for topic in cluster_meta.topics if topic != "__consumer_offsets"]
55+
56+
assert TOPIC not in current_topics, f"Topic {TOPIC} shouldn't exist"
57+
58+
# Kafka Cloud allows to use replication_factor=1 only for Dedicated clusters.
59+
topic_obj = NewTopic(TOPIC, num_partitions=1, replication_factor=3)
60+
61+
res = admin_client.create_topics([topic_obj], operation_timeout=10, validate_only=False)
62+
for topic, f in res.items():
63+
f.result()
64+
65+
producer = Producer(conf)
66+
for i in range(expected_messages):
67+
message = f"This is some text for message {i}"
68+
producer.produce(topic=TOPIC, value=message)
69+
producer.flush(timeout=10)
70+
return expected_messages
71+
72+
73+
@pytest.mark.asyncio
74+
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG)
75+
@requires_env("KAFKA_API_KEY", "KAFKA_SECRET", "KAFKA_BOOTSTRAP_SERVER")
76+
@pytest.mark.parametrize("kafka_seed_topic_cloud", [5], indirect=True)
77+
async def test_kafka_source_cloud(kafka_seed_topic_cloud: int):
78+
"""
79+
Creates topic in cloud, sends 5 simple messages. Downloader should download them.
80+
In order to have this test succeed, you need to create cluster on Confluent Cloud,
81+
and create the API key with admin privileges. By default, user account keys have it.
82+
"""
83+
84+
expected_messages = kafka_seed_topic_cloud
85+
86+
connection_config = CloudKafkaConnectionConfig(
87+
bootstrap_server=os.environ["KAFKA_BOOTSTRAP_SERVER"],
88+
port=9092,
89+
access_config=CloudKafkaAccessConfig(
90+
kafka_api_key=os.environ["KAFKA_API_KEY"],
91+
secret=os.environ["KAFKA_SECRET"],
92+
),
93+
)
94+
95+
with tempfile.TemporaryDirectory() as tempdir:
96+
tempdir_path = Path(tempdir)
97+
download_config = CloudKafkaDownloaderConfig(download_dir=tempdir_path)
98+
indexer = CloudKafkaIndexer(
99+
connection_config=connection_config,
100+
index_config=CloudKafkaIndexerConfig(
101+
topic=TOPIC,
102+
num_messages_to_consume=expected_messages,
103+
),
104+
)
105+
downloader = CloudKafkaDownloader(
106+
connection_config=connection_config, download_config=download_config
107+
)
108+
indexer.precheck()
109+
await source_connector_validation(
110+
indexer=indexer,
111+
downloader=downloader,
112+
configs=SourceValidationConfigs(
113+
test_id="kafka-cloud",
114+
exclude_fields_extend=["connector_type"],
115+
expected_num_files=expected_messages,
116+
validate_downloaded_files=True,
117+
validate_file_data=True,
118+
),
119+
)
120+
121+
122+
@pytest.mark.asyncio
123+
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG)
124+
@requires_env("KAFKA_API_KEY", "KAFKA_SECRET", "KAFKA_BOOTSTRAP_SERVER")
125+
@pytest.mark.parametrize(
126+
"kafka_seed_topic_cloud", [0], indirect=True
127+
) # make it just create topic, without messages
128+
async def test_kafka_destination_cloud(upload_file: Path, kafka_seed_topic_cloud: int):
129+
"""
130+
Creates empty topic in cloud, sends 1 partitioned file using Uploader.
131+
In order to have this test succeed, you need to create cluster on Confluent Cloud.
132+
"""
133+
134+
connection_config = CloudKafkaConnectionConfig(
135+
bootstrap_server=os.environ["KAFKA_BOOTSTRAP_SERVER"],
136+
port=9092,
137+
access_config=CloudKafkaAccessConfig(
138+
kafka_api_key=os.environ["KAFKA_API_KEY"],
139+
secret=os.environ["KAFKA_SECRET"],
140+
),
141+
)
142+
143+
uploader = CloudKafkaUploader(
144+
connection_config=connection_config,
145+
upload_config=CloudKafkaUploaderConfig(topic=TOPIC, batch_size=10),
146+
)
147+
file_data = FileData(
148+
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name),
149+
connector_type=CONNECTOR_TYPE,
150+
identifier="mock file data",
151+
)
152+
uploader.precheck()
153+
154+
if uploader.is_async():
155+
await uploader.run_async(path=upload_file, file_data=file_data)
156+
else:
157+
uploader.run(path=upload_file, file_data=file_data)
158+
159+
conf = {
160+
"bootstrap.servers": os.environ["KAFKA_BOOTSTRAP_SERVER"],
161+
"sasl.username": os.environ["KAFKA_API_KEY"],
162+
"sasl.password": os.environ["KAFKA_SECRET"],
163+
"sasl.mechanism": "PLAIN",
164+
"security.protocol": "SASL_SSL",
165+
"group.id": "default_group_name",
166+
"enable.auto.commit": "false",
167+
"auto.offset.reset": "earliest",
168+
}
169+
170+
all_messages = get_all_messages(conf=conf, topic=TOPIC)
171+
with upload_file.open("r") as upload_fs:
172+
content_to_upload = json.load(upload_fs)
173+
assert len(all_messages) == len(content_to_upload), (
174+
f"expected number of messages ({len(content_to_upload)}) doesn't "
175+
f"match how many messages read off of kafka topic {TOPIC}: {len(all_messages)}"
176+
)
177+
assert all_messages == content_to_upload

0 commit comments

Comments
 (0)