31
31
import com .datastax .oss .driver .api .core .cql .ExecutionInfo ;
32
32
import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
33
33
import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
34
+ import com .datastax .oss .driver .api .core .cql .Statement ;
34
35
import com .datastax .oss .driver .api .testinfra .ccm .CcmRule ;
35
36
import com .datastax .oss .driver .api .testinfra .ccm .SchemaChangeSynchronizer ;
36
37
import com .datastax .oss .driver .api .testinfra .session .SessionRule ;
37
- import com .datastax .oss .driver .api .testinfra .session .SessionUtils ;
38
38
import com .datastax .oss .driver .categories .ParallelizableTests ;
39
39
import com .datastax .oss .driver .internal .core .cql .EmptyColumnDefinitions ;
40
40
import com .tngtech .java .junit .dataprovider .DataProvider ;
41
41
import com .tngtech .java .junit .dataprovider .DataProviderRunner ;
42
42
import edu .umd .cs .findbugs .annotations .NonNull ;
43
43
import io .reactivex .Flowable ;
44
- import java .time .Duration ;
45
44
import java .util .LinkedHashSet ;
46
45
import java .util .List ;
47
46
import java .util .Set ;
@@ -60,14 +59,7 @@ public class DefaultReactiveResultSetIT {
60
59
61
60
private static CcmRule ccmRule = CcmRule .getInstance ();
62
61
63
- private static SessionRule <CqlSession > sessionRule =
64
- SessionRule .builder (ccmRule )
65
- .withConfigLoader (
66
- SessionUtils .configLoaderBuilder ()
67
- .withDuration (
68
- DefaultDriverOption .METADATA_SCHEMA_REQUEST_TIMEOUT , Duration .ofSeconds (20 ))
69
- .build ())
70
- .build ();
62
+ private static SessionRule <CqlSession > sessionRule = SessionRule .builder (ccmRule ).build ();
71
63
72
64
@ ClassRule public static TestRule chain = RuleChain .outerRule (ccmRule ).around (sessionRule );
73
65
@@ -76,19 +68,15 @@ public static void initialize() {
76
68
CqlSession session = sessionRule .session ();
77
69
SchemaChangeSynchronizer .withLock (
78
70
() -> {
79
- session .execute ("DROP TABLE IF EXISTS test_reactive_read" );
80
- session .execute ("DROP TABLE IF EXISTS test_reactive_write" );
71
+ session .execute (createSlowStatement ( "DROP TABLE IF EXISTS test_reactive_read" ) );
72
+ session .execute (createSlowStatement ( "DROP TABLE IF EXISTS test_reactive_write" ) );
81
73
session .checkSchemaAgreement ();
82
74
session .execute (
83
- SimpleStatement .builder (
84
- "CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))" )
85
- .setExecutionProfile (sessionRule .slowProfile ())
86
- .build ());
75
+ createSlowStatement (
76
+ "CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))" ));
87
77
session .execute (
88
- SimpleStatement .builder (
89
- "CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))" )
90
- .setExecutionProfile (sessionRule .slowProfile ())
91
- .build ());
78
+ createSlowStatement (
79
+ "CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))" ));
92
80
session .checkSchemaAgreement ();
93
81
});
94
82
for (int i = 0 ; i < 1000 ; i ++) {
@@ -101,6 +89,12 @@ public static void initialize() {
101
89
}
102
90
}
103
91
92
+ static Statement <?> createSlowStatement (String statement ) {
93
+ return SimpleStatement .builder (statement )
94
+ .setExecutionProfile (sessionRule .slowProfile ())
95
+ .build ();
96
+ }
97
+
104
98
@ Before
105
99
public void truncateTables () throws Exception {
106
100
CqlSession session = sessionRule .session ();
0 commit comments