Skip to content

Commit 4458113

Browse files
authored
S3 input source (#8903)
* add s3 input source for native batch ingestion * add docs * fixes * checkstyle * lazy splits * fixes and hella tests * fix it * re-use better iterator * use key * javadoc and checkstyle * exception * oops * refactor to use S3Coords instead of URI * remove unused code, add retrying stream to handle s3 stream * remove unused parameter * update to latest master * use list of objects instead of object * serde test * refactor and such * now with the ability to compile * fix signature and javadocs * fix conflicts yet again, fix S3 uri stuffs * more tests, enforce uri for bucket * javadoc * oops * abstract class instead of interface * null or empty * better error
1 parent 282b838 commit 4458113

File tree

30 files changed

+1397
-291
lines changed

30 files changed

+1397
-291
lines changed

core/src/main/java/org/apache/druid/data/input/InputSource.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,5 @@ public interface InputSource
7575
* @param inputFormat to parse data. It can be null if {@link #needsFormat()} = true
7676
* @param temporaryDirectory to store temp data. It will be cleaned up automatically once the task is finished.
7777
*/
78-
InputSourceReader reader(
79-
InputRowSchema inputRowSchema,
80-
@Nullable InputFormat inputFormat,
81-
File temporaryDirectory
82-
);
78+
InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory);
8379
}

core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,47 +19,58 @@
1919

2020
package org.apache.druid.data.input;
2121

22-
import com.google.common.base.Predicate;
2322
import org.apache.druid.data.input.impl.RetryingInputStream;
2423
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
2524
import org.apache.druid.java.util.common.RetryUtils;
25+
import org.apache.druid.utils.CompressionUtils;
2626

2727
import java.io.IOException;
2828
import java.io.InputStream;
2929

