diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..2153956 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,20 @@ +**/__pycache__/ +*.pyc +*.pyo +*.pyd +.git +.venv +.pytest_cache +.coverage +*.log +.DS_Store +Thumbs.db +*.tmp +*.swp +*.swo +*~ +docker/ +Readme.md +image.png +remarks.txt +pom.xml diff --git a/.gitignore b/.gitignore index cb1348a..83396f7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ **/__pycache__/ -*.env \ No newline at end of file +*.env + +.venv/ +target/ +dist/ \ No newline at end of file diff --git a/docker/.dockerignore b/docker/.dockerignore deleted file mode 100644 index 30a7cb5..0000000 --- a/docker/.dockerignore +++ /dev/null @@ -1,7 +0,0 @@ -**/__pycache__/ -*.pyc -*.pyo -*.pyd -*.env -*.db -*.git diff --git a/docker/Dockerfile_coinbase_producer b/docker/Dockerfile_coinbase_producer index e65e805..e41a721 100644 --- a/docker/Dockerfile_coinbase_producer +++ b/docker/Dockerfile_coinbase_producer @@ -1,18 +1,41 @@ -FROM python:3.12-slim +# Multi-stage build for Coinbase Producer +FROM python:3.12-slim AS builder -# Zet werkdirectory +# Install uv +COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/ + +# Set working directory WORKDIR /app -# Kopieer alleen wat nodig is voor installatie (betere caching) -COPY ../pyproject.toml . -COPY ../src/ ./src +# Copy dependency files +COPY pyproject.toml uv.lock ./ + +# Install dependencies +RUN uv sync --frozen --no-dev + +# Production stage +FROM python:3.12-slim AS runtime -# Installeer afhankelijkheden via pip (hatch + pyproject.toml) -RUN pip install --upgrade pip && pip install . +# Install uv in runtime stage +COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/ + +# Set working directory +WORKDIR /app -# Zet PYTHONPATH zodat src importeerbaar is -ENV PYTHONPATH=/app/src +# Copy the virtual environment from builder stage +COPY --from=builder /app/.venv /app/.venv + +# Copy source code and configuration +COPY src/ ./src/ +COPY pyproject.toml uv.lock ./ + +# Set environment variables ENV PYTHONUNBUFFERED=1 ENV IN_DOCKER=1 -# Start de app -CMD ["python", "src/producers/coinbase_producer/main.py", "--topics", "BTC-USD,XRP-USD,ETH-USD"] +ENV PATH="/app/.venv/bin:$PATH" + +# Install the package in editable mode +RUN uv sync --frozen --no-dev + +# Start the app using uv +CMD ["uv", "run", "python", "src/python_services/producers/coinbase_producer/main.py", "--topics", "BTC-USD,XRP-USD,ETH-USD"] diff --git a/docker/Dockerfile_reddit_producer b/docker/Dockerfile_reddit_producer index 126bcd2..c1af0fb 100644 --- a/docker/Dockerfile_reddit_producer +++ b/docker/Dockerfile_reddit_producer @@ -1,19 +1,42 @@ -FROM python:3.12-slim +# Multi-stage build for Reddit Producer +FROM python:3.12-slim AS builder -# Zet werkdirectory +# Install uv +COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/ + +# Set working directory +WORKDIR /app + +# Copy dependency files +COPY pyproject.toml uv.lock ./ + +# Install dependencies +RUN uv sync --frozen --no-dev + +# Production stage +FROM python:3.12-slim AS runtime + +# Install uv in runtime stage +COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/ + +# Set working directory WORKDIR /app -# Kopieer alleen wat nodig is voor installatie (betere caching) -COPY ../pyproject.toml . -COPY ../src/ ./src +# Copy the virtual environment from builder stage +COPY --from=builder /app/.venv /app/.venv -# Installeer afhankelijkheden via pip (hatch + pyproject.toml) -RUN pip install --upgrade pip && pip install . +# Copy source code and configuration +COPY src/ ./src/ +COPY pyproject.toml uv.lock ./ +COPY Reddit.env ./ -# Zet PYTHONPATH zodat src importeerbaar is -ENV PYTHONPATH=/app/src +# Set environment variables ENV PYTHONUNBUFFERED=1 ENV IN_DOCKER=1 +ENV PATH="/app/.venv/bin:$PATH" + +# Install the package in editable mode +RUN uv sync --frozen --no-dev -# Start de app -CMD ["python", "src/producers/reddit_producer/main.py", "--subreddits", "Bitcoin,CryptoCurrency,CryptoMarkets,funny,AskReddit,gaming"] +# Start the app using uv +CMD ["uv", "run", "python", "src/python_services/producers/reddit_producer/main.py", "--subreddits", "Bitcoin,CryptoCurrency,CryptoMarkets,funny,AskReddit,gaming"] diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..9e5555a --- /dev/null +++ b/pom.xml @@ -0,0 +1,114 @@ + + 4.0.0 + com.streaming.project + java-streams + jar + 1.0-SNAPSHOT + java-streams + http://maven.apache.org + + 11 + 11 + UTF-8 + + + + src/java-streams/main/java + src/java-streams/test/java + + + src/java-streams/main/resources + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 11 + 11 + + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + org.apache.kafka + kafka-streams + 8.0.0-ccs + + + org.apache.kafka + kafka-clients + 8.0.0-ccs + + + + + + io.confluent + kafka-avro-serializer + 8.0.0 + + + io.confluent + kafka-streams-avro-serde + 8.0.0 + + + org.apache.avro + avro + 1.11.3 + + + org.apache.avro + avro-maven-plugin + 1.11.3 + + + + + com.fasterxml.jackson.core + jackson-core + 2.17.2 + + + com.fasterxml.jackson.core + jackson-databind + 2.17.2 + + + com.fasterxml.jackson.core + jackson-annotations + 2.17.2 + + + + + org.slf4j + slf4j-api + 1.7.36 + + + org.slf4j + slf4j-simple + 1.7.36 + + + + junit + junit + 4.13.2 + test + + + diff --git a/pyproject.toml b/pyproject.toml index c965498..9e04a02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,11 @@ dependencies = [ ] [tool.hatch.build.targets.wheel] -packages = ["src/"] +packages = ["src"] +exclude = ["src/java-streams"] + +[tool.hatch.metadata] +allow-direct-references = true [build-system] requires = ["hatchling"] diff --git a/src/java-streams/main/java/com/streaming/project/StreamReader.java b/src/java-streams/main/java/com/streaming/project/StreamReader.java new file mode 100644 index 0000000..ec2a3d7 --- /dev/null +++ b/src/java-streams/main/java/com/streaming/project/StreamReader.java @@ -0,0 +1,46 @@ +package com.streaming.project; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.ForeachAction; +import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; +import org.apache.avro.generic.GenericRecord; + +import java.util.Collections; + +public class StreamReader { + private final String topicName; + private final String streamName; + private final ForeachAction processor; + private final GenericAvroSerde genericAvroSerde; + + public StreamReader(String topicName, String streamName, ForeachAction processor, + String schemaRegistryUrl) { + this.topicName = topicName; + this.streamName = streamName; + this.processor = processor; + + // Configure Avro serde for values + this.genericAvroSerde = new GenericAvroSerde(); + this.genericAvroSerde.configure( + Collections.singletonMap("schema.registry.url", schemaRegistryUrl), + false); // false for value serializer + } + + public void addToBuilder(StreamsBuilder builder) { + KStream stream = builder.stream(topicName, + Consumed.with(Serdes.String(), genericAvroSerde)); + + stream.peek(processor); + } + + public String getTopicName() { + return topicName; + } + + public String getStreamName() { + return streamName; + } +} diff --git a/src/java-streams/main/java/com/streaming/project/StreamingApplication.java b/src/java-streams/main/java/com/streaming/project/StreamingApplication.java new file mode 100644 index 0000000..1736c80 --- /dev/null +++ b/src/java-streams/main/java/com/streaming/project/StreamingApplication.java @@ -0,0 +1,89 @@ +package com.streaming.project; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Properties; + +public class StreamingApplication { + + public static void main(String[] args) { + Properties appConfig = loadConfiguration(); + Properties streamSettings = createStreamConfig(appConfig); + + // Build topology + StreamsBuilder builder = new StreamsBuilder(); + StreamReader coinbaseReader = new StreamReader( + appConfig.getProperty("coinbase.topic.name"), + appConfig.getProperty("coinbase.source.name"), + (key, value) -> System.out.println("Key: " + key + ", Value: " + value), + appConfig.getProperty("schema.registry.url")); + coinbaseReader.addToBuilder(builder); + + // Create and configure streams + KafkaStreams streams = new KafkaStreams(builder.build(), streamSettings); + configureStreamsHandlers(streams); + + System.out.println("Starting streaming application for topic: " + coinbaseReader.getTopicName()); + + // Start streams and wait + streams.start(); + + // Graceful shutdown + Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); + + try { + Thread.currentThread().join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private static Properties loadConfiguration() { + Properties props = new Properties(); + Path configPath = Paths.get("src/java-streams/main/resources/application.properties"); + + try (FileInputStream inputStream = new FileInputStream(configPath.toFile())) { + props.load(inputStream); + } catch (IOException e) { + throw new RuntimeException("Failed to load configuration from: " + configPath, e); + } + return props; + } + + private static Properties createStreamConfig(Properties appConfig) { + Properties config = new Properties(); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, appConfig.getProperty("kafka.application.id")); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getProperty("kafka.bootstrap.servers")); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, appConfig.getProperty("kafka.default.key.serde")); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, appConfig.getProperty("kafka.default.value.serde")); + config.put("schema.registry.url", appConfig.getProperty("kafka.schema.registry.url")); + config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, appConfig.getProperty("kafka.processing.guarantee")); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, + Integer.parseInt(appConfig.getProperty("kafka.commit.interval.ms"))); + config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, + Integer.parseInt(appConfig.getProperty("kafka.num.stream.threads"))); + return config; + } + + private static void configureStreamsHandlers(KafkaStreams streams) { + streams.setStateListener((newState, oldState) -> { + System.out.println("State changed: " + oldState + " -> " + newState); + if (newState == KafkaStreams.State.ERROR) { + System.err.println("Application entered ERROR state!"); + } + }); + + streams.setUncaughtExceptionHandler(exception -> { + System.err.println("Uncaught exception: " + exception.getMessage()); + exception.printStackTrace(); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; + }); + } +} diff --git a/src/java-streams/main/resources/application.properties b/src/java-streams/main/resources/application.properties new file mode 100644 index 0000000..cda7e43 --- /dev/null +++ b/src/java-streams/main/resources/application.properties @@ -0,0 +1,16 @@ +# Kafka Streams Configuration +kafka.application.id=streaming-app +kafka.bootstrap.servers=localhost:9092,localhost:9093 +kafka.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde +kafka.default.value.serde=io.confluent.kafka.streams.serdes.avro.GenericAvroSerde +kafka.schema.registry.url=http://localhost:8081 + +# Stream Processing Configuration +kafka.processing.guarantee=at_least_once +kafka.commit.interval.ms=10000 +kafka.num.stream.threads=1 + +# Application Configuration +coinbase.topic.name=coinbase_BTC-USD +coinbase.source.name=Coinbase +schema.registry.url=http://localhost:8081 diff --git a/src/interfaces/__init__.py b/src/python_services/__init__.py similarity index 100% rename from src/interfaces/__init__.py rename to src/python_services/__init__.py diff --git a/src/constants/Dataclass.py b/src/python_services/constants/Dataclass.py similarity index 100% rename from src/constants/Dataclass.py rename to src/python_services/constants/Dataclass.py diff --git a/src/constants/Enums.py b/src/python_services/constants/Enums.py similarity index 100% rename from src/constants/Enums.py rename to src/python_services/constants/Enums.py diff --git a/src/generic/KafkaProducer.py b/src/python_services/generic/KafkaProducer.py similarity index 94% rename from src/generic/KafkaProducer.py rename to src/python_services/generic/KafkaProducer.py index 519313b..5558a44 100644 --- a/src/generic/KafkaProducer.py +++ b/src/python_services/generic/KafkaProducer.py @@ -1,9 +1,9 @@ from confluent_kafka import Producer from confluent_kafka.admin import AdminClient, NewTopic -from src.constants.Enums import ProducerApplicationEnum +from src.python_services.constants.Enums import ProducerApplicationEnum -from src.generic.SchemaRegistryClient import SchemaRegistryClient -from src.generic.LoggingDecorator import log_filtered_message +from src.python_services.generic.SchemaRegistryClient import SchemaRegistryClient +from src.python_services.generic.LoggingDecorator import log_filtered_message from confluent_kafka.serialization import ( SerializationContext, MessageField, diff --git a/src/generic/LoggingDecorator.py b/src/python_services/generic/LoggingDecorator.py similarity index 97% rename from src/generic/LoggingDecorator.py rename to src/python_services/generic/LoggingDecorator.py index 4082b31..4c3a8f6 100644 --- a/src/generic/LoggingDecorator.py +++ b/src/python_services/generic/LoggingDecorator.py @@ -1,6 +1,6 @@ import logging import json -from src.constants.Dataclass import LogMessage +from src.python_services.constants.Dataclass import LogMessage from functools import wraps # Configure root logger if not already configured diff --git a/src/generic/SchemaRegistryClient.py b/src/python_services/generic/SchemaRegistryClient.py similarity index 94% rename from src/generic/SchemaRegistryClient.py rename to src/python_services/generic/SchemaRegistryClient.py index e8d76b7..df7b602 100644 --- a/src/generic/SchemaRegistryClient.py +++ b/src/python_services/generic/SchemaRegistryClient.py @@ -1,7 +1,7 @@ from confluent_kafka.schema_registry import ( SchemaRegistryClient as ConfluentSchemaRegistryClient, ) -from src.constants.Enums import SerializerEnum +from src.python_services.constants.Enums import SerializerEnum class SchemaRegistryClient: diff --git a/src/interfaces/BaseStreamProducer.py b/src/python_services/interfaces/BaseStreamProducer.py similarity index 100% rename from src/interfaces/BaseStreamProducer.py rename to src/python_services/interfaces/BaseStreamProducer.py diff --git a/src/producers/coinbase_producer/__init__.py b/src/python_services/interfaces/__init__.py similarity index 100% rename from src/producers/coinbase_producer/__init__.py rename to src/python_services/interfaces/__init__.py diff --git a/src/producers/coinbase_producer/CoinbaseProducer.py b/src/python_services/producers/coinbase_producer/CoinbaseProducer.py similarity index 90% rename from src/producers/coinbase_producer/CoinbaseProducer.py rename to src/python_services/producers/coinbase_producer/CoinbaseProducer.py index 2a98be2..a0ccba7 100644 --- a/src/producers/coinbase_producer/CoinbaseProducer.py +++ b/src/python_services/producers/coinbase_producer/CoinbaseProducer.py @@ -1,11 +1,11 @@ import json import websocket import logging -from src.interfaces.BaseStreamProducer import BaseStreamProducer -from src.generic.KafkaProducer import KafkaProducer -from src.constants.Enums import ProducerApplicationEnum -from src.constants.Dataclass import CoinbaseMessage -from src.generic.LoggingDecorator import log_method +from src.python_services.interfaces.BaseStreamProducer import BaseStreamProducer +from src.python_services.generic.KafkaProducer import KafkaProducer +from src.python_services.constants.Enums import ProducerApplicationEnum +from src.python_services.constants.Dataclass import CoinbaseMessage +from src.python_services.generic.LoggingDecorator import log_method from typing import List diff --git a/src/python_services/producers/coinbase_producer/__init__.py b/src/python_services/producers/coinbase_producer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/producers/coinbase_producer/main.py b/src/python_services/producers/coinbase_producer/main.py similarity index 88% rename from src/producers/coinbase_producer/main.py rename to src/python_services/producers/coinbase_producer/main.py index 27cfb6a..94a8379 100644 --- a/src/producers/coinbase_producer/main.py +++ b/src/python_services/producers/coinbase_producer/main.py @@ -1,7 +1,7 @@ import argparse from dotenv import load_dotenv -from src.producers.coinbase_producer.CoinbaseProducer import CoinbaseProducer +from src.python_services.producers.coinbase_producer.CoinbaseProducer import CoinbaseProducer load_dotenv() diff --git a/src/producers/reddit_producer/RedditProducer.py b/src/python_services/producers/reddit_producer/RedditProducer.py similarity index 92% rename from src/producers/reddit_producer/RedditProducer.py rename to src/python_services/producers/reddit_producer/RedditProducer.py index 5614801..c7ad355 100644 --- a/src/producers/reddit_producer/RedditProducer.py +++ b/src/python_services/producers/reddit_producer/RedditProducer.py @@ -1,11 +1,11 @@ import praw import logging from textblob import TextBlob -from src.interfaces.BaseStreamProducer import BaseStreamProducer -from src.constants.Dataclass import RedditMessage -from src.constants.Enums import ProducerApplicationEnum -from src.generic.KafkaProducer import KafkaProducer -from src.generic.LoggingDecorator import log_method +from src.python_services.interfaces.BaseStreamProducer import BaseStreamProducer +from src.python_services.constants.Dataclass import RedditMessage +from src.python_services.constants.Enums import ProducerApplicationEnum +from src.python_services.generic.KafkaProducer import KafkaProducer +from src.python_services.generic.LoggingDecorator import log_method from typing import List diff --git a/src/producers/reddit_producer/main.py b/src/python_services/producers/reddit_producer/main.py similarity index 92% rename from src/producers/reddit_producer/main.py rename to src/python_services/producers/reddit_producer/main.py index 483e0e0..145fa29 100644 --- a/src/producers/reddit_producer/main.py +++ b/src/python_services/producers/reddit_producer/main.py @@ -2,7 +2,7 @@ from dotenv import load_dotenv import argparse -from src.producers.reddit_producer.RedditProducer import RedditProducer +from src.python_services.producers.reddit_producer.RedditProducer import RedditProducer load_dotenv(dotenv_path='Reddit.env')