diff --git a/data/data-r2dbc/build.gradle b/data/data-r2dbc/build.gradle new file mode 100644 index 0000000..c27ea07 --- /dev/null +++ b/data/data-r2dbc/build.gradle @@ -0,0 +1,23 @@ +plugins { + id "java" + id "org.springframework.boot" + id "org.springframework.cr.smoke-test" +} + +dependencies { + implementation(platform(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)) + implementation("org.springframework.boot:spring-boot-starter-data-r2dbc") + implementation("org.postgresql:r2dbc-postgresql") + + implementation("org.crac:crac:$cracVersion") + implementation(project(":cr-listener")) + + testImplementation("org.springframework.boot:spring-boot-starter-test") + + appTestImplementation(project(":cr-smoke-test-support")) + appTestImplementation("org.awaitility:awaitility:4.2.0") +} + +crSmokeTest { + webApplication = false +} diff --git a/data/data-r2dbc/docker-compose.yml b/data/data-r2dbc/docker-compose.yml new file mode 100644 index 0000000..52ec993 --- /dev/null +++ b/data/data-r2dbc/docker-compose.yml @@ -0,0 +1,7 @@ +services: + postgres: + image: 'postgres:14' + environment: + - 'POSTGRES_PASSWORD=password' + ports: + - '5432:5432' diff --git a/data/data-r2dbc/src/appTest/java/com/example/data/r2dbc/R2dbcApplicationTests.java b/data/data-r2dbc/src/appTest/java/com/example/data/r2dbc/R2dbcApplicationTests.java new file mode 100644 index 0000000..a9d062d --- /dev/null +++ b/data/data-r2dbc/src/appTest/java/com/example/data/r2dbc/R2dbcApplicationTests.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.data.r2dbc; + +import static org.assertj.core.api.Assertions.*; + +import java.time.Duration; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.springframework.cr.smoketest.support.assertj.AssertableOutput; +import org.springframework.cr.smoketest.support.junit.ApplicationTest; + +@ApplicationTest +public class R2dbcApplicationTests { + + @Test + void connectionTest(AssertableOutput output) { + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + assertThat(output).hasLineMatching("Starting R2dbcReader: was (INITIALIZED|STOPPED)"); + assertThat(output).hasLineMatching("count \\d+: Customer\\[id=\\d, firstname=.*, lastname=.*\\]"); + }); + } + +} diff --git a/data/data-r2dbc/src/main/java/com/example/data/r2dbc/Customer.java b/data/data-r2dbc/src/main/java/com/example/data/r2dbc/Customer.java new file mode 100644 index 0000000..922c75e --- /dev/null +++ b/data/data-r2dbc/src/main/java/com/example/data/r2dbc/Customer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.data.r2dbc; + +import java.util.Objects; + +import org.springframework.data.annotation.Id; + +record Customer(@Id Integer id, String firstname, String lastname) { + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Customer customer)) { + return false; + } + return Objects.equals(firstname, customer.firstname) && Objects.equals(lastname, customer.lastname); + } + + @Override + public int hashCode() { + return Objects.hash(firstname, lastname); + } +} diff --git a/data/data-r2dbc/src/main/java/com/example/data/r2dbc/CustomerRepository.java b/data/data-r2dbc/src/main/java/com/example/data/r2dbc/CustomerRepository.java new file mode 100644 index 0000000..d43bca7 --- /dev/null +++ b/data/data-r2dbc/src/main/java/com/example/data/r2dbc/CustomerRepository.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.data.r2dbc; + +import org.springframework.data.r2dbc.repository.Query; +import org.springframework.data.repository.reactive.ReactiveCrudRepository; +import reactor.core.publisher.Flux; + +interface CustomerRepository extends ReactiveCrudRepository { + + @Query("select id, firstname, lastname from customer c where c.lastname = :lastname") + Flux findByLastname(String lastname); + +} diff --git a/data/data-r2dbc/src/main/java/com/example/data/r2dbc/R2dbcApplication.java b/data/data-r2dbc/src/main/java/com/example/data/r2dbc/R2dbcApplication.java new file mode 100644 index 0000000..7b14e2a --- /dev/null +++ b/data/data-r2dbc/src/main/java/com/example/data/r2dbc/R2dbcApplication.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.data.r2dbc; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.transaction.annotation.EnableTransactionManagement; + +/** + * @author Christoph Strobl + * @since 2023/09 + */ +@SpringBootApplication +@EnableTransactionManagement +public class R2dbcApplication { + + public static void main(String[] args) throws InterruptedException { + SpringApplication.run(R2dbcApplication.class, args); + } + +} diff --git a/data/data-r2dbc/src/main/java/com/example/data/r2dbc/R2dbcReader.java b/data/data-r2dbc/src/main/java/com/example/data/r2dbc/R2dbcReader.java new file mode 100644 index 0000000..a7d98f7 --- /dev/null +++ b/data/data-r2dbc/src/main/java/com/example/data/r2dbc/R2dbcReader.java @@ -0,0 +1,127 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.data.r2dbc; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import io.r2dbc.spi.ConnectionFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.SmartLifecycle; +import org.springframework.r2dbc.core.DatabaseClient; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +@Component +@EnableScheduling +class R2dbcReader implements SmartLifecycle { + + private final Object lifecycleMonitor = new Object(); + + private final AtomicInteger counter = new AtomicInteger(0); + + List customerNames; + + private State state = State.INITIALIZED; + + @Autowired + CustomerRepository customers; + + @Autowired + DatabaseClient database; + + @Autowired + ConnectionFactory connectionFactory; + + enum State { + + INITIALIZED, STARTED, STOPPED + + } + + @Override + public void start() { + + synchronized (lifecycleMonitor) { + + System.out.printf("Starting R2dbcReader: was %s\n", state); + + switch (state) { + case INITIALIZED -> { + + var statements = Arrays.asList(// + "DROP TABLE IF EXISTS customer;", + "CREATE TABLE customer ( id SERIAL PRIMARY KEY, firstname VARCHAR(100) NOT NULL, lastname VARCHAR(100) NOT NULL);"); + + statements.forEach(it -> database.sql(it) // + .fetch() // + .rowsUpdated() // + .block()); + + if (ObjectUtils.isEmpty(customerNames)) { + + var dave = new Customer(null, "Dave", "Matthews"); + var carter = new Customer(null, "Carter", "Beauford"); + this.customerNames = insertCustomers(dave, carter); + } + state = State.STARTED; + } + case STOPPED -> { + + } + } + } + } + + @Scheduled(fixedDelay = 1000) + public void scheduled() { + + if (isRunning()) { + + customers.findByLastname(customerNames.get(counter.get() % customerNames.size())).doOnNext(customer -> { + int count = counter.incrementAndGet(); + System.out.printf("count %03d: %s\n", count, customer); + }).blockFirst(Duration.ofMillis(500)); + } + } + + @Override + public void stop() { + + synchronized (lifecycleMonitor) { + switch (state) { + case INITIALIZED, STARTED -> { + System.out.printf("Stopping DataReader: was %s\n", state); + state = State.STOPPED; + } + } + } + } + + @Override + public boolean isRunning() { + return State.STARTED.equals(state); + } + + private List insertCustomers(Customer... customers) { + return this.customers.saveAll(Arrays.asList(customers)).map(Customer::lastname).collectList().block(); + } + +} diff --git a/data/data-r2dbc/src/main/resources/application.properties b/data/data-r2dbc/src/main/resources/application.properties new file mode 100644 index 0000000..6b3705e --- /dev/null +++ b/data/data-r2dbc/src/main/resources/application.properties @@ -0,0 +1,3 @@ +spring.r2dbc.url=r2dbc:postgresql://${POSTGRES_HOST:localhost}:${POSTGRES_PORT_5432:5432}/postgres +spring.r2dbc.username=postgres +spring.r2dbc.password=password