Skip to content

Commit ca85e80

Browse files
authored
Pipe: Add TsFile parsing with Mods function (apache#16540)
1 parent bbe7882 commit ca85e80

18 files changed

+2065
-149
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,39 @@ public static void insertData(
115115
TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null);
116116
}
117117

118+
public static void insertData(
119+
final String dataBaseName,
120+
final String tableName,
121+
final int deviceStartIndex,
122+
final int deviceEndIndex,
123+
final int startInclusive,
124+
final int endExclusive,
125+
final BaseEnv baseEnv) {
126+
List<String> list = new ArrayList<>(endExclusive - startInclusive + 1);
127+
for (int deviceIndex = deviceStartIndex; deviceIndex < deviceEndIndex; ++deviceIndex) {
128+
for (int i = startInclusive; i < endExclusive; ++i) {
129+
list.add(
130+
String.format(
131+
"insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, s9, s10, s11, time) values ('t%s','t%s','t%s','t%s','%s', %s.0, %s, %s, %d, %d.0, '%s', '%s', %s)",
132+
tableName,
133+
deviceIndex,
134+
deviceIndex,
135+
deviceIndex,
136+
deviceIndex,
137+
i,
138+
i,
139+
i,
140+
i,
141+
i,
142+
i,
143+
getDateStr(i),
144+
i,
145+
i));
146+
}
147+
}
148+
TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null);
149+
}
150+
118151
public static void insertData(
119152
final String dataBaseName,
120153
final String tableName,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic;
21+
22+
import org.apache.iotdb.db.it.utils.TestUtils;
23+
import org.apache.iotdb.isession.SessionConfig;
24+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
25+
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
26+
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
27+
import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
28+
29+
import org.junit.Test;
30+
import org.junit.experimental.categories.Category;
31+
import org.junit.runner.RunWith;
32+
33+
import java.util.Collections;
34+
import java.util.HashSet;
35+
36+
import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQueryWithRetry;
37+
38+
@RunWith(IoTDBTestRunner.class)
39+
@Category({MultiClusterIT2DualTableManualBasic.class})
40+
public class IoTDBPipeTsFileDecompositionWithModsIT extends AbstractPipeTableModelDualManualIT {
41+
42+
/**
43+
* Test IoTDB pipe handling TsFile decomposition with Mods (modification operations) in table
44+
* model
45+
*
46+
* <p>Test scenario: 1. Create two storage groups sg1 and sg2, each containing table1 2. Insert
47+
* small amount of data in sg1 (1-6 rows), insert large amount of data in sg2 (110 batches, 100
48+
* rows per batch) 3. Execute FLUSH operation to persist data to TsFile 4. Execute multiple DELETE
49+
* operations on sg1, deleting data in time ranges 2-4 and 3-5 5. Execute multiple DELETE
50+
* operations on sg2, deleting data matching specific conditions (s0-s3 field values) 6. Execute
51+
* FLUSH operation again 7. Create pipe with mods enabled, synchronize data to receiver 8. Verify
52+
* correctness of receiver data: - sg1 only retains time=1 data, time=2-4 data is correctly
53+
* deleted - sg2 DELETE operation results meet expectations (t10 retains 1000 rows, t11 all
54+
* deleted, t12 retains 5900 rows, etc.)
55+
*
56+
* <p>Test purpose: Verify that IoTDB pipe can correctly handle Mods (modification operations) in
57+
* TsFile, ensuring DELETE operations can be correctly synchronized to the receiver and data
58+
* consistency is guaranteed.
59+
*/
60+
@Test
61+
public void testTsFileDecompositionWithMods() {
62+
TableModelUtils.createDataBaseAndTable(senderEnv, "table1", "sg1");
63+
TableModelUtils.createDataBaseAndTable(receiverEnv, "table1", "sg1");
64+
65+
TableModelUtils.insertData("sg1", "table1", 1, 6, senderEnv);
66+
67+
TableModelUtils.createDataBaseAndTable(senderEnv, "table1", "sg2");
68+
for (int i = 1; i <= 110; i++) {
69+
TableModelUtils.insertData("sg2", "table1", 10, 15, (i - 1) * 100, i * 100, senderEnv);
70+
}
71+
72+
executeNonQueryWithRetry(senderEnv, "FLUSH");
73+
74+
executeNonQueryWithRetry(
75+
senderEnv,
76+
"DELETE FROM table1 WHERE time >= 2 AND time <= 4",
77+
SessionConfig.DEFAULT_USER,
78+
SessionConfig.DEFAULT_PASSWORD,
79+
"sg1",
80+
"table");
81+
82+
executeNonQueryWithRetry(
83+
senderEnv,
84+
"DELETE FROM table1 WHERE time >= 3 AND time <= 5",
85+
SessionConfig.DEFAULT_USER,
86+
SessionConfig.DEFAULT_PASSWORD,
87+
"sg1",
88+
"table");
89+
90+
executeNonQueryWithRetry(
91+
senderEnv,
92+
"DELETE FROM table1 WHERE time >= 0 AND time < 10000 AND s0 ='t10' AND s1='t10' AND s2='t10' AND s3='t10'",
93+
SessionConfig.DEFAULT_USER,
94+
SessionConfig.DEFAULT_PASSWORD,
95+
"sg2",
96+
"table");
97+
98+
executeNonQueryWithRetry(
99+
senderEnv,
100+
"DELETE FROM table1 WHERE time >= 0 AND time <= 11000 AND s0 ='t11' AND s1='t11' AND s2='t11' AND s3='t11'",
101+
SessionConfig.DEFAULT_USER,
102+
SessionConfig.DEFAULT_PASSWORD,
103+
"sg2",
104+
"table");
105+
106+
executeNonQueryWithRetry(
107+
senderEnv,
108+
"DELETE FROM table1 WHERE time >= 5000 AND time < 10100 AND s0 ='t12' AND s1='t12' AND s2='t12' AND s3='t12'",
109+
SessionConfig.DEFAULT_USER,
110+
SessionConfig.DEFAULT_PASSWORD,
111+
"sg2",
112+
"table");
113+
114+
executeNonQueryWithRetry(
115+
senderEnv,
116+
"DELETE FROM table1 WHERE time >= 0 AND time < 10000 AND s0 ='t13' AND s1='t13' AND s2='t13' AND s3='t13'",
117+
SessionConfig.DEFAULT_USER,
118+
SessionConfig.DEFAULT_PASSWORD,
119+
"sg2",
120+
"table");
121+
122+
executeNonQueryWithRetry(
123+
senderEnv,
124+
"DELETE FROM table1 WHERE time >= 10000 AND time <= 11000 AND s0 ='t14' AND s1='t14' AND s2='t14' AND s3='t14'",
125+
SessionConfig.DEFAULT_USER,
126+
SessionConfig.DEFAULT_PASSWORD,
127+
"sg2",
128+
"table");
129+
130+
executeNonQueryWithRetry(senderEnv, "FLUSH");
131+
132+
executeNonQueryWithRetry(
133+
senderEnv,
134+
String.format(
135+
"CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true', 'capture.table'='true') WITH CONNECTOR('ip'='%s', 'port'='%s', 'username'='root', 'format'='tablet')",
136+
receiverEnv.getDataNodeWrapperList().get(0).getIp(),
137+
receiverEnv.getDataNodeWrapperList().get(0).getPort()));
138+
139+
HashSet<String> expectedResults = new HashSet<>();
140+
expectedResults.add(
141+
"t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,");
142+
143+
TestUtils.assertDataEventuallyOnEnv(
144+
receiverEnv,
145+
TableModelUtils.getQuerySql("table1"),
146+
TableModelUtils.generateHeaderResults(),
147+
expectedResults,
148+
"sg1");
149+
150+
TestUtils.assertDataEventuallyOnEnv(
151+
receiverEnv,
152+
"SELECT s4 FROM table1 WHERE time >= 2 AND time <= 4",
153+
"s4,",
154+
Collections.emptySet(),
155+
"sg1");
156+
157+
TestUtils.assertDataEventuallyOnEnv(
158+
receiverEnv,
159+
"SELECT COUNT(*) as count FROM table1 WHERE s0 ='t10' AND s1='t10' AND s2='t10' AND s3='t10'",
160+
"count,",
161+
Collections.singleton("1000,"),
162+
"sg2");
163+
164+
TestUtils.assertDataEventuallyOnEnv(
165+
receiverEnv,
166+
"SELECT COUNT(*) as count FROM table1 WHERE s0 ='t11' AND s1='t11' AND s2='t11' AND s3='t11'",
167+
"count,",
168+
Collections.singleton("0,"),
169+
"sg2");
170+
171+
TestUtils.assertDataEventuallyOnEnv(
172+
receiverEnv,
173+
"SELECT COUNT(*) as count FROM table1 WHERE s0 ='t12' AND s1='t12' AND s2='t12' AND s3='t12'",
174+
"count,",
175+
Collections.singleton("5900,"),
176+
"sg2");
177+
178+
TestUtils.assertDataEventuallyOnEnv(
179+
receiverEnv,
180+
"SELECT COUNT(*) as count FROM table1 WHERE s0 ='t13' AND s1='t13' AND s2='t13' AND s3='t13'",
181+
"count,",
182+
Collections.singleton("1000,"),
183+
"sg2");
184+
185+
TestUtils.assertDataEventuallyOnEnv(
186+
receiverEnv,
187+
"SELECT COUNT(*) as count FROM table1 WHERE s0 ='t14' AND s1='t14' AND s2='t14' AND s3='t14'",
188+
"count,",
189+
Collections.singleton("10000,"),
190+
"sg2");
191+
}
192+
}

0 commit comments

Comments
 (0)