Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
**/__pycache__/
*.pyc
*.pyo
*.pyd
.git
.venv
.pytest_cache
.coverage
*.log
.DS_Store
Thumbs.db
*.tmp
*.swp
*.swo
*~
docker/
Readme.md
image.png
remarks.txt
pom.xml
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
**/__pycache__/
*.env
*.env

.venv/
target/
dist/
7 changes: 0 additions & 7 deletions docker/.dockerignore

This file was deleted.

45 changes: 34 additions & 11 deletions docker/Dockerfile_coinbase_producer
Original file line number Diff line number Diff line change
@@ -1,18 +1,41 @@
FROM python:3.12-slim
# Multi-stage build for Coinbase Producer
FROM python:3.12-slim AS builder

# Zet werkdirectory
# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/

# Set working directory
WORKDIR /app

# Kopieer alleen wat nodig is voor installatie (betere caching)
COPY ../pyproject.toml .
COPY ../src/ ./src
# Copy dependency files
COPY pyproject.toml uv.lock ./

# Install dependencies
RUN uv sync --frozen --no-dev

# Production stage
FROM python:3.12-slim AS runtime

# Installeer afhankelijkheden via pip (hatch + pyproject.toml)
RUN pip install --upgrade pip && pip install .
# Install uv in runtime stage
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/

# Set working directory
WORKDIR /app

# Zet PYTHONPATH zodat src importeerbaar is
ENV PYTHONPATH=/app/src
# Copy the virtual environment from builder stage
COPY --from=builder /app/.venv /app/.venv

# Copy source code and configuration
COPY src/ ./src/
COPY pyproject.toml uv.lock ./

# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV IN_DOCKER=1
# Start de app
CMD ["python", "src/producers/coinbase_producer/main.py", "--topics", "BTC-USD,XRP-USD,ETH-USD"]
ENV PATH="/app/.venv/bin:$PATH"

# Install the package in editable mode
RUN uv sync --frozen --no-dev

# Start the app using uv
CMD ["uv", "run", "python", "src/python_services/producers/coinbase_producer/main.py", "--topics", "BTC-USD,XRP-USD,ETH-USD"]
45 changes: 34 additions & 11 deletions docker/Dockerfile_reddit_producer
Original file line number Diff line number Diff line change
@@ -1,19 +1,42 @@
FROM python:3.12-slim
# Multi-stage build for Reddit Producer
FROM python:3.12-slim AS builder

# Zet werkdirectory
# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/

# Set working directory
WORKDIR /app

# Copy dependency files
COPY pyproject.toml uv.lock ./

# Install dependencies
RUN uv sync --frozen --no-dev

# Production stage
FROM python:3.12-slim AS runtime

# Install uv in runtime stage
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/

# Set working directory
WORKDIR /app

# Kopieer alleen wat nodig is voor installatie (betere caching)
COPY ../pyproject.toml .
COPY ../src/ ./src
# Copy the virtual environment from builder stage
COPY --from=builder /app/.venv /app/.venv

# Installeer afhankelijkheden via pip (hatch + pyproject.toml)
RUN pip install --upgrade pip && pip install .
# Copy source code and configuration
COPY src/ ./src/
COPY pyproject.toml uv.lock ./
COPY Reddit.env ./

# Zet PYTHONPATH zodat src importeerbaar is
ENV PYTHONPATH=/app/src
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV IN_DOCKER=1
ENV PATH="/app/.venv/bin:$PATH"

# Install the package in editable mode
RUN uv sync --frozen --no-dev

# Start de app
CMD ["python", "src/producers/reddit_producer/main.py", "--subreddits", "Bitcoin,CryptoCurrency,CryptoMarkets,funny,AskReddit,gaming"]
# Start the app using uv
CMD ["uv", "run", "python", "src/python_services/producers/reddit_producer/main.py", "--subreddits", "Bitcoin,CryptoCurrency,CryptoMarkets,funny,AskReddit,gaming"]
114 changes: 114 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.streaming.project</groupId>
<artifactId>java-streams</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>java-streams</name>
<url>http://maven.apache.org</url>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<build>
<sourceDirectory>src/java-streams/main/java</sourceDirectory>
<testSourceDirectory>src/java-streams/test/java</testSourceDirectory>
<resources>
<resource>
<directory>src/java-streams/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>8.0.0-ccs</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>8.0.0-ccs</version>
</dependency>


<!-- Dependencies below are required/recommended only when using Apache Avro. -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>8.0.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>8.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
</dependency>

<!-- Jackson dependencies for Avro compatibility -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.17.2</version>
</dependency>

<!-- SLF4J logging dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ dependencies = [
]

[tool.hatch.build.targets.wheel]
packages = ["src/"]
packages = ["src"]
exclude = ["src/java-streams"]

[tool.hatch.metadata]
allow-direct-references = true

[build-system]
requires = ["hatchling"]
Expand Down
46 changes: 46 additions & 0 deletions src/java-streams/main/java/com/streaming/project/StreamReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.streaming.project;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ForeachAction;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.avro.generic.GenericRecord;

import java.util.Collections;

public class StreamReader {
private final String topicName;
private final String streamName;
private final ForeachAction<String, GenericRecord> processor;
private final GenericAvroSerde genericAvroSerde;

public StreamReader(String topicName, String streamName, ForeachAction<String, GenericRecord> processor,
String schemaRegistryUrl) {
this.topicName = topicName;
this.streamName = streamName;
this.processor = processor;

// Configure Avro serde for values
this.genericAvroSerde = new GenericAvroSerde();
this.genericAvroSerde.configure(
Collections.singletonMap("schema.registry.url", schemaRegistryUrl),
false); // false for value serializer
}

public void addToBuilder(StreamsBuilder builder) {
KStream<String, GenericRecord> stream = builder.stream(topicName,
Consumed.with(Serdes.String(), genericAvroSerde));

stream.peek(processor);
}

public String getTopicName() {
return topicName;
}

public String getStreamName() {
return streamName;
}
}
Loading