Skip to content

Commit ee1624d

Browse files
committed
Add e2e test for storing large blob data
1 parent eef69f4 commit ee1624d

File tree

5 files changed

+246
-199
lines changed

5 files changed

+246
-199
lines changed

core/src/integration-test/java/com/scalar/db/storage/jdbc/JdbcDatabaseColumnValueIntegrationTest.java

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,38 @@
11
package com.scalar.db.storage.jdbc;
22

3+
import static org.assertj.core.api.Assertions.assertThat;
4+
35
import com.scalar.db.api.DistributedStorageColumnValueIntegrationTestBase;
6+
import com.scalar.db.api.Get;
7+
import com.scalar.db.api.Put;
8+
import com.scalar.db.api.PutBuilder;
9+
import com.scalar.db.api.Result;
10+
import com.scalar.db.api.TableMetadata;
411
import com.scalar.db.config.DatabaseConfig;
12+
import com.scalar.db.exception.storage.ExecutionException;
13+
import com.scalar.db.io.BigIntColumn;
14+
import com.scalar.db.io.BlobColumn;
15+
import com.scalar.db.io.BooleanColumn;
516
import com.scalar.db.io.Column;
617
import com.scalar.db.io.DataType;
18+
import com.scalar.db.io.DateColumn;
19+
import com.scalar.db.io.DoubleColumn;
20+
import com.scalar.db.io.FloatColumn;
21+
import com.scalar.db.io.IntColumn;
22+
import com.scalar.db.io.Key;
23+
import com.scalar.db.io.TextColumn;
24+
import com.scalar.db.io.TimeColumn;
25+
import com.scalar.db.io.TimestampColumn;
26+
import com.scalar.db.io.TimestampTZColumn;
727
import com.scalar.db.util.TestUtils;
28+
import java.util.Optional;
829
import java.util.Properties;
930
import java.util.Random;
31+
import java.util.stream.Stream;
32+
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.Arguments;
35+
import org.junit.jupiter.params.provider.MethodSource;
1036

