Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions case-study-1/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@
- Pub/Sub client libraries
- Cloud SQL w/ PostgreSQL
- Cloud Run

### Run application locally

See [run the application locally](docs/run-application-locally.md).
81 changes: 81 additions & 0 deletions case-study-1/docs/run-application-locally.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Run the application locally

## Start a local Postgres

```bash
docker run -d --name local-pg \
-e POSTGRES_DB=session_db \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=password \
-p 5432:5432 postgres:16
```

## Connect with psql

```bash
docker run -it --rm \
--network host \
postgres:16 \
psql -h localhost -U postgres -d session_db
```

## Create database

```sql
CREATE DATABASE session_db
WITH
OWNER = postgres
TEMPLATE = postgres
ENCODING = 'UTF-8'
LC_COLLATE = 'C'
LC_CTYPE = 'C'
TABLESPACE = pg_default
CONNECTION LIMIT = -1
IS_TEMPLATE = False;
```

## Create Spring Session tables

```sql
DROP TABLE IF EXISTS SPRING_SESSION_ATTRIBUTES;
DROP TABLE IF EXISTS SPRING_SESSION;

CREATE TABLE IF NOT EXISTS SPRING_SESSION
(
PRIMARY_ID CHAR(36) NOT NULL,
SESSION_ID CHAR(36) NOT NULL,
CREATION_TIME BIGINT NOT NULL,
LAST_ACCESS_TIME BIGINT NOT NULL,
MAX_INACTIVE_INTERVAL INT NOT NULL,
EXPIRY_TIME BIGINT NOT NULL,
PRINCIPAL_NAME VARCHAR(100),
CONSTRAINT SPRING_SESSION_PK PRIMARY KEY (PRIMARY_ID)
);

CREATE UNIQUE INDEX IF NOT EXISTS SPRING_SESSION_IX1 ON SPRING_SESSION (SESSION_ID);
CREATE INDEX IF NOT EXISTS SPRING_SESSION_IX2 ON SPRING_SESSION (EXPIRY_TIME);
CREATE INDEX IF NOT EXISTS SPRING_SESSION_IX3 ON SPRING_SESSION (PRINCIPAL_NAME);

CREATE TABLE IF NOT EXISTS SPRING_SESSION_ATTRIBUTES
(
SESSION_PRIMARY_ID CHAR(36) NOT NULL,
ATTRIBUTE_NAME VARCHAR(200) NOT NULL,
ATTRIBUTE_BYTES BYTEA NOT NULL,
CONSTRAINT SPRING_SESSION_ATTRIBUTES_PK PRIMARY KEY (SESSION_PRIMARY_ID, ATTRIBUTE_NAME),
CONSTRAINT SPRING_SESSION_ATTRIBUTES_FK FOREIGN KEY (SESSION_PRIMARY_ID) REFERENCES SPRING_SESSION (PRIMARY_ID) ON DELETE CASCADE
);
```

## Verify Spring Session via logs

```bash
docker logs local-pg
```

---

## Run the application

```bash
./mvnw spring-boot:run
```
93 changes: 83 additions & 10 deletions case-study-1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>org.squidmin.java.spring.maven</groupId>
<artifactId>case-study-1</artifactId>
<version>0.0.1-SNAPSHOT</version>

<name>case-study-1</name>
<description>case-study-1</description>
<url/>
Expand All @@ -26,35 +29,69 @@
<tag/>
<url/>
</scm>

