Skip to content

Commit ed55acc

Browse files
committed
Fixed several issues and refactored the test files
1 parent f6f09cc commit ed55acc

File tree

11 files changed

+326
-312
lines changed

11 files changed

+326
-312
lines changed

presto-clp/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,5 +124,11 @@
124124
<artifactId>commons-math3</artifactId>
125125
<scope>test</scope>
126126
</dependency>
127+
128+
<dependency>
129+
<groupId>commons-io</groupId>
130+
<artifactId>commons-io</artifactId>
131+
<scope>test</scope>
132+
</dependency>
127133
</dependencies>
128134
</project>

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ public boolean isNullable()
7575

7676
public ColumnMetadata getColumnMetadata()
7777
{
78-
ColumnMetadata.Builder builder = ColumnMetadata.builder()
78+
return ColumnMetadata.builder()
7979
.setName(columnName)
8080
.setType(columnType)
81-
.setNullable(nullable);
82-
return builder.build();
81+
.setNullable(nullable)
82+
.build();
8383
}
8484

8585
@Override

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpExpression.java

Lines changed: 0 additions & 70 deletions
This file was deleted.

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
public class ClpMetadata
4747
implements ConnectorMetadata
4848
{
49-
private static final String DEFAULT_SCHEMA_NAME = "default";
49+
public static final String DEFAULT_SCHEMA_NAME = "default";
5050
private final ClpMetadataProvider clpMetadataProvider;
5151
private final LoadingCache<SchemaTableName, List<ClpColumnHandle>> columnHandleCache;
5252
private final LoadingCache<String, List<ClpTableHandle>> tableHandleCache;
@@ -63,7 +63,7 @@ public ClpMetadata(ClpConfig clpConfig, ClpMetadataProvider clpMetadataProvider)
6363
.refreshAfterWrite(clpConfig.getMetadataRefreshInterval(), SECONDS)
6464
.build(CacheLoader.from(this::loadTableHandles));
6565

66-
this.clpMetadataProvider = clpMetadataProvider;
66+
this.clpMetadataProvider = requireNonNull(clpMetadataProvider, "ClpMetadataProvider is null");
6767
}
6868

6969
private List<ClpColumnHandle> loadColumnHandles(SchemaTableName schemaTableName)
@@ -120,14 +120,14 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
120120
}
121121

122122
@Override
123-
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
124-
ConnectorTableHandle table,
125-
Constraint<ColumnHandle> constraint,
126-
Optional<Set<ColumnHandle>> desiredColumns)
123+
public ConnectorTableLayoutResult getTableLayoutForConstraint(ConnectorSession session,
124+
ConnectorTableHandle table,
125+
Constraint<ColumnHandle> constraint,
126+
Optional<Set<ColumnHandle>> desiredColumns)
127127
{
128128
ClpTableHandle tableHandle = (ClpTableHandle) table;
129129
ConnectorTableLayout layout = new ConnectorTableLayout(new ClpTableLayoutHandle(tableHandle, Optional.empty()));
130-
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
130+
return new ConnectorTableLayoutResult(layout, constraint.getSummary());
131131
}
132132

133133
@Override
@@ -163,7 +163,13 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
163163
schemaTableNames = listTables(session, Optional.ofNullable(prefix.getSchemaName()));
164164
}
165165
else {
166-
schemaTableNames = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
166+
SchemaTableName table = new SchemaTableName(schemaName, prefix.getTableName());
167+
if (listTables(session, Optional.ofNullable(schemaName)).contains(table)) {
168+
schemaTableNames = ImmutableList.of(table);
169+
}
170+
else {
171+
schemaTableNames = ImmutableList.of();
172+
}
167173
}
168174

169175
return schemaTableNames.stream()

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525

2626
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
27+
import static java.util.Objects.requireNonNull;
2728

2829
public class ClpSplit
2930
implements ConnectorSplit
@@ -33,7 +34,7 @@ public class ClpSplit
3334
@JsonCreator
3435
public ClpSplit(@JsonProperty("path") String path)
3536
{
36-
this.path = path;
37+
this.path = requireNonNull(path, "Split path is null");
3738
}
3839

3940
@JsonProperty

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public StorageType getStorageType()
6666
@Override
6767
public int hashCode()
6868
{
69-
return Objects.hash(schemaTableName, storageType);
69+
return Objects.hash(schemaTableName, tablePath, storageType);
7070
}
7171

7272
@Override

presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.facebook.presto.spi.PrestoException;
2626

