Skip to content

Commit 5cb31e8

Browse files
luoluoyuyuJackieTien97
authored andcommitted
Pipe: Repair the table model construction TabletBatch process causing memory expansion (#16123)
* Pipe: Repair the table model construction TabletBatch process causing memory expansion * add ut * update PipeTabletEventPlainBatch * update PipeTabletEventPlainBatch (cherry picked from commit 256f3e7)
1 parent c17894e commit 5cb31e8

File tree

3 files changed

+251
-8
lines changed

3 files changed

+251
-8
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,22 @@
2828
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
2929
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3030

31+
import org.apache.tsfile.enums.TSDataType;
32+
import org.apache.tsfile.utils.Binary;
33+
import org.apache.tsfile.utils.BitMap;
3134
import org.apache.tsfile.utils.Pair;
3235
import org.apache.tsfile.utils.PublicBAOS;
3336
import org.apache.tsfile.utils.RamUsageEstimator;
3437
import org.apache.tsfile.utils.ReadWriteIOUtils;
38+
import org.apache.tsfile.write.UnSupportedDataTypeException;
3539
import org.apache.tsfile.write.record.Tablet;
3640

3741
import java.io.DataOutputStream;
3842
import java.io.IOException;
3943
import java.nio.ByteBuffer;
44+
import java.time.LocalDate;
4045
import java.util.ArrayList;
46+
import java.util.Arrays;
4147
import java.util.HashMap;
4248
import java.util.List;
4349
import java.util.Map;
@@ -102,23 +108,28 @@ public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
102108
final String databaseName = insertTablets.getKey();
103109
for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry :
104110
insertTablets.getValue().entrySet()) {
105-
final List<Tablet> batchTablets = new ArrayList<>();
111+
// needCopyFlag and tablet
112+
final List<Pair<Boolean, Tablet>> batchTablets = new ArrayList<>();
106113
for (final Tablet tablet : tabletEntry.getValue().getRight()) {
107114
boolean success = false;
108-
for (final Tablet batchTablet : batchTablets) {
109-
if (batchTablet.append(tablet, tabletEntry.getValue().getLeft())) {
115+
for (final Pair<Boolean, Tablet> tabletPair : batchTablets) {
116+
if (tabletPair.getLeft()) {
117+
tabletPair.setRight(copyTablet(tabletPair.getRight()));
118+
tabletPair.setLeft(Boolean.FALSE);
119+
}
120+
if (tabletPair.getRight().append(tablet, tabletEntry.getValue().getLeft())) {
110121
success = true;
111122
break;
112123
}
113124
}
114125
if (!success) {
115-
batchTablets.add(tablet);
126+
batchTablets.add(new Pair<>(Boolean.TRUE, tablet));
116127
}
117128
}
118-
for (final Tablet batchTablet : batchTablets) {
129+
for (final Pair<Boolean, Tablet> tabletPair : batchTablets) {
119130
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
120131
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
121-
batchTablet.serialize(outputStream);
132+
tabletPair.getRight().serialize(outputStream);
122133
ReadWriteIOUtils.write(true, outputStream);
123134
tabletBuffers.add(
124135
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()));
@@ -214,4 +225,89 @@ private long constructTabletBatch(final Tablet tablet, final String databaseName
214225
currentBatch.getRight().add(tablet);
215226
return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
216227
}
228+
229+
public static Tablet copyTablet(final Tablet tablet) {
230+
final Object[] copiedValues = new Object[tablet.getValues().length];
231+
for (int i = 0; i < tablet.getValues().length; i++) {
232+
if (tablet.getValues()[i] == null
233+
|| tablet.getSchemas() == null
234+
|| tablet.getSchemas().get(i) == null) {
235+
continue;
236+
}
237+
copiedValues[i] =
238+
copyValueList(
239+
tablet.getValues()[i], tablet.getSchemas().get(i).getType(), tablet.getRowSize());
240+
}
241+
242+
BitMap[] bitMaps = null;
243+
if (tablet.getBitMaps() != null) {
244+
bitMaps =
245+
Arrays.stream(tablet.getBitMaps())
246+
.map(
247+
bitMap -> {
248+
if (bitMap != null) {
249+
final byte[] data = bitMap.getByteArray();
250+
return new BitMap(bitMap.getSize(), Arrays.copyOf(data, data.length));
251+
}
252+
return null;
253+
})
254+
.toArray(BitMap[]::new);
255+
}
256+
257+
return new Tablet(
258+
tablet.getTableName(),
259+
new ArrayList<>(tablet.getSchemas()),
260+
new ArrayList<>(tablet.getColumnTypes()),
261+
Arrays.copyOf(tablet.getTimestamps(), tablet.getRowSize()),
262+
copiedValues,
263+
bitMaps,
264+
tablet.getRowSize());
265+
}
266+
267+
private static Object copyValueList(
268+
final Object valueList, final TSDataType dataType, final int rowSize) {
269+
switch (dataType) {
270+
case BOOLEAN:
271+
final boolean[] boolValues = (boolean[]) valueList;
272+
final boolean[] copiedBoolValues = new boolean[rowSize];
273+
System.arraycopy(boolValues, 0, copiedBoolValues, 0, rowSize);
274+
return copiedBoolValues;
275+
case INT32:
276+
final int[] intValues = (int[]) valueList;
277+
final int[] copiedIntValues = new int[rowSize];
278+
System.arraycopy(intValues, 0, copiedIntValues, 0, rowSize);
279+
return copiedIntValues;
280+
case DATE:
281+
final LocalDate[] dateValues = (LocalDate[]) valueList;
282+
final LocalDate[] copiedDateValues = new LocalDate[rowSize];
283+
System.arraycopy(dateValues, 0, copiedDateValues, 0, rowSize);
284+
return copiedDateValues;
285+
case INT64:
286+
case TIMESTAMP:
287+
final long[] longValues = (long[]) valueList;
288+
final long[] copiedLongValues = new long[rowSize];
289+
System.arraycopy(longValues, 0, copiedLongValues, 0, rowSize);
290+
return copiedLongValues;
291+
case FLOAT:
292+
final float[] floatValues = (float[]) valueList;
293+
final float[] copiedFloatValues = new float[rowSize];
294+
System.arraycopy(floatValues, 0, copiedFloatValues, 0, rowSize);
295+
return copiedFloatValues;
296+
case DOUBLE:
297+
final double[] doubleValues = (double[]) valueList;
298+
final double[] copiedDoubleValues = new double[rowSize];
299+
System.arraycopy(doubleValues, 0, copiedDoubleValues, 0, rowSize);
300+
return copiedDoubleValues;
301+
case TEXT:
302+
case BLOB:
303+
case STRING:
304+
final Binary[] binaryValues = (Binary[]) valueList;
305+
final Binary[] copiedBinaryValues = new Binary[rowSize];
306+
System.arraycopy(binaryValues, 0, copiedBinaryValues, 0, rowSize);
307+
return copiedBinaryValues;
308+
default:
309+
throw new UnSupportedDataTypeException(
310+
String.format("Data type %s is not supported.", dataType));
311+
}
312+
}
217313
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.sink;
21+
22+
import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
23+
24+
import org.apache.tsfile.write.record.Tablet;
25+
import org.junit.Assert;
26+
import org.junit.Test;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.Random;
31+
32+
public class PipeTabletEventPlainBatchTest {
33+
34+
@Test
35+
public void constructTabletBatch() {
36+
Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true);
37+
38+
Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
39+
40+
Assert.assertNotSame(tablet, copyTablet);
41+
Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
42+
43+
for (int i = 0; i < tablet.getSchemas().size(); i++) {
44+
for (int j = 0; j < tablet.getRowSize(); j++) {
45+
Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
46+
}
47+
}
48+
}
49+
50+
@Test
51+
public void constructTabletBatch1() {
52+
Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true);
53+
tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, true));
54+
Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
55+
56+
Assert.assertNotSame(tablet, copyTablet);
57+
Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
58+
59+
for (int i = 0; i < tablet.getSchemas().size(); i++) {
60+
for (int j = 0; j < tablet.getRowSize(); j++) {
61+
Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
62+
}
63+
}
64+
}
65+
66+
@Test
67+
public void constructTabletBatch2() {
68+
Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true);
69+
tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, true));
70+
71+
tablet = PipeTabletEventPlainBatch.copyTablet(tablet);
72+
tablet.getBitMaps()[1].markAll();
73+
tablet.getValues()[1] = null;
74+
75+
Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
76+
77+
Assert.assertNotSame(tablet, copyTablet);
78+
Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
79+
80+
for (int i = 0; i < tablet.getSchemas().size(); i++) {
81+
if (i == 1) {
82+
Assert.assertTrue(tablet.getBitMaps()[i].isAllMarked());
83+
Assert.assertNull(copyTablet.getValues()[1]);
84+
continue;
85+
}
86+
87+
for (int j = 0; j < tablet.getRowSize(); j++) {
88+
Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
89+
}
90+
}
91+
}
92+
93+
@Test
94+
public void constructTabletBatch3() {
95+
Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true);
96+
tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, true));
97+
98+
tablet = PipeTabletEventPlainBatch.copyTablet(tablet);
99+
tablet.getBitMaps()[1] = null;
100+
101+
Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
102+
103+
Assert.assertNotSame(tablet, copyTablet);
104+
Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
105+
106+
for (int i = 0; i < tablet.getSchemas().size(); i++) {
107+
if (i == 1) {
108+
Assert.assertNull(tablet.getBitMaps()[i]);
109+
continue;
110+
}
111+
112+
for (int j = 0; j < tablet.getRowSize(); j++) {
113+
Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
114+
}
115+
}
116+
}
117+
118+
@Test
119+
public void constructTabletBatch4() {
120+
Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true);
121+
tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, true));
122+
123+
List<Integer> rowIndices = new ArrayList<>(tablet.getSchemas().size());
124+
Random random = new Random();
125+
for (int i = 0; i < tablet.getSchemas().size(); i++) {
126+
int r = random.nextInt(tablet.getRowSize());
127+
rowIndices.add(r);
128+
tablet.addValue(tablet.getSchemas().get(i).getMeasurementName(), r, null);
129+
}
130+
131+
Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
132+
133+
Assert.assertNotSame(tablet, copyTablet);
134+
Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
135+
136+
for (int i = 0; i < tablet.getSchemas().size(); i++) {
137+
for (int j = 0; j < tablet.getRowSize(); j++) {
138+
if (rowIndices.get(i) == j) {
139+
Assert.assertTrue(tablet.getBitMaps()[i].isMarked(j));
140+
Assert.assertNull(tablet.getValue(j, i));
141+
continue;
142+
}
143+
Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
144+
}
145+
}
146+
}
147+
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public void doTableModelTest1() {
277277
}
278278
}
279279

280-
private Tablet generateTablet(
280+
static Tablet generateTablet(
281281
final String tableName,
282282
final int deviceIDNum,
283283
final boolean hasDuplicates,
@@ -389,7 +389,7 @@ private Tablet generateTablet(
389389
return tablet;
390390
}
391391

392-
public LocalDate getDate(final int value) {
392+
public static LocalDate getDate(final int value) {
393393
Date date = new Date(value);
394394
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
395395
try {

0 commit comments

Comments
 (0)