Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,12 @@ public Ref getRef(int columnIndex) {
}

@Override
public Blob getBlob(int columnIndex) {
throw new UnsupportedOperationException();
public Blob getBlob(int columnIndex) throws SQLException {
RequestDelegate<Blob> delegate = createLocalRequestDelegate();
for (ResultSet rs : resultSets) {
delegate.addRequest(() -> rs.getBlob(columnIndex));
}
return delegate.requestAllAndCompare();
}

@Override
Expand All @@ -806,8 +810,12 @@ public Ref getRef(String columnLabel) {
}

@Override
public Blob getBlob(String columnLabel) {
throw new UnsupportedOperationException();
public Blob getBlob(String columnLabel) throws SQLException {
RequestDelegate<Blob> delegate = createLocalRequestDelegate();
for (ResultSet rs : resultSets) {
delegate.addRequest(() -> rs.getBlob(columnLabel));
}
return delegate.requestAllAndCompare();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.relational.it.query.object;

import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.sql.Blob;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
public class IoTDBObjectQueryIT {

private static final String DATABASE_NAME = "test_db";

private static final String TIME_ZONE = "+00:00";

private static final String[] createSqls =
new String[] {
"CREATE DATABASE " + DATABASE_NAME,
"USE " + DATABASE_NAME,
"CREATE TABLE t1(device_id STRING TAG, o1 OBJECT, b1 BLOB, s1 STRING)",
"INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(1, 'd1', X'cafebabe01', to_object(true, 0, X'cafebabe01'), 'cafebabe01')",
"INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(2, 'd1', X'cafebabe0202', to_object(true, 0, X'cafebabe02'), 'cafebabe02')",
"INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(3, 'd1', X'cafebabe0303', to_object(true, 0, X'cafebabe03'), 'cafebabe03')",
"INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(4, 'd1', X'cafebabe04', to_object(true, 0, X'cafebabe04'), 'cafebabe04')",
"INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(1, 'd2', X'cafebade01', to_object(true, 0, X'cafebade01'), 'cafebade01')",
"INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(2, 'd2', X'cafebade0202', to_object(true, 0, X'cafebade02'), 'cafebade02')",
"INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(3, 'd2', X'cafebade0302', to_object(true, 0, X'cafebade03'), 'cafebade03')",
"INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(4, 'd2', X'cafebade04', to_object(true, 0, X'cafebade04'), 'cafebade04')",
"FLUSH",
};

@BeforeClass
public static void classSetUp() {
EnvFactory.getEnv().initClusterEnvironment();
prepareTableData(createSqls);
}

@AfterClass
public static void classTearDown() {
EnvFactory.getEnv().cleanClusterEnvironment();
}

@Test
public void jdbcTest() {
try (Connection connection =
EnvFactory.getEnv()
.getConnection(
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
BaseEnv.TABLE_SQL_DIALECT)) {
connection.setClientInfo("time_zone", TIME_ZONE);
try (Statement statement = connection.createStatement()) {
statement.execute("use " + DATABASE_NAME);
try (ResultSet resultSet =
statement.executeQuery(
"SELECT time, b1, o1, s1 FROM t1 WHERE device_id = 'd1' ORDER BY time")) {
int cnt = 0;
while (resultSet.next()) {
cnt++;
Blob blob = resultSet.getBlob(3);
byte[] bytes = resultSet.getBytes("o1");
assertArrayEquals(blob.getBytes(1, (int) blob.length()), bytes);
assertTrue(new String(bytes).endsWith(String.format("%d.bin", cnt)));
String s = resultSet.getString(3);
assertEquals("(Object) 5 B", s);
}
assertEquals(4, cnt);
}

try (ResultSet resultSet =
statement.executeQuery(
"SELECT time, b1, READ_OBJECT(o1), s1 FROM t1 WHERE device_id = 'd2' AND READ_OBJECT(o1)=b1 ORDER BY time")) {
int cnt = 0;
String[] ans = {"0xcafebade01", "0xcafebade04"};
while (resultSet.next()) {
String s = resultSet.getString(3);
assertEquals(ans[cnt], s);
cnt++;
}
assertEquals(2, cnt);
}
}
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void sessionTest() {
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
session.executeNonQueryStatement("USE " + DATABASE_NAME);

// SessionDataSet
try (SessionDataSet dataSet =
session.executeQueryStatement(
"SELECT time, b1, o1, s1 FROM t1 WHERE device_id = 'd1' ORDER BY time")) {
int cnt = 0;
while (dataSet.hasNext()) {
cnt++;
RowRecord rowRecord = dataSet.next();
Field field = rowRecord.getField(2);
String s = field.getStringValue();
assertEquals("(Object) 5 B", s);
Object blob = field.getObjectValue(TSDataType.OBJECT);
assertTrue(blob instanceof Binary);
assertTrue(
new String(((Binary) blob).getValues()).endsWith(String.format("%d.bin", cnt)));

Binary binary = field.getBinaryV();
assertArrayEquals(binary.getValues(), ((Binary) blob).getValues());
assertTrue(new String(binary.getValues()).endsWith(String.format("%d.bin", cnt)));
}
assertEquals(4, cnt);
}

// SessionDataSet.DataIterator
try (SessionDataSet dataSet =
session.executeQueryStatement(
"SELECT time, b1, o1, s1 FROM t1 WHERE device_id = 'd2' ORDER BY time")) {
SessionDataSet.DataIterator iterator = dataSet.iterator();
int cnt = 0;
while (iterator.next()) {
cnt++;
Object o = iterator.getObject(3);
assertTrue(o instanceof String);
assertEquals("(Object) 5 B", o);
String s = iterator.getString("o1");
assertEquals("(Object) 5 B", s);
Binary blob = iterator.getBlob(3);
assertTrue(new String(blob.getValues()).endsWith(String.format("%d.bin", cnt)));
}
assertEquals(4, cnt);
}
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,22 @@ public void testManageTable() {
assertEquals(tableNames.length, cnt);
}

// Test unsupported, to be deleted
try {
statement.execute("alter table test1.table1 rename to tableN");
} catch (final SQLException e) {
assertEquals("701: The renaming for base table is currently unsupported", e.getMessage());
}

// Test unsupported, to be deleted
try {
statement.execute(
"alter table if exists test_db.table1 rename column if exists model to modelType");
} catch (final SQLException e) {
assertEquals(
"701: The renaming for base table column is currently unsupported", e.getMessage());
}

// Alter table properties
statement.execute("alter table test1.table1 set properties ttl=1000000");
ttls = new String[] {"1000000"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public byte[] getBytes(String columnName) throws SQLException {
return null;
}

if (dataType.equals(TSDataType.BLOB)) {
if (dataType.equals(TSDataType.BLOB) || dataType.equals(TSDataType.OBJECT)) {
Binary binary = ioTDBRpcDataSet.getBinary(columnName);
return binary == null ? null : binary.getValues();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ private void doTransfer(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeConfigRegionWritePlanEvent.toString());
pipeConfigRegionWritePlanEvent.toString(),
true);
}
}

Expand Down Expand Up @@ -252,7 +253,8 @@ private void doTransfer(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeConfigRegionSnapshotEvent.toString());
pipeConfigRegionSnapshotEvent.toString(),
true);
} else {
LOGGER.info("Successfully transferred config region snapshot {}.", snapshot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri
String.format(
"Transfer config region write plan %s error, result status %s.",
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), status),
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString());
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString(),
true);
}

if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -279,7 +280,8 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
String.format(
"Seal config region snapshot file %s error, result status %s.",
snapshotFile, resp.getStatus()),
snapshotFile.toString());
snapshotFile.toString(),
true);
}

