From 75eae3c2ec7d4a703031f192b6a4105d709e850d Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Tue, 12 Aug 2025 19:48:26 -0400 Subject: [PATCH 1/2] add s3-file-sink template --- python/destinations/s3-file/README.md | 49 +++++++++++ python/destinations/s3-file/dockerfile | 23 +++++ python/destinations/s3-file/icon.png | Bin 0 -> 4851 bytes python/destinations/s3-file/library.json | 84 +++++++++++++++++++ python/destinations/s3-file/main.py | 42 ++++++++++ python/destinations/s3-file/requirements.txt | 3 + 6 files changed, 201 insertions(+) create mode 100644 python/destinations/s3-file/README.md create mode 100644 python/destinations/s3-file/dockerfile create mode 100644 python/destinations/s3-file/icon.png create mode 100644 python/destinations/s3-file/library.json create mode 100644 python/destinations/s3-file/main.py create mode 100644 python/destinations/s3-file/requirements.txt diff --git a/python/destinations/s3-file/README.md b/python/destinations/s3-file/README.md new file mode 100644 index 000000000..61efe9a8b --- /dev/null +++ b/python/destinations/s3-file/README.md @@ -0,0 +1,49 @@ +# AWS S3 File Destination + +[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/s3-file) demonstrates how to consume data from a Kafka topic and write it to an AWS S3 bucket. + +## How to run + +Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector. + +Clicking `Set up connector` allows you to enter your connection details and runtime parameters. + +Then either: +* click `Test connection & deploy` to deploy the pre-built and configured container into Quix. + +* or click `Customise connector` to inspect or alter the code before deployment. + +## Environment Variables + +The connector uses the following environment variables (which generally correspond to the +[`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) parameter names): + +### Required +- `input`: The input Kafka topic +- `S3_BUCKET`: The S3 bucket to use. +- `AWS_ENDPOINT_URL`: The URL to your S3 instance. +- `AWS_REGION_NAME`: The region of your S3 bucket. +- `AWS_SECRET_ACCESS_KEY`: Your AWS secret. +- `AWS_ACCESS_KEY_ID`: Your AWS Access Key. +- `FILE_FORMAT`: The file format to publish data as; options: \[parquet, json\]. + +### Optional +Unless explicitly defined, these are optional, or generally set to the [`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) defaults. + +- `S3_BUCKET_DIRECTORY`: An optional path within the S3 bucket to use. + **Default**: "" (root) +- `FILE_FORMAT`: The file format to publish data as; options: \[parquet, json\]. + **Default**: "parquet" + + +## Requirements / Prerequisites + +You will need the appropriate AWS features and access to use this connector. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open Source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation. diff --git a/python/destinations/s3-file/dockerfile b/python/destinations/s3-file/dockerfile new file mode 100644 index 000000000..d3cd4959a --- /dev/null +++ b/python/destinations/s3-file/dockerfile @@ -0,0 +1,23 @@ +FROM python:3.11.1-slim-buster + +# Set environment variables to non-interactive and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 + +# Set the working directory inside the container +WORKDIR /app + +# Copy only the requirements file(s) to leverage Docker cache +# Assuming all requirements files are in the root or subdirectories +COPY ./requirements.txt ./ + +# Install dependencies +# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application +COPY . . + +# Set the command to run your application +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/destinations/s3-file/icon.png b/python/destinations/s3-file/icon.png new file mode 100644 index 0000000000000000000000000000000000000000..2940139df862529596b3601ee8abfbd96e436f60 GIT binary patch literal 4851 zcmV!el(7juaywH~YOqPAC2l%A=ctW4z~ z68r|*e4W|`xTpj$5uP9-?77&1lM;Xg-$|S2;R?P|!9@7NWEw%8Xa@=Ze%kDZEBs2v zR%-h#S&oZwH2|Hk9j3~6ArBrVO|X6JhZhjQ6~5HCfCQrmV5y34!iCIWo8WQ#%?JT3 zgOA62*R7Kq>^82k4a0IHkbAx>eEGpLe%^i~G=SyjzXBId!Yv#purz!#Wp6u7PxRKRx<%Q;lim{%{D*;4s!qKenHmnsMow@*q=KHQPu#C z6MjiAV&8L5EiZ)G_e(KK$TQpG4%`StNA>>0A!?Y~6z8=?LPP zEP|SbnuasMg+OFu%K3_Q`Vr!cZjW$@!eJS6BS8H&A!-b4C}{|G$ys5C=?6nAm#S3kwsf({R> zy8UtRaixva5tvGqrDs=aH{sYlm9tFLKDhi|o$O=D`%mMEmM$vZ+ZSTB^*bDy)UsEFj zH*Z4meb=KHZbqmkjQIrapE$s zR2rYeUivKL)i699MfL~ZMcwN!BXG~Bp{`pm_Rj|SXI%Fl{MnE3+&6Z>r>Vl_G8LRX zd&f}|fKt{&rVEV|IEgrsY+k&N4(6cj z{2%1PRanNoZ{-RMUU?(p$rOsDW<6@<`}F)%F}V_S&o$I$n%Wg*ev9G5qal#a;zTM7 zLXG!NpmC?+it-|I9fg@6Aw(L+Nkot? z0%P}b?$KgPpYS-E(hVfY)8GWryio!>YQ4y+CwT+gKykJv51A$Ar$En z{izI2CDQoi@m_JA>nI+veo-4PuL}!;)P{njDe_1aic(D;Ry+Y%m<19tCx^yM)YGYV z=L|CdlQ)sXdd49+d|Yxcq*1&4+sc%y#+H9jxetApsMuEvd|9o8*ufkxq4 zT@n^lGy#OVCqcAQFYRU3PAZ;P`NXn0WQh=L>{Zj)9?(3aT*S#lC(;AxUx4N(eAJjo z{p4|+PG<1t>9ct1E#Rj1W;BpikYN(5;xzz=mv3KjIqvazktOwfCY=_a<%j@DQrW*F zBkyP=h5=IXWCjR}pW_S62aUV{k7h3zQH1LyGy*k0?W-ZxP{sRcA8QFeOhYnG6FNM6 z$bgC`01Got0h1Wa)jd^SFTy^b8D^$`>)P8yHZP}kr}8VAIBybT^Ps`SX!OYns;#2b3K@vQH7Vge-ZCmA^WejJ5`f(s6JmLt}fEVgVU)2eZW@g>s8z z`Rg>b%GQ1!Lo|z+plyU=1^tNxHm_KQM>pR_UPM+z`Ghw?ueo+WEK?8*Lh~h^;tK_& zDcDq?Yp3Wq{u`|{Zy-Z3sNw@bb-@6B_WNV#A=i$bHm>lIBY}x6hk!|d9#s?WqQ{B> za4evS1jz1TgyIHF7WyQJ`e<=sA7m( zmCqwIj|`}M8-UmNQIGWz5ys01y43!ujJbwUwuG!u(jf~Ry*k*1O902iC4j@>62Reb z3E*(J1aLSUO#pW)$mvC*;F7@cLx9s3f{cZ012`Nm0UQpO01k&s0EfdRfWzSuz~OKS z;BZtt0cInZGqv&)m5GYa3FHi2&fDXJP~9kv5JI!EcRBl4tPUP2z&MsN#`#KL7DP6& zYM#$E0mZefGGLq;6J}k%bAb`B)fEDdT-ya4Ico0ThtQ6%nWNIdL>b*NA6xBD1*0?{ilN z&j+xu>O(=aR#%B$tBHL7l1T34eMPSF!O5OQT`UG=A@Y*2-e?>u$-Xb1z?paoL$r_6 z!G}^Aky6eTLUeJBlE^UT6mX$gtiKWP$e{%!C=dZt$ohqenu@v7LsaDS`Bag{tu>m9 zJZghMynf;o{%y~1Q6CHlbeR_hjeJ3Cdrmy$p8w7lv3%pjyuhx%6VD6%Q%P z#EAz)mq}BsAR=LNh4P@!Bi=_cdGw{zqHK&PB*pauVGa-SEc>`Pq*)zoKjG7B^=x+e z*c0F~FD%f3R2qjPF&rEmMt3Y>TKaSzuP8c8WZ?30A+8Ce5J@z%evndDsar$~46{s( z;;s&!KVTu&kx$NbfzDBnKAlYCED8HmJc&Mf-Iq+^cr-z-UR{N*># zRYD>K1H(iYCXA1ZLJ=7P8oyTIS50q1qvNfx$`s)so$ z5X4|IFc;YW=OQ26+mkelXuEq(5X(LZ%jjQqm}pk75Hp?vx*>qS<6`w z)4Uwy;U{b=jbU^&x5HTfm&l*{7PO%#qyazkfXmS2i`yZ;i=jxMMvH8OD2DmaO2x3;N|O*AejdyDi**6J=2&ykhv3Jy0h{iGqNg09c*i8?%tzQWf)j%F1#7}s#BKSC3wG-q zWa4mhDuwE723XVqerEbykbX|QfKYwC*v@4-5~q9f3?b%#Xd028TOb%5_4!D0BS?2{GN=uAaj3)L62Re*g8-s-|Lm!A zeqe+wNug2@Amk%&wqS;4XAh-EVljkzoWrle}1*k zpUSnf@(3ATMZ=1_?uGg-&IovD$?=5fcC z598`g4C!2M{K*pFZ{o zvU=%!=+=(2V`2jEmX8;?e{}f5jT@2K^f3e%E|Ny*<&cU1W06azv1iBE(DUN=(VfVO z3}0C5{Kq8#2{&CRVlnl2Psb7{pMMBiOPjcsjKXI~O#oYHiQyp}cPgAa_97=Dx5R=B7{?4kCF@-crbCd~@w z6RF)nFE>!z1s9cqJ=8Xk>~5J%_>5^~K3E!I1GO#GI<4jo5pxSx@jWo-bhv%TitbJc Ze*{Ic4`|4H{L=sc002ovPDHLkV1k0Y0c8LH literal 0 HcmV?d00001 diff --git a/python/destinations/s3-file/library.json b/python/destinations/s3-file/library.json new file mode 100644 index 000000000..3db5626ca --- /dev/null +++ b/python/destinations/s3-file/library.json @@ -0,0 +1,84 @@ +{ + "libraryItemId": "s3-iceberg-destination", + "name": "AWS S3 Iceberg Sink", + "language": "Python", + "tags": { + "Pipeline Stage": ["Destination"], + "Type": ["Connectors"], + "Category": ["Open table format"] + }, + "shortDescription": "Consume data from a Kafka topic and write it to an AWS S3 bucket path.", + "DefaultFile": "main.py", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "IconFile": "icon.png", + "Variables": [ + { + "Name": "input", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "The input Kafka topic.", + "DefaultValue": "input", + "Required": true + }, + { + "Name": "S3_BUCKET", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The S3 bucket to use.", + "Required": true + }, + { + "Name": "S3_BUCKET_DIRECTORY", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "An optional path within the S3 bucket to use, else uses root.", + "DefaultValue": "", + "Required": false + }, + { + "Name": "AWS_ENDPOINT_URL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The URL to your S3 instance.", + "Required": true + }, + { + "Name": "AWS_REGION_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The region of your S3 bucket.", + "Required": true + }, + { + "Name": "AWS_SECRET_ACCESS_KEY", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "Your AWS secret.", + "Required": true + }, + { + "Name": "AWS_ACCESS_KEY_ID", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "Your AWS Access Key.", + "Required": true + }, + { + "Name": "FILE_FORMAT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The file format to publish data as; options: [parquet, json].", + "DefaultValue": "parquet", + "Required": false + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 500, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": false + } +} \ No newline at end of file diff --git a/python/destinations/s3-file/main.py b/python/destinations/s3-file/main.py new file mode 100644 index 000000000..e81d1c312 --- /dev/null +++ b/python/destinations/s3-file/main.py @@ -0,0 +1,42 @@ +from typing import get_args + +from quixstreams import Application +from quixstreams.sinks.community.file.s3 import S3FileSink +from quixstreams.sinks.community.file.formats import FormatName +import os + +from dotenv import load_dotenv +load_dotenv() + + +def get_file_format() -> FormatName: + valid_formats = get_args(FormatName) + if (file_format := os.getenv("FILE_FORMAT", "parquet")) not in valid_formats: + raise ValueError( + f"`FILE_FORMAT` must be one of {valid_formats}; got {file_format}" + ) + return file_format + + +app = Application( + consumer_group="s3-destination", + auto_offset_reset="earliest", + commit_interval=5 +) + +s3_file_sink = S3FileSink( + bucket=os.environ["S3_BUCKET"], + directory=os.getenv("S3_BUCKET_DIRECTORY", ""), + aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], + aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], + endpoint_url=os.environ["AWS_ENDPOINT_URL"], + region_name=os.environ["AWS_REGION_NAME"], + format=get_file_format(), +) + +sdf = app.dataframe(app.topic(os.environ["input"])).sink(s3_file_sink) +sdf.sink() + + +if __name__ == "__main__": + app.run() diff --git a/python/destinations/s3-file/requirements.txt b/python/destinations/s3-file/requirements.txt new file mode 100644 index 000000000..daf15166c --- /dev/null +++ b/python/destinations/s3-file/requirements.txt @@ -0,0 +1,3 @@ +# TODO: finalize version +quixstreams[aws]==3.22.0 +python-dotenv From 8da3c75833d809ba724d8d60860aed4a1395c02d Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Wed, 13 Aug 2025 00:16:29 -0400 Subject: [PATCH 2/2] more cleanup --- python/destinations/s3-file/README.md | 1 - python/destinations/s3-file/library.json | 13 +++---------- python/destinations/s3-file/main.py | 11 +++-------- 3 files changed, 6 insertions(+), 19 deletions(-) diff --git a/python/destinations/s3-file/README.md b/python/destinations/s3-file/README.md index 61efe9a8b..338a67cec 100644 --- a/python/destinations/s3-file/README.md +++ b/python/destinations/s3-file/README.md @@ -25,7 +25,6 @@ The connector uses the following environment variables (which generally correspo - `AWS_REGION_NAME`: The region of your S3 bucket. - `AWS_SECRET_ACCESS_KEY`: Your AWS secret. - `AWS_ACCESS_KEY_ID`: Your AWS Access Key. -- `FILE_FORMAT`: The file format to publish data as; options: \[parquet, json\]. ### Optional Unless explicitly defined, these are optional, or generally set to the [`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) defaults. diff --git a/python/destinations/s3-file/library.json b/python/destinations/s3-file/library.json index 3db5626ca..c6a9d0053 100644 --- a/python/destinations/s3-file/library.json +++ b/python/destinations/s3-file/library.json @@ -1,11 +1,11 @@ { - "libraryItemId": "s3-iceberg-destination", - "name": "AWS S3 Iceberg Sink", + "libraryItemId": "s3-file", + "name": "AWS S3 File Sink", "language": "Python", "tags": { "Pipeline Stage": ["Destination"], "Type": ["Connectors"], - "Category": ["Open table format"] + "Category": ["File Store"] }, "shortDescription": "Consume data from a Kafka topic and write it to an AWS S3 bucket path.", "DefaultFile": "main.py", @@ -36,13 +36,6 @@ "DefaultValue": "", "Required": false }, - { - "Name": "AWS_ENDPOINT_URL", - "Type": "EnvironmentVariable", - "InputType": "FreeText", - "Description": "The URL to your S3 instance.", - "Required": true - }, { "Name": "AWS_REGION_NAME", "Type": "EnvironmentVariable", diff --git a/python/destinations/s3-file/main.py b/python/destinations/s3-file/main.py index e81d1c312..cddb44c51 100644 --- a/python/destinations/s3-file/main.py +++ b/python/destinations/s3-file/main.py @@ -1,12 +1,9 @@ from typing import get_args +import os from quixstreams import Application from quixstreams.sinks.community.file.s3 import S3FileSink from quixstreams.sinks.community.file.formats import FormatName -import os - -from dotenv import load_dotenv -load_dotenv() def get_file_format() -> FormatName: @@ -19,7 +16,7 @@ def get_file_format() -> FormatName: app = Application( - consumer_group="s3-destination", + consumer_group="s3-file-destination", auto_offset_reset="earliest", commit_interval=5 ) @@ -29,13 +26,11 @@ def get_file_format() -> FormatName: directory=os.getenv("S3_BUCKET_DIRECTORY", ""), aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], - endpoint_url=os.environ["AWS_ENDPOINT_URL"], region_name=os.environ["AWS_REGION_NAME"], format=get_file_format(), ) -sdf = app.dataframe(app.topic(os.environ["input"])).sink(s3_file_sink) -sdf.sink() +sdf = app.dataframe(topic=app.topic(os.environ["input"])).sink(s3_file_sink) if __name__ == "__main__":