Skip to content

Commit aaff449

Browse files
authored
Pipe: aggregate tablets with different measurements under the same table before write into tsfile (apache#15372)
1 parent 8f29bcd commit aaff449

File tree

1 file changed

+79
-3
lines changed

1 file changed

+79
-3
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,20 @@
2525
import org.apache.tsfile.exception.write.WriteProcessException;
2626
import org.apache.tsfile.file.metadata.IDeviceID;
2727
import org.apache.tsfile.file.metadata.TableSchema;
28+
import org.apache.tsfile.utils.BitMap;
2829
import org.apache.tsfile.utils.Pair;
2930
import org.apache.tsfile.utils.WriteUtils;
3031
import org.apache.tsfile.write.TsFileWriter;
3132
import org.apache.tsfile.write.record.Tablet;
33+
import org.apache.tsfile.write.record.Tablet.ColumnCategory;
3234
import org.apache.tsfile.write.schema.IMeasurementSchema;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
3537

3638
import java.io.File;
3739
import java.io.IOException;
3840
import java.util.ArrayList;
41+
import java.util.Arrays;
3942
import java.util.Collections;
4043
import java.util.Comparator;
4144
import java.util.HashMap;
@@ -48,6 +51,8 @@
4851
import java.util.Objects;
4952
import java.util.Set;
5053
import java.util.concurrent.atomic.AtomicLong;
54+
import java.util.stream.Collectors;
55+
import java.util.stream.IntStream;
5156

5257
public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder {
5358

@@ -201,6 +206,76 @@ List<Pair<String, File>> writeTableModelTabletsToTsFiles(
201206
return sealedFiles;
202207
}
203208

209+
private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>> T tryBestToAggregateTablets(
210+
final LinkedList<T> tablets) {
211+
if (tablets.isEmpty()) {
212+
return null;
213+
}
214+
215+
// Retrieve the first tablet to serve as the basis for the aggregation
216+
final Pair<Tablet, List<Pair<IDeviceID, Integer>>> firstPair = tablets.peekFirst();
217+
final Tablet firstTablet = firstPair.left;
218+
final List<Pair<IDeviceID, Integer>> aggregationDeviceTimestampIndexList = firstPair.right;
219+
final String aggregationTableName = firstTablet.getTableName();
220+
final long[] aggregationTimestamps = firstTablet.getTimestamps();
221+
final int aggregationRow = firstTablet.getRowSize();
222+
final int aggregationMaxRow = firstTablet.getMaxRowNumber();
223+
224+
// Prepare lists to accumulate schemas, columnCategories, values, and bitMaps
225+
final List<IMeasurementSchema> aggregatedSchemas = new ArrayList<>();
226+
final List<ColumnCategory> aggregatedColumnCategories = new ArrayList<>();
227+
final List<Object> aggregatedValues = new ArrayList<>();
228+
final List<BitMap> aggregatedBitMaps = new ArrayList<>();
229+
230+
// Iterate and poll tablets from the head that satisfy the aggregation criteria
231+
while (!tablets.isEmpty()) {
232+
final Pair<Tablet, List<Pair<IDeviceID, Integer>>> pair = tablets.peekFirst();
233+
final Tablet tablet = pair.left;
234+
final List<Pair<IDeviceID, Integer>> deviceTimestampIndexList = pair.right;
235+
if (Objects.equals(deviceTimestampIndexList, aggregationDeviceTimestampIndexList)
236+
&& Objects.equals(firstTablet.getTableName(), aggregationTableName)
237+
&& Arrays.equals(tablet.getTimestamps(), aggregationTimestamps)
238+
&& tablet.getRowSize() == aggregationRow
239+
&& tablet.getMaxRowNumber() == aggregationMaxRow) {
240+
// Aggregate the current tablet's data
241+
aggregatedSchemas.addAll(tablet.getSchemas());
242+
aggregatedColumnCategories.addAll(tablet.getColumnTypes());
243+
aggregatedValues.addAll(Arrays.asList(tablet.getValues()));
244+
aggregatedBitMaps.addAll(Arrays.asList(tablet.getBitMaps()));
245+
// Remove the aggregated tablet
246+
tablets.pollFirst();
247+
} else {
248+
// Stop aggregating once a tablet does not meet the criteria
249+
break;
250+
}
251+
}
252+
253+
// Remove duplicates from aggregatedSchemas, record the index of the first occurrence, and
254+
// filter out the corresponding values in aggregatedValues and aggregatedBitMaps based on that
255+
// index
256+
final Set<IMeasurementSchema> seen = new HashSet<>();
257+
final List<Integer> distinctIndices =
258+
IntStream.range(0, aggregatedSchemas.size())
259+
.filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the first occurrence index
260+
.boxed()
261+
.collect(Collectors.toList());
262+
263+
// Construct a new aggregated Tablet using the deduplicated data
264+
return (T)
265+
new Pair<>(
266+
new Tablet(
267+
aggregationTableName,
268+
distinctIndices.stream().map(aggregatedSchemas::get).collect(Collectors.toList()),
269+
distinctIndices.stream()
270+
.map(aggregatedColumnCategories::get)
271+
.collect(Collectors.toList()),
272+
aggregationTimestamps,
273+
distinctIndices.stream().map(aggregatedValues::get).toArray(),
274+
distinctIndices.stream().map(aggregatedBitMaps::get).toArray(BitMap[]::new),
275+
aggregationRow),
276+
aggregationDeviceTimestampIndexList);
277+
}
278+
204279
private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
205280
void tryBestToWriteTabletsIntoOneFile(final Set<LinkedList<T>> device2TabletsLinkedList)
206281
throws IOException {
@@ -218,7 +293,7 @@ void tryBestToWriteTabletsIntoOneFile(final Set<LinkedList<T>> device2TabletsLin
218293
final Set<String> columnNames = new HashSet<>();
219294

220295
while (!tablets.isEmpty()) {
221-
final T pair = tablets.peekFirst();
296+
final T pair = tryBestToAggregateTablets(tablets);
222297
if (timestampsAreNonOverlapping(
223298
(Pair<Tablet, List<Pair<IDeviceID, Integer>>>) pair, deviceLastTimestampMap)) {
224299
final Tablet tablet = pair.left;
@@ -237,10 +312,11 @@ void tryBestToWriteTabletsIntoOneFile(final Set<LinkedList<T>> device2TabletsLin
237312
}
238313

239314
tabletsToWrite.add(pair);
315+
continue;
316+
} else {
240317
// NOTE: mutating a LinkedList that lives inside a Set violates the contract that the
241318
// element’s hashCode must remain stable while it’s in the set
242-
tablets.pollFirst();
243-
continue;
319+
tablets.addFirst(pair);
244320
}
245321
break;
246322
}

0 commit comments

Comments
 (0)