Skip to content

Commit 6dfd7ff

Browse files
committed
WIP
1 parent 971443a commit 6dfd7ff

File tree

7 files changed

+314
-1
lines changed

7 files changed

+314
-1
lines changed

presto-clp/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@
7575
<artifactId>jackson-databind</artifactId>
7676
</dependency>
7777

78+
<dependency>
79+
<groupId>com.fasterxml.jackson.dataformat</groupId>
80+
<artifactId>jackson-dataformat-yaml</artifactId>
81+
</dependency>
82+
7883
<dependency>
7984
<groupId>com.facebook.presto</groupId>
8085
<artifactId>presto-spi</artifactId>
@@ -148,6 +153,12 @@
148153
<scope>test</scope>
149154
</dependency>
150155

156+
<dependency>
157+
<groupId>org.apache.pinot</groupId>
158+
<artifactId>pinot-jdbc-client</artifactId>
159+
<version>1.3.0</version>
160+
</dependency>
161+
151162
<dependency>
152163
<groupId>commons-io</groupId>
153164
<artifactId>commons-io</artifactId>

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class ClpConfig
3333
private long metadataRefreshInterval = 60;
3434
private long metadataExpireInterval = 600;
3535

36+
private String metadataYamlPath;
37+
3638
private String splitFilterConfig;
3739
private SplitFilterProviderType splitFilterProviderType = SplitFilterProviderType.MYSQL;
3840
private SplitProviderType splitProviderType = SplitProviderType.MYSQL;
@@ -151,6 +153,18 @@ public ClpConfig setMetadataExpireInterval(long metadataExpireInterval)
151153
return this;
152154
}
153155

