11package com .scalar .db .storage .jdbc ;
22
3+ import static org .assertj .core .api .Assertions .assertThat ;
4+
5+ import com .scalar .db .api .DistributedStorage ;
36import com .scalar .db .api .DistributedStorageIntegrationTestBase ;
7+ import com .scalar .db .api .Get ;
8+ import com .scalar .db .api .Put ;
9+ import com .scalar .db .api .Result ;
10+ import com .scalar .db .api .Scan ;
11+ import com .scalar .db .api .Scanner ;
412import com .scalar .db .config .DatabaseConfig ;
13+ import com .scalar .db .exception .storage .ExecutionException ;
14+ import com .scalar .db .io .Key ;
15+ import com .scalar .db .service .StorageFactory ;
16+ import java .io .IOException ;
17+ import java .util .List ;
18+ import java .util .Optional ;
519import java .util .Properties ;
20+ import org .junit .jupiter .api .Test ;
621
722public class JdbcDatabaseIntegrationTest extends DistributedStorageIntegrationTestBase {
823
@@ -25,4 +40,113 @@ protected int getLargeDataSizeInBytes() {
2540 return super .getLargeDataSizeInBytes ();
2641 }
2742 }
43+
44+ @ Test
45+ public void get_InStreamingMode_ShouldRetrieveSingleResult () throws ExecutionException {
46+ if (!JdbcTestUtils .isMysql (rdbEngine ) || JdbcTestUtils .isMariaDB (rdbEngine )) {
47+ // MySQL is the only RDB engine that supports streaming mode
48+ return ;
49+ }
50+
51+ try (DistributedStorage storage = getStorageInStreamingMode ()) {
52+ // Arrange
53+ int pKey = 0 ;
54+ int cKey = 1 ;
55+ int value = 2 ;
56+
57+ storage .put (
58+ Put .newBuilder ()
59+ .namespace (namespace )
60+ .table (TABLE )
61+ .partitionKey (Key .ofInt (COL_NAME1 , pKey ))
62+ .clusteringKey (Key .ofInt (COL_NAME4 , cKey ))
63+ .intValue (COL_NAME3 , value )
64+ .build ());
65+
66+ // Act
67+ Optional <Result > result =
68+ storage .get (
69+ Get .newBuilder ()
70+ .namespace (namespace )
71+ .table (TABLE )
72+ .partitionKey (Key .ofInt (COL_NAME1 , pKey ))
73+ .clusteringKey (Key .ofInt (COL_NAME4 , cKey ))
74+ .build ());
75+
76+ // Assert
77+ assertThat (result .isPresent ()).isTrue ();
78+ assertThat (result .get ().getInt (COL_NAME1 )).isEqualTo (pKey );
79+ assertThat (result .get ().getInt (COL_NAME4 )).isEqualTo (cKey );
80+ assertThat (result .get ().getInt (COL_NAME3 )).isEqualTo (value );
81+ }
82+ }
83+
84+ @ Test
85+ public void scan_InStreamingMode_ShouldRetrieveResults () throws IOException , ExecutionException {
86+ if (!JdbcTestUtils .isMysql (rdbEngine ) || JdbcTestUtils .isMariaDB (rdbEngine )) {
87+ // MySQL is the only RDB engine that supports streaming mode
88+ return ;
89+ }
90+
91+ try (DistributedStorage storage = getStorageInStreamingMode ()) {
92+ // Arrange
93+ int pKey = 0 ;
94+
95+ storage .put (
96+ Put .newBuilder ()
97+ .namespace (namespace )
98+ .table (TABLE )
99+ .partitionKey (Key .ofInt (COL_NAME1 , pKey ))
100+ .clusteringKey (Key .ofInt (COL_NAME4 , 0 ))
101+ .intValue (COL_NAME3 , 1 )
102+ .build ());
103+ storage .put (
104+ Put .newBuilder ()
105+ .namespace (namespace )
106+ .table (TABLE )
107+ .partitionKey (Key .ofInt (COL_NAME1 , pKey ))
108+ .clusteringKey (Key .ofInt (COL_NAME4 , 1 ))
109+ .intValue (COL_NAME3 , 2 )
110+ .build ());
111+ storage .put (
112+ Put .newBuilder ()
113+ .namespace (namespace )
114+ .table (TABLE )
115+ .partitionKey (Key .ofInt (COL_NAME1 , pKey ))
116+ .clusteringKey (Key .ofInt (COL_NAME4 , 2 ))
117+ .intValue (COL_NAME3 , 3 )
118+ .build ());
119+
120+ // Act
121+ Scanner scanner =
122+ storage .scan (
123+ Scan .newBuilder ()
124+ .namespace (namespace )
125+ .table (TABLE )
126+ .partitionKey (Key .ofInt (COL_NAME1 , pKey ))
127+ .build ());
128+ List <Result > results = scanner .all ();
129+ scanner .close ();
130+
131+ // Assert
132+ assertThat (results ).hasSize (3 );
133+ assertThat (results .get (0 ).getInt (COL_NAME1 )).isEqualTo (pKey );
134+ assertThat (results .get (0 ).getInt (COL_NAME4 )).isEqualTo (0 );
135+ assertThat (results .get (0 ).getInt (COL_NAME3 )).isEqualTo (1 );
136+
137+ assertThat (results .get (1 ).getInt (COL_NAME1 )).isEqualTo (pKey );
138+ assertThat (results .get (1 ).getInt (COL_NAME4 )).isEqualTo (1 );
139+ assertThat (results .get (1 ).getInt (COL_NAME3 )).isEqualTo (2 );
140+
141+ assertThat (results .get (2 ).getInt (COL_NAME1 )).isEqualTo (pKey );
142+ assertThat (results .get (2 ).getInt (COL_NAME4 )).isEqualTo (2 );
143+ assertThat (results .get (2 ).getInt (COL_NAME3 )).isEqualTo (3 );
144+ }
145+ }
146+
147+ private DistributedStorage getStorageInStreamingMode () {
148+ Properties properties = JdbcEnv .getProperties (TEST_NAME );
149+ properties .setProperty (DatabaseConfig .SCAN_FETCH_SIZE , Integer .toString (Integer .MIN_VALUE ));
150+ return StorageFactory .create (properties ).getStorage ();
151+ }
28152}
0 commit comments