Skip to content

Commit 20b4a3b

Browse files
[lance] Do not replace oss scheme to s3 scheme (#6764)
1 parent 708ea8f commit 20b4a3b

File tree

4 files changed

+260
-16
lines changed

4 files changed

+260
-16
lines changed

paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,24 @@ public class LanceUtils {
4141
private static final Class<?> jindoFileIOKlass;
4242
private static final Class<?> hadoopFileIOKlass;
4343

44+
// OSS configuration keys
45+
public static final String FS_OSS_ENDPOINT = "fs.oss.endpoint";
46+
public static final String FS_OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
47+
public static final String FS_OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
48+
public static final String FS_OSS_SECURITY_TOKEN = "fs.oss.securityToken";
49+
private static final String FS_PREFIX = "fs.";
50+
51+
// Storage options keys for Lance
52+
public static final String STORAGE_OPTION_ENDPOINT = "endpoint";
53+
public static final String STORAGE_OPTION_ACCESS_KEY_ID = "access_key_id";
54+
public static final String STORAGE_OPTION_SECRET_ACCESS_KEY = "secret_access_key";
55+
public static final String STORAGE_OPTION_SESSION_TOKEN = "session_token";
56+
public static final String STORAGE_OPTION_VIRTUAL_HOSTED_STYLE = "virtual_hosted_style_request";
57+
public static final String STORAGE_OPTION_OSS_ACCESS_KEY_ID = "oss_access_key_id";
58+
public static final String STORAGE_OPTION_OSS_SECRET_ACCESS_KEY = "oss_secret_access_key";
59+
public static final String STORAGE_OPTION_OSS_SESSION_TOKEN = "oss_session_token";
60+
public static final String STORAGE_OPTION_OSS_ENDPOINT = "oss_endpoint";
61+
4462
static {
4563
Class<?> klass;
4664
try {
@@ -107,19 +125,38 @@ public static Pair<Path, Map<String, String>> toLanceSpecified(FileIO fileIO, Pa
107125
converted = new Path(uriString.replace("traceable:/", "file:/"));
108126
}
109127
} else if ("oss".equals(schema)) {
110-
assert originOptions.containsKey("fs.oss.endpoint");
111-
assert originOptions.containsKey("fs.oss.accessKeyId");
112-
assert originOptions.containsKey("fs.oss.accessKeySecret");
128+
assert originOptions.containsKey(FS_OSS_ENDPOINT);
129+
assert originOptions.containsKey(FS_OSS_ACCESS_KEY_ID);
130+
assert originOptions.containsKey(FS_OSS_ACCESS_KEY_SECRET);
131+
132+
for (String key : originOptions.keySet()) {
133+
if (key.startsWith(FS_PREFIX)) {
134+
storageOptions.put(key, originOptions.get(key));
135+
}
136+
}
137+
138+
storageOptions.put(
139+
STORAGE_OPTION_ENDPOINT,
140+
"https://" + uri.getHost() + "." + originOptions.get(FS_OSS_ENDPOINT));
113141
storageOptions.put(
114-
"endpoint",
115-
"https://" + uri.getHost() + "." + originOptions.get("fs.oss.endpoint"));
116-
storageOptions.put("access_key_id", originOptions.get("fs.oss.accessKeyId"));
117-
storageOptions.put("secret_access_key", originOptions.get("fs.oss.accessKeySecret"));
118-
storageOptions.put("virtual_hosted_style_request", "true");
119-
if (originOptions.containsKey("fs.oss.securityToken")) {
120-
storageOptions.put("session_token", originOptions.get("fs.oss.securityToken"));
142+
STORAGE_OPTION_ACCESS_KEY_ID, originOptions.get(FS_OSS_ACCESS_KEY_ID));
143+
storageOptions.put(
144+
STORAGE_OPTION_OSS_ACCESS_KEY_ID, originOptions.get(FS_OSS_ACCESS_KEY_ID));
145+
storageOptions.put(
146+
STORAGE_OPTION_SECRET_ACCESS_KEY, originOptions.get(FS_OSS_ACCESS_KEY_SECRET));
147+
storageOptions.put(
148+
STORAGE_OPTION_OSS_SECRET_ACCESS_KEY,
149+
originOptions.get(FS_OSS_ACCESS_KEY_SECRET));
150+
storageOptions.put(STORAGE_OPTION_VIRTUAL_HOSTED_STYLE, "true");
151+
if (originOptions.containsKey(FS_OSS_SECURITY_TOKEN)) {
152+
storageOptions.put(
153+
STORAGE_OPTION_SESSION_TOKEN, originOptions.get(FS_OSS_SECURITY_TOKEN));
154+
storageOptions.put(
155+
STORAGE_OPTION_OSS_SESSION_TOKEN, originOptions.get(FS_OSS_SECURITY_TOKEN));
156+
}
157+
if (originOptions.containsKey(FS_OSS_ENDPOINT)) {
158+
storageOptions.put(STORAGE_OPTION_OSS_ENDPOINT, originOptions.get(FS_OSS_ENDPOINT));
121159
}
122-
converted = new Path(uri.toString().replace("oss://", "s3://"));
123160
}
124161

125162
return Pair.of(converted, storageOptions);
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.format.lance;
20+
21+
import org.apache.paimon.fs.FileIO;
22+
import org.apache.paimon.fs.Path;
23+
import org.apache.paimon.fs.PluginFileIO;
24+
import org.apache.paimon.options.Options;
25+
import org.apache.paimon.utils.Pair;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.util.Map;
30+
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertTrue;
33+
34+
class LanceUtilsTest {
35+
36+
private static class TestFileIO extends PluginFileIO {
37+
private static final long serialVersionUID = 1L;
38+
39+
@Override
40+
public boolean isObjectStore() {
41+
return true;
42+
}
43+
44+
@Override
45+
protected FileIO createFileIO(Path path) {
46+
throw new UnsupportedOperationException("Not used in tests");
47+
}
48+
49+
@Override
50+
protected ClassLoader pluginClassLoader() {
51+
return Thread.currentThread().getContextClassLoader();
52+
}
53+
54+
void setOptions(Options opts) {
55+
this.options = opts;
56+
}
57+
}
58+
59+
@Test
60+
void testOssUrlConversion() {
61+
Path path = new Path("oss://test-bucket/db-name.db/table-name/bucket-0/data.lance");
62+
Options options = new Options();
63+
options.set(LanceUtils.FS_OSS_ENDPOINT, "oss-example-region.example.com");
64+
options.set(LanceUtils.FS_OSS_ACCESS_KEY_ID, "test-key");
65+
options.set(LanceUtils.FS_OSS_ACCESS_KEY_SECRET, "test-secret");
66+
67+
TestFileIO fileIO = new TestFileIO();
68+
fileIO.setOptions(options);
69+
70+
Pair<Path, Map<String, String>> result = LanceUtils.toLanceSpecified(fileIO, path);
71+
72+
assertTrue(result.getKey().toString().startsWith("oss://test-bucket/"));
73+
74+
Map<String, String> storageOptions = result.getValue();
75+
assertEquals(
76+
"https://test-bucket.oss-example-region.example.com",
77+
storageOptions.get(LanceUtils.STORAGE_OPTION_ENDPOINT));
78+
assertEquals("test-key", storageOptions.get(LanceUtils.STORAGE_OPTION_ACCESS_KEY_ID));
79+
assertEquals(
80+
"test-secret", storageOptions.get(LanceUtils.STORAGE_OPTION_SECRET_ACCESS_KEY));
81+
assertEquals("true", storageOptions.get(LanceUtils.STORAGE_OPTION_VIRTUAL_HOSTED_STYLE));
82+
83+
assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_ENDPOINT));
84+
assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_ACCESS_KEY_ID));
85+
assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_ACCESS_KEY_SECRET));
86+
}
87+
88+
@Test
89+
void testOssUrlWithSecurityToken() {
90+
Path path = new Path("oss://my-bucket/path/to/file.lance");
91+
Options options = new Options();
92+
options.set(LanceUtils.FS_OSS_ENDPOINT, "oss-example-region.example.com");
93+
options.set(LanceUtils.FS_OSS_ACCESS_KEY_ID, "test-access-key");
94+
options.set(LanceUtils.FS_OSS_ACCESS_KEY_SECRET, "test-secret-key");
95+
options.set(LanceUtils.FS_OSS_SECURITY_TOKEN, "test-token");
96+
97+
TestFileIO fileIO = new TestFileIO();
98+
fileIO.setOptions(options);
99+
100+
Pair<Path, Map<String, String>> result = LanceUtils.toLanceSpecified(fileIO, path);
101+
102+
Map<String, String> storageOptions = result.getValue();
103+
assertEquals("test-token", storageOptions.get(LanceUtils.STORAGE_OPTION_SESSION_TOKEN));
104+
assertEquals("test-token", storageOptions.get(LanceUtils.STORAGE_OPTION_OSS_SESSION_TOKEN));
105+
assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_SECURITY_TOKEN));
106+
}
107+
}