156+
public String getMetadataYamlPath()
157+
{
158+
return metadataYamlPath;
159+
}
160+
161+
@Config("clp.metadata-yaml-path")
162+
public ClpConfig setMetadataYamlPath(String metadataYamlPath)
163+
{
164+
this.metadataYamlPath = metadataYamlPath;
165+
return this;
166+
}
167+
154168
public String getSplitFilterConfig()
155169
{
156170
return splitFilterConfig;
@@ -189,7 +203,8 @@ public ClpConfig setSplitProviderType(SplitProviderType splitProviderType)
189203

190204
public enum MetadataProviderType
191205
{
192-
MYSQL
206+
MYSQL,
207+
YAML
193208
}
194209

195210
public enum SplitFilterProviderType

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public enum ClpErrorCode
2828
CLP_UNSUPPORTED_SPLIT_SOURCE(2, EXTERNAL),
2929
CLP_UNSUPPORTED_TYPE(3, EXTERNAL),
3030
CLP_UNSUPPORTED_CONFIG_OPTION(4, EXTERNAL),
31+
CLP_UNSUPPORTED_TABLE_SCHEMA_YAML(5, EXTERNAL),
3132

3233
CLP_SPLIT_FILTER_CONFIG_NOT_FOUND(10, USER_ERROR),
3334
CLP_MANDATORY_SPLIT_FILTER_NOT_VALID(11, USER_ERROR),

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
1717
import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
1818
import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider;
19+
import com.facebook.presto.plugin.clp.metadata.ClpYamlMetadataProvider;
1920
import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider;
2021
import com.facebook.presto.plugin.clp.split.ClpSplitProvider;
2122
import com.facebook.presto.plugin.clp.split.filter.ClpMySqlSplitFilterProvider;
@@ -56,6 +57,9 @@ protected void setup(Binder binder)
5657
if (config.getMetadataProviderType() == MetadataProviderType.MYSQL) {
5758
binder.bind(ClpMetadataProvider.class).to(ClpMySqlMetadataProvider.class).in(Scopes.SINGLETON);
5859
}
60+
else if (config.getMetadataProviderType() == MetadataProviderType.YAML) {
61+
binder.bind(ClpMetadataProvider.class).to(ClpYamlMetadataProvider.class).in(Scopes.SINGLETON);
62+
}
5963
else {
6064
throw new PrestoException(CLP_UNSUPPORTED_METADATA_SOURCE, "Unsupported metadata provider type: " + config.getMetadataProviderType());
6165
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.plugin.clp.metadata;
15+
16+
import com.facebook.airlift.log.Logger;
17+
import com.facebook.presto.plugin.clp.ClpColumnHandle;
18+
import com.facebook.presto.plugin.clp.ClpConfig;
19+
import com.facebook.presto.plugin.clp.ClpTableHandle;
20+
import com.facebook.presto.spi.PrestoException;
21+
import com.facebook.presto.spi.SchemaTableName;
22+
import com.fasterxml.jackson.core.type.TypeReference;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
25+
import com.google.common.collect.ImmutableList;
26+
import com.google.common.collect.ImmutableMap;
27+
import org.apache.commons.lang3.tuple.Pair;
28+
29+
import java.io.File;
30+
import java.io.IOException;
31+
import java.nio.file.Path;
32+
import java.nio.file.Paths;
33+
import java.util.*;
34+
35+
import static com.facebook.presto.plugin.clp.ClpConnectorFactory.CONNECTOR_NAME;
36+
import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_TABLE_SCHEMA_YAML;
37+
import static java.lang.String.format;
38+
39+
public class ClpYamlMetadataProvider
40+
implements ClpMetadataProvider
41+
{
42+
private static final Logger log = Logger.get(ClpYamlMetadataProvider.class);
43+
private final ClpConfig config;
44+
private Map<SchemaTableName, String> tableSchemaYamlMap;
45+
46+
public ClpYamlMetadataProvider(ClpConfig config)
47+
{
48+
this.config = config;
49+
}
50+
51+
@Override
52+
public List<ClpColumnHandle> listColumnHandles(SchemaTableName schemaTableName)
53+
{
54+
Path tableSchemaPath = Paths.get(tableSchemaYamlMap.get(schemaTableName));
55+
ClpSchemaTree schemaTree = new ClpSchemaTree(config.isPolymorphicTypeEnabled());
56+
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
57+
mapper.findAndRegisterModules();
58+
59+
try {
60+
Map<String, Object> root = mapper.readValue(
61+
new File(tableSchemaPath.toString()),
62+
new TypeReference<HashMap<String, Object>>() {});
63+
ImmutableList.Builder<Pair<String, Byte>> typesBuilder = ImmutableList.builder();
64+
collectTypes(root, "", typesBuilder);
65+
for (Pair<String, Byte> entry : typesBuilder.build()) {
66+
schemaTree.addColumn(entry.getLeft(), entry.getRight());
67+
}
68+
return schemaTree.collectColumnHandles();
69+
}
70+
catch (IOException e) {
71+
log.error(format("Failed to parse table schema file %s, error: %s", tableSchemaPath, e.getMessage()), e);
72+
}
73+
return Collections.emptyList();
74+
}
75+
76+
@Override
77+
public List<ClpTableHandle> listTableHandles(String schemaName)
78+
{
79+
Path tablesSchemaPath = Paths.get(config.getMetadataYamlPath());
80+
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
81+
mapper.findAndRegisterModules();
82+
83+
try {
84+
Map<String, Object> root = mapper.readValue(new File(tablesSchemaPath.toString()),
85+
new TypeReference<HashMap<String, Object>>() {});
86+
87+
Object catalogObj = root.get(CONNECTOR_NAME);
88+
if (!(catalogObj instanceof Map)) {
89+
throw new PrestoException(CLP_UNSUPPORTED_TABLE_SCHEMA_YAML, format("The table schema does not contain field: %s", CONNECTOR_NAME));
90+
}
91+
Object schemaObj = ((Map<String, Object>) catalogObj).get(schemaName);
92+
ImmutableList.Builder<ClpTableHandle> tableHandlesBuilder = new ImmutableList.Builder<>();
93+
ImmutableMap.Builder<SchemaTableName, String> tableSchemaYamlMapBuilder = new ImmutableMap.Builder<>();
94+
for (Map.Entry<String, Object> schemaEntry : ((Map<String, Object>) schemaObj).entrySet()) {
95+
String tableName = schemaEntry.getKey();
96+
String tableSchemaYamlPath = schemaEntry.getValue().toString();
97+
// The splits' absolute paths will be stored in Pinot metadata database
98+
SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName);
99+
tableHandlesBuilder.add(new ClpTableHandle(schemaTableName, ""));
100+
tableSchemaYamlMapBuilder.put(schemaTableName, tableSchemaYamlPath);
101+
}
102+
this.tableSchemaYamlMap = tableSchemaYamlMapBuilder.build();
103+
return tableHandlesBuilder.build();
104+
}
105+
catch (IOException e) {
106+
log.error(format("Failed to parse metadata file: %s, error: %s", config.getMetadataYamlPath(), e.getMessage()), e);
107+
}
108+
return Collections.emptyList();
109+
}
110+
111+
private void collectTypes(Object node, String prefix, ImmutableList.Builder<Pair<String, Byte>> fullNamesBuilder)
112+
{
113+
if (node instanceof Number) {
114+
fullNamesBuilder.add(Pair.of(prefix, ((Number) node).byteValue()));
115+
return;
116+
}
117+
if (node instanceof List) {
118+
for (Number type : (List<Number>) node) {
119+
fullNamesBuilder.add(Pair.of(prefix, (type.byteValue())));
120+
}
121+
return;
122+
}
123+
for (Map.Entry<String, Object> entry : ((Map<String, Object>) node).entrySet()) {
124+
if (!prefix.isEmpty()) {
125+
collectTypes(entry.getValue(), format("%s.%s", prefix, entry.getKey()), fullNamesBuilder);
126+
continue;
127+
}
128+
collectTypes(entry.getValue(), entry.getKey(), fullNamesBuilder);
129+
}
130+
}
131+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.plugin.clp.split;
15+
16+
import com.facebook.presto.plugin.clp.ClpSplit;
17+
import com.facebook.presto.plugin.clp.ClpTableLayoutHandle;
18+
19+
import java.util.Collections;
20+
import java.util.List;
21+
22+
public class ClpPinotSplitProvider
23+
implements ClpSplitProvider
24+
{
25+
@Override
26+
public List<ClpSplit> listSplits(ClpTableLayoutHandle clpTableLayoutHandle) {
27+
String url = "jdbc:pinot://localhost:8099";
28+
29+
return Collections.emptyList();
30+
}
31+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.plugin.clp;
15+
16+
import com.facebook.presto.common.type.ArrayType;
17+
import com.facebook.presto.common.type.RowType;
18+
import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
19+
import com.facebook.presto.plugin.clp.metadata.ClpYamlMetadataProvider;
20+
import com.facebook.presto.spi.ColumnHandle;
21+
import com.facebook.presto.spi.ColumnMetadata;
22+
import com.facebook.presto.spi.ConnectorTableMetadata;
23+
import com.facebook.presto.spi.SchemaTableName;
24+
import com.google.common.collect.ImmutableList;
25+
import com.google.common.collect.ImmutableSet;
26+
import org.testng.annotations.BeforeTest;
27+
import org.testng.annotations.Test;
28+
29+
import java.util.HashSet;
30+
import java.util.Optional;
31+
32+
import static com.facebook.presto.common.type.BigintType.BIGINT;
33+
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
34+
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
35+
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
36+
import static com.facebook.presto.plugin.clp.ClpConfig.MetadataProviderType.YAML;
37+
import static com.facebook.presto.plugin.clp.ClpMetadata.DEFAULT_SCHEMA_NAME;
38+
import static com.facebook.presto.testing.TestingConnectorSession.SESSION;
39+
import static org.testng.Assert.assertEquals;
40+
41+
public class TestClpYamlMetadata
42+
{
43+
private static final String PINOT_BROKER_URL = "jdbc:pinot://localhost:8099";
44+
private static final String METADATA_YAML_PATH = "/home/xiaochong-dev/presto-e2e/pinot/tables-schema.yaml";
45+
private static final String TABLE_NAME = "cockroachdb";
46+
private ClpMetadata metadata;
47+
48+
@BeforeTest
49+
public void setUp()
50+
{
51+
ClpConfig config = new ClpConfig()
52+
.setPolymorphicTypeEnabled(true)
53+
.setMetadataDbUrl(PINOT_BROKER_URL)
54+
.setMetadataProviderType(YAML)
55+
.setMetadataYamlPath(METADATA_YAML_PATH);
56+
ClpMetadataProvider metadataProvider = new ClpYamlMetadataProvider(config);
57+
metadata = new ClpMetadata(config, metadataProvider);
58+
}
59+
60+
@Test
61+
public void testListSchemaNames()
62+
{
63+
assertEquals(metadata.listSchemaNames(SESSION), ImmutableList.of(DEFAULT_SCHEMA_NAME));
64+
}
65+
66+
@Test
67+
public void testListTables()
68+
{
69+
ImmutableSet.Builder<SchemaTableName> builder = ImmutableSet.builder();
70+
builder.add(new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME));
71+
assertEquals(new HashSet<>(metadata.listTables(SESSION, Optional.empty())), builder.build());
72+
}
73+
74+
@Test
75+
public void testGetTableMetadata()
76+
{
77+
ClpTableHandle clpTableHandle = (ClpTableHandle) metadata.getTableHandle(SESSION, new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME));
78+
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(SESSION, clpTableHandle);
79+
// ImmutableSet<ColumnMetadata> columnMetadata = ImmutableSet.<ColumnMetadata>builder()
80+
// .add(ColumnMetadata.builder()
81+
// .setName("a_bigint")
82+
// .setType(BIGINT)
83+
// .setNullable(true)
84+
// .build())
85+
// .add(ColumnMetadata.builder()
86+
// .setName("a_varchar")
87+
// .setType(VARCHAR)
88+
// .setNullable(true)
89+
// .build())
90+
// .add(ColumnMetadata.builder()
91+
// .setName("b_double")
92+
// .setType(DOUBLE)
93+
// .setNullable(true)
94+
// .build())
95+
// .add(ColumnMetadata.builder()
96+
// .setName("b_varchar")
97+
// .setType(VARCHAR)
98+
// .setNullable(true)
99+
// .build())
100+
// .add(ColumnMetadata.builder()
101+
// .setName("c")
102+
// .setType(RowType.from(ImmutableList.of(
103+
// RowType.field("d", BOOLEAN),
104+
// RowType.field("e", VARCHAR))))
105+
// .setNullable(true)
106+
// .build())
107+
// .add(ColumnMetadata.builder()
108+
// .setName("f")
109+
// .setType(RowType.from(ImmutableList.of(
110+
// RowType.field("g",
111+
// RowType.from(ImmutableList.of(
112+
// RowType.field("h", new ArrayType(VARCHAR))))))))
113+
// .setNullable(true)
114+
// .build())
115+
// .build();
116+
// assertEquals(columnMetadata, ImmutableSet.copyOf(tableMetadata.getColumns()));
117+
ImmutableSet<ColumnMetadata> actual = ImmutableSet.copyOf(tableMetadata.getColumns());
118+
System.out.println("Hello world");
119+
}
120+
}

0 commit comments

Comments
 (0)