|
22 | 22 | import java.io.DataInput; |
23 | 23 | import java.io.DataOutput; |
24 | 24 | import java.io.IOException; |
25 | | -import java.util.Arrays; |
26 | | -import java.util.Set; |
27 | 25 | import org.apache.hadoop.conf.Configuration; |
28 | | -import org.apache.hadoop.fs.BlockLocation; |
29 | | -import org.apache.hadoop.fs.FileSystem; |
30 | | -import org.apache.hadoop.fs.Path; |
31 | 26 | import org.apache.hadoop.mapreduce.InputSplit; |
32 | 27 | import org.apache.iceberg.FileScanTask; |
33 | 28 | import org.apache.iceberg.ScanTaskGroup; |
| 29 | +import org.apache.iceberg.hadoop.Util; |
34 | 30 | import org.apache.iceberg.mr.InputFormatConfig; |
35 | | -import org.apache.iceberg.relocated.com.google.common.collect.Sets; |
36 | 31 | import org.apache.iceberg.util.SerializationUtil; |
37 | | -import org.slf4j.Logger; |
38 | | -import org.slf4j.LoggerFactory; |
39 | 32 |
|
40 | 33 | // Since this class extends `mapreduce.InputSplit and implements `mapred.InputSplit`, it can be returned by both MR v1 |
41 | 34 | // and v2 file formats. |
42 | 35 | public class IcebergSplit extends InputSplit implements IcebergSplitContainer { |
43 | | - private static final Logger LOG = LoggerFactory.getLogger(IcebergSplit.class); |
44 | 36 |
|
45 | 37 | public static final String[] ANYWHERE = new String[]{"*"}; |
46 | 38 |
|
@@ -78,33 +70,14 @@ public String[] getLocations() { |
78 | 70 | // getLocations() won't be accurate when called on worker nodes and will always return "*" |
79 | 71 | if (locations == null && conf != null) { |
80 | 72 | boolean localityPreferred = conf.getBoolean(InputFormatConfig.LOCALITY, false); |
81 | | - locations = localityPreferred ? blockLocations(taskGroup, conf) : ANYWHERE; |
| 73 | + locations = localityPreferred ? Util.blockLocations(taskGroup, conf) : ANYWHERE; |
82 | 74 | } else { |
83 | 75 | locations = ANYWHERE; |
84 | 76 | } |
85 | 77 |
|
86 | 78 | return locations; |
87 | 79 | } |
88 | 80 |
|
89 | | - // We should move to Util.blockLocations once the following PR is merged and shipped |
90 | | - // https://github.com/apache/iceberg/pull/11053 |
91 | | - private static String[] blockLocations(ScanTaskGroup<FileScanTask> task, Configuration conf) { |
92 | | - final Set<String> locationSets = Sets.newHashSet(); |
93 | | - task.tasks().forEach(fileScanTask -> { |
94 | | - final Path path = new Path(fileScanTask.file().path().toString()); |
95 | | - try { |
96 | | - final FileSystem fs = path.getFileSystem(conf); |
97 | | - for (BlockLocation location : fs.getFileBlockLocations(path, fileScanTask.start(), fileScanTask.length())) { |
98 | | - locationSets.addAll(Arrays.asList(location.getHosts())); |
99 | | - } |
100 | | - } catch (IOException e) { |
101 | | - LOG.warn("Failed to get block locations for path {}", path, e); |
102 | | - } |
103 | | - }); |
104 | | - |
105 | | - return locationSets.toArray(new String[0]); |
106 | | - } |
107 | | - |
108 | 81 | @Override |
109 | 82 | public void write(DataOutput out) throws IOException { |
110 | 83 | byte[] data = SerializationUtil.serializeToBytes(this.taskGroup); |
|
0 commit comments