Skip to content

Commit 4114cab

Browse files
authored
Read from Kafka and write to PostgreSQL (#149)
* Add KafkaReadingService * Add PostgreSQL * Add test * Fix compose file * Add Spring Boot 2 example
1 parent acabbc0 commit 4114cab

File tree

24 files changed

+361
-30
lines changed

24 files changed

+361
-30
lines changed

.editorconfig

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ end_of_line = crlf
44
indent_size = 4
55
indent_style = space
66
insert_final_newline = true
7-
max_line_length = 130
7+
max_line_length = 199
88
tab_width = 4
9-
ij_continuation_indent_size = 8
9+
ij_continuation_indent_size = 4
1010

1111
[*.java]
12-
ij_java_imports_layout = *,|,javax.**,java.**,|,$*
12+
ij_java_imports_layout = *,|,java.**,javax.**,|,$*
1313
ij_java_align_multiline_parameters = true
1414
ij_java_blank_lines_after_class_header = 1
1515
ij_java_blank_lines_after_imports = 1
@@ -32,7 +32,7 @@ ij_java_keep_simple_lambdas_in_one_line = true
3232
ij_java_keep_simple_methods_in_one_line = true
3333
ij_java_names_count_to_use_import_on_demand = 101
3434
ij_java_class_count_to_use_import_on_demand = 101
35-
ij_java_packages_to_use_import_on_demand = javax.swing.*
35+
ij_java_packages_to_use_import_on_demand = java.awt.*, javax.swing.*
3636
ij_java_prefer_longer_names = true
3737
ij_java_space_after_closing_angle_bracket_in_type_argument = false
3838
ij_java_space_after_colon = true

common-internal-bom/build.gradle.kts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,11 @@ dependencies {
1212
api(platform("org.assertj:assertj-bom:3.26.3"))
1313
api(platform("org.testcontainers:testcontainers-bom:1.20.0"))
1414
api(platform("org.junit:junit-bom:5.10.3"))
15+
api(platform("io.github.mfvanek:pg-index-health-bom:0.13.0"))
16+
17+
constraints {
18+
api("org.liquibase:liquibase-core:4.28.0")
19+
api("com.github.blagerweij:liquibase-sessionlock:1.6.9")
20+
api("org.awaitility:awaitility:4.2.1")
21+
}
1522
}

db-migrations/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
plugins {
2+
id("java")
3+
id("sb-ot-demo.java-conventions")
4+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
databaseChangeLog:
2+
- include:
3+
file: db/changelog/sql/schema.sql
4+
- include:
5+
file: db/changelog/sql/storage.sql
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
--liquibase formatted sql
2+
3+
--changeset ivan.vakhrushev:2024.08.04:create.schema
4+
create schema if not exists otel_demo;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
--liquibase formatted sql
2+
3+
--changeset ivan.vakhrushev:2024.08.04:storage.table
4+
create table if not exists otel_demo.storage
5+
(
6+
id bigint generated always as identity,
7+
message text not null,
8+
trace_id varchar(64) not null unique,
9+
created_at timestamptz not null
10+
);
11+
12+
--changeset ivan.vakhrushev:2024.08.04:storage.comments.on.table.and.columns
13+
comment on table otel_demo.storage is 'Information about messages from Kafka';
14+
comment on column otel_demo.storage.id is 'Unique identifier of the record in the current table';
15+
comment on column otel_demo.storage.message is 'Message from Kafka';
16+
comment on column otel_demo.storage.trace_id is 'Unique traceId of operation';
17+
comment on column otel_demo.storage.created_at is 'Date and time of operation';

docker/docker-compose-base.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,24 @@ services:
8484
networks:
8585
- jaeger-example
8686

87+
postgres:
88+
container_name: postgres
89+
image: postgres:16.2
90+
shm_size: "2gb"
91+
environment:
92+
POSTGRES_DB: "otel_demo_db"
93+
POSTGRES_USER: "otel_demo_user"
94+
POSTGRES_PASSWORD: "otel_demo_password"
95+
PGDATA: "/var/lib/postgresql/data/pgdata"
96+
volumes:
97+
- otel_demo_db_data:/var/lib/postgresql/data
98+
ports:
99+
- "6432:5432"
100+
networks:
101+
- jaeger-example
102+
87103
networks:
88104
jaeger-example:
105+
106+
volumes:
107+
otel_demo_db_data:

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ rootProject.name = "spring-boot-open-telemetry-demo"
22
include("spring-boot-3-demo-app")
33
include("common-internal-bom")
44
include("spring-boot-2-demo-app")
5+
include("db-migrations")

spring-boot-2-demo-app/build.gradle.kts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,22 @@ dependencies {
2424
}
2525
implementation("org.springframework.cloud:spring-cloud-sleuth-otel-autoconfigure")
2626

27+
implementation("org.springframework.boot:spring-boot-starter-jdbc")
28+
implementation("org.postgresql:postgresql")
29+
implementation("com.zaxxer:HikariCP")
30+
implementation(project(":db-migrations"))
31+
implementation("org.liquibase:liquibase-core")
32+
implementation("com.github.blagerweij:liquibase-sessionlock")
33+
implementation("net.ttddyy:datasource-proxy:1.9") {
34+
because("https://github.com/jdbc-observations/datasource-proxy/issues/111")
35+
}
36+
2737
testImplementation("org.springframework.boot:spring-boot-starter-test")
2838
testImplementation("org.springframework.boot:spring-boot-starter-webflux")
39+
testImplementation("org.testcontainers:postgresql")
2940
testImplementation("org.testcontainers:kafka")
3041
testImplementation("org.springframework.kafka:spring-kafka-test")
42+
testImplementation("org.awaitility:awaitility")
3143
}
3244

3345
springBoot {
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.github.mfvanek.spring.boot2.test.service;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.springframework.cloud.sleuth.Span;
7+
import org.springframework.cloud.sleuth.Tracer;
8+
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
9+
import org.springframework.kafka.annotation.KafkaListener;
10+
import org.springframework.kafka.support.Acknowledgment;
11+
import org.springframework.stereotype.Service;
12+
13+
import java.time.Clock;
14+
import java.time.LocalDateTime;
15+
import java.util.Map;
16+
import java.util.UUID;
17+
18+
@Slf4j
19+
@Service
20+
@RequiredArgsConstructor
21+
public class KafkaReadingService {
22+
23+
private final Tracer tracer;
24+
private final Clock clock;
25+
private final NamedParameterJdbcTemplate jdbcTemplate;
26+
27+
@KafkaListener(topics = "${spring.kafka.template.default-topic}")
28+
public void listen(ConsumerRecord<UUID, String> message, Acknowledgment ack) {
29+
final Span currentSpan = tracer.currentSpan();
30+
final String traceId = currentSpan != null ? currentSpan.context().traceId() : "";
31+
log.info("Received record: {} with traceId {}", message.value(), traceId);
32+
jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);",
33+
Map.ofEntries(
34+
Map.entry("msg", message.value()),
35+
Map.entry("traceId", traceId),
36+
Map.entry("createdAt", LocalDateTime.now(clock))
37+
)
38+
);
39+
ack.acknowledge();
40+
}
41+
}

0 commit comments

Comments
 (0)