LOGGER.info("Successfully transferred config region snapshot {}.", snapshotFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,10 +847,10 @@ private TSStatus executeStatementAndClassifyExceptions(
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Exception encountered while executing statement %s: ",
receiverId.get(),
statement.getPipeLoggingString(),
e);
statement.getPipeLoggingString());
return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
} finally {
if (Objects.nonNull(allocatedMemoryBlock)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
import org.apache.iotdb.rpc.TSStatusCode;

import java.util.Objects;

/**
* This visitor translated some exceptions to pipe related {@link TSStatus} to help sender classify
* them and apply different error handling tactics. Please DO NOT modify the exceptions returned by
Expand Down Expand Up @@ -130,7 +132,11 @@ public TSStatus visitBatchActivateTemplate(
private TSStatus visitGeneralActivateTemplate(
final Statement activateTemplateStatement, final Exception context) {
if (context instanceof MetadataException || context instanceof StatementAnalyzeException) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
return (Objects.nonNull(context.getMessage())
&& context.getMessage().contains("has not been set any template")
? new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
: new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()))
.setMessage(context.getMessage());
} else if (isAutoCreateConflict(context)) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ private TSStatus visitGeneralActivateTemplate(
if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
&& context.isSetMessage()
&& context.getMessage().contains("has not been set any template")) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
return visitStatement(activateTemplateStatement, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ private void doTransfer(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeSchemaRegionWritePlanEvent.toString());
pipeSchemaRegionWritePlanEvent.toString(),
true);
}
}

Expand Down Expand Up @@ -187,7 +188,8 @@ private void doTransfer(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeSchemaRegionSnapshotEvent.toString());
pipeSchemaRegionSnapshotEvent.toString(),
true);
} else {
LOGGER.info(
"Successfully transferred schema region snapshot {}, {} and {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void onComplete(TPipeConsensusTransferResp response) {
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
connector.statusHandler().handle(status, status.getMessage(), event.toString());
connector.statusHandler().handle(status, status.getMessage(), event.toString(), true);
}
event.decreaseReferenceCount(PipeConsensusDeleteEventHandler.class.getName(), true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ private void doTransfer(final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
String.format(
"Transfer deletion %s error, result status %s.",
pipeDeleteDataNodeEvent.getDeleteDataNode().getType(), status),
pipeDeleteDataNodeEvent.getDeletionResource().toString());
pipeDeleteDataNodeEvent.getDeletionResource().toString(),
true);
}

if (LOGGER.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ private void doTransfer(final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWri
String.format(
"Transfer data node write plan %s error, result status %s.",
pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), status),
pipeSchemaRegionWritePlanEvent.getPlanNode().toString());
pipeSchemaRegionWritePlanEvent.getPlanNode().toString(),
true);
}

if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -222,7 +223,8 @@ private void doTransfer(final PipeSchemaRegionSnapshotEvent snapshotEvent)
String.format(
"Seal file %s, %s and %s error, result status %s.",
mTreeSnapshotFile, tagLogSnapshotFile, attributeSnapshotFile, resp.getStatus()),
snapshotEvent.toString());
snapshotEvent.toString(),
true);
}

LOGGER.info(
Expand Down
Loading
Loading