1
+ package swierzy .oraclejdbc23newfeatures ;
2
+ import java .sql .*;
3
+ import oracle .jdbc .*;
4
+
5
+ import java .util .UUID ;
6
+ import java .util .concurrent .*;
7
+ import oracle .rsi .*;
8
+ import java .time .*;
9
+
10
+ /*
11
+ main application class
12
+ it uses two additional subscriber classes
13
+ 1. DMLSubscriber as a subscriber for the INSERT statement being called asynchronously
14
+ 2. QuerySubscriber as a subscriber for the SELECT statement being called asynchronously
15
+ it provides two methods demonstrating JDBC 23c new features
16
+ 1. reactiveCallsDemo() which demonstrates Oracle JDBC 23c driver support for reactive programming
17
+ 2. streamsIngestionDemo() which demonstrates Oracje JDBC 23c driver support for streams ingestion
18
+ */
19
+ public class Main {
20
+ private static CountDownLatch latch = new CountDownLatch (2 );
21
+ /*
22
+ reactiveCallsDemo: method, which demonstrates reactive programming support
23
+ provided by Oracle23c JDBC driver
24
+ */
25
+ public static void reactiveCallsDemo () throws Exception {
26
+ // database connection used by psDML and psQuery PreparedStatement objects
27
+ Connection con ;
28
+ // prepared statements used to call INSERT and SELECT SQL statements in NOWAIT mode
29
+ PreparedStatement psDML , psQuery ;
30
+
31
+ // database connection creation
32
+ System .out .println ("Main.reactiveCallsDemo: connecting to the database." );
33
+ con = DriverManager .getConnection (
34
+ "<db_url>" ,
35
+ "<db_username>" ,
36
+ "<db_password>" );
37
+ System .out .println ("Main.reactiveeCallsDemo: Connected to the database" );
38
+
39
+ // prepared statements for INSERT and SELECT calls creation
40
+ psDML = con .prepareStatement ("INSERT INTO TEST (VAL)" +
41
+ "SELECT DBMS_RANDOM.STRING('a',2000) " +
42
+ "FROM TEST" );
43
+
44
+ psQuery = con .prepareStatement ("SELECT COUNT(*) FROM TEST" );
45
+
46
+ /*
47
+ INSERT and SELECT statements reactive calls
48
+ notes:
49
+ 1. reactive methods are implemented ONLY as Oracle extensions to the standard JDBC interfaces
50
+ - it is not possible to call them without using OraclePrepareStatement class directly
51
+ 2. traditional JAVA casting due to Oracle official documentation is not recommended
52
+ - it is needed to use class.unwrap method to get the OraclePreparedStatement object
53
+ */
54
+ Flow .Publisher <OracleResultSet > fpQuery = psQuery .unwrap (OraclePreparedStatement .class ).executeQueryAsyncOracle ();
55
+ fpQuery .subscribe (new QuerySubscriber (latch ));
56
+ System .out .println ("Main.reactiveCallsDemo: SELECT started in NOWAIT mode" );
57
+
58
+ Flow .Publisher <Long > fpDML = psDML .unwrap (OraclePreparedStatement .class ).executeUpdateAsyncOracle ();
59
+ fpDML .subscribe (new DMLSubscriber (latch ));
60
+ System .out .println ("Main.reactiveCallsDemo: INSERT started in NOWAIT mode" );
61
+ /*
62
+ As this demo uses reactive methods, the execution does not wait for the end of SQL statements
63
+ reactiveCallsDemo() execution is completed BEFORE completion of INSERT and SELECT statements reactive execution
64
+ */
65
+ System .out .println ("Main.reactiveCallsDemo: end of method and return to Main.main." );
66
+
67
+ }
68
+
69
+ /*
70
+ stramsIngestionDemo(): method, which demonstrates support for Streams Ingestion
71
+ provided by Oracle 23c JDBC driver
72
+ */
73
+ public static void streamsIngestionDemo () throws Exception {
74
+ String value ;
75
+
76
+ // auxiliary, traditional database connection used to check number of rows in TEST table
77
+ Connection conAux = DriverManager .getConnection (
78
+ "<db_url>" ,
79
+ "<db_username>" ,
80
+ "<db_password>" );
81
+
82
+ // auxiliary PreparedStatement object used to check number of rows in TEST table
83
+ PreparedStatement psAux = conAux .prepareStatement ("SELECT COUNT(*) FROM TEST" );
84
+ // auxiliary ResultSet object used to check number of rows in TEST table
85
+ ResultSet rsAux ;
86
+ int countInt ;
87
+
88
+ /*
89
+ Streams Ingestion additionally to traditional JDBC parameters, like
90
+ connection string, username and password, requires providing the following data
91
+ 1. ExecutorService: pool of threads responsible for execution
92
+ 2. buffer size
93
+ 3. interval (in seconds) between pushing the data into the database
94
+ 4. names of target schema, table and columns
95
+ */
96
+
97
+ System .out .println ("Main.streamsIngestDemo: connecting to the database." );
98
+ ExecutorService es = Executors .newFixedThreadPool (5 );
99
+ ReactiveStreamsIngestion rs = ReactiveStreamsIngestion
100
+ .builder ()
101
+ .url ("<db_url>" )
102
+ .username ("<db_username>" )
103
+ .password ("<db_password>" )
104
+ .executor (es )
105
+ .bufferRows (60 )
106
+ .bufferInterval (Duration .ofSeconds (2 ))
107
+ .schema ("<db_username>" )
108
+ .table ("TEST" )
109
+ .columns (new String [] {"VAL" })
110
+ .build ();
111
+ System .out .println ("Main.streamsIngestiondemo: connected to the database" );
112
+
113
+ // Streams Ingestion uses reactive calls and provides its own publisher and subscriber
114
+ PushPublisher <Object []> pushPublisher = ReactiveStreamsIngestion .pushPublisher ();
115
+ pushPublisher .subscribe (rs .subscriber ());
116
+ System .out .println ("Main.streamsIngestionDemo: ReactiveStreamsIngestion configured." );
117
+
118
+ for (int i = 1 ; i <= 60 ; i ++) {
119
+ // generation of a random value
120
+ value = UUID .randomUUID ().toString ();
121
+ // pushing the data into the database
122
+ pushPublisher .accept (new Object [] {value });
123
+ System .out .println ("Main.streamsIngestionDemo: A random string #" +i +" has been generated and pushed into the database stream." );
124
+
125
+ /*
126
+ Auxiliary PreparedStatement object got from the auxiliary, separate database connection,
127
+ checks the number of rows in the TEST table
128
+ */
129
+ rsAux = psAux .executeQuery ();
130
+ rsAux .next ();
131
+ countInt = rsAux .getInt (1 );
132
+ System .out .println ("Main.streamsIngestionDemo: Number of rows in TEST table: " +countInt );
133
+ Thread .sleep (200 );
134
+ }
135
+
136
+ // closing the stream and the executor service
137
+ pushPublisher .close ();
138
+ rs .close ();
139
+ es .shutdown ();
140
+ System .out .println ("Main.streamsIngestionDemo: end of method and return to Main.main." );
141
+ }
142
+ public static void main (String [] args ) {
143
+ try {
144
+ System .out .println ("Main.main: begin of the demonstration" );
145
+ Class .forName ("oracle.jdbc.driver.OracleDriver" );
146
+ reactiveCallsDemo ();
147
+ streamsIngestionDemo ();
148
+ System .out .println ("Main.main: waiting for DML and Query subscribers" );
149
+ latch .await ();
150
+ System .out .println ("Main.main: end of the demonstration" );
151
+ }
152
+ catch (Exception e ) {
153
+ e .printStackTrace ();
154
+ }
155
+ }
156
+ }
0 commit comments