|
20 | 20 | package org.apache.druid.server.compaction; |
21 | 21 |
|
22 | 22 | import com.google.common.annotations.VisibleForTesting; |
| 23 | +import com.google.common.base.Function; |
23 | 24 | import com.google.common.base.Preconditions; |
| 25 | +import com.google.common.collect.Iterators; |
24 | 26 | import com.google.common.collect.Lists; |
25 | 27 | import org.apache.druid.java.util.common.DateTimes; |
26 | 28 | import org.apache.druid.java.util.common.Intervals; |
|
33 | 35 | import org.apache.druid.timeline.Partitions; |
34 | 36 | import org.apache.druid.timeline.SegmentTimeline; |
35 | 37 | import org.apache.druid.timeline.TimelineObjectHolder; |
| 38 | +import org.apache.druid.timeline.VersionedIntervalTimeline; |
36 | 39 | import org.apache.druid.timeline.partition.NumberedPartitionChunk; |
37 | 40 | import org.apache.druid.timeline.partition.NumberedShardSpec; |
38 | 41 | import org.apache.druid.timeline.partition.PartitionChunk; |
@@ -137,18 +140,29 @@ private void populateQueue(SegmentTimeline timeline, List<Interval> skipInterval |
137 | 140 | final String temporaryVersion = DateTimes.nowUtc().toString(); |
138 | 141 | for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval : intervalToPartitionMap.entrySet()) { |
139 | 142 | Interval interval = partitionsPerInterval.getKey(); |
140 | | - int partitionNum = 0; |
141 | 143 | Set<DataSegment> segmentSet = partitionsPerInterval.getValue(); |
142 | 144 | int partitions = segmentSet.size(); |
143 | | - for (DataSegment segment : segmentSet) { |
144 | | - DataSegment segmentsForCompact = segment.withShardSpec(new NumberedShardSpec(partitionNum, partitions)); |
145 | | - timelineWithConfiguredSegmentGranularity.add( |
146 | | - interval, |
147 | | - temporaryVersion, |
148 | | - NumberedPartitionChunk.make(partitionNum, partitions, segmentsForCompact) |
149 | | - ); |
150 | | - partitionNum += 1; |
151 | | - } |
| 145 | + timelineWithConfiguredSegmentGranularity.addAll( |
| 146 | + Iterators.transform( |
| 147 | + segmentSet.iterator(), |
| 148 | + new Function<>() |
| 149 | + { |
| 150 | + int partitionNum = 0; |
| 151 | + |
| 152 | + @Override |
| 153 | + public VersionedIntervalTimeline.PartitionChunkEntry<String, DataSegment> apply(DataSegment segment) |
| 154 | + { |
| 155 | + final DataSegment segmentForCompact = |
| 156 | + segment.withShardSpec(new NumberedShardSpec(partitionNum, partitions)); |
| 157 | + return new VersionedIntervalTimeline.PartitionChunkEntry<>( |
| 158 | + interval, |
| 159 | + temporaryVersion, |
| 160 | + NumberedPartitionChunk.make(partitionNum++, partitions, segmentForCompact) |
| 161 | + ); |
| 162 | + } |
| 163 | + } |
| 164 | + ) |
| 165 | + ); |
152 | 166 | } |
153 | 167 | // PartitionHolder can only holds chunks of one partition space |
154 | 168 | // However, partition in the new timeline (timelineWithConfiguredSegmentGranularity) can be hold multiple |
|
0 commit comments