Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions data/data-r2dbc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id "java"
id "org.springframework.boot"
id "org.springframework.cr.smoke-test"
}

dependencies {
implementation(platform(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES))
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.postgresql:r2dbc-postgresql")

implementation("org.crac:crac:$cracVersion")
implementation(project(":cr-listener"))

testImplementation("org.springframework.boot:spring-boot-starter-test")

appTestImplementation(project(":cr-smoke-test-support"))
appTestImplementation("org.awaitility:awaitility:4.2.0")
}

crSmokeTest {
webApplication = false
}
7 changes: 7 additions & 0 deletions data/data-r2dbc/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
services:
postgres:
image: 'postgres:14'
environment:
- 'POSTGRES_PASSWORD=password'
ports:
- '5432:5432'
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.data.r2dbc;

import static org.assertj.core.api.Assertions.*;

import java.time.Duration;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.springframework.cr.smoketest.support.assertj.AssertableOutput;
import org.springframework.cr.smoketest.support.junit.ApplicationTest;

@ApplicationTest
public class R2dbcApplicationTests {

@Test
void connectionTest(AssertableOutput output) {
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
assertThat(output).hasLineMatching("Starting R2dbcReader: was (INITIALIZED|STOPPED)");
assertThat(output).hasLineMatching("count \\d+: Customer\\[id=\\d, firstname=.*, lastname=.*\\]");
});
}

}
39 changes: 39 additions & 0 deletions data/data-r2dbc/src/main/java/com/example/data/r2dbc/Customer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.data.r2dbc;

import java.util.Objects;

import org.springframework.data.annotation.Id;

record Customer(@Id Integer id, String firstname, String lastname) {

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Customer customer)) {
return false;
}
return Objects.equals(firstname, customer.firstname) && Objects.equals(lastname, customer.lastname);
}

@Override
public int hashCode() {
return Objects.hash(firstname, lastname);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.data.r2dbc;

import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;

interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {

@Query("select id, firstname, lastname from customer c where c.lastname = :lastname")
Flux<Customer> findByLastname(String lastname);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.data.r2dbc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
* @author Christoph Strobl
* @since 2023/09
*/
@SpringBootApplication
@EnableTransactionManagement
public class R2dbcApplication {

public static void main(String[] args) throws InterruptedException {
SpringApplication.run(R2dbcApplication.class, args);
}

}
127 changes: 127 additions & 0 deletions data/data-r2dbc/src/main/java/com/example/data/r2dbc/R2dbcReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.data.r2dbc;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import io.r2dbc.spi.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

@Component
@EnableScheduling
class R2dbcReader implements SmartLifecycle {

private final Object lifecycleMonitor = new Object();

private final AtomicInteger counter = new AtomicInteger(0);

List<String> customerNames;

private State state = State.INITIALIZED;

@Autowired
CustomerRepository customers;

@Autowired
DatabaseClient database;

@Autowired
ConnectionFactory connectionFactory;

enum State {

INITIALIZED, STARTED, STOPPED

}

@Override
public void start() {

synchronized (lifecycleMonitor) {

System.out.printf("Starting R2dbcReader: was %s\n", state);

switch (state) {
case INITIALIZED -> {

var statements = Arrays.asList(//
"DROP TABLE IF EXISTS customer;",
"CREATE TABLE customer ( id SERIAL PRIMARY KEY, firstname VARCHAR(100) NOT NULL, lastname VARCHAR(100) NOT NULL);");

statements.forEach(it -> database.sql(it) //
.fetch() //
.rowsUpdated() //
.block());

if (ObjectUtils.isEmpty(customerNames)) {

var dave = new Customer(null, "Dave", "Matthews");
var carter = new Customer(null, "Carter", "Beauford");
this.customerNames = insertCustomers(dave, carter);
}
state = State.STARTED;
}
case STOPPED -> {

}
}
}
}

@Scheduled(fixedDelay = 1000)
public void scheduled() {

if (isRunning()) {

customers.findByLastname(customerNames.get(counter.get() % customerNames.size())).doOnNext(customer -> {
int count = counter.incrementAndGet();
System.out.printf("count %03d: %s\n", count, customer);
}).blockFirst(Duration.ofMillis(500));
}
}

@Override
public void stop() {

synchronized (lifecycleMonitor) {
switch (state) {
case INITIALIZED, STARTED -> {
System.out.printf("Stopping DataReader: was %s\n", state);
state = State.STOPPED;
}
}
}
}

@Override
public boolean isRunning() {
return State.STARTED.equals(state);
}

private List<String> insertCustomers(Customer... customers) {
return this.customers.saveAll(Arrays.asList(customers)).map(Customer::lastname).collectList().block();
}

}
3 changes: 3 additions & 0 deletions data/data-r2dbc/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
spring.r2dbc.url=r2dbc:postgresql://${POSTGRES_HOST:localhost}:${POSTGRES_PORT_5432:5432}/postgres
spring.r2dbc.username=postgres
spring.r2dbc.password=password