Skip to content

Commit 3f57283

Browse files
xieandrewaaneja
authored andcommitted
Add support for symlink files in quick stats
1 parent 4098a9b commit 3f57283

File tree

4 files changed

+213
-20
lines changed

4 files changed

+213
-20
lines changed

presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.facebook.presto.hive.avro.PrestoAvroSerDe;
3131
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
3232
import com.facebook.presto.hive.metastore.Column;
33+
import com.facebook.presto.hive.metastore.Partition;
3334
import com.facebook.presto.hive.metastore.Storage;
3435
import com.facebook.presto.hive.metastore.Table;
3536
import com.facebook.presto.hive.pagefile.PageInputFormat;
@@ -92,10 +93,12 @@
9293

9394
import javax.annotation.Nullable;
9495

96+
import java.io.BufferedReader;
9597
import java.io.ByteArrayInputStream;
9698
import java.io.ByteArrayOutputStream;
9799
import java.io.IOException;
98100
import java.io.InputStream;
101+
import java.io.InputStreamReader;
99102
import java.io.UncheckedIOException;
100103
import java.lang.annotation.Annotation;
101104
import java.lang.reflect.Field;
@@ -171,6 +174,7 @@
171174
import static com.google.common.collect.Iterables.filter;
172175
import static com.google.common.collect.Lists.newArrayList;
173176
import static com.google.common.collect.Lists.transform;
177+
import static com.google.common.io.CharStreams.readLines;
174178
import static java.lang.Byte.parseByte;
175179
import static java.lang.Double.parseDouble;
176180
import static java.lang.Float.floatToRawIntBits;
@@ -1343,9 +1347,25 @@ public static Map<String, String> buildDirectoryContextProperties(ConnectorSessi
13431347
return directoryContextProperties.build();
13441348
}
13451349