2727
import java.util.ArrayList;
28+
import java.util.Collections;
2829
import java.util.HashMap;
2930
import java.util.HashSet;
3031
import java.util.List;
@@ -188,9 +189,11 @@ private String getTypeSuffix(Type type)
188189
private Type buildRowType(ClpNode node)
189190
{
190191
List<RowType.Field> fields = new ArrayList<>();
191-
for (Map.Entry<String, ClpNode> entry : node.children.entrySet()) {
192-
String name = entry.getKey();
193-
ClpNode child = entry.getValue();
192+
List<String> sortedKeys = new ArrayList<>(node.children.keySet());
193+
Collections.sort(sortedKeys);
194+
195+
for (String name : sortedKeys) {
196+
ClpNode child = node.children.get(name);
194197
Type fieldType = child.isLeaf() ? child.type : buildRowType(child);
195198
fields.add(new RowType.Field(Optional.of(name), fieldType));
196199
}
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.plugin.clp;
15+
16+
import com.facebook.airlift.log.Logger;
17+
import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
18+
import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider;
19+
import com.facebook.presto.plugin.clp.metadata.ClpNodeType;
20+
import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider;
21+
import org.apache.commons.io.FileUtils;
22+
import org.apache.commons.math3.util.Pair;
23+
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.sql.Connection;
27+
import java.sql.DriverManager;
28+
import java.sql.PreparedStatement;
29+
import java.sql.SQLException;
30+
import java.sql.Statement;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.UUID;
34+
35+
import static org.testng.Assert.fail;
36+
37+
public final class ClpMetadataDbSetUp
38+
{
39+
private static final Logger log = Logger.get(ClpMetadataDbSetUp.class);
40+
41+
public static final String metadataDbUrlTemplate =
42+
"jdbc:h2:file:%s;MODE=MySQL;DATABASE_TO_UPPER=FALSE";
43+
public static final String metadataDbTablePrefix = "clp_";
44+
public static final String metadataDbUser = "sa";
45+
public static final String metadataDbPassword = "";
46+
private static final String datasetsTableName = metadataDbTablePrefix + "datasets";
47+
48+
public static final class DbHandle
49+
{
50+
DbHandle(String dbPath)
51+
{
52+
this.dbPath = dbPath;
53+
}
54+
public String dbPath;
55+
}
56+
57+
private ClpMetadataDbSetUp()
58+
{
59+
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
60+
}
61+
62+
public static DbHandle getDbHandle(String dbName)
63+
{
64+
return new DbHandle(String.format("/tmp/presto-clp-test-%s/%s", UUID.randomUUID(), dbName));
65+
}
66+
67+
public static ClpMetadata setupMetadata(DbHandle dbHandle, Map<String, List<Pair<String, ClpNodeType>>> clpFields)
68+
{
69+
final String metadataDbUrl = String.format(metadataDbUrlTemplate, dbHandle.dbPath);
70+
final String columnMetadataTableSuffix = "_column_metadata";
71+
72+
try (Connection conn = DriverManager.getConnection(metadataDbUrl, metadataDbUser, metadataDbPassword);
73+
Statement stmt = conn.createStatement()) {
74+
createDatasetsTable(stmt);
75+
76+
for (Map.Entry<String, List<Pair<String, ClpNodeType>>> entry : clpFields.entrySet()) {
77+
String tableName = entry.getKey();
78+
String columnMetadataTableName = metadataDbTablePrefix + tableName + columnMetadataTableSuffix;
79+
String createColumnMetadataSQL = String.format(
80+
"CREATE TABLE IF NOT EXISTS %s (" +
81+
" name VARCHAR(512) NOT NULL," +
82+
" type TINYINT NOT NULL," +
83+
" PRIMARY KEY (name, type))", columnMetadataTableName);
84+
String insertColumnMetadataSQL = String.format(
85+
"INSERT INTO %s (name, type) VALUES (?, ?)", columnMetadataTableName);
86+
stmt.execute(createColumnMetadataSQL);
87+
88+
updateDatasetsTable(conn, tableName);
89+
90+
try (PreparedStatement pstmt = conn.prepareStatement(insertColumnMetadataSQL)) {
91+
for (Pair<String, ClpNodeType> record : entry.getValue()) {
92+
pstmt.setString(1, record.getFirst());
93+
pstmt.setByte(2, record.getSecond().getType());
94+
pstmt.addBatch();
95+
}
96+
pstmt.executeBatch();
97+
}
98+
}
99+
}
100+
catch (SQLException e) {
101+
fail(e.getMessage());
102+
}
103+
104+
ClpConfig config = new ClpConfig().setPolymorphicTypeEnabled(true)
105+
.setMetadataDbUrl(metadataDbUrl)
106+
.setMetadataDbUser(metadataDbUser)
107+
.setMetadataDbPassword(metadataDbPassword)
108+
.setMetadataTablePrefix(metadataDbTablePrefix);
109+
ClpMetadataProvider metadataProvider = new ClpMySqlMetadataProvider(config);
110+
return new ClpMetadata(config, metadataProvider);
111+
}
112+
113+
public static ClpMySqlSplitProvider setupSplit(DbHandle dbHandle, Map<String, List<String>> splits)
114+
{
115+
final String metadataDbUrl = String.format(metadataDbUrlTemplate, dbHandle.dbPath);
116+
final String archiveTableSuffix = "_archives";
117+
final String archiveTableFormat = metadataDbTablePrefix + "%s" + archiveTableSuffix;
118+
119+
try (Connection conn = DriverManager.getConnection(metadataDbUrl, metadataDbUser, metadataDbPassword);
120+
Statement stmt = conn.createStatement()) {
121+
createDatasetsTable(stmt);
122+
123+
// Create and populate archive tables
124+
for (Map.Entry<String, List<String>> tableSplits : splits.entrySet()) {
125+
String tableName = tableSplits.getKey();
126+
updateDatasetsTable(conn, tableName);
127+
128+
String archiveTableName = String.format(archiveTableFormat, tableSplits.getKey());
129+
String createArchiveTableSQL = String.format(
130+
"CREATE TABLE IF NOT EXISTS %s (" +
131+
"pagination_id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, " +
132+
"id VARCHAR(64) NOT NULL" +
133+
")",
134+
archiveTableName);
135+
stmt.execute(createArchiveTableSQL);
136+
137+
String insertArchiveTableSQL = String.format("INSERT INTO %s (id) VALUES (?)", archiveTableName);
138+
try (PreparedStatement pstmt = conn.prepareStatement(insertArchiveTableSQL)) {
139+
for (String splitPath : tableSplits.getValue()) {
140+
pstmt.setString(1, splitPath);
141+
pstmt.addBatch();
142+
}
143+
pstmt.executeBatch();
144+
}
145+
}
146+
}
147+
catch (SQLException e) {
148+
fail(e.getMessage());
149+
}
150+
151+
return new ClpMySqlSplitProvider(
152+
new ClpConfig()
153+
.setPolymorphicTypeEnabled(true)
154+
.setMetadataDbUrl(metadataDbUrl)
155+
.setMetadataDbUser(metadataDbUser)
156+
.setMetadataDbPassword(metadataDbPassword)
157+
.setMetadataTablePrefix(metadataDbTablePrefix));
158+
}
159+
160+
public static void tearDown(DbHandle dbHandle)
161+
{
162+
File dir = new File(dbHandle.dbPath).getParentFile();
163+
if (dir.exists()) {
164+
try {
165+
FileUtils.deleteDirectory(dir);
166+
log.info("Deleted database dir" + dir.getAbsolutePath());
167+
}
168+
catch (IOException e) {
169+
log.warn("Failed to delete directory " + dir + ": " + e.getMessage());
170+
}
171+
}
172+
}
173+
174+
private static void createDatasetsTable(Statement stmt) throws SQLException
175+
{
176+
final String createDatasetTableSQL = String.format(
177+
"CREATE TABLE IF NOT EXISTS %s (" +
178+
" name VARCHAR(255) PRIMARY KEY," +
179+
" archive_storage_type VARCHAR(4096) NOT NULL," +
180+
" archive_storage_directory VARCHAR(4096) NOT NULL)", datasetsTableName);
181+
stmt.execute(createDatasetTableSQL);
182+
}
183+
184+
private static void updateDatasetsTable(Connection conn, String tableName) throws SQLException
185+
{
186+
final String insertDatasetTableSQL = String.format(
187+
"INSERT INTO %s (name, archive_storage_type, archive_storage_directory) VALUES (?, ?, ?)", datasetsTableName);
188+
try (PreparedStatement pstmt = conn.prepareStatement(insertDatasetTableSQL)) {
189+
pstmt.setString(1, tableName);
190+
pstmt.setString(2, "fs");
191+
pstmt.setString(3, "/tmp/archives/" + tableName);
192+
pstmt.executeUpdate();
193+
}
194+
}
195+
}

0 commit comments

Comments
 (0)