|
| 1 | ++++ |
| 2 | +archetype = "page" |
| 3 | +title = "Spring Cloud Stream Binder" |
| 4 | +weight = 3 |
| 5 | ++++ |
| 6 | + |
| 7 | +[Spring Cloud Stream](https://spring.io/projects/spring-cloud-stream) is a Java framework that is designed for building event-driven microservices backed by a scalable, fault-tolerant messaging system. The [Oracle Database Transactional Event Queues stream binder implementation](https://github.com/oracle/spring-cloud-oracle/tree/main/database/spring-cloud-stream-binder-oracle-txeventq) allows developers to leverage Oracle’s database messaging platform within the Spring Cloud Stream ecosystem. |
| 8 | + |
| 9 | +This section covers the key features of the Spring Cloud Stream Binder for Oracle Database Transactional Event Queues and getting started examples for developers. |
| 10 | + |
| 11 | +## Key Features of the Transactional Event Queues Stream Binder |
| 12 | + |
| 13 | +The Spring Cloud Stream Binder for Oracle Database Transactional Event Queues provides a high-throughput, reliable messaging platform built directly into the database. |
| 14 | + |
| 15 | +- Real-time messaging with multiple publishers, consumers, and topics — all with a simple functional interface. |
| 16 | +- Convergence of data: Your messaging infrastructure integrates directly with the database, eliminating the need for external brokers. |
| 17 | +- Integration with Spring Cloud Stream provides an interface that's easy-to-use and quick to get started. |
| 18 | + |
| 19 | +## Configuring the Transactional Event Queues Stream Binder |
| 20 | + |
| 21 | +In this section, we'll cover how to configure the Transactional Event Queues Stream Binder for a Spring Boot project. |
| 22 | + |
| 23 | +### Project Dependencies |
| 24 | + |
| 25 | +To start developing with the Stream Binder, add the [spring-cloud-stream-binder-oracle-txeventq](https://central.sonatype.com/artifact/com.oracle.database.spring.cloud-stream-binder/spring-cloud-stream-binder-oracle-txeventq) dependency to your Maven project: |
| 26 | + |
| 27 | +```xml |
| 28 | +<dependency> |
| 29 | + <groupId>com.oracle.database.spring.cloud-stream-binder</groupId> |
| 30 | + <artifactId>spring-cloud-stream-binder-oracle-txeventq</artifactId> |
| 31 | + <version>${txeventq-stream-binder.version}</version> |
| 32 | +</dependency> |
| 33 | +``` |
| 34 | + |
| 35 | +For Gradle users, add the following dependency to your project: |
| 36 | + |
| 37 | +```groovy |
| 38 | +implementation "com.oracle.database.spring.cloud-stream-binder:spring-cloud-stream-binder-oracle-txeventq:${txeventqStreamBinderVersion}" |
| 39 | +``` |
| 40 | + |
| 41 | +### Stream Binder Permissions |
| 42 | + |
| 43 | +The database user producing/consuming events with the Stream Binder requires the following database permissions. Modify the username and tablespace grant as appropriate for your application: |
| 44 | + |
| 45 | +```sql |
| 46 | +grant unlimited tablespace to testuser; |
| 47 | +grant select_catalog_role to testuser; |
| 48 | +grant execute on dbms_aq to testuser; |
| 49 | +grant execute on dbms_aqadm to testuser; |
| 50 | +grant execute on dbms_aqin to testuser; |
| 51 | +grant execute on dbms_aqjms_internal to testuser; |
| 52 | +grant execute on dbms_teqk to testuser; |
| 53 | +grant execute on DBMS_RESOURCE_MANAGER to testuser; |
| 54 | +grant select on sys.aq$_queue_shards to testuser; |
| 55 | +grant select on user_queue_partition_assignment_table to testuser; |
| 56 | +``` |
| 57 | + |
| 58 | +### Stream Binder Database Connection |
| 59 | + |
| 60 | +The Stream Binder uses a standard JDBC connection to produce and consume messages. With YAML-style Spring application properties, it'll look something like this: |
| 61 | + |
| 62 | +```yaml |
| 63 | + |
| 64 | +spring: |
| 65 | + datasource: |
| 66 | + username: ${USERNAME} |
| 67 | + password: ${PASSWORD} |
| 68 | + url: ${JDBC_URL} |
| 69 | + driver-class-name: oracle.jdbc.OracleDriver |
| 70 | + type: oracle.ucp.jdbc.PoolDataSourceImpl |
| 71 | + oracleucp: |
| 72 | + initial-pool-size: 1 |
| 73 | + min-pool-size: 1 |
| 74 | + max-pool-size: 30 |
| 75 | + connection-pool-name: StreamBinderSample |
| 76 | + connection-factory-class-name: oracle.jdbc.pool.OracleDataSource |
| 77 | +``` |
| 78 | +
|
| 79 | +## Suppliers, Functions and Consumers |
| 80 | +
|
| 81 | +Spring Cloud Stream uses Java suppliers, functions, and consumers to abstract references to the underlying messaging system (in this case, Transactional Event Queues). To illustrate how this works, we'll create a basic Supplier, Function, and Consumer with Spring Cloud Stream. |
| 82 | +
|
| 83 | +Our example workflow will use three functional interfaces: |
| 84 | +
|
| 85 | +- A Supplier that streams a phrase word-by-word |
| 86 | +- A Function that processes each word from supplier and capitalizes it |
| 87 | +- A Consumer that receives each capitalized word from the function and prints it to `stdout` |
| 88 | + |
| 89 | +Once we've implemented the Java interfaces, we'll wire them together with Spring Cloud Stream. |
| 90 | + |
| 91 | +### Supplier Implementation |
| 92 | + |
| 93 | +The following Supplier implementation supplies a phrase word-by-word, indicating when it has processed the whole phrase. |
| 94 | + |
| 95 | +```java |
| 96 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 97 | +import java.util.concurrent.atomic.AtomicInteger; |
| 98 | +import java.util.function.Supplier; |
| 99 | +
|
| 100 | +public class WordSupplier implements Supplier<String> { |
| 101 | + private final String[] words; |
| 102 | + private final AtomicInteger idx = new AtomicInteger(0); |
| 103 | + private final AtomicBoolean done = new AtomicBoolean(false); |
| 104 | +
|
| 105 | + public WordSupplier(String phrase) { |
| 106 | + this.words = phrase.split(" "); |
| 107 | + } |
| 108 | +
|
| 109 | + @Override |
| 110 | + public String get() { |
| 111 | + int i = idx.getAndAccumulate(words.length, (x, y) -> { |
| 112 | + if (x < words.length - 1) { |
| 113 | + return x + 1; |
| 114 | + } |
| 115 | + done.set(true); |
| 116 | + return 0; |
| 117 | + }); |
| 118 | + return words[i]; |
| 119 | + } |
| 120 | +
|
| 121 | + public boolean done() { |
| 122 | + return done.get(); |
| 123 | + } |
| 124 | +} |
| 125 | +``` |
| 126 | + |
| 127 | +Next, let’s add Spring beans for our Supplier, a Function, and a Consumer. The `toUpperCase` Function takes a string and capitalizes it, and the `stdoutConsumer` Consumer prints each string it receives to `stdout`. |
| 128 | +```java |
| 129 | +import java.util.function.Consumer; |
| 130 | +import java.util.function.Function; |
| 131 | +
|
| 132 | +import org.springframework.beans.factory.annotation.Value; |
| 133 | +import org.springframework.context.annotation.Bean; |
| 134 | +import org.springframework.context.annotation.Configuration; |
| 135 | +
|
| 136 | +@Configuration |
| 137 | +public class StreamConfiguration { |
| 138 | + |
| 139 | + // Input phrase for the producer |
| 140 | + @Value("${phrase}") |
| 141 | + private String phrase; |
| 142 | +
|
| 143 | + // Function, Capitalizes each input |
| 144 | + @Bean |
| 145 | + public Function<String, String> toUpperCase() { |
| 146 | + return String::toUpperCase; |
| 147 | + } |
| 148 | +
|
| 149 | + // Consumer, Prints each input |
| 150 | + @Bean |
| 151 | + public Consumer<String> stdoutConsumer() { |
| 152 | + return s -> System.out.println("Consumed: " + s); |
| 153 | + } |
| 154 | +
|
| 155 | + // Supplier, WordSupplier |
| 156 | + @Bean |
| 157 | + public WordSupplier wordSupplier() { |
| 158 | + return new WordSupplier(phrase); |
| 159 | + } |
| 160 | +} |
| 161 | +``` |
| 162 | + |
| 163 | +### Configure Beans with Spring Cloud Stream |
| 164 | + |
| 165 | +Returning to the application properties, let’s configure the Spring Cloud Stream bindings for each bean defined previously. |
| 166 | + |
| 167 | +In our binding configuration, the `wordSupplier` Supplier has the `toUpperCase` Function as a destination, and the `stdoutConsumer` Consumer reads from toUpperCase. The result of this acyclic configuration is that each word from the phrase is converted to uppercase and sent to stdout. |
| 168 | + |
| 169 | +```yaml |
| 170 | +# Input phrase for the wordSupplier |
| 171 | +phrase: "Spring Cloud Stream simplifies event-driven microservices with powerful messaging capabilities." |
| 172 | +
|
| 173 | +spring: |
| 174 | + cloud: |
| 175 | + stream: |
| 176 | + bindings: |
| 177 | + wordSupplier-out-0: |
| 178 | + # wordSupplier output |
| 179 | + destination: toUpperCase-in-0 |
| 180 | + group: t1 |
| 181 | + producer: |
| 182 | + required-groups: |
| 183 | + - t1 |
| 184 | + stdoutConsumer-in-0: |
| 185 | + # stdoutConsumer input |
| 186 | + destination: toUpperCase-out-0 |
| 187 | + group: t1 |
| 188 | + function: |
| 189 | + # defines the stream flow, toUppercase bridges |
| 190 | + # wordSupplier and stdoutConsumer |
| 191 | + definition: wordSupplier;toUpperCase;stdoutConsumer |
| 192 | +``` |
| 193 | + |
| 194 | +If you’ve added the prior code to a Spring Boot application, you should see the following messages sent to `stdout` when it is run: |
| 195 | + |
| 196 | +``` |
| 197 | +Consumed: SPRING |
| 198 | +Consumed: CLOUD |
| 199 | +Consumed: STREAM |
| 200 | +Consumed: SIMPLIFIES |
| 201 | +Consumed: EVENT-DRIVEN |
| 202 | +Consumed: MICROSERVICES |
| 203 | +Consumed: WITH |
| 204 | +Consumed: POWERFUL |
| 205 | +Consumed: MESSAGING |
| 206 | +Consumed: CAPABILITIES. |
| 207 | +``` |
0 commit comments