<properties>
<java.version>21</java.version>
<spring-cloud-gcp.version>7.4.0</spring-cloud-gcp.version>
<spring-cloud-gcp.version>4.7.2</spring-cloud-gcp.version>
<spring-cloud.version>2025.0.0</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-storage</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
<version>${spring-cloud-gcp.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter</artifactId>
</dependency>

<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-configuration-processor</artifactId>-->
<!-- <optional>true</optional>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-storage</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
Expand All @@ -72,27 +109,63 @@
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>

<!-- Apache Avro (excluding commons-lang3 to avoid conflicts) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.12.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
Expand Down
26 changes: 26 additions & 0 deletions case-study-1/scripts/publish_message_to_topic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Set vars (adjust to your values)
PROJECT_ID="lofty-root-378503"
TOPIC_ID="java21-spring3-maven-reference-topic"

# Payload must be base64. On macOS:
DATA_BASE64="$(printf '{"eventId":"9a7b127e-23b5-48a1-93dc-03c5c619c6b3","timestamp":"2023-10-01T12:00:00Z","messageType":"WIDGET_CREATED","records":[{"id":"9a7b127e-23b5-48a1-93dc-03c5c619c6b3","name":"Full Widget","createdAt":"2023-10-01T12:00:00Z","meta":{"color":"blue","size":"large","features":["waterproof","shockproof"]}}]}' | base64 | tr -d '\n')"

# Get an OAuth2 access token via gcloud
ACCESS_TOKEN="$(gcloud auth print-access-token)"

# Publish
curl -sS -X POST \
"https://pubsub.googleapis.com/v1/projects/${PROJECT_ID}/topics/${TOPIC_ID}:publish" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json; charset=utf-8" \
-d '{
"messages": [
{
"data": "'"${DATA_BASE64}"'",
"attributes": {
"source": "case-study-1",
"env": "dev"
}
}
]
}'
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.squidmin.java.spring.maven.casestudies;
package org.squidmin.java.spring.maven.casestudies.casestudy1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.squidmin.java.spring.maven.casestudies.casestudy1.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.support.MapJobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.transaction.PlatformTransactionManager;
import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.NormalizedWidget;
import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget;
import org.squidmin.java.spring.maven.casestudies.casestudy1.io.JsonToAvroProcessor;
import org.squidmin.java.spring.maven.casestudies.casestudy1.listeners.JobLoggerListener;
import org.squidmin.java.spring.maven.casestudies.casestudy1.listeners.StepLoggerListener;
import org.squidmin.java.spring.maven.casestudies.casestudy1.policies.TransientSkips;

import javax.sql.DataSource;

@Configuration
@EnableBatchProcessing
public class BatchJobConfig {

@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
factory.setTablePrefix("BATCH_");
factory.afterPropertiesSet();
return factory.getObject();
}

@Bean
public JobOperator jobOperator(JobExplorer jobExplorer, JobLauncher jobLauncher, JobRepository jobRepository, JobRegistry jobRegistry) {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobLauncher(jobLauncher);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
return jobOperator;
}

@Bean
public JobRegistry jobRegistry() {
return new MapJobRegistry();
}

@Bean
public Step normalizeWidgetsStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
JdbcPagingItemReader<Widget> widgetReader,
JsonToAvroProcessor processor,
ItemWriter<NormalizedWidget> widgetCompositeWriter,
StepLoggerListener stepLoggerListener,
TransientSkips skipPolicy) {

return new StepBuilder("normalizeWidgetsStep", jobRepository)
.<Widget, NormalizedWidget>chunk(500, txManager) // Process 500 items per chunk
.reader(widgetReader) // Read items from the database
.processor(processor) // Process items (normalize data)
.writer(widgetCompositeWriter) // Write to DB
.faultTolerant()
.skipPolicy(skipPolicy) // Skip invalid rows (e.g., missing field)
.retryLimit(5) // Retry transient errors up to 3 times
.retry(ConcurrencyFailureException.class) // Retry on database lock issues
.listener(stepLoggerListener) // Log step execution details
.build();
}

@Bean
public Job normalizeWidgetsJob(JobRepository jobRepository,
Step normalizeWidgetsStep,
JobLoggerListener jobLoggerListener) {
return new JobBuilder("normalizeWidgetsJob", jobRepository)
.listener(jobLoggerListener)
.start(normalizeWidgetsStep)
.build();
}

}
Loading