30-
public interface RetryingInputEntity extends InputEntity
30+
public abstract class RetryingInputEntity implements InputEntity
3131
{
32+
/**
33+
* Open a {@link RetryingInputStream} wrapper for an underlying input stream, optionally decompressing the retrying
34+
* stream if the file extension matches a known compression, otherwise passing through the retrying stream directly.
35+
*/
3236
@Override
33-
default InputStream open() throws IOException
37+
public InputStream open() throws IOException
3438
{
35-
return new RetryingInputStream<>(
39+
RetryingInputStream<?> retryingInputStream = new RetryingInputStream<>(
3640
this,
3741
new RetryingInputEntityOpenFunction(),
3842
getRetryCondition(),
3943
RetryUtils.DEFAULT_MAX_TRIES
4044
);
45+
return CompressionUtils.decompress(retryingInputStream, getPath());
4146
}
4247

4348
/**
44-
* Directly opens an {@link InputStream} on the input entity.
49+
* Directly opens an {@link InputStream} on the input entity. Decompression should be handled externally, and is
50+
* handled by the default implementation of {@link #open}, so this should return the raw stream for the object.
4551
*/
46-
default InputStream readFromStart() throws IOException
52+
protected InputStream readFromStart() throws IOException
4753
{
4854
return readFrom(0);
4955
}
5056

5157
/**
52-
* Directly opens an {@link InputStream} starting at the given offset on the input entity.
58+
* Directly opens an {@link InputStream} starting at the given offset on the input entity. Decompression should be
59+
* handled externally, and is handled by the default implementation of {@link #open},this should return the raw stream
60+
* for the object.
5361
*
5462
* @param offset an offset to start reading from. A non-negative integer counting
5563
* the number of bytes from the beginning of the entity
5664
*/
57-
InputStream readFrom(long offset) throws IOException;
65+
protected abstract InputStream readFrom(long offset) throws IOException;
5866

59-
@Override
60-
Predicate<Throwable> getRetryCondition();
67+
/**
68+
* Get path name for this entity, used by the default implementation of {@link #open} to determine if the underlying
69+
* stream needs decompressed, based on file extension of the path
70+
*/
71+
protected abstract String getPath();
6172

62-
class RetryingInputEntityOpenFunction implements ObjectOpenFunction<RetryingInputEntity>
73+
private static class RetryingInputEntityOpenFunction implements ObjectOpenFunction<RetryingInputEntity>
6374
{
6475
@Override
6576
public InputStream open(RetryingInputEntity object) throws IOException
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.data.input.impl;
21+
22+
import com.fasterxml.jackson.annotation.JsonCreator;
23+
import com.fasterxml.jackson.annotation.JsonProperty;
24+
import com.google.common.base.Preconditions;
25+
import org.apache.druid.java.util.common.StringUtils;
26+
27+
import java.net.URI;
28+
import java.util.Objects;
29+
30+
/**
31+
* Common type for 'bucket' and 'path' concept of cloud objects to allow code sharing between cloud specific
32+
* implementations. {@link #bucket} and {@link #path} should NOT be URL encoded.
33+
*
34+
* The intention is that this is used as a common representation for storage objects as an alternative to dealing in
35+
* {@link URI} directly, but still provide a mechanism to round-trip with a URI.
36+
*
37+
* In common clouds, bucket names must be dns compliant:
38+
* https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
39+
* https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata
40+
* https://cloud.google.com/storage/docs/naming
41+
*
42+
* The constructor ensures that bucket names are DNS compliant by checking that the URL encoded form of the bucket
43+
* matches the supplied value. Technically it should probably confirm that the bucket is also all lower-case, but
44+
* S3 has a legacy mode where buckets did not have to be compliant so we can't enforce that here unfortunately.
45+
*/
46+
public class CloudObjectLocation
47+
{
48+
private final String bucket;
49+
private final String path;
50+
51+
@JsonCreator
52+
public CloudObjectLocation(@JsonProperty("bucket") String bucket, @JsonProperty("path") String path)
53+
{
54+
this.bucket = Preconditions.checkNotNull(StringUtils.maybeRemoveTrailingSlash(bucket));
55+
this.path = Preconditions.checkNotNull(StringUtils.maybeRemoveLeadingSlash(path));
56+
Preconditions.checkArgument(
57+
this.bucket.equals(StringUtils.urlEncode(this.bucket)),
58+
"bucket must follow DNS-compliant naming conventions"
59+
);
60+
}
61+
62+
public CloudObjectLocation(URI uri)
63+
{
64+
this(uri.getHost(), uri.getPath());
65+
}
66+
67+
/**
68+
* Given a scheme, encode {@link #bucket} and {@link #path} into a {@link URI}.
69+
*
70+
* In all clouds bucket names must be dns compliant, so it does not require encoding
71+
* There is no such restriction on object names, so they will be URL encoded when constructing the URI
72+
*/
73+
public URI toUri(String scheme)
74+
{
75+
// Encode path, except leave '/' characters unencoded
76+
return URI.create(
77+
StringUtils.format(
78+
"%s://%s/%s",
79+
scheme,
80+
bucket,
81+
StringUtils.replace(StringUtils.urlEncode(path), "%2F", "/")
82+
)
83+
);
84+
}
85+
86+
@JsonProperty
87+
public String getBucket()
88+
{
89+
return bucket;
90+
}
91+
92+
@JsonProperty
93+
public String getPath()
94+
{
95+
return path;
96+
}
97+
98+
@Override
99+
public String toString()
100+
{
101+
return "CloudObjectLocation{" +
102+
"bucket='" + bucket + '\'' +
103+
", path='" + path + '\'' +
104+
'}';
105+
}
106+
107+
@Override
108+
public boolean equals(Object o)
109+
{
110+
if (this == o) {
111+
return true;
112+
}
113+
114+
if (o == null || getClass() != o.getClass()) {
115+
return false;
116+
}
117+
118+
final CloudObjectLocation that = (CloudObjectLocation) o;
119+
return Objects.equals(bucket, that.bucket) &&
120+
Objects.equals(path, that.path);
121+
}
122+
123+
@Override
124+
public int hashCode()
125+
{
126+
return Objects.hash(bucket, path);
127+
}
128+
129+
}

core/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.druid.java.util.common.StringUtils;
2828
import org.apache.druid.java.util.common.logger.Logger;
2929
import org.apache.druid.metadata.PasswordProvider;
30-
import org.apache.druid.utils.CompressionUtils;
3130

3231
import javax.annotation.Nullable;
3332
import java.io.IOException;
@@ -36,7 +35,7 @@
3635
import java.net.URLConnection;
3736
import java.util.Base64;
3837

39-
public class HttpEntity implements RetryingInputEntity
38+
public class HttpEntity extends RetryingInputEntity
4039
{
4140
private static final Logger LOG = new Logger(HttpEntity.class);
4241

@@ -64,12 +63,15 @@ public URI getUri()
6463
}
6564

6665
@Override
67-
public InputStream readFrom(long offset) throws IOException
66+
protected InputStream readFrom(long offset) throws IOException
6867
{
69-
return CompressionUtils.decompress(
70-
openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset),
71-
uri.toString()
72-
);
68+
return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset);
69+
}
70+
71+
@Override
72+
protected String getPath()
73+
{
74+
return uri.getPath();
7375
}
7476

7577
@Override

core/src/main/java/org/apache/druid/java/util/common/StringUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,16 @@ public static String urlDecode(String s)
237237
}
238238
}
239239

240+
public static String maybeRemoveLeadingSlash(String s)
241+
{
242+
return s != null && s.startsWith("/") ? s.substring(1) : s;
243+
}
244+
245+
public static String maybeRemoveTrailingSlash(String s)
246+
{
247+
return s != null && s.endsWith("/") ? s.substring(0, s.length() - 1) : s;
248+
}
249+
240250
/**
241251
* Removes all occurrences of the given char from the given string. This method is an optimal version of
242252
* {@link String#replace(CharSequence, CharSequence) s.replace("c", "")}.

core/src/main/java/org/apache/druid/utils/CollectionUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import com.google.common.collect.Maps;
2424
import org.apache.druid.java.util.common.ISE;
2525

26+
import javax.annotation.Nullable;
2627
import java.util.AbstractCollection;
2728
import java.util.Collection;
2829
import java.util.Comparator;
2930
import java.util.HashMap;
3031
import java.util.Iterator;
32+
import java.util.List;
3133
import java.util.Map;
3234
import java.util.Spliterator;
3335
import java.util.TreeSet;
@@ -116,6 +118,11 @@ public static <K, V, K2> Map<K2, V> mapKeys(Map<K, V> map, Function<K, K2> keyMa
116118
return result;
117119
}
118120

121+
public static boolean isNullOrEmpty(@Nullable List<?> list)
122+
{
123+
return list == null || list.isEmpty();
124+
}
125+
119126
private CollectionUtils()
120127
{
121128
}

0 commit comments

Comments
 (0)