diff --git a/python/destinations/s3-file/README.md b/python/destinations/s3-file/README.md new file mode 100644 index 00000000..338a67ce --- /dev/null +++ b/python/destinations/s3-file/README.md @@ -0,0 +1,48 @@ +# AWS S3 File Destination + +[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/s3-file) demonstrates how to consume data from a Kafka topic and write it to an AWS S3 bucket. + +## How to run + +Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector. + +Clicking `Set up connector` allows you to enter your connection details and runtime parameters. + +Then either: +* click `Test connection & deploy` to deploy the pre-built and configured container into Quix. + +* or click `Customise connector` to inspect or alter the code before deployment. + +## Environment Variables + +The connector uses the following environment variables (which generally correspond to the +[`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) parameter names): + +### Required +- `input`: The input Kafka topic +- `S3_BUCKET`: The S3 bucket to use. +- `AWS_ENDPOINT_URL`: The URL to your S3 instance. +- `AWS_REGION_NAME`: The region of your S3 bucket. +- `AWS_SECRET_ACCESS_KEY`: Your AWS secret. +- `AWS_ACCESS_KEY_ID`: Your AWS Access Key. + +### Optional +Unless explicitly defined, these are optional, or generally set to the [`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) defaults. + +- `S3_BUCKET_DIRECTORY`: An optional path within the S3 bucket to use. + **Default**: "" (root) +- `FILE_FORMAT`: The file format to publish data as; options: \[parquet, json\]. + **Default**: "parquet" + + +## Requirements / Prerequisites + +You will need the appropriate AWS features and access to use this connector. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open Source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation. diff --git a/python/destinations/s3-file/dockerfile b/python/destinations/s3-file/dockerfile new file mode 100644 index 00000000..d3cd4959 --- /dev/null +++ b/python/destinations/s3-file/dockerfile @@ -0,0 +1,23 @@ +FROM python:3.11.1-slim-buster + +# Set environment variables to non-interactive and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 + +# Set the working directory inside the container +WORKDIR /app + +# Copy only the requirements file(s) to leverage Docker cache +# Assuming all requirements files are in the root or subdirectories +COPY ./requirements.txt ./ + +# Install dependencies +# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application +COPY . . + +# Set the command to run your application +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/destinations/s3-file/icon.png b/python/destinations/s3-file/icon.png new file mode 100644 index 00000000..2940139d Binary files /dev/null and b/python/destinations/s3-file/icon.png differ diff --git a/python/destinations/s3-file/library.json b/python/destinations/s3-file/library.json new file mode 100644 index 00000000..c6a9d005 --- /dev/null +++ b/python/destinations/s3-file/library.json @@ -0,0 +1,77 @@ +{ + "libraryItemId": "s3-file", + "name": "AWS S3 File Sink", + "language": "Python", + "tags": { + "Pipeline Stage": ["Destination"], + "Type": ["Connectors"], + "Category": ["File Store"] + }, + "shortDescription": "Consume data from a Kafka topic and write it to an AWS S3 bucket path.", + "DefaultFile": "main.py", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "IconFile": "icon.png", + "Variables": [ + { + "Name": "input", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "The input Kafka topic.", + "DefaultValue": "input", + "Required": true + }, + { + "Name": "S3_BUCKET", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The S3 bucket to use.", + "Required": true + }, + { + "Name": "S3_BUCKET_DIRECTORY", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "An optional path within the S3 bucket to use, else uses root.", + "DefaultValue": "", + "Required": false + }, + { + "Name": "AWS_REGION_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The region of your S3 bucket.", + "Required": true + }, + { + "Name": "AWS_SECRET_ACCESS_KEY", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "Your AWS secret.", + "Required": true + }, + { + "Name": "AWS_ACCESS_KEY_ID", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "Your AWS Access Key.", + "Required": true + }, + { + "Name": "FILE_FORMAT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The file format to publish data as; options: [parquet, json].", + "DefaultValue": "parquet", + "Required": false + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 500, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": false + } +} \ No newline at end of file diff --git a/python/destinations/s3-file/main.py b/python/destinations/s3-file/main.py new file mode 100644 index 00000000..cddb44c5 --- /dev/null +++ b/python/destinations/s3-file/main.py @@ -0,0 +1,37 @@ +from typing import get_args +import os + +from quixstreams import Application +from quixstreams.sinks.community.file.s3 import S3FileSink +from quixstreams.sinks.community.file.formats import FormatName + + +def get_file_format() -> FormatName: + valid_formats = get_args(FormatName) + if (file_format := os.getenv("FILE_FORMAT", "parquet")) not in valid_formats: + raise ValueError( + f"`FILE_FORMAT` must be one of {valid_formats}; got {file_format}" + ) + return file_format + + +app = Application( + consumer_group="s3-file-destination", + auto_offset_reset="earliest", + commit_interval=5 +) + +s3_file_sink = S3FileSink( + bucket=os.environ["S3_BUCKET"], + directory=os.getenv("S3_BUCKET_DIRECTORY", ""), + aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], + aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], + region_name=os.environ["AWS_REGION_NAME"], + format=get_file_format(), +) + +sdf = app.dataframe(topic=app.topic(os.environ["input"])).sink(s3_file_sink) + + +if __name__ == "__main__": + app.run() diff --git a/python/destinations/s3-file/requirements.txt b/python/destinations/s3-file/requirements.txt new file mode 100644 index 00000000..daf15166 --- /dev/null +++ b/python/destinations/s3-file/requirements.txt @@ -0,0 +1,3 @@ +# TODO: finalize version +quixstreams[aws]==3.22.0 +python-dotenv