diff --git a/case-study-1/README.md b/case-study-1/README.md index 99b8a29..862f73c 100644 --- a/case-study-1/README.md +++ b/case-study-1/README.md @@ -7,3 +7,7 @@ - Pub/Sub client libraries - Cloud SQL w/ PostgreSQL - Cloud Run + +### Run application locally + +See [run the application locally](docs/run-application-locally.md). diff --git a/case-study-1/docs/run-application-locally.md b/case-study-1/docs/run-application-locally.md new file mode 100644 index 0000000..53965c9 --- /dev/null +++ b/case-study-1/docs/run-application-locally.md @@ -0,0 +1,81 @@ +# Run the application locally + +## Start a local Postgres + +```bash +docker run -d --name local-pg \ + -e POSTGRES_DB=session_db \ + -e POSTGRES_USER=postgres \ + -e POSTGRES_PASSWORD=password \ + -p 5432:5432 postgres:16 +``` + +## Connect with psql + +```bash +docker run -it --rm \ + --network host \ + postgres:16 \ + psql -h localhost -U postgres -d session_db +``` + +## Create database + +```sql +CREATE DATABASE session_db + WITH + OWNER = postgres + TEMPLATE = postgres + ENCODING = 'UTF-8' + LC_COLLATE = 'C' + LC_CTYPE = 'C' + TABLESPACE = pg_default + CONNECTION LIMIT = -1 + IS_TEMPLATE = False; +``` + +## Create Spring Session tables + +```sql +DROP TABLE IF EXISTS SPRING_SESSION_ATTRIBUTES; +DROP TABLE IF EXISTS SPRING_SESSION; + +CREATE TABLE IF NOT EXISTS SPRING_SESSION +( + PRIMARY_ID CHAR(36) NOT NULL, + SESSION_ID CHAR(36) NOT NULL, + CREATION_TIME BIGINT NOT NULL, + LAST_ACCESS_TIME BIGINT NOT NULL, + MAX_INACTIVE_INTERVAL INT NOT NULL, + EXPIRY_TIME BIGINT NOT NULL, + PRINCIPAL_NAME VARCHAR(100), + CONSTRAINT SPRING_SESSION_PK PRIMARY KEY (PRIMARY_ID) +); + +CREATE UNIQUE INDEX IF NOT EXISTS SPRING_SESSION_IX1 ON SPRING_SESSION (SESSION_ID); +CREATE INDEX IF NOT EXISTS SPRING_SESSION_IX2 ON SPRING_SESSION (EXPIRY_TIME); +CREATE INDEX IF NOT EXISTS SPRING_SESSION_IX3 ON SPRING_SESSION (PRINCIPAL_NAME); + +CREATE TABLE IF NOT EXISTS SPRING_SESSION_ATTRIBUTES +( + SESSION_PRIMARY_ID CHAR(36) NOT NULL, + ATTRIBUTE_NAME VARCHAR(200) NOT NULL, + ATTRIBUTE_BYTES BYTEA NOT NULL, + CONSTRAINT SPRING_SESSION_ATTRIBUTES_PK PRIMARY KEY (SESSION_PRIMARY_ID, ATTRIBUTE_NAME), + CONSTRAINT SPRING_SESSION_ATTRIBUTES_FK FOREIGN KEY (SESSION_PRIMARY_ID) REFERENCES SPRING_SESSION (PRIMARY_ID) ON DELETE CASCADE +); +``` + +## Verify Spring Session via logs + +```bash +docker logs local-pg +``` + +--- + +## Run the application + +```bash +./mvnw spring-boot:run +``` diff --git a/case-study-1/pom.xml b/case-study-1/pom.xml index 804bf5a..5c6c2c1 100644 --- a/case-study-1/pom.xml +++ b/case-study-1/pom.xml @@ -2,15 +2,18 @@ 4.0.0 + org.springframework.boot spring-boot-starter-parent 3.5.7 + org.squidmin.java.spring.maven case-study-1 0.0.1-SNAPSHOT + case-study-1 case-study-1 @@ -26,35 +29,69 @@ + 21 - 7.4.0 + 4.7.2 2025.0.0 + + + com.google.cloud + spring-cloud-gcp-starter-pubsub + + + com.google.cloud + spring-cloud-gcp-starter-storage + + + com.google.cloud + spring-cloud-gcp-pubsub-stream-binder + ${spring-cloud-gcp.version} + + + com.google.cloud + spring-cloud-gcp-starter + + + + + + + org.springframework.boot spring-boot-starter-data-jpa + + org.springframework.boot + spring-boot-starter-batch + org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-integration + org.springframework.boot spring-boot-starter-webflux - com.google.cloud - spring-cloud-gcp-starter + org.springframework.boot + spring-boot-starter-validation - com.google.cloud - spring-cloud-gcp-starter-pubsub + org.springframework.boot + spring-boot-configuration-processor + true - com.google.cloud - spring-cloud-gcp-starter-storage + org.springframework.cloud + spring-cloud-starter org.springframework.cloud @@ -72,27 +109,63 @@ postgresql runtime + + - org.springframework.boot - spring-boot-configuration-processor - true + org.apache.avro + avro + 1.12.0 + + + org.apache.commons + commons-lang3 + + + org.projectlombok lombok true + + + com.h2database + h2 + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + postgresql + test + + + org.testcontainers + junit-jupiter + test + org.springframework.boot spring-boot-starter-test test + + org.springframework.batch + spring-batch-test + test + io.projectreactor reactor-test test + diff --git a/case-study-1/scripts/publish_message_to_topic.sh b/case-study-1/scripts/publish_message_to_topic.sh new file mode 100755 index 0000000..5c26048 --- /dev/null +++ b/case-study-1/scripts/publish_message_to_topic.sh @@ -0,0 +1,26 @@ +# Set vars (adjust to your values) +PROJECT_ID="lofty-root-378503" +TOPIC_ID="java21-spring3-maven-reference-topic" + +# Payload must be base64. On macOS: +DATA_BASE64="$(printf '{"eventId":"9a7b127e-23b5-48a1-93dc-03c5c619c6b3","timestamp":"2023-10-01T12:00:00Z","messageType":"WIDGET_CREATED","records":[{"id":"9a7b127e-23b5-48a1-93dc-03c5c619c6b3","name":"Full Widget","createdAt":"2023-10-01T12:00:00Z","meta":{"color":"blue","size":"large","features":["waterproof","shockproof"]}}]}' | base64 | tr -d '\n')" + +# Get an OAuth2 access token via gcloud +ACCESS_TOKEN="$(gcloud auth print-access-token)" + +# Publish +curl -sS -X POST \ + "https://pubsub.googleapis.com/v1/projects/${PROJECT_ID}/topics/${TOPIC_ID}:publish" \ + -H "Authorization: Bearer ${ACCESS_TOKEN}" \ + -H "Content-Type: application/json; charset=utf-8" \ + -d '{ + "messages": [ + { + "data": "'"${DATA_BASE64}"'", + "attributes": { + "source": "case-study-1", + "env": "dev" + } + } + ] + }' diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/CaseStudy1Application.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/CaseStudy1Application.java similarity index 82% rename from case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/CaseStudy1Application.java rename to case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/CaseStudy1Application.java index 1428cfa..5b946a0 100644 --- a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/CaseStudy1Application.java +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/CaseStudy1Application.java @@ -1,4 +1,4 @@ -package org.squidmin.java.spring.maven.casestudies; +package org.squidmin.java.spring.maven.casestudies.casestudy1; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/BatchJobConfig.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/BatchJobConfig.java new file mode 100644 index 0000000..74ee1de --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/BatchJobConfig.java @@ -0,0 +1,93 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.config; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.JobRegistry; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.support.MapJobRegistry; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.batch.core.launch.support.SimpleJobOperator; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.database.JdbcPagingItemReader; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.dao.ConcurrencyFailureException; +import org.springframework.transaction.PlatformTransactionManager; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.NormalizedWidget; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget; +import org.squidmin.java.spring.maven.casestudies.casestudy1.io.JsonToAvroProcessor; +import org.squidmin.java.spring.maven.casestudies.casestudy1.listeners.JobLoggerListener; +import org.squidmin.java.spring.maven.casestudies.casestudy1.listeners.StepLoggerListener; +import org.squidmin.java.spring.maven.casestudies.casestudy1.policies.TransientSkips; + +import javax.sql.DataSource; + +@Configuration +@EnableBatchProcessing +public class BatchJobConfig { + + @Bean + public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { + JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); + factory.setDataSource(dataSource); + factory.setTransactionManager(transactionManager); + factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ"); + factory.setTablePrefix("BATCH_"); + factory.afterPropertiesSet(); + return factory.getObject(); + } + + @Bean + public JobOperator jobOperator(JobExplorer jobExplorer, JobLauncher jobLauncher, JobRepository jobRepository, JobRegistry jobRegistry) { + SimpleJobOperator jobOperator = new SimpleJobOperator(); + jobOperator.setJobExplorer(jobExplorer); + jobOperator.setJobLauncher(jobLauncher); + jobOperator.setJobRepository(jobRepository); + jobOperator.setJobRegistry(jobRegistry); + return jobOperator; + } + + @Bean + public JobRegistry jobRegistry() { + return new MapJobRegistry(); + } + + @Bean + public Step normalizeWidgetsStep(JobRepository jobRepository, + PlatformTransactionManager txManager, + JdbcPagingItemReader widgetReader, + JsonToAvroProcessor processor, + ItemWriter widgetCompositeWriter, + StepLoggerListener stepLoggerListener, + TransientSkips skipPolicy) { + + return new StepBuilder("normalizeWidgetsStep", jobRepository) + .chunk(500, txManager) // Process 500 items per chunk + .reader(widgetReader) // Read items from the database + .processor(processor) // Process items (normalize data) + .writer(widgetCompositeWriter) // Write to DB + .faultTolerant() + .skipPolicy(skipPolicy) // Skip invalid rows (e.g., missing field) + .retryLimit(5) // Retry transient errors up to 3 times + .retry(ConcurrencyFailureException.class) // Retry on database lock issues + .listener(stepLoggerListener) // Log step execution details + .build(); + } + + @Bean + public Job normalizeWidgetsJob(JobRepository jobRepository, + Step normalizeWidgetsStep, + JobLoggerListener jobLoggerListener) { + return new JobBuilder("normalizeWidgetsJob", jobRepository) + .listener(jobLoggerListener) + .start(normalizeWidgetsStep) + .build(); + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/GcsConfig.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/GcsConfig.java new file mode 100644 index 0000000..4be2e3e --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/GcsConfig.java @@ -0,0 +1,89 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.config; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ImpersonatedCredentials; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.io.IOException; +import java.util.List; + +@Configuration +@Slf4j +public class GcsConfig { + + private final String projectId; + private final String gcsPrefix; + private final String bucketName; + private final String batchUploadPrefix; + private final String impersonationTarget; + private final String accessToken; + private final String gkmsKeyName; + + public GcsConfig(@Value("${spring.cloud.gcp.project-id:#{systemEnvironment['PROJECT_ID']}}") String projectId, + @Value("${gcp.storage.gcs-prefix:#{systemEnvironment['GCS_PREFIX']}}") String gcsPrefix, + @Value("${gcp.storage.bucket.name:#{systemEnvironment['GCS_BUCKET_NAME']}}") String bucketName, + @Value("${gcp.storage.bucket.batch-upload-prefix:#{systemEnvironment['BATCH_UPLOAD_PREFIX']}}") String batchUploadPrefix, + @Value("${gcp.auth.impersonation-target:#{systemEnvironment['IMPERSONATION_TARGET']}}") String impersonationTarget, + @Value("${gcp.auth.access-token:#{systemEnvironment['OAUTH_ACCESS_TOKEN']}}") String accessToken, + @Value("${gcp.kms.key-name:#{systemEnvironment['GKMS_KEY_NAME']}}") String gkmsKeyName) { + + this.projectId = projectId; + this.gcsPrefix = gcsPrefix; + this.bucketName = bucketName; + this.batchUploadPrefix = batchUploadPrefix; + this.impersonationTarget = impersonationTarget; + this.accessToken = accessToken; + this.gkmsKeyName = gkmsKeyName; + + } + + @Bean + public Storage storage() throws IOException { + GoogleCredentials userCreds = GoogleCredentials.getApplicationDefault() + .createScoped("https://www.googleapis.com/auth/cloud-platform"); + + ImpersonatedCredentials saCreds = ImpersonatedCredentials.create( + userCreds, + getImpersonationTarget(), // e.g. "gcs-cmek-test-sa@lofty-root-378503.iam.gserviceaccount.com" + null, // delegates + List.of("https://www.googleapis.com/auth/cloud-platform"), + 3600 // lifetime in seconds + ); + + return StorageOptions.newBuilder() + .setProjectId(projectId) + .setCredentials(saCreds) + .build() + .getService(); + } + + public String getGcsPrefix() { + return gcsPrefix; + } + + public String getBucketName() { + return bucketName; + } + + public String getBatchUploadPrefix() { + return batchUploadPrefix; + } + + public String getImpersonationTarget() { + return impersonationTarget; + } + + public String getAccessToken() { + return accessToken; + } + + public String getGkmsKeyName() { + return gkmsKeyName; + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/GoogleCredentialsConfig.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/GoogleCredentialsConfig.java new file mode 100644 index 0000000..8bbe8ff --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/GoogleCredentialsConfig.java @@ -0,0 +1,20 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.config; + +import com.google.auth.oauth2.GoogleCredentials; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.io.IOException; + +@Configuration +public class GoogleCredentialsConfig { + + @Bean + public GoogleCredentials applicationDefaultCredentials() throws IOException { + GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault() + .createScoped("https://www.googleapis.com/auth/cloud-platform"); + googleCredentials.refreshIfExpired(); + return googleCredentials; + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/PubSubConfig.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/PubSubConfig.java new file mode 100644 index 0000000..6e76524 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/PubSubConfig.java @@ -0,0 +1,55 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class PubSubConfig { + + private final String projectId; + private final String topic; + private final String subscription; + private final String role; + private final String orderingKey; + private final String maxRetries; + + public PubSubConfig(@Value("${spring.cloud.gcp.project-id}") String projectId, + @Value("${gcp.pubsub.topic.name}") String topic, + @Value("${gcp.pubsub.subscription.name}") String subscription, + @Value("${gcp.pubsub.role.name:roles/pubsub.subscriber}") String role, + @Value("${gcp.pubsub.ordering-key:}") String orderingKey, + @Value("${gcp.pubsub.max-retries:5}") String maxRetries) { + + this.projectId = projectId; + this.topic = topic; + this.subscription = subscription; + this.role = role; + this.orderingKey = orderingKey; + this.maxRetries = maxRetries; + } + + public String getProjectId() { + return projectId; + } + + public String getTopic() { + return topic; + } + + public String getSubscription() { + return subscription; + } + + public String getRole() { + return role; + } + + public String getOrderingKey() { + return orderingKey; + } + + public String getMaxRetries() { + return maxRetries; + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/SkipPolicies.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/SkipPolicies.java new file mode 100644 index 0000000..6626351 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/SkipPolicies.java @@ -0,0 +1,41 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.config; + +import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; +import org.springframework.batch.core.step.skip.SkipPolicy; +import org.springframework.stereotype.Component; + +import java.sql.SQLTransientException; + +/** + * Defines various skip policies for handling errors during batch processing. + * These policies determine whether to skip certain exceptions or not. + * You can customize these policies based on your application's requirements. + * For example, you might want to skip transient errors but fail on validation errors. + * Each policy is implemented as a Spring component for easy integration. + * + * @see org.springframework.batch.core.step.skip.SkipPolicy + */ +@Component +public class SkipPolicies { + + /** + * Example: skip common transient/validation issues; fail on others. + */ + @Component + public static class TransientSkips implements SkipPolicy { + @Override + public boolean shouldSkip(Throwable t, long skipCount) { + if (t instanceof IllegalArgumentException) { + return true; // invalid row content + } else return t instanceof SQLTransientException; // transient DB issues + } + } + + /** + * Drop-in to skip everything (for quick tests) + */ + @Component + public static class SkipAll extends AlwaysSkipItemSkipPolicy { + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/Validators.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/Validators.java new file mode 100644 index 0000000..a74afb8 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/config/Validators.java @@ -0,0 +1,18 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.config; + +import jakarta.validation.Validation; +import jakarta.validation.Validator; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Configuration class to provide a Validator bean for validating objects. + * This is useful for ensuring data integrity and enforcing constraints. + */ +@Configuration +public class Validators { + @Bean + public Validator validator() { + return Validation.buildDefaultValidatorFactory().getValidator(); + } +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/controller/WidgetController.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/controller/WidgetController.java new file mode 100644 index 0000000..38c17f1 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/controller/WidgetController.java @@ -0,0 +1,39 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.controller; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget; +import org.squidmin.java.spring.maven.casestudies.casestudy1.service.WidgetService; + +import java.util.List; +import java.util.UUID; + +@RestController("case-study-1/api/widgets") +public class WidgetController { + + private static final Logger log = LoggerFactory.getLogger(WidgetController.class); + + private final WidgetService widgetService; + + public WidgetController(WidgetService widgetService) { + this.widgetService = widgetService; + } + + @GetMapping + public List list(@RequestParam(name = "q", required = false) String q) { + return widgetService.searchByName(q); + } + + @GetMapping("{id}") + public ResponseEntity get(@PathVariable UUID id) { + return widgetService.findById(id) + .map(ResponseEntity::ok) + .orElse(ResponseEntity.notFound().build()); + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/domain/NormalizedWidget.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/domain/NormalizedWidget.java new file mode 100644 index 0000000..1351c78 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/domain/NormalizedWidget.java @@ -0,0 +1,63 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.domain; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import java.time.OffsetDateTime; + +@Entity +@Table(name = "normalized_widget") +public class NormalizedWidget { + + @Id + private Long id; // same as source for idempotency + + @Column(nullable = false, unique = true) + private String externalOrderId; + + @Column(nullable = false, unique = true) + private String name; + + @Column(nullable = false) + private OffsetDateTime createdAt; + + @Column(nullable = false) + private boolean cancelled; + + protected NormalizedWidget() {} + + public NormalizedWidget(Long id, + String name, + String externalId, + OffsetDateTime createdAt, + boolean cancelled) { + + this.id = id; + this.name = name; + this.externalOrderId = externalId; + this.createdAt = createdAt; + this.cancelled = cancelled; + } + + public Long getId() { return id; } + + public void setId(Long id) { this.id = id; } + + public String getName() { return name; } + + public String getExternalOrderId() { return externalOrderId; } + + public void setExternalOrderId(String externalOrderId) { this.externalOrderId = externalOrderId; } + + public OffsetDateTime getCreatedAt() { return createdAt; } + + public void setCreatedAt(OffsetDateTime createdAt) { this.createdAt = createdAt; } + + public boolean isCancelled() { return cancelled; } + + public void setCancelled(boolean cancelled) { this.cancelled = cancelled; } + +} + diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/domain/TextMessage.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/domain/TextMessage.java new file mode 100644 index 0000000..b7ee578 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/domain/TextMessage.java @@ -0,0 +1,4 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.domain; + +public class TextMessage { +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/domain/Widget.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/domain/Widget.java new file mode 100644 index 0000000..4691630 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/domain/Widget.java @@ -0,0 +1,58 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.domain; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import org.hibernate.annotations.JdbcTypeCode; +import org.hibernate.type.SqlTypes; + +import java.time.Instant; +import java.util.Map; +import java.util.UUID; + +@Entity +@Table(name = "widgets") +public class Widget { + + @Id + @Column(name = "id", nullable = false, columnDefinition = "uuid") + private UUID id; + + @Column(name = "name", nullable = false) + private String name; + + // DB sets default now(); mark read-only so we don’t overwrite it + @Column(name = "created_at", columnDefinition = "timestamptz", updatable = false, insertable = false) + private Instant createdAt; + + @JdbcTypeCode(SqlTypes.JSON) + @Column(name = "meta", columnDefinition = "jsonb") + private Map meta; + + public Widget() {} + + public Widget(UUID id, String name) { + this.id = id; + this.name = name; + } + + public Widget(UUID id, String name, Instant createdAt, Map meta) { + this.id = id; + this.name = name; + this.createdAt = createdAt; + this.meta = meta; + } + + public UUID getId() { return id; } + public void setId(UUID id) { this.id = id; } + + public String getName() { return name; } + public void setName(String name) { this.name = name; } + + public Instant getCreatedAt() { return createdAt; } + + public Map getMeta() { return meta; } + public void setMeta(Map meta) { this.meta = meta; } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/dto/ExampleInboundMessage.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/dto/ExampleInboundMessage.java new file mode 100644 index 0000000..c015150 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/dto/ExampleInboundMessage.java @@ -0,0 +1,5 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.dto; + +public sealed abstract class ExampleInboundMessage permits TextMessagePayload, WidgetPayload { + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/dto/TextMessagePayload.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/dto/TextMessagePayload.java new file mode 100644 index 0000000..1ba0c0f --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/dto/TextMessagePayload.java @@ -0,0 +1,20 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.dto; + +import lombok.*; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.TextMessage; + +import java.util.List; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +public final class TextMessagePayload extends ExampleInboundMessage { + + private String messageId; + private String messageType; + + private List records; + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/dto/WidgetPayload.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/dto/WidgetPayload.java new file mode 100644 index 0000000..05a09bd --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/dto/WidgetPayload.java @@ -0,0 +1,21 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.dto; + +import lombok.*; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget; + +import java.util.List; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +public final class WidgetPayload extends ExampleInboundMessage { + + private String eventId; + private String timestamp; + private String messageType; + + private List records; + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/io/GcsWriter.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/io/GcsWriter.java new file mode 100644 index 0000000..48d5d86 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/io/GcsWriter.java @@ -0,0 +1,16 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.io; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.stereotype.Component; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.NormalizedWidget; + +@Component +public class GcsWriter implements ItemWriter { + + @Override + public void write(Chunk chunk) { + + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/io/JsonToAvroProcessor.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/io/JsonToAvroProcessor.java new file mode 100644 index 0000000..93213f3 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/io/JsonToAvroProcessor.java @@ -0,0 +1,42 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.io; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.NormalizedWidget; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget; +import org.squidmin.java.spring.maven.casestudies.casestudy1.util.AvroUtil; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Random; +import java.util.UUID; + +@Component +public class JsonToAvroProcessor implements ItemProcessor { + + private final AvroUtil avroUtil; + private final ObjectMapper objectMapper; + + public JsonToAvroProcessor(AvroUtil avroUtil, ObjectMapper objectMapper) { + this.avroUtil = avroUtil; + this.objectMapper = objectMapper; + } + + @Override + public NormalizedWidget process(Widget in) { + // Randomly fail to simulate transient errors +// if (null == in.getMeta()) { +// return null; +// } + // Convert widget to Avro format + return new NormalizedWidget( + new Random().nextLong(1000), + in.getName(), + UUID.randomUUID().toString(), + LocalDateTime.now().atOffset(ZoneOffset.UTC), + false + ); + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/io/PostgresReader.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/io/PostgresReader.java new file mode 100644 index 0000000..6cad3ff --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/io/PostgresReader.java @@ -0,0 +1,68 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.io; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.batch.item.database.JdbcPagingItemReader; +import org.springframework.batch.item.database.Order; +import org.springframework.batch.item.database.PagingQueryProvider; +import org.springframework.batch.item.database.support.PostgresPagingQueryProvider; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Component; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget; + +import javax.sql.DataSource; +import java.sql.ResultSet; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.UUID; + +@Component +public class PostgresReader { + + private final ObjectMapper objectMapper; + + public PostgresReader() { + this.objectMapper = new ObjectMapper(); + } + + @Bean + public JdbcPagingItemReader widgetReader(DataSource dataSource) { + var reader = new JdbcPagingItemReader(); + reader.setName("widgetReader"); + reader.setDataSource(dataSource); + reader.setPageSize(500); + reader.setRowMapper(widgetRowMapper()); + reader.setQueryProvider(queryProvider()); + reader.setParameterValues(Map.of("createdAfter", OffsetDateTime.now().minusMinutes(60))); + return reader; + } + + private PagingQueryProvider queryProvider() { + var provider = new PostgresPagingQueryProvider(); + provider.setSelectClause("SELECT id, name, meta, created_at"); + provider.setFromClause("FROM widgets"); + provider.setWhereClause("WHERE created_at >= :createdAfter"); + provider.setSortKeys(Map.of("id", Order.ASCENDING)); + return provider; + } + + @SuppressWarnings("unchecked") + private RowMapper widgetRowMapper() { + return (ResultSet rs, int rowNum) -> { + UUID id = (UUID) rs.getObject("id"); + String name = rs.getString("name"); + Instant createdAt = rs.getTimestamp("created_at").toInstant(); + String metaJson = rs.getString("meta"); + Map meta = null; + try { + meta = objectMapper.readValue(metaJson, Map.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + return new Widget(id, name, createdAt, meta); + }; + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/listeners/JobLoggerListener.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/listeners/JobLoggerListener.java new file mode 100644 index 0000000..7a005ec --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/listeners/JobLoggerListener.java @@ -0,0 +1,20 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.listeners; + +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.stereotype.Component; + +@Component +public class JobLoggerListener implements JobExecutionListener { + + @Override + public void beforeJob(JobExecution jobExecution) { + System.out.println("Job '" + jobExecution.getJobInstance().getJobName() + "' starting."); + } + + @Override + public void afterJob(JobExecution jobExecution) { + System.out.println("Job '" + jobExecution.getJobInstance().getJobName() + "' finished with status: " + jobExecution.getStatus()); + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/listeners/StepLoggerListener.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/listeners/StepLoggerListener.java new file mode 100644 index 0000000..e2f20cc --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/listeners/StepLoggerListener.java @@ -0,0 +1,16 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.listeners; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.stereotype.Component; + +@Component +public class StepLoggerListener implements StepExecutionListener { + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + return stepExecution.getExitStatus(); + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/policies/TransientSkips.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/policies/TransientSkips.java new file mode 100644 index 0000000..e4bc416 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/policies/TransientSkips.java @@ -0,0 +1,16 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.policies; + +import org.springframework.batch.core.step.skip.SkipLimitExceededException; +import org.springframework.batch.core.step.skip.SkipPolicy; +import org.springframework.batch.item.ParseException; +import org.springframework.stereotype.Component; + +@Component +public class TransientSkips implements SkipPolicy { + + @Override + public boolean shouldSkip(Throwable t, long skipCount) throws SkipLimitExceededException { + return (t instanceof ParseException && skipCount <= 10) || (t instanceof IllegalArgumentException); + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/repository/GcsRepository.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/repository/GcsRepository.java new file mode 100644 index 0000000..79083a8 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/repository/GcsRepository.java @@ -0,0 +1,4 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.repository; + +public interface GcsRepository { +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/repository/WidgetRepository.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/repository/WidgetRepository.java new file mode 100644 index 0000000..23e0576 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/repository/WidgetRepository.java @@ -0,0 +1,13 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.repository; + +import org.springframework.data.jpa.repository.JpaRepository; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget; + +import java.util.List; +import java.util.UUID; + +public interface WidgetRepository extends JpaRepository { + + List findByNameContainingIgnoreCase(String q); + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/repository/impl/GcsRepositoryImpl.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/repository/impl/GcsRepositoryImpl.java new file mode 100644 index 0000000..5708e1e --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/repository/impl/GcsRepositoryImpl.java @@ -0,0 +1,13 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.repository.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Repository; +import org.squidmin.java.spring.maven.casestudies.casestudy1.repository.GcsRepository; + +@Repository +public class GcsRepositoryImpl implements GcsRepository { + + private static final Logger log = LoggerFactory.getLogger(GcsRepositoryImpl.class); + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/GcsService.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/GcsService.java new file mode 100644 index 0000000..01b9673 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/GcsService.java @@ -0,0 +1,5 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.service; + +public interface GcsService { + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/GcsUploadJobRunner.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/GcsUploadJobRunner.java new file mode 100644 index 0000000..2775099 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/GcsUploadJobRunner.java @@ -0,0 +1,33 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.service; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; + +@ConditionalOnProperty(name = "app.jobs.enabled", havingValue = "true", matchIfMissing = false) +@Profile("!test") +@Component +public class GcsUploadJobRunner implements CommandLineRunner { + + private final JobLauncher jobLauncher; + private final Job postgresToGcsJob; + + public GcsUploadJobRunner(JobLauncher jobLauncher, Job postgresToGcsJob) { + this.jobLauncher = jobLauncher; + this.postgresToGcsJob = postgresToGcsJob; + } + + @Override + public void run(String... args) throws Exception { + JobParameters jobParameters = new JobParametersBuilder() + .addLong("startTime", System.currentTimeMillis()) + .toJobParameters(); + jobLauncher.run(postgresToGcsJob, jobParameters); + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/WidgetService.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/WidgetService.java new file mode 100644 index 0000000..8ec3a3a --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/WidgetService.java @@ -0,0 +1,42 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.service; + +import org.springframework.stereotype.Service; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget; +import org.squidmin.java.spring.maven.casestudies.casestudy1.repository.WidgetRepository; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +@Service +public class WidgetService { + + private final WidgetRepository repo; + + public WidgetService(WidgetRepository repo) { + this.repo = repo; + } + + /** + * Read all rows + */ + public List findAll() { + return repo.findAll(); + } + + /** + * Read by primary key + */ + public Optional findById(UUID id) { + return repo.findById(id); + } + + /** + * Read by name (case-insensitive contains) + */ + public List searchByName(String q) { + return (q == null || q.isBlank()) ? repo.findAll() + : repo.findByNameContainingIgnoreCase(q); + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/impl/GcsServiceImpl.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/impl/GcsServiceImpl.java new file mode 100644 index 0000000..89375f7 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/service/impl/GcsServiceImpl.java @@ -0,0 +1,11 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.service.impl; + +import org.springframework.stereotype.Service; +import org.squidmin.java.spring.maven.casestudies.casestudy1.service.GcsService; + +@Service +public class GcsServiceImpl implements GcsService { + + + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/subscribe/ExampleSubscriber.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/subscribe/ExampleSubscriber.java new file mode 100644 index 0000000..699bed8 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/subscribe/ExampleSubscriber.java @@ -0,0 +1,38 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.subscribe; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; +import org.squidmin.java.spring.maven.casestudies.casestudy1.dto.WidgetPayload; +import org.squidmin.java.spring.maven.casestudies.casestudy1.util.Loggable; + +import java.util.function.Consumer; + +@Component +@Loggable +public class ExampleSubscriber { + + private final ObjectMapper objectMapper; + + private Logger log; + + public ExampleSubscriber(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Bean + public Consumer> ingest() { + return message -> { + try { + WidgetPayload payload = objectMapper.readValue(message.getPayload(), WidgetPayload.class); + log.info("Received message payload: {}", objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(payload)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }; + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/util/AvroUtil.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/util/AvroUtil.java new file mode 100644 index 0000000..fb79ced --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/util/AvroUtil.java @@ -0,0 +1,51 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.util; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.logging.log4j.util.Strings; +import org.springframework.stereotype.Component; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +@Component +public class AvroUtil { + + public String getAvroSchema() throws IOException { + String schema = Strings.EMPTY; + try (InputStream is = AvroUtil.class.getClassLoader().getResourceAsStream("avro/schema.json")) { + if (is != null) { + schema = new String(is.readAllBytes(), StandardCharsets.UTF_8); + } + } + return schema; + } + + public byte[] serializeToAvro(List items) throws IOException { + Schema schema = new Schema.Parser().parse(getAvroSchema()); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(schema); + DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter); + dataFileWriter.create(schema, out); + + for (Widget item : items) { + GenericRecord record = new GenericData.Record(schema); + record.put("id", item.getId()); + record.put("createdAt", item.getCreatedAt()); + record.put("name", item.getName()); + record.put("meta", item.getMeta()); + dataFileWriter.append(record); + } + + dataFileWriter.close(); + return out.toByteArray(); + } + +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/util/Loggable.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/util/Loggable.java new file mode 100644 index 0000000..0456142 --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/util/Loggable.java @@ -0,0 +1,11 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.util; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface Loggable { +} diff --git a/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/util/LoggerAspect.java b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/util/LoggerAspect.java new file mode 100644 index 0000000..b7e96ae --- /dev/null +++ b/case-study-1/src/main/java/org/squidmin/java/spring/maven/casestudies/casestudy1/util/LoggerAspect.java @@ -0,0 +1,31 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1.util; + +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.lang.reflect.Field; + +@Aspect +@Component +public class LoggerAspect { + + @Before(value = "@within(org.squidmin.java.spring.maven.casestudies.casestudy1.util.Loggable) && target(bean)") + public void injectLogger(Object bean) { + Class clazz = bean.getClass(); + try { + Field logField = clazz.getDeclaredField("log"); + if (logField != null && Logger.class.isAssignableFrom(logField.getType())) { + logField.setAccessible(true); + Logger logger = LoggerFactory.getLogger(clazz); + logField.set(bean, logger); + } + } catch (NoSuchFieldException ignored) { + // No field named 'log', skip injection + } catch (IllegalAccessException e) { + throw new RuntimeException("Failed to inject logger into " + clazz.getName(), e); + } + } +} diff --git a/case-study-1/src/main/resources/application.yml b/case-study-1/src/main/resources/application.yml index 725abd7..d15ad61 100644 --- a/case-study-1/src/main/resources/application.yml +++ b/case-study-1/src/main/resources/application.yml @@ -1,13 +1,29 @@ spring: application: name: case-study-1 - session: - store-type: jdbc # Use JDBC for session management + batch: jdbc: - initialize-schema: always # Ensures schema.sql runs on startup (use 'never' in production) + initialize-schema: always # Ensures Spring Batch schema is created on startup. +# job: +# enabled: false # Disable automatic job execution on startup + cloud: + gcp: + project-id: lofty-root-378503 + stream: + bindings: + ingest-in-0: + destination: ${gcp.pubsub.topic.name} + content-type: text/plain + group: ${gcp.pubsub.subscription.name} + gcp: + pubsub: + bindings: + input-in-0: + consumer: + ack-mode: MANUAL test: database: - replace: none # Prevents Spring Boot from replacing the datasource with an in-memory database during tests + replace: none # Prevents Spring Boot from replacing the datasource with an in-memory database during tests. security: user: name: admin @@ -19,8 +35,8 @@ spring: driver-class-name: org.postgresql.Driver sql: init: - mode: always # Ensures schema.sql runs on startup (use 'never' in production) - platform: postgres # Prevents Hibernate from overriding the jsonb data type with varchar data type. + mode: always # Ensures schema.sql runs on startup (use 'never' in production). + platform: postgres # Prevents Hibernate from overriding the jsonb data type with varchar data type. jpa: hibernate: ddl-auto: update @@ -29,11 +45,32 @@ spring: hibernate: format_sql: true dialect: org.hibernate.dialect.PostgreSQLDialect +# main: +# allow-bean-definition-overriding: true + +gcp: + storage: + gcs-prefix: gs:// + bucket: + batch-upload-prefix: batch_uploads/ + name: jm_lofty-root-cmek-test + pubsub: + topic: + name: java21-spring3-maven-reference-topic + subscription: + name: java21-spring3-maven-reference-subscription + role: + name: roles/pubsub.editor + ordering-key: test-ordering-key + max-retries: 5 server: port: 8080 management: + endpoint: + health: + show-details: always endpoints: web: exposure: @@ -44,3 +81,7 @@ logging: org: springframework: security: DEBUG + +app: + jobs: + enabled: true diff --git a/case-study-1/src/main/resources/avro/schema.json b/case-study-1/src/main/resources/avro/schema.json new file mode 100644 index 0000000..87f6661 --- /dev/null +++ b/case-study-1/src/main/resources/avro/schema.json @@ -0,0 +1,10 @@ +{ + "type": "record", + "name": "Widget", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "name", "type": "string" }, + { "name": "createdAt", "type": "string" }, + { "name": "meta", "type": "record" } + ] +} diff --git a/case-study-1/src/main/resources/schema.sql b/case-study-1/src/main/resources/schema.sql new file mode 100644 index 0000000..96dd256 --- /dev/null +++ b/case-study-1/src/main/resources/schema.sql @@ -0,0 +1,108 @@ +DROP TABLE IF EXISTS SPRING_SESSION_ATTRIBUTES; +DROP TABLE IF EXISTS SPRING_SESSION; + +CREATE TABLE IF NOT EXISTS SPRING_SESSION +( + PRIMARY_ID CHAR(36) NOT NULL, + SESSION_ID CHAR(36) NOT NULL, + CREATION_TIME BIGINT NOT NULL, + LAST_ACCESS_TIME BIGINT NOT NULL, + MAX_INACTIVE_INTERVAL INT NOT NULL, + EXPIRY_TIME BIGINT NOT NULL, + PRINCIPAL_NAME VARCHAR(100), + CONSTRAINT SPRING_SESSION_PK PRIMARY KEY (PRIMARY_ID) +); + +CREATE UNIQUE INDEX IF NOT EXISTS SPRING_SESSION_IX1 ON SPRING_SESSION (SESSION_ID); +CREATE INDEX IF NOT EXISTS SPRING_SESSION_IX2 ON SPRING_SESSION (EXPIRY_TIME); +CREATE INDEX IF NOT EXISTS SPRING_SESSION_IX3 ON SPRING_SESSION (PRINCIPAL_NAME); + +CREATE TABLE IF NOT EXISTS SPRING_SESSION_ATTRIBUTES +( + SESSION_PRIMARY_ID CHAR(36) NOT NULL, + ATTRIBUTE_NAME VARCHAR(200) NOT NULL, + ATTRIBUTE_BYTES BYTEA NOT NULL, + CONSTRAINT SPRING_SESSION_ATTRIBUTES_PK PRIMARY KEY (SESSION_PRIMARY_ID, ATTRIBUTE_NAME), + CONSTRAINT SPRING_SESSION_ATTRIBUTES_FK FOREIGN KEY (SESSION_PRIMARY_ID) REFERENCES SPRING_SESSION (PRIMARY_ID) ON DELETE CASCADE +); + +-- Autogenerated: do not edit this file + +CREATE TABLE IF NOT EXISTS BATCH_JOB_INSTANCE ( + JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY, + VERSION BIGINT, + JOB_NAME VARCHAR(100) NOT NULL, + JOB_KEY VARCHAR(32) NOT NULL, + constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION ( + JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + VERSION BIGINT, + JOB_INSTANCE_ID BIGINT NOT NULL, + CREATE_TIME TIMESTAMP NOT NULL, + START_TIME TIMESTAMP DEFAULT NULL, + END_TIME TIMESTAMP DEFAULT NULL, + STATUS VARCHAR(10), + EXIT_CODE VARCHAR(2500), + EXIT_MESSAGE VARCHAR(2500), + LAST_UPDATED TIMESTAMP, + constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID) + references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION_PARAMS ( + JOB_EXECUTION_ID BIGINT NOT NULL, + PARAMETER_NAME VARCHAR(100) NOT NULL, + PARAMETER_TYPE VARCHAR(100) NOT NULL, + PARAMETER_VALUE VARCHAR(2500), + IDENTIFYING CHAR(1) NOT NULL, + constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) + references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_STEP_EXECUTION ( + STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + VERSION BIGINT NOT NULL, + STEP_NAME VARCHAR(100) NOT NULL, + JOB_EXECUTION_ID BIGINT NOT NULL, + CREATE_TIME TIMESTAMP NOT NULL, + START_TIME TIMESTAMP DEFAULT NULL, + END_TIME TIMESTAMP DEFAULT NULL, + STATUS VARCHAR(10), + COMMIT_COUNT BIGINT, + READ_COUNT BIGINT, + FILTER_COUNT BIGINT, + WRITE_COUNT BIGINT, + READ_SKIP_COUNT BIGINT, + WRITE_SKIP_COUNT BIGINT, + PROCESS_SKIP_COUNT BIGINT, + ROLLBACK_COUNT BIGINT, + EXIT_CODE VARCHAR(2500), + EXIT_MESSAGE VARCHAR(2500), + LAST_UPDATED TIMESTAMP, + constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID) + references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_STEP_EXECUTION_CONTEXT ( + STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + SHORT_CONTEXT VARCHAR(2500) NOT NULL, + SERIALIZED_CONTEXT TEXT, + constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID) + references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION_CONTEXT ( + JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + SHORT_CONTEXT VARCHAR(2500) NOT NULL, + SERIALIZED_CONTEXT TEXT, + constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) + references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) +) ; + +CREATE SEQUENCE IF NOT EXISTS BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE; +CREATE SEQUENCE IF NOT EXISTS BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE; +CREATE SEQUENCE IF NOT EXISTS BATCH_JOB_INSTANCE_SEQ MAXVALUE 9223372036854775807 NO CYCLE; + +CREATE SEQUENCE IF NOT EXISTS BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE; diff --git a/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/CaseStudy1ApplicationTests.java b/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/CaseStudy1ApplicationTests.java deleted file mode 100644 index c86c4c8..0000000 --- a/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/CaseStudy1ApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.squidmin.java.spring.maven.casestudies; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class CaseStudy1ApplicationTests { - - @Test - void contextLoads() { - } - -} diff --git a/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/BatchJobIntegrationTest.java b/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/BatchJobIntegrationTest.java new file mode 100644 index 0000000..da4c3b8 --- /dev/null +++ b/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/BatchJobIntegrationTest.java @@ -0,0 +1,155 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.postgresql.util.PGobject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.*; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.batch.test.context.SpringBatchTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest +@SpringBatchTest +@Testcontainers +@Import(BatchJobIntegrationTest.TestSupport.class) +@Disabled +class BatchJobIntegrationTest { + + private static final Logger log = LoggerFactory.getLogger(BatchJobIntegrationTest.class); + + @Container + static final PostgreSQLContainer POSTGRES = + new PostgreSQLContainer<>("postgres:15-alpine") + .withDatabaseName("testdb") + .withUsername("test") + .withPassword("test"); + + @DynamicPropertySource + static void registerProps(DynamicPropertyRegistry registry) { + registry.add("spring.datasource.url", POSTGRES::getJdbcUrl); + registry.add("spring.datasource.username", POSTGRES::getUsername); + registry.add("spring.datasource.password", POSTGRES::getPassword); + registry.add("spring.jpa.hibernate.ddl-auto", () -> "update"); + } + + @Autowired + private JdbcTemplate jdbc; + @Autowired + private Job normalizeWidgetsJob; + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @BeforeEach + void setupSchemaAndData() { + try { + jdbc.execute("DELETE FROM widgets"); + + jdbc.update(""" + INSERT INTO widgets(id, name, meta, created_at) + VALUES (?, ?, ?, ?)""", + UUID.randomUUID(), + "Widget A", + createJsonbObject("{\"color\": \"red\", \"size\": \"large\"}"), + LocalDateTime.now(ZoneOffset.UTC)); + + jdbc.update(""" + INSERT INTO widgets(id, name, meta, created_at) + VALUES (?, ?, ?, ?)""", + UUID.randomUUID(), + "Widget B", + createJsonbObject("{\"color\": \"blue\", \"size\": \"medium\"}"), + LocalDateTime.now(ZoneOffset.UTC)); + + int count = jdbc.queryForObject("SELECT COUNT(*) FROM widgets", Integer.class); + log.info("Number of widgets in the database: {}", count); + + log.info("Test data inserted into widgets table."); + } catch (Exception e) { + log.error("Error setting up schema and data", e); + throw e; + } + } + +// @AfterEach +// void cleanup() { +// jdbc.execute("DROP TABLE IF EXISTS widgets"); +// } + + @Test + void job_end_to_end_processes_and_writes_data() throws Exception { + try { + jdbc.execute("DELETE FROM widgets"); + + jdbc.update(""" + INSERT INTO widgets(id, name, meta, created_at) + VALUES (?, ?, ?, ?)""", + UUID.randomUUID(), + "Widget A", + createJsonbObject("{\"color\": \"red\", \"size\": \"large\"}"), + LocalDateTime.now(ZoneOffset.UTC)); + + jdbc.update(""" + INSERT INTO widgets(id, name, meta, created_at) + VALUES (?, ?, ?, ?)""", + UUID.randomUUID(), + "Widget B", + createJsonbObject("{\"color\": \"blue\", \"size\": \"medium\"}"), + LocalDateTime.now(ZoneOffset.UTC)); + + int count = jdbc.queryForObject("SELECT COUNT(*) FROM widgets", Integer.class); + log.info("Number of widgets in the database: {}", count); + + log.info("Test data inserted into widgets table."); + } catch (Exception e) { + log.error("Error setting up schema and data", e); + throw e; + } + + JobParameters params = new JobParametersBuilder() + .addLong("run.id", System.nanoTime()) + .toJobParameters(); + + jobLauncherTestUtils.setJob(normalizeWidgetsJob); + JobExecution exec = jobLauncherTestUtils.launchJob(params); + + assertThat(exec.getStatus()).isEqualTo(BatchStatus.COMPLETED); + + StepExecution step = exec.getStepExecutions().iterator().next(); + assertThat(step.getReadCount()).isEqualTo(2); // 2 rows read + assertThat(step.getWriteCount()).isEqualTo(2); // 2 rows written + + // Verify data processing (mocked GCS writer can be verified separately) + } + + private PGobject createJsonbObject(String json) { + try { + PGobject jsonbObject = new PGobject(); + jsonbObject.setType("jsonb"); + jsonbObject.setValue(json); + return jsonbObject; + } catch (Exception e) { + throw new RuntimeException("Failed to create JSONB object", e); + } + } + + static class TestSupport { + } + +} diff --git a/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/CaseStudy1ApplicationTests.java b/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/CaseStudy1ApplicationTests.java new file mode 100644 index 0000000..e7ef55e --- /dev/null +++ b/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/CaseStudy1ApplicationTests.java @@ -0,0 +1,167 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1; + +import jakarta.validation.Validator; +import jakarta.validation.executable.ExecutableValidator; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.configuration.JobRegistry; +import org.springframework.batch.item.database.JdbcPagingItemReader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.test.context.ActiveProfiles; +import org.squidmin.java.spring.maven.casestudies.casestudy1.config.*; +import org.squidmin.java.spring.maven.casestudies.casestudy1.controller.WidgetController; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.NormalizedWidget; +import org.squidmin.java.spring.maven.casestudies.casestudy1.domain.Widget; +import org.squidmin.java.spring.maven.casestudies.casestudy1.io.GcsWriter; +import org.squidmin.java.spring.maven.casestudies.casestudy1.io.JsonToAvroProcessor; +import org.squidmin.java.spring.maven.casestudies.casestudy1.io.PostgresReader; +import org.squidmin.java.spring.maven.casestudies.casestudy1.listeners.JobLoggerListener; +import org.squidmin.java.spring.maven.casestudies.casestudy1.listeners.StepLoggerListener; +import org.squidmin.java.spring.maven.casestudies.casestudy1.policies.TransientSkips; +import org.squidmin.java.spring.maven.casestudies.casestudy1.repository.WidgetRepository; +import org.squidmin.java.spring.maven.casestudies.casestudy1.repository.impl.GcsRepositoryImpl; +import org.squidmin.java.spring.maven.casestudies.casestudy1.service.WidgetService; +import org.squidmin.java.spring.maven.casestudies.casestudy1.service.impl.GcsServiceImpl; +import org.squidmin.java.spring.maven.casestudies.casestudy1.subscribe.ExampleSubscriber; +import org.squidmin.java.spring.maven.casestudies.casestudy1.util.AvroUtil; + +import javax.sql.DataSource; +import java.time.Instant; +import java.util.HashMap; +import java.util.UUID; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ActiveProfiles("test") +class CaseStudy1ApplicationTests { + + private final Logger log = LoggerFactory.getLogger(CaseStudy1ApplicationTests.class); + + @Autowired + private ConfigurableApplicationContext context; + + @Autowired + private DataSource dataSource; + + @Test + void contextLoads() { + BatchJobConfig batchJobConfig = context.getBean(BatchJobConfig.class); + validateBatchJobConfig(batchJobConfig); + + GcsConfig gcsConfig = context.getBean(GcsConfig.class); + validateGcsConfig(gcsConfig); + + PubSubConfig pubSubConfig = context.getBean(PubSubConfig.class); + validatePubSubConfig(pubSubConfig); + + SkipPolicies skipPolicies = context.getBean(SkipPolicies.class); + Assertions.assertNotNull(skipPolicies); + + Validators validators = context.getBean(Validators.class); + validateValidators(validators); + + WidgetController widgetController = context.getBean(WidgetController.class); + Assertions.assertNotNull(widgetController); + + PostgresReader postgresReader = context.getBean(PostgresReader.class); + validatePostgresReader(postgresReader); + + JsonToAvroProcessor jsonToAvroProcessor = context.getBean(JsonToAvroProcessor.class); + validateJsonToAvroProcessor(jsonToAvroProcessor); + + GcsWriter gcsWriter = context.getBean(GcsWriter.class); + Assertions.assertNotNull(gcsWriter); + + JobLoggerListener jobLoggerListener = context.getBean(JobLoggerListener.class); + Assertions.assertNotNull(jobLoggerListener); + + StepLoggerListener stepLoggerListener = context.getBean(StepLoggerListener.class); + Assertions.assertNotNull(stepLoggerListener); + + SkipPolicies.TransientSkips transientSkips = context.getBean(SkipPolicies.TransientSkips.class); + Assertions.assertNotNull(transientSkips); + + TransientSkips _transientSkips = context.getBean(TransientSkips.class); + Assertions.assertNotNull(_transientSkips); + + AvroUtil avroUtil = context.getBean(AvroUtil.class); + Assertions.assertNotNull(avroUtil); + + GcsRepositoryImpl gcsRepository = context.getBean(GcsRepositoryImpl.class); + Assertions.assertNotNull(gcsRepository); + + GcsServiceImpl gcsService = context.getBean(GcsServiceImpl.class); + Assertions.assertNotNull(gcsService); + + WidgetRepository widgetRepository = context.getBean(WidgetRepository.class); + Assertions.assertNotNull(widgetRepository); + + WidgetService widgetService = context.getBean(WidgetService.class); + Assertions.assertNotNull(widgetService); + + WidgetController controller = context.getBean(WidgetController.class); + Assertions.assertNotNull(controller); + + ExampleSubscriber exampleSubscriber = context.getBean(ExampleSubscriber.class); + Assertions.assertNotNull(exampleSubscriber); + } + + private void validateBatchJobConfig(BatchJobConfig batchJobConfig) { + JobRegistry jobRegistry = batchJobConfig.jobRegistry(); + jobRegistry.getJobNames().forEach(log::info); + Assertions.assertNotNull(batchJobConfig); + Assertions.assertNotNull(jobRegistry); + Assertions.assertNotNull(jobRegistry.getJobNames()); + Assertions.assertTrue(0 < jobRegistry.getJobNames().size()); + } + + private void validateGcsConfig(GcsConfig gcsConfig) { + Assertions.assertNotNull(gcsConfig); + Assertions.assertEquals("gs://", gcsConfig.getGcsPrefix()); + Assertions.assertEquals("batch_uploads/", gcsConfig.getBatchUploadPrefix()); + Assertions.assertEquals("jm_lofty-root-cmek-test", gcsConfig.getBucketName()); + } + + private void validatePubSubConfig(PubSubConfig pubSubConfig) { + Assertions.assertNotNull(pubSubConfig); + Assertions.assertEquals("lofty-root-378503", pubSubConfig.getProjectId()); + Assertions.assertEquals("java21-spring3-maven-reference-topic", pubSubConfig.getTopic()); + Assertions.assertEquals("java21-spring3-maven-reference-subscription", pubSubConfig.getSubscription()); + Assertions.assertEquals("roles/pubsub.editor", pubSubConfig.getRole()); + Assertions.assertEquals("test-ordering-key", pubSubConfig.getOrderingKey()); + Assertions.assertEquals("5", pubSubConfig.getMaxRetries()); + } + + private void validateValidators(Validators validators) { + Assertions.assertNotNull(validators); + Validator validator = validators.validator(); + Assertions.assertNotNull(validator); + ExecutableValidator executableValidator = validator.forExecutables(); + Assertions.assertNotNull(executableValidator); + } + + private void validatePostgresReader(PostgresReader postgresReader) { + Assertions.assertNotNull(postgresReader); + JdbcPagingItemReader widgetJdbcPagingItemReader = postgresReader.widgetReader(dataSource); + Assertions.assertNotNull(widgetJdbcPagingItemReader); + Assertions.assertEquals("widgetReader", widgetJdbcPagingItemReader.getName()); + } + + private void validateJsonToAvroProcessor(JsonToAvroProcessor jsonToAvroProcessor) { + Assertions.assertNotNull(jsonToAvroProcessor); + NormalizedWidget testWidget = jsonToAvroProcessor.process( + new Widget( + UUID.randomUUID(), + "Test Widget", + Instant.now(), + new HashMap<>() + ) + ); + Assertions.assertNotNull(testWidget); + Assertions.assertEquals("Test Widget", testWidget.getName()); + } + +} diff --git a/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/PostgresToGcsJobIntegrationTest.java b/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/PostgresToGcsJobIntegrationTest.java new file mode 100644 index 0000000..f318435 --- /dev/null +++ b/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/PostgresToGcsJobIntegrationTest.java @@ -0,0 +1,44 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.batch.core.*; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.batch.test.JobRepositoryTestUtils; +import org.springframework.batch.test.context.SpringBatchTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.jdbc.Sql; + +@SpringBootTest +@SpringBatchTest +public class PostgresToGcsJobIntegrationTest { + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Autowired + private JobRepositoryTestUtils jobRepositoryTestUtils; + + @Autowired + private Job postgresToGcsJob; + + @Test + @Sql(scripts = {"/init.sql", "/create_spring_batch_tables.sql"}) + void testBatchJob() throws Exception { + jobRepositoryTestUtils.removeJobExecutions(); + JobParameters jobParameters = new JobParametersBuilder() + .addLong("time", System.currentTimeMillis()) + .toJobParameters(); + + jobLauncherTestUtils.setJob(postgresToGcsJob); + var exec = jobLauncherTestUtils.launchJob(jobParameters); + Assertions.assertThat(exec.getExitStatus()).isEqualTo(ExitStatus.COMPLETED); + StepExecution step = exec.getStepExecutions().iterator().next(); + Assertions.assertThat(step.getReadCount()).isEqualTo(4); + Assertions.assertThat(step.getFilterCount()).isEqualTo(0); + Assertions.assertThat(step.getSkipCount()).isEqualTo(0); + Assertions.assertThat(step.getWriteCount()).isEqualTo(4); + } + +} diff --git a/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/UnitTestConfig.java b/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/UnitTestConfig.java new file mode 100644 index 0000000..23302b2 --- /dev/null +++ b/case-study-1/src/test/java/org/squidmin/java/spring/maven/casestudies/casestudy1/UnitTestConfig.java @@ -0,0 +1,10 @@ +package org.squidmin.java.spring.maven.casestudies.casestudy1; + +import org.springframework.context.annotation.Configuration; +import org.springframework.test.context.ActiveProfiles; + +@Configuration +@ActiveProfiles("test") +public class UnitTestConfig { + +} diff --git a/case-study-1/src/test/resources/application-test.yml b/case-study-1/src/test/resources/application-test.yml new file mode 100644 index 0000000..0d51044 --- /dev/null +++ b/case-study-1/src/test/resources/application-test.yml @@ -0,0 +1,10 @@ +spring: + application: + name: case-study-1 + batch: + job: + enabled: false + +app: + jobs: + enabled: false diff --git a/case-study-1/src/test/resources/create_spring_batch_tables.sql b/case-study-1/src/test/resources/create_spring_batch_tables.sql new file mode 100644 index 0000000..3408312 --- /dev/null +++ b/case-study-1/src/test/resources/create_spring_batch_tables.sql @@ -0,0 +1,78 @@ +CREATE TABLE IF NOT EXISTS BATCH_JOB_INSTANCE ( + JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY, + VERSION BIGINT, + JOB_NAME VARCHAR(100) NOT NULL, + JOB_KEY VARCHAR(32) NOT NULL, + constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION ( + JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + VERSION BIGINT, + JOB_INSTANCE_ID BIGINT NOT NULL, + CREATE_TIME TIMESTAMP NOT NULL, + START_TIME TIMESTAMP DEFAULT NULL, + END_TIME TIMESTAMP DEFAULT NULL, + STATUS VARCHAR(10), + EXIT_CODE VARCHAR(2500), + EXIT_MESSAGE VARCHAR(2500), + LAST_UPDATED TIMESTAMP, + constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID) + references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION_PARAMS ( + JOB_EXECUTION_ID BIGINT NOT NULL, + PARAMETER_NAME VARCHAR(100) NOT NULL, + PARAMETER_TYPE VARCHAR(100) NOT NULL, + PARAMETER_VALUE VARCHAR(2500), + IDENTIFYING CHAR(1) NOT NULL, + constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) + references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_STEP_EXECUTION ( + STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + VERSION BIGINT NOT NULL, + STEP_NAME VARCHAR(100) NOT NULL, + JOB_EXECUTION_ID BIGINT NOT NULL, + CREATE_TIME TIMESTAMP NOT NULL, + START_TIME TIMESTAMP DEFAULT NULL, + END_TIME TIMESTAMP DEFAULT NULL, + STATUS VARCHAR(10), + COMMIT_COUNT BIGINT, + READ_COUNT BIGINT, + FILTER_COUNT BIGINT, + WRITE_COUNT BIGINT, + READ_SKIP_COUNT BIGINT, + WRITE_SKIP_COUNT BIGINT, + PROCESS_SKIP_COUNT BIGINT, + ROLLBACK_COUNT BIGINT, + EXIT_CODE VARCHAR(2500), + EXIT_MESSAGE VARCHAR(2500), + LAST_UPDATED TIMESTAMP, + constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID) + references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_STEP_EXECUTION_CONTEXT ( + STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + SHORT_CONTEXT VARCHAR(2500) NOT NULL, + SERIALIZED_CONTEXT TEXT, + constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID) + references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) +) ; + +CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION_CONTEXT ( + JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + SHORT_CONTEXT VARCHAR(2500) NOT NULL, + SERIALIZED_CONTEXT TEXT, + constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) + references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) +) ; + +CREATE SEQUENCE IF NOT EXISTS BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE; +CREATE SEQUENCE IF NOT EXISTS BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE; +CREATE SEQUENCE IF NOT EXISTS BATCH_JOB_INSTANCE_SEQ MAXVALUE 9223372036854775807 NO CYCLE; + +CREATE SEQUENCE IF NOT EXISTS BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE; diff --git a/case-study-1/src/test/resources/create_spring_session_tables.sql b/case-study-1/src/test/resources/create_spring_session_tables.sql new file mode 100644 index 0000000..9ad6bc7 --- /dev/null +++ b/case-study-1/src/test/resources/create_spring_session_tables.sql @@ -0,0 +1,27 @@ +DROP TABLE IF EXISTS SPRING_SESSION_ATTRIBUTES; +DROP TABLE IF EXISTS SPRING_SESSION; + +CREATE TABLE IF NOT EXISTS SPRING_SESSION +( + PRIMARY_ID CHAR(36) NOT NULL, + SESSION_ID CHAR(36) NOT NULL, + CREATION_TIME BIGINT NOT NULL, + LAST_ACCESS_TIME BIGINT NOT NULL, + MAX_INACTIVE_INTERVAL INT NOT NULL, + EXPIRY_TIME BIGINT NOT NULL, + PRINCIPAL_NAME VARCHAR(100), + CONSTRAINT SPRING_SESSION_PK PRIMARY KEY (PRIMARY_ID) +); + +CREATE UNIQUE INDEX IF NOT EXISTS SPRING_SESSION_IX1 ON SPRING_SESSION (SESSION_ID); +CREATE INDEX IF NOT EXISTS SPRING_SESSION_IX2 ON SPRING_SESSION (EXPIRY_TIME); +CREATE INDEX IF NOT EXISTS SPRING_SESSION_IX3 ON SPRING_SESSION (PRINCIPAL_NAME); + +CREATE TABLE IF NOT EXISTS SPRING_SESSION_ATTRIBUTES +( + SESSION_PRIMARY_ID CHAR(36) NOT NULL, + ATTRIBUTE_NAME VARCHAR(200) NOT NULL, + ATTRIBUTE_BYTES BYTEA NOT NULL, + CONSTRAINT SPRING_SESSION_ATTRIBUTES_PK PRIMARY KEY (SESSION_PRIMARY_ID, ATTRIBUTE_NAME), + CONSTRAINT SPRING_SESSION_ATTRIBUTES_FK FOREIGN KEY (SESSION_PRIMARY_ID) REFERENCES SPRING_SESSION (PRIMARY_ID) ON DELETE CASCADE +); diff --git a/case-study-1/src/test/resources/init.sql b/case-study-1/src/test/resources/init.sql new file mode 100644 index 0000000..a245bfe --- /dev/null +++ b/case-study-1/src/test/resources/init.sql @@ -0,0 +1,22 @@ +DROP TABLE IF EXISTS widgets; + +CREATE TABLE IF NOT EXISTS widgets +( + id UUID PRIMARY KEY, + name TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + meta JSONB +); + +INSERT INTO widgets (id, name, meta) +VALUES (gen_random_uuid(), 'Widget X', '{"size":42}'), + (gen_random_uuid(), 'Widget Y', '{"color":"blue"}'); + +-- Include updated_at value in insertion +INSERT INTO widgets (id, name, meta, created_at) +VALUES (gen_random_uuid(), 'Widget Z', '{"weight":10}', NOW()); + +-- Invalid record to test error handling +INSERT INTO widgets (id, name, meta) +VALUES (gen_random_uuid(), 'Another widget', '{"invalid":false}'); diff --git a/case-study-1/src/test/resources/input/widget_additional-metadata.json b/case-study-1/src/test/resources/input/widget_additional-metadata.json new file mode 100644 index 0000000..2728a3c --- /dev/null +++ b/case-study-1/src/test/resources/input/widget_additional-metadata.json @@ -0,0 +1,17 @@ +{ + "eventId": "123e4567-e89b-12d3-a456-426614174000", + "timestamp": "2023-10-01T10:00:00Z", + "messageType": "WidgetTransaction", + "records": [ + { + "id": "123e4567-e89b-12d3-a456-426614174004", + "name": "Custom Widget", + "createdAt": "2023-10-01T20:00:00Z", + "meta": { + "customField1": "value1", + "customField2": 42, + "customField3": true + } + } + ] +} diff --git a/case-study-1/src/test/resources/input/widget_empty-metadata.json b/case-study-1/src/test/resources/input/widget_empty-metadata.json new file mode 100644 index 0000000..b860ad3 --- /dev/null +++ b/case-study-1/src/test/resources/input/widget_empty-metadata.json @@ -0,0 +1,43 @@ +{ + "eventId": "123e4567-e89b-12d3-a456-426614174000", + "timestamp": "2023-10-01T10:00:00Z", + "messageType": "WidgetTransaction", + "records": [ + { + "id": "123e4567-e89b-12d3-a456-426614174000", + "name": "Basic Widget" + }, + { + "id": "123e4567-e89b-12d3-a456-426614174001", + "name": "Advanced Widget", + "createdAt": "2023-10-01T12:00:00Z", + "meta": { + "color": "blue", + "size": "large", + "features": ["waterproof", "shockproof"] + } + }, + { + "id": "123e4567-e89b-12d3-a456-426614174002", + "name": "Nested Widget", + "createdAt": "2023-10-01T15:30:00Z", + "meta": { + "dimensions": { + "width": 10, + "height": 20, + "depth": 5 + }, + "manufacturer": { + "name": "WidgetCorp", + "location": "USA" + } + } + }, + { + "id": "123e4567-e89b-12d3-a456-426614174003", + "name": "Empty Meta Widget", + "createdAt": "2023-10-01T18:45:00Z", + "meta": {} + } + ] +} diff --git a/case-study-1/src/test/resources/input/widget_full-payload.json b/case-study-1/src/test/resources/input/widget_full-payload.json new file mode 100644 index 0000000..fbbdf02 --- /dev/null +++ b/case-study-1/src/test/resources/input/widget_full-payload.json @@ -0,0 +1,17 @@ +{ + "eventId": "123e4567-e89b-12d3-a456-426614174000", + "timestamp": "2023-10-01T10:00:00Z", + "messageType": "WidgetTransaction", + "records": [ + { + "id": "123e4567-e89b-12d3-a456-426614174001", + "name": "Advanced Widget", + "createdAt": "2023-10-01T12:00:00Z", + "meta": { + "color": "blue", + "size": "large", + "features": ["waterproof", "shockproof"] + } + } + ] +} diff --git a/case-study-1/src/test/resources/input/widget_minimal-payload.json b/case-study-1/src/test/resources/input/widget_minimal-payload.json new file mode 100644 index 0000000..3624cc7 --- /dev/null +++ b/case-study-1/src/test/resources/input/widget_minimal-payload.json @@ -0,0 +1,24 @@ +{ + "eventId": "123e4567-e89b-12d3-a456-426614174000", + "timestamp": "2023-10-01T10:00:00Z", + "messageType": "WidgetTransaction", + "records": [ + { + "id": "123e4567-e89b-12d3-a456-426614174000", + "name": "Widget A" + }, + { + "id": "123e4567-e89b-12d3-a456-426614174001", + "name": "Widget B", + "createdAt": "2023-10-01T12:00:00Z", + "meta": { + "color": "blue", + "size": "large", + "features": [ + "waterproof", + "shockproof" + ] + } + } + ] +} diff --git a/case-study-1/src/test/resources/input/widget_nested-metadata.json b/case-study-1/src/test/resources/input/widget_nested-metadata.json new file mode 100644 index 0000000..ef8931f --- /dev/null +++ b/case-study-1/src/test/resources/input/widget_nested-metadata.json @@ -0,0 +1,23 @@ +{ + "eventId": "123e4567-e89b-12d3-a456-426614174000", + "timestamp": "2023-10-01T10:00:00Z", + "messageType": "WidgetTransaction", + "records": [ + { + "id": "123e4567-e89b-12d3-a456-426614174002", + "name": "Nested Widget", + "createdAt": "2023-10-01T15:30:00Z", + "meta": { + "dimensions": { + "width": 10, + "height": 20, + "depth": 5 + }, + "manufacturer": { + "name": "WidgetCorp", + "location": "USA" + } + } + } + ] +} diff --git a/case-study-1/src/test/resources/insert_widgets.sql b/case-study-1/src/test/resources/insert_widgets.sql new file mode 100644 index 0000000..44eba56 --- /dev/null +++ b/case-study-1/src/test/resources/insert_widgets.sql @@ -0,0 +1,11 @@ +INSERT INTO widgets (id, name, meta) +VALUES (gen_random_uuid(), 'Widget X', '{"size":42}'), + (gen_random_uuid(), 'Widget Y', '{"color":"blue"}'); + +-- Include updated_at value in insertion +INSERT INTO widgets (id, name, meta, created_at) +VALUES (gen_random_uuid(), 'Widget Z', '{"weight":10}', NOW()); + +-- Invalid record to test error handling +INSERT INTO widgets (id, name, meta) +VALUES (gen_random_uuid(), 'Another widget', '{"invalid":false}'); diff --git a/spring-batch/exports/normalized_orders.csv b/spring-batch/exports/normalized_orders.csv index e69de29..73deefe 100644 --- a/spring-batch/exports/normalized_orders.csv +++ b/spring-batch/exports/normalized_orders.csv @@ -0,0 +1,2 @@ +1,EXT-1,alice@example.com,USD,12.34,2025-10-27T04:37:48.164328Z,false +2,EXT-2,bob@example.com,EUR,50.00,2025-10-28T01:37:48.164328Z,false diff --git a/spring-batch/src/main/java/org/squidmin/java/spring/maven/batch/io/OrderItemReader.java b/spring-batch/src/main/java/org/squidmin/java/spring/maven/batch/io/OrderItemReader.java index ab3569b..28b5120 100644 --- a/spring-batch/src/main/java/org/squidmin/java/spring/maven/batch/io/OrderItemReader.java +++ b/spring-batch/src/main/java/org/squidmin/java/spring/maven/batch/io/OrderItemReader.java @@ -50,4 +50,4 @@ private RowMapper orderRowMapper() { ); } -} \ No newline at end of file +}