Skip to content

Commit ccaef26

Browse files
kaka11chenmorningman
authored andcommitted
[opt](multi-catalog) Optimize file split size. (#59175)
1 parent 58c267e commit ccaef26

File tree

14 files changed

+691
-96
lines changed

14 files changed

+691
-96
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,16 @@
6363
import java.util.Map;
6464
import java.util.Objects;
6565
import java.util.Optional;
66+
import java.util.Random;
6667
import java.util.Set;
6768
import java.util.concurrent.ExecutionException;
6869
import java.util.stream.Collectors;
6970

7071
public class FederationBackendPolicy {
7172
private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class);
73+
74+
private static final long FIXED_SHUFFLE_SEED = 123456789L;
75+
7276
protected final List<Backend> backends = Lists.newArrayList();
7377
private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
7478

@@ -220,6 +224,7 @@ public void setEnableSplitsRedistribution(boolean enableSplitsRedistribution) {
220224
public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException {
221225
ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
222226

227+
Collections.shuffle(splits, new Random(FIXED_SHUFFLE_SEED));
223228
List<Split> remainingSplits;
224229

225230
List<Backend> backends = new ArrayList<>();
@@ -228,8 +233,6 @@ public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) t
228233
}
229234
ResettableRandomizedIterator<Backend> randomCandidates = new ResettableRandomizedIterator<>(backends);
230235

231-
boolean splitsToBeRedistributed = false;
232-
233236
// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain
234237
// locality information
235238
if (Config.split_assigner_optimized_local_scheduling) {
@@ -246,7 +249,6 @@ public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) t
246249
assignment.put(selectedBackend, split);
247250
assignedWeightPerBackend.put(selectedBackend,
248251
assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue());
249-
splitsToBeRedistributed = true;
250252
continue;
251253
}
252254
}
@@ -276,7 +278,6 @@ public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) t
276278
case CONSISTENT_HASHING: {
277279
candidateNodes = consistentHash.getNode(split,
278280
Config.split_assigner_min_consistent_hash_candidate_num);
279-
splitsToBeRedistributed = true;
280281
break;
281282
}
282283
default: {
@@ -302,7 +303,7 @@ public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) t
302303
assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue());
303304
}
304305

305-
if (enableSplitsRedistribution && splitsToBeRedistributed) {
306+
if (enableSplitsRedistribution) {
306307
equateDistribution(assignment);
307308
}
308309
return assignment;
@@ -499,3 +500,4 @@ public void funnel(Split split, PrimitiveSink primitiveSink) {
499500
}
500501
}
501502
}
503+

fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
9494

9595
protected TableScanParams scanParams;
9696

97+
protected FileSplitter fileSplitter;
98+
9799
/**
98100
* External file scan node for Query hms table
99101
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
@@ -134,6 +136,8 @@ protected void doInitialize() throws UserException {
134136
}
135137
initBackendPolicy();
136138
initSchemaParams();
139+
fileSplitter = new FileSplitter(sessionVariable.maxInitialSplitSize, sessionVariable.maxSplitSize,
140+
sessionVariable.maxInitialSplitNum);
137141
}
138142

139143
// Init schema (Tuple/Slot) related params.
@@ -592,19 +596,4 @@ public TableScanParams getScanParams() {
592596
}
593597
return this.scanParams;
594598
}
595-
596-
/**
597-
* The real file split size is determined by:
598-
* 1. If user specify the split size in session variable `file_split_size`, use user specified value.
599-
* 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size.
600-
* @param blockSize, got from file system, eg, hdfs
601-
* @return the real file split size
602-
*/
603-
protected long getRealFileSplitSize(long blockSize) {
604-
long realSplitSize = sessionVariable.getFileSplitSize();
605-
if (realSplitSize <= 0) {
606-
realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize);
607-
}
608-
return realSplitSize;
609-
}
610599
}

fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@
6262
* Base class for External File Scan, including external query and load.
6363
*/
6464
public abstract class FileScanNode extends ExternalScanNode {
65-
66-
public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB
67-
6865
// For explain
6966
protected long totalFileSize = 0;
7067
protected long totalPartitionNum = 0;
@@ -115,12 +112,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
115112
}
116113

117114
output.append(prefix);
118-
boolean isBatch;
119-
try {
120-
isBatch = isBatchMode();
121-
} catch (UserException e) {
122-
throw new RuntimeException(e);
123-
}
115+
boolean isBatch = isBatchMode();
124116
if (isBatch) {
125117
output.append("(approximate)");
126118
}

0 commit comments

Comments
 (0)