1137
public class JdbcDatabaseColumnValueIntegrationTest
1238
extends DistributedStorageColumnValueIntegrationTestBase {
@@ -68,4 +94,163 @@ protected Column<?> getColumnWithMaxValue(String columnName, DataType dataType)
6894
}
6995
return super.getColumnWithMaxValue(columnName, dataType);
7096
}
97+
98+
@ParameterizedTest()
99+
@MethodSource("provideBlobSizes")
100+
public void put_largeBlobData_ShouldWorkCorrectly(int blobSize, String humanReadableBlobSize)
101+
throws ExecutionException {
102+
String tableName = TABLE + "_large_single_blob_single";
103+
try {
104+
// Arrange
105+
TableMetadata.Builder metadata =
106+
TableMetadata.newBuilder()
107+
.addColumn(COL_NAME1, DataType.INT)
108+
.addColumn(COL_NAME2, DataType.BLOB)
109+
.addPartitionKey(COL_NAME1);
110+
111+
admin.createTable(namespace, tableName, metadata.build(), true, getCreationOptions());
112+
admin.truncateTable(namespace, tableName);
113+
byte[] blobData = createLargeBlob(blobSize);
114+
Put put =
115+
Put.newBuilder()
116+
.namespace(namespace)
117+
.table(tableName)
118+
.partitionKey(Key.ofInt(COL_NAME1, 1))
119+
.blobValue(COL_NAME2, blobData)
120+
.build();
121+
122+
// Act
123+
storage.put(put);
124+
125+
// Assert
126+
Optional<Result> optionalResult =
127+
storage.get(
128+
Get.newBuilder()
129+
.namespace(namespace)
130+
.table(tableName)
131+
.partitionKey(Key.ofInt(COL_NAME1, 1))
132+
.build());
133+
assertThat(optionalResult).isPresent();
134+
Result result = optionalResult.get();
135+
assertThat(result.getColumns().get(COL_NAME2).getBlobValueAsBytes()).isEqualTo(blobData);
136+
} finally {
137+
admin.dropTable(namespace, tableName, true);
138+
}
139+
}
140+
141+
Stream<Arguments> provideBlobSizes() {
142+
return Stream.of(
143+
Arguments.of(32_766, "32,766 KB"),
144+
Arguments.of(32_767, "32,767 KB"),
145+
Arguments.of(100_000_000, "100 MB"));
146+
}
147+
148+
@Test
149+
public void put_largeBlobData_WithMultipleBlobColumnsShouldWorkCorrectly()
150+
throws ExecutionException {
151+
String tableName = TABLE + "_large_multiples_blob";
152+
try {
153+
// Arrange
154+
TableMetadata.Builder metadata =
155+
TableMetadata.newBuilder()
156+
.addColumn(COL_NAME1, DataType.INT)
157+
.addColumn(COL_NAME2, DataType.BLOB)
158+
.addColumn(COL_NAME3, DataType.BLOB)
159+
.addPartitionKey(COL_NAME1);
160+
161+
admin.createTable(namespace, tableName, metadata.build(), true, getCreationOptions());
162+
admin.truncateTable(namespace, tableName);
163+
byte[] blobDataCol2 = createLargeBlob(32_766);
164+
byte[] blobDataCol3 = createLargeBlob(5000);
165+
Put put =
166+
Put.newBuilder()
167+
.namespace(namespace)
168+
.table(tableName)
169+
.partitionKey(Key.ofInt(COL_NAME1, 1))
170+
.blobValue(COL_NAME2, blobDataCol2)
171+
.blobValue(COL_NAME3, blobDataCol3)
172+
.build();
173+
174+
// Act
175+
storage.put(put);
176+
177+
// Assert
178+
Optional<Result> optionalResult =
179+
storage.get(
180+
Get.newBuilder()
181+
.namespace(namespace)
182+
.table(tableName)
183+
.partitionKey(Key.ofInt(COL_NAME1, 1))
184+
.build());
185+
assertThat(optionalResult).isPresent();
186+
Result result = optionalResult.get();
187+
assertThat(result.getColumns().get(COL_NAME2).getBlobValueAsBytes()).isEqualTo(blobDataCol2);
188+
assertThat(result.getColumns().get(COL_NAME3).getBlobValueAsBytes()).isEqualTo(blobDataCol3);
189+
} finally {
190+
admin.dropTable(namespace, tableName, true);
191+
}
192+
}
193+
194+
@Test
195+
public void put_largeBlobData_WithAllColumnsTypesShouldWorkCorrectly() throws ExecutionException {
196+
// Arrange
197+
IntColumn partitionKeyValue = (IntColumn) getColumnWithMaxValue(PARTITION_KEY, DataType.INT);
198+
BooleanColumn col1Value = (BooleanColumn) getColumnWithMaxValue(COL_NAME1, DataType.BOOLEAN);
199+
IntColumn col2Value = (IntColumn) getColumnWithMaxValue(COL_NAME2, DataType.INT);
200+
BigIntColumn col3Value = (BigIntColumn) getColumnWithMaxValue(COL_NAME3, DataType.BIGINT);
201+
FloatColumn col4Value = (FloatColumn) getColumnWithMaxValue(COL_NAME4, DataType.FLOAT);
202+
DoubleColumn col5Value = (DoubleColumn) getColumnWithMaxValue(COL_NAME5, DataType.DOUBLE);
203+
TextColumn col6Value = (TextColumn) getColumnWithMaxValue(COL_NAME6, DataType.TEXT);
204+
BlobColumn col7Value = BlobColumn.of(COL_NAME7, createLargeBlob(32_766));
205+
DateColumn col8Value = (DateColumn) getColumnWithMaxValue(COL_NAME8, DataType.DATE);
206+
TimeColumn col9Value = (TimeColumn) getColumnWithMaxValue(COL_NAME9, DataType.TIME);
207+
TimestampTZColumn col10Value =
208+
(TimestampTZColumn) getColumnWithMaxValue(COL_NAME10, DataType.TIMESTAMPTZ);
209+
TimestampColumn column11Value = null;
210+
if (isTimestampTypeSupported()) {
211+
column11Value = (TimestampColumn) getColumnWithMaxValue(COL_NAME11, DataType.TIMESTAMP);
212+
}
213+
214+
PutBuilder.Buildable put =
215+
Put.newBuilder()
216+
.namespace(namespace)
217+
.table(TABLE)
218+
.partitionKey(Key.newBuilder().add(partitionKeyValue).build())
219+
.value(col1Value)
220+
.value(col2Value)
221+
.value(col3Value)
222+
.value(col4Value)
223+
.value(col5Value)
224+
.value(col6Value)
225+
.value(col7Value)
226+
.value(col8Value)
227+
.value(col9Value)
228+
.value(col10Value);
229+
if (isTimestampTypeSupported()) {
230+
put.value(column11Value);
231+
}
232+
// Act
233+
storage.put(put.build());
234+
235+
// Assert
236+
assertResult(
237+
partitionKeyValue,
238+
col1Value,
239+
col2Value,
240+
col3Value,
241+
col4Value,
242+
col5Value,
243+
col6Value,
244+
col7Value,
245+
col8Value,
246+
col9Value,
247+
col10Value,
248+
column11Value);
249+
}
250+
251+
private byte[] createLargeBlob(int size) {
252+
byte[] blob = new byte[size];
253+
random.nextBytes(blob);
254+
return blob;
255+
}
71256
}

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineOracle.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
import com.scalar.db.storage.jdbc.query.SelectQuery;
1616
import com.scalar.db.storage.jdbc.query.SelectWithFetchFirstNRowsOnly;
1717
import com.scalar.db.storage.jdbc.query.UpsertQuery;
18+
import java.io.ByteArrayInputStream;
19+
import java.io.InputStream;
1820
import java.sql.Driver;
1921
import java.sql.JDBCType;
22+
import java.sql.PreparedStatement;
2023
import java.sql.SQLException;
2124
import java.sql.Types;
2225
import java.time.LocalDate;
@@ -31,6 +34,7 @@
3134
import org.slf4j.LoggerFactory;
3235

