Skip to content

Commit 3df7063

Browse files
HTHoushuwenwei
authored andcommitted
Two Implementation of Object File Path (#16858)
* add base32 Object Path * add plainObjectPath and configuration * change default configuration * fix UT errors * reduce bytes copy * implement unchangeable config * Add IT * add config * replace region id for object binary * fix ut * spotless * spotless * fix rebase * fix rebase * Fix review --------- Co-authored-by: shuwenwei <[email protected]>
1 parent 3539ec0 commit 3df7063

File tree

36 files changed

+1195
-282
lines changed

36 files changed

+1195
-282
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,12 @@ public CommonConfig setAuditableOperationResult(String auditableOperationResult)
651651
return this;
652652
}
653653

654+
@Override
655+
public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) {
656+
setProperty("restrict_object_limit", String.valueOf(restrictObjectLimit));
657+
return this;
658+
}
659+
654660
// For part of the log directory
655661
public String getClusterConfigStr() {
656662
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,4 +684,11 @@ public CommonConfig setAuditableOperationResult(String auditableOperationResult)
684684
cnConfig.setAuditableOperationResult(auditableOperationResult);
685685
return this;
686686
}
687+
688+
@Override
689+
public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) {
690+
cnConfig.setRestrictObjectLimit(restrictObjectLimit);
691+
dnConfig.setRestrictObjectLimit(restrictObjectLimit);
692+
return this;
693+
}
687694
}

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ public String getSystemDir() {
169169
return getDataNodeDir() + File.separator + "system";
170170
}
171171

172+
public String getDataNodeObjectDir() {
173+
return getDataNodeDir() + File.separator + "data" + File.separator + "object";
174+
}
175+
172176
@Override
173177
protected MppJVMConfig initVMConfig() {
174178
return MppJVMConfig.builder()

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,4 +477,9 @@ public CommonConfig setAuditableOperationLevel(String auditableOperationLevel) {
477477
public CommonConfig setAuditableOperationResult(String auditableOperationResult) {
478478
return this;
479479
}
480+
481+
@Override
482+
public CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit) {
483+
return this;
484+
}
480485
}

integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,6 @@ default CommonConfig setDefaultDatabaseLevel(int defaultDatabaseLevel) {
211211
CommonConfig setAuditableOperationLevel(String auditableOperationLevel);
212212

213213
CommonConfig setAuditableOperationResult(String auditableOperationResult);
214+
215+
CommonConfig setRestrictObjectLimit(boolean restrictObjectLimit);
214216
}
Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iotdb.relational.it.session;
20+
21+
import org.apache.iotdb.isession.ITableSession;
22+
import org.apache.iotdb.isession.SessionDataSet;
23+
import org.apache.iotdb.it.env.EnvFactory;
24+
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
25+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
26+
import org.apache.iotdb.itbase.category.TableClusterIT;
27+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
28+
import org.apache.iotdb.rpc.IoTDBConnectionException;
29+
import org.apache.iotdb.rpc.StatementExecutionException;
30+
import org.apache.iotdb.rpc.TSStatusCode;
31+
32+
import com.google.common.io.BaseEncoding;
33+
import org.apache.tsfile.enums.ColumnCategory;
34+
import org.apache.tsfile.enums.TSDataType;
35+
import org.apache.tsfile.utils.Binary;
36+
import org.apache.tsfile.utils.BytesUtils;
37+
import org.apache.tsfile.write.record.Tablet;
38+
import org.junit.After;
39+
import org.junit.AfterClass;
40+
import org.junit.Assert;
41+
import org.junit.Before;
42+
import org.junit.BeforeClass;
43+
import org.junit.Test;
44+
import org.junit.experimental.categories.Category;
45+
import org.junit.runner.RunWith;
46+
47+
import java.io.File;
48+
import java.io.IOException;
49+
import java.nio.charset.StandardCharsets;
50+
import java.nio.file.Files;
51+
import java.nio.file.Paths;
52+
import java.util.ArrayList;
53+
import java.util.Arrays;
54+
import java.util.List;
55+
56+
import static org.junit.Assert.assertNull;
57+
58+
@RunWith(IoTDBTestRunner.class)
59+
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
60+
public class IoTDBObjectInsertIT {
61+
62+
@BeforeClass
63+
public static void classSetUp() throws Exception {
64+
EnvFactory.getEnv().initClusterEnvironment();
65+
}
66+
67+
@Before
68+
public void setUp() throws Exception {
69+
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
70+
session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS db1");
71+
}
72+
}
73+
74+
@After
75+
public void tearDown() throws Exception {
76+
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
77+
session.executeNonQueryStatement("DROP DATABASE IF EXISTS db1");
78+
}
79+
}
80+
81+
@AfterClass
82+
public static void classTearDown() {
83+
EnvFactory.getEnv().cleanClusterEnvironment();
84+
}
85+
86+
@Test
87+
public void insertObjectTest()
88+
throws IoTDBConnectionException, StatementExecutionException, IOException {
89+
String testObject =
90+
System.getProperty("user.dir")
91+
+ File.separator
92+
+ "target"
93+
+ File.separator
94+
+ "test-classes"
95+
+ File.separator
96+
+ "object-example.pt";
97+
File object = new File(testObject);
98+
99+
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
100+
session.executeNonQueryStatement("USE \"db1\"");
101+
// insert table data by tablet
102+
List<String> columnNameList =
103+
Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file");
104+
List<TSDataType> dataTypeList =
105+
Arrays.asList(
106+
TSDataType.STRING,
107+
TSDataType.STRING,
108+
TSDataType.STRING,
109+
TSDataType.FLOAT,
110+
TSDataType.OBJECT);
111+
List<ColumnCategory> columnTypeList =
112+
new ArrayList<>(
113+
Arrays.asList(
114+
ColumnCategory.TAG,
115+
ColumnCategory.TAG,
116+
ColumnCategory.TAG,
117+
ColumnCategory.FIELD,
118+
ColumnCategory.FIELD));
119+
Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1);
120+
int rowIndex = tablet.getRowSize();
121+
tablet.addTimestamp(rowIndex, 1);
122+
tablet.addValue(rowIndex, 0, "1");
123+
tablet.addValue(rowIndex, 1, "5");
124+
tablet.addValue(rowIndex, 2, "3");
125+
tablet.addValue(rowIndex, 3, 37.6F);
126+
tablet.addValue(rowIndex, 4, true, 0, Files.readAllBytes(Paths.get(testObject)));
127+
session.insert(tablet);
128+
tablet.reset();
129+
130+
try (SessionDataSet dataSet =
131+
session.executeQueryStatement("select file from object_table where time = 1")) {
132+
SessionDataSet.DataIterator iterator = dataSet.iterator();
133+
while (iterator.next()) {
134+
Assert.assertEquals(
135+
BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())),
136+
iterator.getString(1));
137+
}
138+
}
139+
140+
try (SessionDataSet dataSet =
141+
session.executeQueryStatement(
142+
"select READ_OBJECT(file) from object_table where time = 1")) {
143+
SessionDataSet.DataIterator iterator = dataSet.iterator();
144+
while (iterator.next()) {
145+
Binary binary = iterator.getBlob(1);
146+
Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues());
147+
}
148+
}
149+
}
150+
// test object file path
151+
boolean success = false;
152+
for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) {
153+
String objectDirStr = dataNodeWrapper.getDataNodeObjectDir();
154+
File objectDir = new File(objectDirStr);
155+
if (objectDir.exists() && objectDir.isDirectory()) {
156+
File[] regionDirs = objectDir.listFiles();
157+
if (regionDirs != null) {
158+
for (File regionDir : regionDirs) {
159+
if (regionDir.isDirectory()) {
160+
File objectFile =
161+
new File(
162+
regionDir,
163+
convertPathString("object_table")
164+
+ File.separator
165+
+ convertPathString("1")
166+
+ File.separator
167+
+ convertPathString("5")
168+
+ File.separator
169+
+ convertPathString("3")
170+
+ File.separator
171+
+ convertPathString("file")
172+
+ File.separator
173+
+ "1.bin");
174+
if (objectFile.exists() && objectFile.isFile()) {
175+
success = true;
176+
}
177+
}
178+
}
179+
}
180+
}
181+
}
182+
Assert.assertTrue(success);
183+
}
184+
185+
@Test
186+
public void insertObjectSegmentsTest()
187+
throws IoTDBConnectionException, StatementExecutionException, IOException {
188+
String testObject =
189+
System.getProperty("user.dir")
190+
+ File.separator
191+
+ "target"
192+
+ File.separator
193+
+ "test-classes"
194+
+ File.separator
195+
+ "object-example.pt";
196+
byte[] objectBytes = Files.readAllBytes(Paths.get(testObject));
197+
List<byte[]> objectSegments = new ArrayList<>();
198+
for (int i = 0; i < objectBytes.length; i += 512) {
199+
objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512, objectBytes.length)));
200+
}
201+
202+
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
203+
session.executeNonQueryStatement("USE \"db1\"");
204+
// insert table data by tablet
205+
List<String> columnNameList =
206+
Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file");
207+
List<TSDataType> dataTypeList =
208+
Arrays.asList(
209+
TSDataType.STRING,
210+
TSDataType.STRING,
211+
TSDataType.STRING,
212+
TSDataType.FLOAT,
213+
TSDataType.OBJECT);
214+
List<ColumnCategory> columnTypeList =
215+
new ArrayList<>(
216+
Arrays.asList(
217+
ColumnCategory.TAG,
218+
ColumnCategory.TAG,
219+
ColumnCategory.TAG,
220+
ColumnCategory.FIELD,
221+
ColumnCategory.FIELD));
222+
Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1);
223+
for (int i = 0; i < objectSegments.size() - 1; i++) {
224+
int rowIndex = tablet.getRowSize();
225+
tablet.addTimestamp(rowIndex, 1);
226+
tablet.addValue(rowIndex, 0, "1");
227+
tablet.addValue(rowIndex, 1, "5");
228+
tablet.addValue(rowIndex, 2, "3");
229+
tablet.addValue(rowIndex, 3, 37.6F);
230+
tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i));
231+
session.insert(tablet);
232+
tablet.reset();
233+
}
234+
235+
try (SessionDataSet dataSet =
236+
session.executeQueryStatement("select file from object_table where time = 1")) {
237+
SessionDataSet.DataIterator iterator = dataSet.iterator();
238+
while (iterator.next()) {
239+
assertNull(iterator.getString(1));
240+
}
241+
}
242+
243+
// insert segment with wrong offset
244+
try {
245+
int rowIndex = tablet.getRowSize();
246+
tablet.addTimestamp(rowIndex, 1);
247+
tablet.addValue(rowIndex, 0, "1");
248+
tablet.addValue(rowIndex, 1, "5");
249+
tablet.addValue(rowIndex, 2, "3");
250+
tablet.addValue(rowIndex, 3, 37.6F);
251+
tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1));
252+
session.insert(tablet);
253+
} catch (StatementExecutionException e) {
254+
Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), e.getStatusCode());
255+
Assert.assertEquals(
256+
String.format(
257+
"741: The file length %d is not equal to the offset %d",
258+
((objectSegments.size() - 1) * 512), 512L),
259+
e.getMessage());
260+
} finally {
261+
tablet.reset();
262+
}
263+
264+
// last segment
265+
int rowIndex = tablet.getRowSize();
266+
tablet.addTimestamp(rowIndex, 1);
267+
tablet.addValue(rowIndex, 0, "1");
268+
tablet.addValue(rowIndex, 1, "5");
269+
tablet.addValue(rowIndex, 2, "3");
270+
tablet.addValue(rowIndex, 3, 37.6F);
271+
tablet.addValue(
272+
rowIndex,
273+
4,
274+
true,
275+
(objectSegments.size() - 1) * 512L,
276+
objectSegments.get(objectSegments.size() - 1));
277+
session.insert(tablet);
278+
tablet.reset();
279+
280+
try (SessionDataSet dataSet =
281+
session.executeQueryStatement(
282+
"select READ_OBJECT(file) from object_table where time = 1")) {
283+
SessionDataSet.DataIterator iterator = dataSet.iterator();
284+
while (iterator.next()) {
285+
Binary binary = iterator.getBlob(1);
286+
Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues());
287+
}
288+
}
289+
}
290+
291+
// test object file path
292+
boolean success = false;
293+
for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) {
294+
String objectDirStr = dataNodeWrapper.getDataNodeObjectDir();
295+
File objectDir = new File(objectDirStr);
296+
if (objectDir.exists() && objectDir.isDirectory()) {
297+
File[] regionDirs = objectDir.listFiles();
298+
if (regionDirs != null) {
299+
for (File regionDir : regionDirs) {
300+
if (regionDir.isDirectory()) {
301+
File objectFile =
302+
new File(
303+
regionDir,
304+
convertPathString("object_table")
305+
+ File.separator
306+
+ convertPathString("1")
307+
+ File.separator
308+
+ convertPathString("5")
309+
+ File.separator
310+
+ convertPathString("3")
311+
+ File.separator
312+
+ convertPathString("file")
313+
+ File.separator
314+
+ "1.bin");
315+
if (objectFile.exists() && objectFile.isFile()) {
316+
success = true;
317+
}
318+
}
319+
}
320+
}
321+
}
322+
}
323+
Assert.assertTrue(success);
324+
}
325+
326+
protected String convertPathString(String path) {
327+
return BaseEncoding.base32().omitPadding().encode(path.getBytes(StandardCharsets.UTF_8));
328+
}
329+
}

0 commit comments

Comments
 (0)