1350+
public static List<Path> readSymlinkPaths(ExtendedFileSystem fileSystem, Iterator<HiveFileInfo> manifestFileInfos)
1351+
throws IOException
1352+
{
1353+
ImmutableList.Builder<Path> targets = ImmutableList.builder();
1354+
while (manifestFileInfos.hasNext()) {
1355+
HiveFileInfo symlink = manifestFileInfos.next();
1356+
1357+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(symlink.getPath())), UTF_8))) {
1358+
readLines(reader).stream()
1359+
.map(Path::new)
1360+
.forEach(targets::add);
1361+
}
1362+
}
1363+
return targets.build();
1364+
}
1365+
13461366
public static List<HiveFileInfo> getTargetPathsHiveFileInfos(
13471367
Path path,
1348-
HivePartitionMetadata partition,
1368+
Optional<Partition> partition,
13491369
Path targetParent,
13501370
List<Path> currentTargetPaths,
13511371
HiveDirectoryContext hiveDirectoryContext,
@@ -1402,7 +1422,7 @@ public static List<HiveFileInfo> getTargetPathsHiveFileInfos(
14021422
}
14031423

14041424
private static Map<String, HiveFileInfo> getTargetParentHiveFileInfoMap(
1405-
HivePartitionMetadata partition,
1425+
Optional<Partition> partition,
14061426
Path targetParent,
14071427
HiveDirectoryContext hiveDirectoryContext,
14081428
ExtendedFileSystem targetFilesystem,
@@ -1411,7 +1431,7 @@ private static Map<String, HiveFileInfo> getTargetParentHiveFileInfoMap(
14111431
NamenodeStats namenodeStats)
14121432
{
14131433
Map<String, HiveFileInfo> targetParentHiveFileInfos = new HashMap<>();
1414-
Iterator<HiveFileInfo> hiveFileInfoIterator = directoryLister.list(targetFilesystem, table, targetParent, partition.getPartition(), namenodeStats, hiveDirectoryContext);
1434+
Iterator<HiveFileInfo> hiveFileInfoIterator = directoryLister.list(targetFilesystem, table, targetParent, partition, namenodeStats, hiveDirectoryContext);
14151435

14161436
// We will use the path without the scheme and authority since the manifest file may contain entries both with and without them
14171437
hiveFileInfoIterator.forEachRemaining(hiveFileInfo -> targetParentHiveFileInfos.put(

presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.common.collect.ImmutableList;
3030
import com.google.common.collect.Iterators;
3131
import com.google.common.collect.ListMultimap;
32-
import com.google.common.io.CharStreams;
3332
import com.google.common.util.concurrent.ListenableFuture;
3433
import org.apache.hadoop.conf.Configuration;
3534
import org.apache.hadoop.fs.Path;
@@ -41,10 +40,7 @@
4140
import org.apache.hadoop.mapred.JobConf;
4241
import org.apache.hadoop.mapred.TextInputFormat;
4342

44-
import java.io.BufferedReader;
4543
import java.io.IOException;
46-
import java.io.InputStreamReader;
47-
import java.nio.charset.StandardCharsets;
4844
import java.util.ArrayList;
4945
import java.util.Comparator;
5046
import java.util.Deque;
@@ -78,6 +74,7 @@
7874
import static com.facebook.presto.hive.HiveUtil.getInputFormat;
7975
import static com.facebook.presto.hive.HiveUtil.getTargetPathsHiveFileInfos;
8076
import static com.facebook.presto.hive.HiveUtil.isHudiParquetInputFormat;
77+
import static com.facebook.presto.hive.HiveUtil.readSymlinkPaths;
8178
import static com.facebook.presto.hive.HiveUtil.shouldUseFileSplitsFromInputFormat;
8279
import static com.facebook.presto.hive.HiveWriterFactory.getBucketNumber;
8380
import static com.facebook.presto.hive.NestedDirectoryPolicy.FAIL;
@@ -260,7 +257,7 @@ Iterator<InternalHiveSplit> getSymlinkIterator(
260257

261258
List<HiveFileInfo> targetPathsHiveFileInfos = getTargetPathsHiveFileInfos(
262259
path,
263-
partition,
260+
partition.getPartition(),
264261
targetParent,
265262
currentTargetPaths,
266263
hiveDirectoryContext,
@@ -730,24 +727,15 @@ private List<InternalHiveSplit> getVirtuallyBucketedSplits(Path path, ExtendedFi
730727
private List<Path> getTargetPathsFromSymlink(ExtendedFileSystem fileSystem, Path symlinkDir, Optional<Partition> partition)
731728
{
732729
try {
733-
List<Path> targets = new ArrayList<>();
734730
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(
735731
IGNORED,
736732
isUseListDirectoryCache(session),
737733
isSkipEmptyFilesEnabled(session),
738734
hdfsContext.getIdentity(),
739735
buildDirectoryContextProperties(session),
740736
session.getRuntimeStats());
741-
List<HiveFileInfo> manifestFileInfos = ImmutableList.copyOf(directoryLister.list(fileSystem, table, symlinkDir, partition, namenodeStats, hiveDirectoryContext));
742-
743-
for (HiveFileInfo symlink : manifestFileInfos) {
744-
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(symlink.getPath())), StandardCharsets.UTF_8))) {
745-
CharStreams.readLines(reader).stream()
746-
.map(Path::new)
747-
.forEach(targets::add);
748-
}
749-
}
750-
return targets;
737+
Iterator<HiveFileInfo> manifestFileInfos = directoryLister.list(fileSystem, table, symlinkDir, partition, namenodeStats, hiveDirectoryContext);
738+
return readSymlinkPaths(fileSystem, manifestFileInfos);
751739
}
752740
catch (IOException e) {
753741
throw new PrestoException(HIVE_BAD_DATA, "Error parsing symlinks from: " + symlinkDir, e);

presto-hive/src/main/java/com/facebook/presto/hive/statistics/QuickStatsProvider.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,19 @@
3232
import com.facebook.presto.hive.metastore.MetastoreContext;
3333
import com.facebook.presto.hive.metastore.Partition;
3434
import com.facebook.presto.hive.metastore.PartitionStatistics;
35+
import com.facebook.presto.hive.metastore.StorageFormat;
3536
import com.facebook.presto.hive.metastore.Table;
3637
import com.facebook.presto.spi.ConnectorSession;
38+
import com.facebook.presto.spi.PrestoException;
3739
import com.facebook.presto.spi.SchemaTableName;
3840
import com.google.common.base.Stopwatch;
3941
import com.google.common.cache.Cache;
4042
import com.google.common.cache.CacheBuilder;
4143
import com.google.common.collect.ImmutableList;
4244
import com.google.common.collect.ImmutableMap;
4345
import org.apache.hadoop.fs.Path;
46+
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
47+
import org.apache.hadoop.mapred.InputFormat;
4448
import org.weakref.jmx.Managed;
4549
import org.weakref.jmx.Nested;
4650

@@ -61,15 +65,20 @@
6165
import java.util.concurrent.TimeoutException;
6266
import java.util.concurrent.atomic.AtomicLong;
6367
import java.util.concurrent.atomic.AtomicReference;
68+
import java.util.stream.Collectors;
6469

6570
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
71+
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
6672
import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID;
6773
import static com.facebook.presto.hive.HiveSessionProperties.getQuickStatsBackgroundBuildTimeout;
6874
import static com.facebook.presto.hive.HiveSessionProperties.getQuickStatsInlineBuildTimeout;
6975
import static com.facebook.presto.hive.HiveSessionProperties.isQuickStatsEnabled;
7076
import static com.facebook.presto.hive.HiveSessionProperties.isSkipEmptyFilesEnabled;
7177
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
7278
import static com.facebook.presto.hive.HiveUtil.buildDirectoryContextProperties;
79+
import static com.facebook.presto.hive.HiveUtil.getInputFormat;
80+
import static com.facebook.presto.hive.HiveUtil.getTargetPathsHiveFileInfos;
81+
import static com.facebook.presto.hive.HiveUtil.readSymlinkPaths;
7382
import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
7483
import static com.facebook.presto.hive.NestedDirectoryPolicy.RECURSE;
7584
import static com.facebook.presto.hive.metastore.PartitionStatistics.empty;
@@ -323,15 +332,18 @@ private PartitionStatistics buildQuickStats(String partitionKey, String partitio
323332
Table resolvedTable = metastore.getTable(metastoreContext, table.getSchemaName(), table.getTableName()).get();
324333
Optional<Partition> partition;
325334
Path path;
335+
StorageFormat storageFormat;
326336
if (UNPARTITIONED_ID.getPartitionName().equals(partitionId)) {
327337
partition = Optional.empty();
328338
path = new Path(resolvedTable.getStorage().getLocation());
339+
storageFormat = resolvedTable.getStorage().getStorageFormat();
329340
}
330341
else {
331342
partition = metastore.getPartitionsByNames(metastoreContext, table.getSchemaName(), table.getTableName(),
332343
ImmutableList.of(new PartitionNameWithVersion(partitionId, Optional.empty()))).get(partitionId);
333344
checkState(partition.isPresent(), "getPartitionsByNames returned no partitions for partition with name [%s]", partitionId);
334345
path = new Path(partition.get().getStorage().getLocation());
346+
storageFormat = partition.get().getStorage().getStorageFormat();
335347
}
336348

337349
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName(), partitionId, false);
@@ -347,6 +359,37 @@ private PartitionStatistics buildQuickStats(String partitionKey, String partitio
347359

348360
Iterator<HiveFileInfo> fileList = directoryLister.list(fs, resolvedTable, path, partition, nameNodeStats, hiveDirectoryContext);
349361

362+
InputFormat<?, ?> inputFormat = getInputFormat(hdfsEnvironment.getConfiguration(hdfsContext, path), storageFormat.getInputFormat(), storageFormat.getSerDe(), false);
363+
if (inputFormat instanceof SymlinkTextInputFormat) {
364+
// For symlinks, follow the paths in the manifest file and create a new iterator of the target files
365+
try {
366+
List<Path> targetPaths = readSymlinkPaths(fs, fileList);
367+
368+
Map<Path, List<Path>> parentToTargets = targetPaths.stream().collect(Collectors.groupingBy(Path::getParent));
369+
370+
ImmutableList.Builder<HiveFileInfo> targetFileInfoList = ImmutableList.builder();
371+
372+
for (Map.Entry<Path, List<Path>> entry : parentToTargets.entrySet()) {
373+
targetFileInfoList.addAll(getTargetPathsHiveFileInfos(
374+
path,
375+
partition,
376+
entry.getKey(),
377+
entry.getValue(),
378+
hiveDirectoryContext,
379+
fs,
380+
directoryLister,
381+
resolvedTable,
382+
nameNodeStats,
383+
session));
384+
}
385+
386+
fileList = targetFileInfoList.build().iterator();
387+
}
388+
catch (IOException e) {
389+
throw new PrestoException(HIVE_BAD_DATA, "Error parsing symlinks", e);
390+
}
391+
}
392+
350393
PartitionQuickStats partitionQuickStats = PartitionQuickStats.EMPTY;
351394
Stopwatch buildStopwatch = Stopwatch.createStarted();
352395
// Build quick stats one by one from statsBuilderStrategies. Do this until we get a non-empty PartitionQuickStats

0 commit comments

Comments
 (0)