paimon-python/pypaimon/read/reader/lance_utils.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import os
2020
from typing import Dict, Optional, Tuple
21+
from urllib.parse import urlparse
2122

2223
from pypaimon.common.file_io import FileIO
2324
from pypaimon.common.options.config import OssOptions
@@ -38,21 +39,35 @@ def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[D
3839
if scheme == 'oss':
3940
storage_options = {}
4041
if hasattr(file_io, 'properties'):
42+
for key, value in file_io.properties.data.items():
43+
if str(key).startswith('fs.'):
44+
storage_options[key] = value
45+
46+
parsed = urlparse(file_path)
47+
bucket = parsed.netloc
48+
path = parsed.path.lstrip('/')
49+
4150
endpoint = file_io.properties.get(OssOptions.OSS_ENDPOINT)
4251
if endpoint:
43-
if not endpoint.startswith('http://') and not endpoint.startswith('https://'):
44-
storage_options['endpoint'] = f"https://{endpoint}"
45-
else:
46-
storage_options['endpoint'] = endpoint
52+
endpoint_clean = endpoint.replace('http://', '').replace('https://', '')
53+
storage_options['endpoint'] = f"https://{bucket}.{endpoint_clean}"
4754

4855
if file_io.properties.contains(OssOptions.OSS_ACCESS_KEY_ID):
4956
storage_options['access_key_id'] = file_io.properties.get(OssOptions.OSS_ACCESS_KEY_ID)
57+
storage_options['oss_access_key_id'] = file_io.properties.get(OssOptions.OSS_ACCESS_KEY_ID)
5058
if file_io.properties.contains(OssOptions.OSS_ACCESS_KEY_SECRET):
5159
storage_options['secret_access_key'] = file_io.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET)
60+
storage_options['oss_secret_access_key'] = file_io.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET)
5261
if file_io.properties.contains(OssOptions.OSS_SECURITY_TOKEN):
5362
storage_options['session_token'] = file_io.properties.get(OssOptions.OSS_SECURITY_TOKEN)
63+
storage_options['oss_session_token'] = file_io.properties.get(OssOptions.OSS_SECURITY_TOKEN)
64+
if file_io.properties.contains(OssOptions.OSS_ENDPOINT):
65+
storage_options['oss_endpoint'] = file_io.properties.get(OssOptions.OSS_ENDPOINT)
5466
storage_options['virtual_hosted_style_request'] = 'true'
5567

