Skip to content

Commit c0366eb

Browse files
authored
API: Kafka v2 source and destination connectors (#339)
1 parent ca9d4af commit c0366eb

File tree

8 files changed

+123
-7
lines changed

8 files changed

+123
-7
lines changed

api-reference/ingest/destination-connector/kafka.mdx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ import SharedAPIKeyURL from '/snippets/general-shared-text/api-key-url.mdx';
1515
Now call the Unstructured CLI or Python SDK. The source connector can be any of the ones supported. This example uses the local source connector:
1616

1717
import KafkaAPISh from '/snippets/destination_connectors/kafka.sh.mdx';
18+
import KafkaAPIPyV2 from '/snippets/destination_connectors/kafka.v2.py.mdx';
1819
import KafkaAPIPyV1 from '/snippets/destination_connectors/kafka.v1.py.mdx';
1920

2021
<CodeGroup>
2122
<KafkaAPISh />
23+
<KafkaAPIPyV2 />
2224
<KafkaAPIPyV1 />
2325
</CodeGroup>

api-reference/ingest/source-connectors/kafka.mdx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ import SharedAPIKeyURL from '/snippets/general-shared-text/api-key-url.mdx';
1515
Now call the Unstructured Ingest CLI or the Unstructured Ingest Python library. The destination connector can be any of the ones supported. This example uses the local destination connector:
1616

1717
import KafkaAPISh from '/snippets/source_connectors/kafka.sh.mdx';
18+
import KafkaAPIPyV2 from '/snippets/source_connectors/kafka.v2.py.mdx';
1819
import KafkaAPIPyV1 from '/snippets/source_connectors/kafka.v1.py.mdx';
1920

2021
<CodeGroup>
2122
<KafkaAPISh />
23+
<KafkaAPIPyV2 />
2224
<KafkaAPIPyV1 />
2325
</CodeGroup>

open-source/ingest/destination-connectors/kafka.mdx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ Now call the Unstructured CLI or Python. The source connector can be any of the
1515
This example sends files to Unstructured API services for processing by default. To process files locally instead, see the instructions at the end of this page.
1616

1717
import KafkaAPISh from '/snippets/destination_connectors/kafka.sh.mdx';
18+
import KafkaAPIPyV2 from '/snippets/destination_connectors/kafka.v2.py.mdx';
1819
import KafkaAPIPyV1 from '/snippets/destination_connectors/kafka.v1.py.mdx';
1920

2021
<CodeGroup>
2122
<KafkaAPISh />
23+
<KafkaAPIPyV2 />
2224
<KafkaAPIPyV1 />
2325
</CodeGroup>
2426

open-source/ingest/source-connectors/kafka.mdx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ Now call the Unstructured CLI or Python. The destination connector can be any of
1515
This example sends data to Unstructured API services for processing by default. To process data locally instead, see the instructions at the end of this page.
1616

1717
import KafkaSh from '/snippets/source_connectors/kafka.sh.mdx';
18+
import KafkaPyV2 from '/snippets/source_connectors/kafka.v2.py.mdx';
1819
import KafkaPyV1 from '/snippets/source_connectors/kafka.v1.py.mdx';
1920

2021
<CodeGroup>
2122
<KafkaSh />
23+
<KafkaPyV2 />
2224
<KafkaPyV1 />
2325
</CodeGroup>
2426

snippets/destination_connectors/kafka.sh.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ unstructured-ingest \
2222
--topic $KAFKA_TOPIC \
2323
--kafka-api-key $KAFKA_API_KEY \
2424
--secret $KAFKA_API_KEY \
25-
--confluent false \
25+
--confluent true \
2626
--num-messages-to-consume 1 \
2727
--timeout 1.0
2828
```
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
```python Python Ingest v2
2+
import os
3+
4+
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
5+
from unstructured_ingest.v2.interfaces import ProcessorConfig
6+
7+
from unstructured_ingest.v2.processes.connectors.local import (
8+
LocalIndexerConfig,
9+
LocalDownloaderConfig,
10+
LocalConnectionConfig
11+
)
12+
13+
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
14+
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
15+
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
16+
17+
from unstructured_ingest.v2.processes.connectors.kafka.cloud import (
18+
CloudKafkaConnectionConfig,
19+
CloudKafkaAccessConfig,
20+
CloudKafkaUploaderConfig
21+
)
22+
23+
# Chunking and embedding are optional.
24+
25+
if __name__ == "__main__":
26+
Pipeline.from_configs(
27+
context=ProcessorConfig(),
28+
indexer_config=LocalIndexerConfig(input_path=os.getenv("LOCAL_FILE_INPUT_DIR")),
29+
downloader_config=LocalDownloaderConfig(),
30+
source_connection_config=LocalConnectionConfig(),
31+
partitioner_config=PartitionerConfig(
32+
partition_by_api=True,
33+
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
34+
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
35+
additional_partition_args={
36+
"split_pdf_page": True,
37+
"split_pdf_allow_failed": True,
38+
"split_pdf_concurrency_level": 15
39+
}
40+
),
41+
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
42+
embedder_config=EmbedderConfig(embedding_provider="huggingface"),
43+
destination_connection_config=CloudKafkaConnectionConfig(
44+
access_config=CloudKafkaAccessConfig(
45+
kafka_api_key=os.getenv("KAFKA_API_KEY"),
46+
secret=os.getenv("KAFKA_SECRET")
47+
),
48+
bootstrap_server=os.getenv("KAFKA_BOOTSTRAP_SERVER"),
49+
port=os.getenv("KAFKA_PORT")
50+
),
51+
uploader_config=CloudKafkaUploaderConfig(
52+
batch_size=100,
53+
topic=os.getenv("KAFKA_TOPIC"),
54+
timeout=10
55+
)
56+
).run()
57+
```

snippets/source_connectors/kafka.sh.mdx

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,15 @@ unstructured-ingest \
1010
--topic $KAFKA_TOPIC \
1111
--kafka-api-key $KAFKA_API_KEY \
1212
--secret $KAFKA_API_KEY \
13-
--confluent false \
13+
--confluent true \
14+
--batch-size 100 \
1415
--num-messages-to-consume 1 \
15-
--timeout 1.0
16+
--timeout 1.0 \
1617
--output-dir $LOCAL_FILE_OUTPUT_DIR \
17-
--chunk-elements \
18+
--chunking-strategy by_title \
1819
--embedding-provider huggingface \
19-
--num-processes 2 \
20-
--verbose \
2120
--partition-by-api \
22-
--api-key $UNSTRUCTURED_API_KEY\
21+
--api-key $UNSTRUCTURED_API_KEY \
2322
--partition-endpoint $UNSTRUCTURED_API_URL \
2423
--strategy hi_res \
2524
--additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}"
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
```python Python Ingest v2
2+
import os
3+
4+
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
5+
from unstructured_ingest.v2.interfaces import ProcessorConfig
6+
7+
from unstructured_ingest.v2.processes.connectors.kafka.cloud import (
8+
CloudKafkaIndexerConfig,
9+
CloudKafkaDownloaderConfig,
10+
CloudKafkaConnectionConfig,
11+
CloudKafkaAccessConfig
12+
)
13+
14+
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
15+
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
16+
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
17+
from unstructured_ingest.v2.processes.connectors.local import LocalUploaderConfig
18+
19+
# Chunking and embedding are optional.
20+
21+
if __name__ == "__main__":
22+
Pipeline.from_configs(
23+
context=ProcessorConfig(),
24+
indexer_config=CloudKafkaIndexerConfig(
25+
topic=os.getenv("KAFKA_TOPIC"),
26+
num_messages_to_consume=100,
27+
timeout=1
28+
),
29+
downloader_config=CloudKafkaDownloaderConfig(download_dir=os.getenv("LOCAL_FILE_DOWNLOAD_DIR")),
30+
source_connection_config=CloudKafkaConnectionConfig(
31+
access_config=CloudKafkaAccessConfig(
32+
kafka_api_key=os.getenv("KAFKA_API_KEY"),
33+
secret=os.getenv("KAFKA_SECRET")
34+
),
35+
bootstrap_server=os.getenv("KAFKA_BOOTSTRAP_SERVER"),
36+
port=os.getenv("KAFKA_PORT")
37+
),
38+
partitioner_config=PartitionerConfig(
39+
partition_by_api=True,
40+
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
41+
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
42+
additional_partition_args={
43+
"split_pdf_page": True,
44+
"split_pdf_allow_failed": True,
45+
"split_pdf_concurrency_level": 15
46+
}
47+
),
48+
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
49+
embedder_config=EmbedderConfig(embedding_provider="huggingface"),
50+
uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
51+
).run()
52+
```

0 commit comments

Comments
 (0)