diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java index e98ff7e7c144..64c18db11a8c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java @@ -781,8 +781,12 @@ public Ref getRef(int columnIndex) { } @Override - public Blob getBlob(int columnIndex) { - throw new UnsupportedOperationException(); + public Blob getBlob(int columnIndex) throws SQLException { + RequestDelegate delegate = createLocalRequestDelegate(); + for (ResultSet rs : resultSets) { + delegate.addRequest(() -> rs.getBlob(columnIndex)); + } + return delegate.requestAllAndCompare(); } @Override @@ -806,8 +810,12 @@ public Ref getRef(String columnLabel) { } @Override - public Blob getBlob(String columnLabel) { - throw new UnsupportedOperationException(); + public Blob getBlob(String columnLabel) throws SQLException { + RequestDelegate delegate = createLocalRequestDelegate(); + for (ResultSet rs : resultSets) { + delegate.addRequest(() -> rs.getBlob(columnLabel)); + } + return delegate.requestAllAndCompare(); } @Override diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT.java new file mode 100644 index 000000000000..4fad2d2696b4 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.object; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.utils.Binary; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBObjectQueryIT { + + private static final String DATABASE_NAME = "test_db"; + + private static final String TIME_ZONE = "+00:00"; + + private static final String[] createSqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "CREATE TABLE t1(device_id STRING TAG, o1 OBJECT, b1 BLOB, s1 STRING)", + "INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(1, 'd1', X'cafebabe01', to_object(true, 0, X'cafebabe01'), 'cafebabe01')", + "INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(2, 'd1', X'cafebabe0202', to_object(true, 0, X'cafebabe02'), 'cafebabe02')", + "INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(3, 'd1', X'cafebabe0303', to_object(true, 0, X'cafebabe03'), 'cafebabe03')", + "INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(4, 'd1', X'cafebabe04', to_object(true, 0, X'cafebabe04'), 'cafebabe04')", + "INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(1, 'd2', X'cafebade01', to_object(true, 0, X'cafebade01'), 'cafebade01')", + "INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(2, 'd2', X'cafebade0202', to_object(true, 0, X'cafebade02'), 'cafebade02')", + "INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(3, 'd2', X'cafebade0302', to_object(true, 0, X'cafebade03'), 'cafebade03')", + "INSERT INTO t1(time, device_id, b1, o1, s1) VALUES(4, 'd2', X'cafebade04', to_object(true, 0, X'cafebade04'), 'cafebade04')", + "FLUSH", + }; + + @BeforeClass + public static void classSetUp() { + EnvFactory.getEnv().initClusterEnvironment(); + prepareTableData(createSqls); + } + + @AfterClass + public static void classTearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void jdbcTest() { + try (Connection connection = + EnvFactory.getEnv() + .getConnection( + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TABLE_SQL_DIALECT)) { + connection.setClientInfo("time_zone", TIME_ZONE); + try (Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE_NAME); + try (ResultSet resultSet = + statement.executeQuery( + "SELECT time, b1, o1, s1 FROM t1 WHERE device_id = 'd1' ORDER BY time")) { + int cnt = 0; + while (resultSet.next()) { + cnt++; + Blob blob = resultSet.getBlob(3); + byte[] bytes = resultSet.getBytes("o1"); + assertArrayEquals(blob.getBytes(1, (int) blob.length()), bytes); + assertTrue(new String(bytes).endsWith(String.format("%d.bin", cnt))); + String s = resultSet.getString(3); + assertEquals("(Object) 5 B", s); + } + assertEquals(4, cnt); + } + + try (ResultSet resultSet = + statement.executeQuery( + "SELECT time, b1, READ_OBJECT(o1), s1 FROM t1 WHERE device_id = 'd2' AND READ_OBJECT(o1)=b1 ORDER BY time")) { + int cnt = 0; + String[] ans = {"0xcafebade01", "0xcafebade04"}; + while (resultSet.next()) { + String s = resultSet.getString(3); + assertEquals(ans[cnt], s); + cnt++; + } + assertEquals(2, cnt); + } + } + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void sessionTest() { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE " + DATABASE_NAME); + + // SessionDataSet + try (SessionDataSet dataSet = + session.executeQueryStatement( + "SELECT time, b1, o1, s1 FROM t1 WHERE device_id = 'd1' ORDER BY time")) { + int cnt = 0; + while (dataSet.hasNext()) { + cnt++; + RowRecord rowRecord = dataSet.next(); + Field field = rowRecord.getField(2); + String s = field.getStringValue(); + assertEquals("(Object) 5 B", s); + Object blob = field.getObjectValue(TSDataType.OBJECT); + assertTrue(blob instanceof Binary); + assertTrue( + new String(((Binary) blob).getValues()).endsWith(String.format("%d.bin", cnt))); + + Binary binary = field.getBinaryV(); + assertArrayEquals(binary.getValues(), ((Binary) blob).getValues()); + assertTrue(new String(binary.getValues()).endsWith(String.format("%d.bin", cnt))); + } + assertEquals(4, cnt); + } + + // SessionDataSet.DataIterator + try (SessionDataSet dataSet = + session.executeQueryStatement( + "SELECT time, b1, o1, s1 FROM t1 WHERE device_id = 'd2' ORDER BY time")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + int cnt = 0; + while (iterator.next()) { + cnt++; + Object o = iterator.getObject(3); + assertTrue(o instanceof String); + assertEquals("(Object) 5 B", o); + String s = iterator.getString("o1"); + assertEquals("(Object) 5 B", s); + Binary blob = iterator.getBlob(3); + assertTrue(new String(blob.getValues()).endsWith(String.format("%d.bin", cnt))); + } + assertEquals(4, cnt); + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + fail(e.getMessage()); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java index 54881fb8293d..c1c421ce57fa 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java @@ -150,6 +150,22 @@ public void testManageTable() { assertEquals(tableNames.length, cnt); } + // Test unsupported, to be deleted + try { + statement.execute("alter table test1.table1 rename to tableN"); + } catch (final SQLException e) { + assertEquals("701: The renaming for base table is currently unsupported", e.getMessage()); + } + + // Test unsupported, to be deleted + try { + statement.execute( + "alter table if exists test_db.table1 rename column if exists model to modelType"); + } catch (final SQLException e) { + assertEquals( + "701: The renaming for base table column is currently unsupported", e.getMessage()); + } + // Alter table properties statement.execute("alter table test1.table1 set properties ttl=1000000"); ttls = new String[] {"1000000"}; diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java index 77ae98d48876..ff00e3b35dfe 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java @@ -380,7 +380,7 @@ public byte[] getBytes(String columnName) throws SQLException { return null; } - if (dataType.equals(TSDataType.BLOB)) { + if (dataType.equals(TSDataType.BLOB) || dataType.equals(TSDataType.OBJECT)) { Binary binary = ioTDBRpcDataSet.getBinary(columnName); return binary == null ? null : binary.getValues(); } else { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java index 83c03ca9cd0e..882c33cb6573 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java @@ -191,7 +191,8 @@ private void doTransfer( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeConfigRegionWritePlanEvent.toString()); + pipeConfigRegionWritePlanEvent.toString(), + true); } } @@ -252,7 +253,8 @@ private void doTransfer( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeConfigRegionSnapshotEvent.toString()); + pipeConfigRegionSnapshotEvent.toString(), + true); } else { LOGGER.info("Successfully transferred config region snapshot {}.", snapshot); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java index 5866839af17b..5ce03f025980 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java @@ -188,7 +188,8 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri String.format( "Transfer config region write plan %s error, result status %s.", pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), status), - pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString()); + pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString(), + true); } if (LOGGER.isDebugEnabled()) { @@ -279,7 +280,8 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent) String.format( "Seal config region snapshot file %s error, result status %s.", snapshotFile, resp.getStatus()), - snapshotFile.toString()); + snapshotFile.toString(), + true); } LOGGER.info("Successfully transferred config region snapshot {}.", snapshotFile); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 4d4bf7b39268..bedd7fbe96a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -847,10 +847,10 @@ private TSStatus executeStatementAndClassifyExceptions( } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Exception encountered while executing statement %s: ", receiverId.get(), - statement.getPipeLoggingString(), - e); + statement.getPipeLoggingString()); return statement.accept(STATEMENT_EXCEPTION_VISITOR, e); } finally { if (Objects.nonNull(allocatedMemoryBlock)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java index 5a55f435b957..5b57b2403146 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java @@ -39,6 +39,8 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement; import org.apache.iotdb.rpc.TSStatusCode; +import java.util.Objects; + /** * This visitor translated some exceptions to pipe related {@link TSStatus} to help sender classify * them and apply different error handling tactics. Please DO NOT modify the exceptions returned by @@ -130,7 +132,11 @@ public TSStatus visitBatchActivateTemplate( private TSStatus visitGeneralActivateTemplate( final Statement activateTemplateStatement, final Exception context) { if (context instanceof MetadataException || context instanceof StatementAnalyzeException) { - return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + return (Objects.nonNull(context.getMessage()) + && context.getMessage().contains("has not been set any template") + ? new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) + : new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())) .setMessage(context.getMessage()); } else if (isAutoCreateConflict(context)) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index 198e8edb6771..7d1b08b12389 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -270,7 +270,8 @@ private TSStatus visitGeneralActivateTemplate( if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() && context.isSetMessage() && context.getMessage().contains("has not been set any template")) { - return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(context.getMessage()); } return visitStatement(activateTemplateStatement, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java index 3634b2396a43..e4d39b49523e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java @@ -122,7 +122,8 @@ private void doTransfer( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeSchemaRegionWritePlanEvent.toString()); + pipeSchemaRegionWritePlanEvent.toString(), + true); } } @@ -187,7 +188,8 @@ private void doTransfer( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeSchemaRegionSnapshotEvent.toString()); + pipeSchemaRegionSnapshotEvent.toString(), + true); } else { LOGGER.info( "Successfully transferred schema region snapshot {}, {} and {}.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java index 6f4e93ba445b..2f39ffab39d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java @@ -79,7 +79,7 @@ public void onComplete(TPipeConsensusTransferResp response) { // Only handle the failed statuses to avoid string format performance overhead if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - connector.statusHandler().handle(status, status.getMessage(), event.toString()); + connector.statusHandler().handle(status, status.getMessage(), event.toString(), true); } event.decreaseReferenceCount(PipeConsensusDeleteEventHandler.class.getName(), true); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 27ad636db386..016f787afaa2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -253,7 +253,8 @@ private void doTransfer(final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent) String.format( "Transfer deletion %s error, result status %s.", pipeDeleteDataNodeEvent.getDeleteDataNode().getType(), status), - pipeDeleteDataNodeEvent.getDeletionResource().toString()); + pipeDeleteDataNodeEvent.getDeletionResource().toString(), + true); } if (LOGGER.isDebugEnabled()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java index e1e1e7868d70..e001367a4c06 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java @@ -127,7 +127,8 @@ private void doTransfer(final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWri String.format( "Transfer data node write plan %s error, result status %s.", pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), status), - pipeSchemaRegionWritePlanEvent.getPlanNode().toString()); + pipeSchemaRegionWritePlanEvent.getPlanNode().toString(), + true); } if (LOGGER.isDebugEnabled()) { @@ -222,7 +223,8 @@ private void doTransfer(final PipeSchemaRegionSnapshotEvent snapshotEvent) String.format( "Seal file %s, %s and %s error, result status %s.", mTreeSnapshotFile, tagLogSnapshotFile, attributeSnapshotFile, resp.getStatus()), - snapshotEvent.toString()); + snapshotEvent.toString(), + true); } LOGGER.info( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java index af46988c55ec..b6fb47d601fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java @@ -294,6 +294,8 @@ private LinkedList partition(TsBlock tsBlock) { partitionExecutors.addLast(partitionExecutor); partitionStartInCurrentBlock = partitionEndInCurrentBlock; + // Reset cross-TsBlock tracking after partition completion + startIndexInFirstBlock = -1; } else { // Last partition of TsBlock // The beginning of next TsBlock may have rows in this partition diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java index 0378c80f042b..8f2c82d6f1cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java @@ -243,7 +243,6 @@ import static org.apache.tsfile.read.common.type.FloatType.FLOAT; import static org.apache.tsfile.read.common.type.IntType.INT32; import static org.apache.tsfile.read.common.type.LongType.INT64; -import static org.apache.tsfile.read.common.type.ObjectType.OBJECT; import static org.apache.tsfile.read.common.type.StringType.STRING; public class ColumnTransformerBuilder @@ -1456,10 +1455,10 @@ private ColumnTransformer getFunctionColumnTransformer( .equalsIgnoreCase(functionName)) { ColumnTransformer first = this.process(children.get(0), context); if (children.size() == 1) { - return new ReadObjectColumnTransformer(OBJECT, first, context.fragmentInstanceContext); + return new ReadObjectColumnTransformer(BLOB, first, context.fragmentInstanceContext); } else if (children.size() == 2) { return new ReadObjectColumnTransformer( - OBJECT, + BLOB, ((LongLiteral) children.get(1)).getParsedValue(), first, context.fragmentInstanceContext); @@ -1468,7 +1467,7 @@ private ColumnTransformer getFunctionColumnTransformer( long length = ((LongLiteral) children.get(2)).getParsedValue(); checkArgument(offset >= 0 && length >= 0); return new ReadObjectColumnTransformer( - OBJECT, + BLOB, ((LongLiteral) children.get(1)).getParsedValue(), ((LongLiteral) children.get(2)).getParsedValue(), first, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameColumn.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameColumn.java index 0bcc8d5bbaed..d0fbf1ed3200 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameColumn.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameColumn.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; +import org.apache.iotdb.db.exception.sql.SemanticException; + import com.google.common.collect.ImmutableList; import java.util.List; @@ -52,8 +54,7 @@ public RenameColumn( this.columnIfNotExists = columnIfNotExists; this.view = view; if (!view) { - throw new UnsupportedOperationException( - "The renaming for base table column is currently unsupported"); + throw new SemanticException("The renaming for base table column is currently unsupported"); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameTable.java index 69c2b18e96da..4a181f99bba0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameTable.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; +import org.apache.iotdb.db.exception.sql.SemanticException; + import com.google.common.collect.ImmutableList; import java.util.List; @@ -46,8 +48,7 @@ public RenameTable( this.tableIfExists = tableIfExists; this.view = view; if (!view) { - throw new UnsupportedOperationException( - "The renaming for base table is currently unsupported"); + throw new SemanticException("The renaming for base table is currently unsupported"); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperatorTest.java index ed0ec405a9c4..9c40227661d0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperatorTest.java @@ -192,6 +192,57 @@ public void testMixedPartition() { } } + @Test + public void testMixedPartition2() { + long[][] timeArray = + new long[][] { + {1, 2, 3}, + {4, 5}, + {6}, + }; + String[][] deviceIdArray = + new String[][] { + {"d1", "d1", "d2"}, + {"d2", "d3"}, + {"d3"}, + }; + int[][] valueArray = + new int[][] { + {1, 2, 3}, + {4, 5}, + {6}, + }; + + long[] expectColumn1 = new long[] {1, 2, 3, 4, 5, 6}; + String[] expectColumn2 = new String[] {"d1", "d1", "d2", "d2", "d3", "d3"}; + int[] expectColumn4 = new int[] {1, 2, 3, 4, 5, 6}; + long[] expectColumn5 = new long[] {1, 2, 1, 2, 1, 2}; + + int count = 0; + try (TableWindowOperator windowOperator = + genWindowOperator(timeArray, deviceIdArray, valueArray)) { + ListenableFuture listenableFuture = windowOperator.isBlocked(); + listenableFuture.get(); + while (!windowOperator.isFinished() && windowOperator.hasNext()) { + TsBlock tsBlock = windowOperator.next(); + if (tsBlock != null && !tsBlock.isEmpty()) { + for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) { + assertEquals(expectColumn1[count], tsBlock.getColumn(0).getLong(i)); + assertEquals( + expectColumn2[count], + tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET)); + assertEquals(expectColumn4[count], tsBlock.getColumn(2).getInt(i)); + assertEquals(expectColumn5[count], tsBlock.getColumn(3).getLong(i)); + } + } + } + assertEquals(6, count); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + static class ChildOperator implements Operator { private int index; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java index 9fe87b22fd73..886d387f60c8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.utils.RetryUtils; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; @@ -44,7 +45,7 @@ public class PipeReceiverStatusHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class); + private static Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class); private static final String NO_PERMISSION = "No permission"; private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception"; private static final String NO_PERMISSION_STR = "No permissions for this operation"; @@ -86,6 +87,11 @@ public PipeReceiverStatusHandler( this.skipIfNoPrivileges = skipIfNoPrivileges; } + public void handle( + final TSStatus status, final String exceptionMessage, final String recordMessage) { + handle(status, exceptionMessage, recordMessage, false); + } + /** * Handle {@link TSStatus} returned by receiver. Do nothing if ignore the {@link Event}, and throw * exception if retry the {@link Event}. Upper class must ensure that the method is invoked only @@ -99,7 +105,10 @@ public PipeReceiverStatusHandler( * put any time-related info here */ public void handle( - final TSStatus status, final String exceptionMessage, final String recordMessage) { + final TSStatus status, + final String exceptionMessage, + final String recordMessage, + final boolean log4NoPrivileges) { if (RetryUtils.needRetryForWrite(status.getCode())) { LOGGER.info("IoTConsensusV2: will retry with increasing interval. status: {}", status); @@ -184,17 +193,28 @@ public void handle( case 803: // NO_PERMISSION if (skipIfNoPrivileges) { + if (log4NoPrivileges && LOGGER.isWarnEnabled()) { + LOGGER.warn( + "{}: Skip if no privileges. will be ignored. event: {}. status: {}", + getNoPermission(true), + shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded", + status); + } return; } handleOtherExceptions(status, exceptionMessage, recordMessage, true); break; - case 305: - handleOtherExceptions(status, exceptionMessage, recordMessage, false); - break; default: // Some auth error may be wrapped in other codes if (exceptionMessage.contains(NO_PERMISSION_STR)) { if (skipIfNoPrivileges) { + if (log4NoPrivileges && LOGGER.isWarnEnabled()) { + LOGGER.warn( + "{}: Skip if no privileges. will be ignored. event: {}. status: {}", + getNoPermission(true), + shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded", + status); + } return; } handleOtherExceptions(status, exceptionMessage, recordMessage, true); @@ -314,4 +334,9 @@ public static TSStatus getPriorStatus(final List givenStatusList) { resultStatus.setSubStatus(givenStatusList); return resultStatus; } + + @TestOnly + public static void setLogger(final Logger logger) { + LOGGER = logger; + } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java new file mode 100644 index 000000000000..38d2e015165d --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.datastructure; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE; + +public class PipeReceiverStatusHandlerTest { + @Test + public void testAuthLogger() { + final PipeReceiverStatusHandler handler = + new PipeReceiverStatusHandler( + CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE.equals("retry"), + CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE, + CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE, + CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE, + CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE, + true); + PipeReceiverStatusHandler.setLogger( + new Logger() { + @Override + public String getName() { + return null; + } + + @Override + public boolean isTraceEnabled() { + return false; + } + + @Override + public void trace(String msg) {} + + @Override + public void trace(String format, Object arg) {} + + @Override + public void trace(String format, Object arg1, Object arg2) {} + + @Override + public void trace(String format, Object... arguments) {} + + @Override + public void trace(String msg, Throwable t) {} + + @Override + public boolean isTraceEnabled(Marker marker) { + return false; + } + + @Override + public void trace(Marker marker, String msg) {} + + @Override + public void trace(Marker marker, String format, Object arg) {} + + @Override + public void trace(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void trace(Marker marker, String format, Object... argArray) {} + + @Override + public void trace(Marker marker, String msg, Throwable t) {} + + @Override + public boolean isDebugEnabled() { + return false; + } + + @Override + public void debug(String msg) {} + + @Override + public void debug(String format, Object arg) {} + + @Override + public void debug(String format, Object arg1, Object arg2) {} + + @Override + public void debug(String format, Object... arguments) {} + + @Override + public void debug(String msg, Throwable t) {} + + @Override + public boolean isDebugEnabled(Marker marker) { + return false; + } + + @Override + public void debug(Marker marker, String msg) {} + + @Override + public void debug(Marker marker, String format, Object arg) {} + + @Override + public void debug(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void debug(Marker marker, String format, Object... arguments) {} + + @Override + public void debug(Marker marker, String msg, Throwable t) {} + + @Override + public boolean isInfoEnabled() { + return false; + } + + @Override + public void info(String msg) {} + + @Override + public void info(String format, Object arg) {} + + @Override + public void info(String format, Object arg1, Object arg2) {} + + @Override + public void info(String format, Object... arguments) {} + + @Override + public void info(String msg, Throwable t) {} + + @Override + public boolean isInfoEnabled(Marker marker) { + return false; + } + + @Override + public void info(Marker marker, String msg) {} + + @Override + public void info(Marker marker, String format, Object arg) {} + + @Override + public void info(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void info(Marker marker, String format, Object... arguments) {} + + @Override + public void info(Marker marker, String msg, Throwable t) {} + + // Warn + @Override + public boolean isWarnEnabled() { + return true; + } + + @Override + public void warn(String msg) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(String format, Object arg) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(String format, Object... arguments) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(String format, Object arg1, Object arg2) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(String msg, Throwable t) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWarnEnabled(Marker marker) { + return true; + } + + @Override + public void warn(Marker marker, String msg) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(Marker marker, String format, Object arg) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(Marker marker, String format, Object arg1, Object arg2) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(Marker marker, String format, Object... arguments) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(Marker marker, String msg, Throwable t) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isErrorEnabled() { + return false; + } + + @Override + public void error(String msg) {} + + @Override + public void error(String format, Object arg) {} + + @Override + public void error(String format, Object arg1, Object arg2) {} + + @Override + public void error(String format, Object... arguments) {} + + @Override + public void error(String msg, Throwable t) {} + + @Override + public boolean isErrorEnabled(Marker marker) { + return false; + } + + @Override + public void error(Marker marker, String msg) {} + + @Override + public void error(Marker marker, String format, Object arg) {} + + @Override + public void error(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void error(Marker marker, String format, Object... arguments) {} + + @Override + public void error(Marker marker, String msg, Throwable t) {} + }); + handler.handle( + new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()), + "", + ""); + handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()), "", ""); + try { + handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()), "", "", true); + Assert.fail(); + } catch (final UnsupportedOperationException e) { + // Expected + } + handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()), "", ""); + try { + handler.handle( + new TSStatus(TSStatusCode.METADATA_ERROR.getStatusCode()) + .setMessage("No permissions for this operation, please add privilege WRITE_DATA"), + "", + "", + true); + Assert.fail(); + } catch (final UnsupportedOperationException e) { + // Expected + } + PipeReceiverStatusHandler.setLogger(LoggerFactory.getLogger(PipeReceiverStatusHandler.class)); + } +} diff --git a/pom.xml b/pom.xml index 79d231b3c39c..8942576d982b 100644 --- a/pom.xml +++ b/pom.xml @@ -134,7 +134,7 @@ 7.0.0 1.5.6 2.0.9 - 1.0.9 + 1.0.10