Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions python/destinations/s3-file/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# AWS S3 File Destination

[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/s3-file) demonstrates how to consume data from a Kafka topic and write it to an AWS S3 bucket.

## How to run

Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector.

Clicking `Set up connector` allows you to enter your connection details and runtime parameters.

Then either:
* click `Test connection & deploy` to deploy the pre-built and configured container into Quix.

* or click `Customise connector` to inspect or alter the code before deployment.

## Environment Variables

The connector uses the following environment variables (which generally correspond to the
[`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) parameter names):

### Required
- `input`: The input Kafka topic
- `S3_BUCKET`: The S3 bucket to use.
- `AWS_ENDPOINT_URL`: The URL to your S3 instance.
- `AWS_REGION_NAME`: The region of your S3 bucket.
- `AWS_SECRET_ACCESS_KEY`: Your AWS secret.
- `AWS_ACCESS_KEY_ID`: Your AWS Access Key.

### Optional
Unless explicitly defined, these are optional, or generally set to the [`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) defaults.

- `S3_BUCKET_DIRECTORY`: An optional path within the S3 bucket to use.
**Default**: "" (root)
- `FILE_FORMAT`: The file format to publish data as; options: \[parquet, json\].
**Default**: "parquet"


## Requirements / Prerequisites

You will need the appropriate AWS features and access to use this connector.

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open Source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation.
23 changes: 23 additions & 0 deletions python/destinations/s3-file/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM python:3.11.1-slim-buster

# Set environment variables to non-interactive and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8

# Set the working directory inside the container
WORKDIR /app

# Copy only the requirements file(s) to leverage Docker cache
# Assuming all requirements files are in the root or subdirectories
COPY ./requirements.txt ./

# Install dependencies
# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the application
COPY . .

# Set the command to run your application
ENTRYPOINT ["python3", "main.py"]
Binary file added python/destinations/s3-file/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
77 changes: 77 additions & 0 deletions python/destinations/s3-file/library.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"libraryItemId": "s3-file",
"name": "AWS S3 File Sink",
"language": "Python",
"tags": {
"Pipeline Stage": ["Destination"],
"Type": ["Connectors"],
"Category": ["File Store"]
},
"shortDescription": "Consume data from a Kafka topic and write it to an AWS S3 bucket path.",
"DefaultFile": "main.py",
"EntryPoint": "dockerfile",
"RunEntryPoint": "main.py",
"IconFile": "icon.png",
"Variables": [
{
"Name": "input",
"Type": "EnvironmentVariable",
"InputType": "InputTopic",
"Description": "The input Kafka topic.",
"DefaultValue": "input",
"Required": true
},
{
"Name": "S3_BUCKET",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The S3 bucket to use.",
"Required": true
},
{
"Name": "S3_BUCKET_DIRECTORY",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "An optional path within the S3 bucket to use, else uses root.",
"DefaultValue": "",
"Required": false
},
{
"Name": "AWS_REGION_NAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The region of your S3 bucket.",
"Required": true
},
{
"Name": "AWS_SECRET_ACCESS_KEY",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "Your AWS secret.",
"Required": true
},
{
"Name": "AWS_ACCESS_KEY_ID",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "Your AWS Access Key.",
"Required": true
},
{
"Name": "FILE_FORMAT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The file format to publish data as; options: [parquet, json].",
"DefaultValue": "parquet",
"Required": false
}
],
"DeploySettings": {
"DeploymentType": "Service",
"CpuMillicores": 200,
"MemoryInMb": 500,
"Replicas": 1,
"PublicAccess": false,
"ValidateConnection": false
}
}
37 changes: 37 additions & 0 deletions python/destinations/s3-file/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import get_args
import os

from quixstreams import Application
from quixstreams.sinks.community.file.s3 import S3FileSink
from quixstreams.sinks.community.file.formats import FormatName


def get_file_format() -> FormatName:
valid_formats = get_args(FormatName)
if (file_format := os.getenv("FILE_FORMAT", "parquet")) not in valid_formats:
raise ValueError(
f"`FILE_FORMAT` must be one of {valid_formats}; got {file_format}"
)
return file_format


app = Application(
consumer_group="s3-file-destination",
auto_offset_reset="earliest",
commit_interval=5
)

s3_file_sink = S3FileSink(
bucket=os.environ["S3_BUCKET"],
directory=os.getenv("S3_BUCKET_DIRECTORY", ""),
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
region_name=os.environ["AWS_REGION_NAME"],
format=get_file_format(),
)

sdf = app.dataframe(topic=app.topic(os.environ["input"])).sink(s3_file_sink)


if __name__ == "__main__":
app.run()
3 changes: 3 additions & 0 deletions python/destinations/s3-file/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# TODO: finalize version
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to finalize this version once the related Quix Streams PR is merged and released.

quixstreams[aws]==3.22.0
python-dotenv