Skip to content

Commit 2adcb33

Browse files
committed
WIP
1 parent 6dfd7ff commit 2adcb33

File tree

7 files changed

+151
-32
lines changed

7 files changed

+151
-32
lines changed

presto-clp/pom.xml

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,16 +153,20 @@
153153
<scope>test</scope>
154154
</dependency>
155155

156-
<dependency>
157-
<groupId>org.apache.pinot</groupId>
158-
<artifactId>pinot-jdbc-client</artifactId>
159-
<version>1.3.0</version>
160-
</dependency>
161-
162156
<dependency>
163157
<groupId>commons-io</groupId>
164158
<artifactId>commons-io</artifactId>
165159
<scope>test</scope>
166160
</dependency>
167161
</dependencies>
162+
163+
<dependencyManagement>
164+
<dependencies>
165+
<dependency>
166+
<groupId>org.yaml</groupId>
167+
<artifactId>snakeyaml</artifactId>
168+
<version>2.1</version> <!-- upper bound -->
169+
</dependency>
170+
</dependencies>
171+
</dependencyManagement>
168172
</project>

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ public enum SplitFilterProviderType
214214

215215
public enum SplitProviderType
216216
{
217-
MYSQL
217+
MYSQL,
218+
PINOT
218219
}
219220
}

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider;
1919
import com.facebook.presto.plugin.clp.metadata.ClpYamlMetadataProvider;
2020
import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider;
21+
import com.facebook.presto.plugin.clp.split.ClpPinotSplitProvider;
2122
import com.facebook.presto.plugin.clp.split.ClpSplitProvider;
2223
import com.facebook.presto.plugin.clp.split.filter.ClpMySqlSplitFilterProvider;
2324
import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider;
@@ -67,6 +68,9 @@ else if (config.getMetadataProviderType() == MetadataProviderType.YAML) {
6768
if (config.getSplitProviderType() == SplitProviderType.MYSQL) {
6869
binder.bind(ClpSplitProvider.class).to(ClpMySqlSplitProvider.class).in(Scopes.SINGLETON);
6970
}
71+
else if (config.getSplitProviderType() == SplitProviderType.PINOT) {
72+
binder.bind(ClpSplitProvider.class).to(ClpPinotSplitProvider.class).in(Scopes.SINGLETON);
73+
}
7074
else {
7175
throw new PrestoException(CLP_UNSUPPORTED_SPLIT_SOURCE, "Unsupported split provider type: " + config.getSplitProviderType());
7276
}

presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpYamlMetadataProvider.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
2525
import com.google.common.collect.ImmutableList;
2626
import com.google.common.collect.ImmutableMap;
27-
import org.apache.commons.lang3.tuple.Pair;
27+
28+
import javax.inject.Inject;
2829

2930
import java.io.File;
3031
import java.io.IOException;
3132
import java.nio.file.Path;
3233
import java.nio.file.Paths;
33-
import java.util.*;
34+
import java.util.Collections;
35+
import java.util.HashMap;
36+
import java.util.List;
37+
import java.util.Map;
3438

3539
import static com.facebook.presto.plugin.clp.ClpConnectorFactory.CONNECTOR_NAME;
3640
import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_TABLE_SCHEMA_YAML;
@@ -43,6 +47,7 @@ public class ClpYamlMetadataProvider
4347
private final ClpConfig config;
4448
private Map<SchemaTableName, String> tableSchemaYamlMap;
4549

50+
@Inject
4651
public ClpYamlMetadataProvider(ClpConfig config)
4752
{
4853
this.config = config;
@@ -54,21 +59,24 @@ public List<ClpColumnHandle> listColumnHandles(SchemaTableName schemaTableName)
5459
Path tableSchemaPath = Paths.get(tableSchemaYamlMap.get(schemaTableName));
5560
ClpSchemaTree schemaTree = new ClpSchemaTree(config.isPolymorphicTypeEnabled());
5661
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
57-
mapper.findAndRegisterModules();
5862

5963
try {
6064
Map<String, Object> root = mapper.readValue(
6165
new File(tableSchemaPath.toString()),
6266
new TypeReference<HashMap<String, Object>>() {});
63-
ImmutableList.Builder<Pair<String, Byte>> typesBuilder = ImmutableList.builder();
64-
collectTypes(root, "", typesBuilder);
65-
for (Pair<String, Byte> entry : typesBuilder.build()) {
66-
schemaTree.addColumn(entry.getLeft(), entry.getRight());
67+
ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
68+
ImmutableList.Builder<Byte> typesBuilder = ImmutableList.builder();
69+
collectTypes(root, "", namesBuilder, typesBuilder);
70+
ImmutableList<String> names = namesBuilder.build();
71+
ImmutableList<Byte> types = typesBuilder.build();
72+
// The names and types should have same sizes
73+
for (int i = 0; i < names.size(); i++) {
74+
schemaTree.addColumn(names.get(i), types.get(i));
6775
}
6876
return schemaTree.collectColumnHandles();
6977
}
7078
catch (IOException e) {
71-
log.error(format("Failed to parse table schema file %s, error: %s", tableSchemaPath, e.getMessage()), e);
79+
log.error(format("Failed to parse table schema file %s, error: %s", tableSchemaPath, e.getMessage()), e);
7280
}
7381
return Collections.emptyList();
7482
}
@@ -78,7 +86,6 @@ public List<ClpTableHandle> listTableHandles(String schemaName)
7886
{
7987
Path tablesSchemaPath = Paths.get(config.getMetadataYamlPath());
8088
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
81-
mapper.findAndRegisterModules();
8289

8390
try {
8491
Map<String, Object> root = mapper.readValue(new File(tablesSchemaPath.toString()),
@@ -100,32 +107,34 @@ public List<ClpTableHandle> listTableHandles(String schemaName)
100107
tableSchemaYamlMapBuilder.put(schemaTableName, tableSchemaYamlPath);
101108
}
102109
this.tableSchemaYamlMap = tableSchemaYamlMapBuilder.build();
103-
return tableHandlesBuilder.build();
110+
return tableHandlesBuilder.build();
104111
}
105112
catch (IOException e) {
106113
log.error(format("Failed to parse metadata file: %s, error: %s", config.getMetadataYamlPath(), e.getMessage()), e);
107114
}
108115
return Collections.emptyList();
109116
}
110117

111-
private void collectTypes(Object node, String prefix, ImmutableList.Builder<Pair<String, Byte>> fullNamesBuilder)
118+
private void collectTypes(Object node, String prefix, ImmutableList.Builder<String> namesBuilder, ImmutableList.Builder<Byte> typesBuilder)
112119
{
113120
if (node instanceof Number) {
114-
fullNamesBuilder.add(Pair.of(prefix, ((Number) node).byteValue()));
121+
namesBuilder.add(prefix);
122+
typesBuilder.add(((Number) node).byteValue());
115123
return;
116124
}
117125
if (node instanceof List) {
118126
for (Number type : (List<Number>) node) {
119-
fullNamesBuilder.add(Pair.of(prefix, (type.byteValue())));
127+
namesBuilder.add(prefix);
128+
typesBuilder.add(type.byteValue());
120129
}
121130
return;
122131
}
123132
for (Map.Entry<String, Object> entry : ((Map<String, Object>) node).entrySet()) {
124133
if (!prefix.isEmpty()) {
125-
collectTypes(entry.getValue(), format("%s.%s", prefix, entry.getKey()), fullNamesBuilder);
134+
collectTypes(entry.getValue(), format("%s.%s", prefix, entry.getKey()), namesBuilder, typesBuilder);
126135
continue;
127136
}
128-
collectTypes(entry.getValue(), entry.getKey(), fullNamesBuilder);
137+
collectTypes(entry.getValue(), entry.getKey(), namesBuilder, typesBuilder);
129138
}
130139
}
131140
}

presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpPinotSplitProvider.java

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,100 @@
1313
*/
1414
package com.facebook.presto.plugin.clp.split;
1515

16+
import com.facebook.airlift.log.Logger;
17+
import com.facebook.presto.plugin.clp.ClpConfig;
1618
import com.facebook.presto.plugin.clp.ClpSplit;
19+
import com.facebook.presto.plugin.clp.ClpTableHandle;
1720
import com.facebook.presto.plugin.clp.ClpTableLayoutHandle;
21+
import com.fasterxml.jackson.databind.JsonNode;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.google.common.collect.ImmutableList;
1824

25+
import javax.inject.Inject;
26+
27+
import java.io.IOException;
28+
import java.io.InputStream;
29+
import java.io.OutputStream;
30+
import java.net.HttpURLConnection;
31+
import java.net.URL;
32+
import java.nio.charset.StandardCharsets;
1933
import java.util.Collections;
34+
import java.util.Iterator;
2035
import java.util.List;
36+
import java.util.Optional;
37+
38+
import static com.facebook.presto.plugin.clp.ClpSplit.SplitType;
39+
import static com.facebook.presto.plugin.clp.ClpSplit.SplitType.ARCHIVE;
40+
import static com.facebook.presto.plugin.clp.ClpSplit.SplitType.IR;
41+
import static java.lang.String.format;
42+
import static java.util.concurrent.TimeUnit.SECONDS;
2143

2244
public class ClpPinotSplitProvider
2345
implements ClpSplitProvider
2446
{
47+
private static final Logger log = Logger.get(ClpPinotSplitProvider.class);
48+
private static final String SQL_SELECT_SPLITS_TEMPLATE = "SELECT tpath FROM %s WHERE 1 = 1 AND (%s) LIMIT 999999";
49+
private final ClpConfig config;
50+
51+
@Inject
52+
public ClpPinotSplitProvider(ClpConfig config)
53+
{
54+
this.config = config;
55+
}
56+
2557
@Override
26-
public List<ClpSplit> listSplits(ClpTableLayoutHandle clpTableLayoutHandle) {
27-
String url = "jdbc:pinot://localhost:8099";
58+
public List<ClpSplit> listSplits(ClpTableLayoutHandle clpTableLayoutHandle)
59+
{
60+
ImmutableList.Builder<ClpSplit> splits = new ImmutableList.Builder<>();
61+
ClpTableHandle clpTableHandle = clpTableLayoutHandle.getTable();
62+
String tableName = clpTableHandle.getSchemaTableName().getTableName();
63+
try {
64+
URL url = new URL(config.getMetadataDbUrl() + "/query/sql");
65+
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
66+
conn.setRequestMethod("POST");
67+
conn.setRequestProperty("Content-Type", "application/json");
68+
conn.setRequestProperty("Accept", "application/json");
69+
conn.setDoOutput(true);
70+
conn.setConnectTimeout((int) SECONDS.toMillis(5));
71+
conn.setReadTimeout((int) SECONDS.toMillis(30));
72+
73+
String query = format(SQL_SELECT_SPLITS_TEMPLATE, tableName, clpTableLayoutHandle.getMetadataSql().orElse("1 = 1"));
74+
log.info("Pinot query: %s", query);
75+
ObjectMapper mapper = new ObjectMapper();
76+
String body = format("{\"sql\": %s }", mapper.writeValueAsString(query));
77+
try (OutputStream os = conn.getOutputStream()) {
78+
os.write(body.getBytes(StandardCharsets.UTF_8));
79+
}
80+
81+
int code = conn.getResponseCode();
82+
InputStream is = (code >= 200 && code < 300) ? conn.getInputStream() : conn.getErrorStream();
83+
if (is == null) {
84+
throw new IOException("Pinot HTTP " + code + " with empty body");
85+
}
86+
87+
JsonNode root;
88+
try (InputStream in = is) {
89+
root = mapper.readTree(in);
90+
}
91+
JsonNode resultTable = root.get("resultTable");
92+
if (resultTable == null) {
93+
throw new RuntimeException("No \"resultTable\" field found");
94+
}
95+
JsonNode rows = resultTable.get("rows");
96+
if (rows == null) {
97+
throw new RuntimeException("No \"rows\" field found");
98+
}
99+
for (Iterator<JsonNode> it = rows.elements(); it.hasNext(); ) {
100+
JsonNode row = it.next();
101+
String splitPath = row.elements().next().asText();
102+
SplitType splitType = splitPath.endsWith(".clp.zst") ? IR : ARCHIVE;
103+
splits.add(new ClpSplit(splitPath, splitType, Optional.empty()));
104+
}
105+
return splits.build();
106+
}
107+
catch (Exception e) {
108+
log.error(e);
109+
}
28110

29111
return Collections.emptyList();
30112
}

presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpYamlMetadata.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@
1313
*/
1414
package com.facebook.presto.plugin.clp;
1515

16-
import com.facebook.presto.common.type.ArrayType;
17-
import com.facebook.presto.common.type.RowType;
1816
import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
1917
import com.facebook.presto.plugin.clp.metadata.ClpYamlMetadataProvider;
20-
import com.facebook.presto.spi.ColumnHandle;
18+
import com.facebook.presto.plugin.clp.split.ClpPinotSplitProvider;
19+
import com.facebook.presto.plugin.clp.split.ClpSplitProvider;
2120
import com.facebook.presto.spi.ColumnMetadata;
2221
import com.facebook.presto.spi.ConnectorTableMetadata;
2322
import com.facebook.presto.spi.SchemaTableName;
@@ -27,23 +26,21 @@
2726
import org.testng.annotations.Test;
2827

2928
import java.util.HashSet;
29+
import java.util.List;
3030
import java.util.Optional;
3131

32-
import static com.facebook.presto.common.type.BigintType.BIGINT;
33-
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
34-
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
35-
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
3632
import static com.facebook.presto.plugin.clp.ClpConfig.MetadataProviderType.YAML;
3733
import static com.facebook.presto.plugin.clp.ClpMetadata.DEFAULT_SCHEMA_NAME;
3834
import static com.facebook.presto.testing.TestingConnectorSession.SESSION;
3935
import static org.testng.Assert.assertEquals;
4036

4137
public class TestClpYamlMetadata
4238
{
43-
private static final String PINOT_BROKER_URL = "jdbc:pinot://localhost:8099";
39+
private static final String PINOT_BROKER_URL = "http://localhost:8099";
4440
private static final String METADATA_YAML_PATH = "/home/xiaochong-dev/presto-e2e/pinot/tables-schema.yaml";
4541
private static final String TABLE_NAME = "cockroachdb";
4642
private ClpMetadata metadata;
43+
private ClpSplitProvider clpSplitProvider;
4744

4845
@BeforeTest
4946
public void setUp()
@@ -55,6 +52,7 @@ public void setUp()
5552
.setMetadataYamlPath(METADATA_YAML_PATH);
5653
ClpMetadataProvider metadataProvider = new ClpYamlMetadataProvider(config);
5754
metadata = new ClpMetadata(config, metadataProvider);
55+
clpSplitProvider = new ClpPinotSplitProvider(config);
5856
}
5957

6058
@Test
@@ -71,6 +69,17 @@ public void testListTables()
7169
assertEquals(new HashSet<>(metadata.listTables(SESSION, Optional.empty())), builder.build());
7270
}
7371

72+
@Test
73+
public void testListSplits()
74+
{
75+
ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle(
76+
new ClpTableHandle(new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME), ""),
77+
Optional.empty(),
78+
Optional.empty());
79+
List<ClpSplit> result = clpSplitProvider.listSplits(layoutHandle);
80+
System.out.println("Hello world");
81+
}
82+
7483
@Test
7584
public void testGetTableMetadata()
7685
{

presto-native-execution/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,16 @@
267267
</dependency>
268268
</dependencies>
269269

270+
<dependencyManagement>
271+
<dependencies>
272+
<dependency>
273+
<groupId>org.yaml</groupId>
274+
<artifactId>snakeyaml</artifactId>
275+
<version>2.1</version> <!-- upper bound -->
276+
</dependency>
277+
</dependencies>
278+
</dependencyManagement>
279+
270280
<build>
271281
<plugins>
272282
<!-- Disable git-commit-id-plugin plugin to allow for running tests without

0 commit comments

Comments
 (0)