3336
class RdbEngineOracle extends AbstractRdbEngine {
37+
3438
private static final Logger logger = LoggerFactory.getLogger(RdbEngineOracle.class);
3539
private final String keyColumnSize;
3640
private final RdbEngineTimeTypeOracle timeTypeEngine;
@@ -484,4 +488,34 @@ public void throwIfCrossPartitionScanConditionOnBlobColumnNotSupported(
484488
getTimeTypeStrategy() {
485489
return timeTypeEngine;
486490
}
491+
492+
@Override
493+
public void bindBlobColumnToPreparedStatement(PreparedStatement preparedStatement, int index, byte[] bytes)
494+
throws SQLException {
495+
// When writing to the BLOB data type with a BLOB size greater than 32766 using a MERGE INTO
496+
// statement, an internal error ORA-03137 on the server side occurs so we needed to use a
497+
// workaround. Below is a detailed explanation of the workaround.
498+
//
499+
// Depending on the byte array size, the JDBC driver automatically choose one the following mode
500+
// to transfer the BLOB data to the server:
501+
// - DIRECT: the most efficient mode. It's used when the byte array length is less than 32767.
502+
// - STREAM: this mode is less efficient. It's used when the byte array length is greater than
503+
// 32766.
504+
// - LOB BINDING: this mode is the least efficient. It's used when an input stream without
505+
// specifying the length is specified.
506+
//
507+
// Through testing, it was conjectured that when the driver selects the STREAM mode, the error
508+
// ORA-03137 occurs. So, we work around the issue by making sure to use the driver in a way so
509+
// that it should never selects the STREAM mode.
510+
// For more details about the modes, see the following documentation:
511+
// https://docs.oracle.com/en/database/oracle/oracle-database/23/jjdbc/LOBs-and-BFiles.html#GUID-8FD40D53-8D64-4187-9F6F-FF78242188AD
512+
if (bytes.length < 32767) {
513+
// the DIRECT mode is used to send BLOB data of small size
514+
preparedStatement.setBytes(index, bytes);
515+
} else {
516+
// the LOB BINDING mode is used to send BLOB data of large size
517+
InputStream inputStream = new ByteArrayInputStream(bytes);
518+
preparedStatement.setBinaryStream(index, inputStream);
519+
}
520+
}
487521
}

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineStrategy.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.sql.Connection;
1515
import java.sql.Driver;
1616
import java.sql.JDBCType;
17+
import java.sql.PreparedStatement;
1718
import java.sql.ResultSet;
1819
import java.sql.SQLException;
1920
import java.sql.SQLWarning;
@@ -221,6 +222,11 @@ default OffsetDateTime encode(TimestampTZColumn column) {
221222
return column.getTimestampTZValue().atOffset(ZoneOffset.UTC);
222223
}
223224

225+
default void bindBlobColumnToPreparedStatement(PreparedStatement preparedStatement, int index, byte[] bytes)
226+
throws SQLException {
227+
preparedStatement.setBytes(index, bytes);
228+
}
229+
224230
default DateColumn parseDateColumn(ResultSet resultSet, String columnName) throws SQLException {
225231
return DateColumn.of(columnName, resultSet.getObject(columnName, LocalDate.class));
226232
}

core/src/main/java/com/scalar/db/storage/jdbc/query/PreparedStatementBinder.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
import com.scalar.db.io.TimestampTZColumn;
1818
import com.scalar.db.storage.jdbc.RdbEngineStrategy;
1919
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20+
import java.io.ByteArrayInputStream;
2021
import java.sql.PreparedStatement;
2122
import java.sql.SQLException;
23+
import java.util.Objects;
2224
import javax.annotation.concurrent.NotThreadSafe;
2325

2426
@NotThreadSafe
@@ -130,11 +132,7 @@ public void visit(BlobColumn column) {
130132
if (column.hasNullValue()) {
131133
preparedStatement.setNull(index++, getSqlType(column.getName()));
132134
} else {
133-
// ByteArrayInputStream bais = new
134-
// ByteArrayInputStream(column.getBlobValueAsBytes());
135-
// preparedStatement.setBinaryStream(index++, bais,
136-
// column.getBlobValueAsBytes().length);
137-
preparedStatement.setBytes(index++, column.getBlobValueAsBytes());
135+
rdbEngine.bindBlobColumnToPreparedStatement(preparedStatement, index++, column.getBlobValueAsBytes());
138136
}
139137
} catch (SQLException e) {
140138
sqlException = e;

0 commit comments

Comments
 (0)