Skip to content

Commit 8e5fcba

Browse files
authored
Fix cross partition write after alter data type (#17082)
* Fi * spotless
1 parent 6952ebf commit 8e5fcba

File tree

5 files changed

+85
-11
lines changed

5 files changed

+85
-11
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ public void stopForcibly() {
572572
Thread.currentThread().interrupt();
573573
logger.error("Waiting node to shutdown error.", e);
574574
}
575-
logger.info("In test {} {} started forcibly.", getTestLogDirName(), getId());
575+
logger.info("In test {} {} stopped forcibly.", getTestLogDirName(), getId());
576576
}
577577

578578
@Override

integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.it.schema;
2121

22+
import org.apache.iotdb.commons.conf.CommonConfig;
2223
import org.apache.iotdb.commons.utils.MetadataUtils;
2324
import org.apache.iotdb.db.utils.SchemaUtils;
2425
import org.apache.iotdb.isession.ISession;
@@ -2749,4 +2750,39 @@ public void testAlterIllegalDataType() {
27492750
throw new RuntimeException(e);
27502751
}
27512752
}
2753+
2754+
@Test
2755+
public void testCrossPartitionWrite()
2756+
throws IoTDBConnectionException, StatementExecutionException {
2757+
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
2758+
session.executeNonQueryStatement("CREATE DATABASE root.cross_partition");
2759+
session.executeNonQueryStatement(
2760+
"CREATE TIMESERIES root.cross_partition.device1.sensor1 WITH DATATYPE=INT32,ENCODING=RLE");
2761+
2762+
// Insert data into two partitions
2763+
Tablet tablet =
2764+
new Tablet(
2765+
"root.cross_partition.device1",
2766+
Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE)));
2767+
tablet.addTimestamp(0, 0);
2768+
tablet.addValue("sensor1", 0, 0);
2769+
tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL);
2770+
tablet.addValue("sensor1", 1, 1);
2771+
session.insertTablet(tablet);
2772+
2773+
session.executeNonQueryStatement(
2774+
"ALTER TIMESERIES root.cross_partition.device1.sensor1 SET DATA TYPE INT64");
2775+
2776+
// Insert data with altered type
2777+
tablet =
2778+
new Tablet(
2779+
"root.cross_partition.device1",
2780+
Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.RLE)));
2781+
tablet.addTimestamp(0, 0);
2782+
tablet.addValue("sensor1", 0, 0L);
2783+
tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL);
2784+
tablet.addValue("sensor1", 1, 1L);
2785+
session.insertTablet(tablet);
2786+
}
2787+
}
27522788
}

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,48 @@ public void testNoPermission() throws Exception {
233233
}
234234
}
235235

236+
@Test
237+
public void testSourcePermissionRestart() throws SQLException {
238+
try (final Connection connection = senderEnv.getConnection();
239+
final Statement statement = connection.createStatement()) {
240+
TestUtils.executeNonQuery(senderEnv, "create user `thulab` 'passwD@123456'", connection);
241+
TestUtils.executeNonQueries(
242+
senderEnv, Collections.singletonList("grant READ on root.** to user thulab"));
243+
244+
statement.execute(
245+
String.format(
246+
"create pipe a2b"
247+
+ " with source ("
248+
+ "'user'='thulab'"
249+
+ ", 'password'='passwD@123456')"
250+
+ " with sink ("
251+
+ "'node-urls'='%s')",
252+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
253+
254+
TestUtils.executeNonQueries(
255+
senderEnv,
256+
Arrays.asList(
257+
"create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)"));
258+
TestUtils.executeNonQueries(
259+
receiverEnv,
260+
Arrays.asList(
261+
"create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)"));
262+
263+
TestUtils.executeNonQueries(senderEnv, Collections.singletonList("start pipe a2b"));
264+
265+
TestUtils.executeNonQueries(
266+
senderEnv,
267+
Arrays.asList(
268+
"insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)"));
269+
270+
TestUtils.assertDataEventuallyOnEnv(
271+
receiverEnv,
272+
"select count(pressure) from root.vehicle.plane",
273+
"count(root.vehicle.plane.pressure),",
274+
Collections.singleton("1,"));
275+
}
276+
}
277+
236278
@Test
237279
public void testSourcePermission() {
238280
TestUtils.executeNonQuery(senderEnv, "create user `thulab` 'passwD@123456'", null);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,15 +1487,10 @@ private boolean insertTabletToTsFileProcessor(
14871487
registerToTsFile(insertTabletNode, tsFileProcessor);
14881488
tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, noFailure, infoForMetrics);
14891489
} catch (DataTypeInconsistentException e) {
1490-
// flush both MemTables so that the new type can be inserted into a new MemTable
1491-
TsFileProcessor workSequenceProcessor = workSequenceTsFileProcessors.get(timePartitionId);
1492-
if (workSequenceProcessor != null) {
1493-
fileFlushPolicy.apply(this, workSequenceProcessor, workSequenceProcessor.isSequence());
1494-
}
1495-
TsFileProcessor workUnsequenceProcessor = workUnsequenceTsFileProcessors.get(timePartitionId);
1496-
if (workUnsequenceProcessor != null) {
1497-
fileFlushPolicy.apply(this, workUnsequenceProcessor, workUnsequenceProcessor.isSequence());
1498-
}
1490+
// flush all MemTables so that the new type can be inserted into a new MemTable
1491+
// cannot just flush the current TsFileProcessor, because the new type may be inserted into
1492+
// other TsFileProcessors of this region
1493+
asyncCloseAllWorkingTsFileProcessors();
14991494
throw e;
15001495
} catch (WriteProcessRejectException e) {
15011496
logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class CommonConfig {
5555
public static final String SYSTEM_CONFIG_NAME = "iotdb-system.properties";
5656
public static final String SYSTEM_CONFIG_TEMPLATE_NAME = "iotdb-system.properties.template";
5757
private static final Logger logger = LoggerFactory.getLogger(CommonConfig.class);
58+
public static final long DEFAULT_TIME_PARTITION_INTERVAL = 604_800_000L;
5859

5960
// Open ID Secret
6061
private String openIdProviderUrl = "";
@@ -184,7 +185,7 @@ public class CommonConfig {
184185
private long timePartitionOrigin = 0;
185186

186187
/** Time partition interval in milliseconds. */
187-
private long timePartitionInterval = 604_800_000;
188+
private long timePartitionInterval = DEFAULT_TIME_PARTITION_INTERVAL;
188189

189190
/** This variable set timestamp precision as millisecond, microsecond or nanosecond. */
190191
private String timestampPrecision = "ms";

0 commit comments

Comments
 (0)