56-
file_path_for_lance = file_path.replace('oss://', 's3://')
68+
if bucket and path:
69+
file_path_for_lance = f"oss://{bucket}/{path}"
70+
elif bucket:
71+
file_path_for_lance = f"oss://{bucket}"
5772

5873
return file_path_for_lance, storage_options
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
import unittest
20+
21+
from pypaimon.common.options import Options
22+
from pypaimon.common.options.config import OssOptions
23+
from pypaimon.common.file_io import FileIO
24+
from pypaimon.read.reader.lance_utils import to_lance_specified
25+
26+
27+
class LanceUtilsTest(unittest.TestCase):
28+
29+
def test_oss_url_bucket_extraction_correctness(self):
30+
file_path = "oss://test-bucket/db-name.db/table-name/bucket-0/data.lance"
31+
32+
properties = Options({
33+
OssOptions.OSS_ENDPOINT.key(): "oss-example-region.example.com",
34+
OssOptions.OSS_ACCESS_KEY_ID.key(): "test-key",
35+
OssOptions.OSS_ACCESS_KEY_SECRET.key(): "test-secret",
36+
})
37+
38+
file_io = FileIO(file_path, properties)
39+
file_path_for_lance, storage_options = to_lance_specified(file_io, file_path)
40+
41+
self.assertEqual(
42+
storage_options['endpoint'],
43+
"https://test-bucket.oss-example-region.example.com"
44+
)
45+
46+
self.assertTrue(file_path_for_lance.startswith("oss://test-bucket/"))
47+
48+
self.assertEqual(storage_options.get('virtual_hosted_style_request'), 'true')
49+
50+
self.assertTrue('fs.oss.endpoint' in storage_options)
51+
self.assertTrue('fs.oss.accessKeyId' in storage_options)
52+
self.assertTrue('fs.oss.accessKeySecret' in storage_options)
53+
54+
def test_oss_url_with_security_token(self):
55+
file_path = "oss://my-bucket/path/to/file.lance"
56+
57+
properties = Options({
58+
OssOptions.OSS_ENDPOINT.key(): "oss-example-region.example.com",
59+
OssOptions.OSS_ACCESS_KEY_ID.key(): "test-access-key",
60+
OssOptions.OSS_ACCESS_KEY_SECRET.key(): "test-secret-key",
61+
OssOptions.OSS_SECURITY_TOKEN.key(): "test-token",
62+
})
63+
64+
file_io = FileIO(file_path, properties)
65+
file_path_for_lance, storage_options = to_lance_specified(file_io, file_path)
66+
67+
self.assertEqual(file_path_for_lance, "oss://my-bucket/path/to/file.lance")
68+
69+
self.assertEqual(
70+
storage_options['endpoint'],
71+
"https://my-bucket.oss-example-region.example.com"
72+
)
73+
74+
self.assertEqual(storage_options.get('virtual_hosted_style_request'), 'true')
75+
76+
self.assertEqual(storage_options.get('access_key_id'), "test-access-key")
77+
self.assertEqual(storage_options.get('secret_access_key'), "test-secret-key")
78+
self.assertEqual(storage_options.get('session_token'), "test-token")
79+
self.assertEqual(storage_options.get('oss_session_token'), "test-token")
80+
81+
self.assertTrue('fs.oss.securityToken' in storage_options)
82+
83+
84+
if __name__ == '__main__':
85+
unittest.main()

0 commit comments

Comments
 (0)