Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ under the License.
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-shade</artifactId>
<version>0.4-SNAPSHOT</version>
</dependency>
</dependencies>

<!-- Activate these profiles with -Ptrino-xxx to build and test against different Trino versions -->
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/apache/paimon/trino/TrinoConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
Expand All @@ -41,6 +42,7 @@ public class TrinoConnector implements Connector {
private final TrinoMetadataBase trinoMetadata;
private final TrinoSplitManagerBase trinoSplitManager;
private final TrinoPageSourceProvider trinoPageSourceProvider;
private final List<PropertyMetadata<?>> tableProperties;

public TrinoConnector(
TrinoMetadataBase trinoMetadata,
Expand All @@ -50,6 +52,8 @@ public TrinoConnector(
this.trinoSplitManager = requireNonNull(trinoSplitManager, "jmxSplitManager is null");
this.trinoPageSourceProvider =
requireNonNull(trinoPageSourceProvider, "jmxRecordSetProvider is null");
tableProperties = new TrinoTableOptions().getTableProperties().stream()
.collect(toImmutableList());;
}

@Override
Expand Down Expand Up @@ -89,4 +93,9 @@ public List<PropertyMetadata<?>> getSessionProperties() {
true)
);
}

@Override
public List<PropertyMetadata<?>> getTableProperties() {
return tableProperties;
}
}
152 changes: 151 additions & 1 deletion src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.paimon.trino;

import io.trino.spi.security.TrinoPrincipal;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.utils.InstantiationUtil;

import io.trino.spi.connector.Assignment;
Expand All @@ -41,6 +43,7 @@
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.predicate.TupleDomain;
import org.apache.paimon.utils.StringUtils;


import java.io.IOException;
Expand All @@ -54,9 +57,11 @@
import java.util.Optional;
import java.util.function.Function;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument;

/** Trino {@link ConnectorMetadata}. */
public abstract class TrinoMetadataBase implements ConnectorMetadata {
Expand All @@ -76,6 +81,33 @@ private List<String> listSchemaNames() {
return catalog.listDatabases();
}

@Override
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties, TrinoPrincipal owner) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(schemaName),
"schemaName cannot be null or empty");

try {
catalog.createDatabase(schemaName, true);
} catch (Catalog.DatabaseAlreadyExistException e) {
throw new RuntimeException(format("database already existed: '%s'", schemaName));
}
}

@Override
public void dropSchema(ConnectorSession session, String schemaName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(schemaName),
"schemaName cannot be null or empty");
try {
catalog.dropDatabase(schemaName, false, true);
} catch (Catalog.DatabaseNotEmptyException e) {
throw new RuntimeException(format("database is not empty: '%s'", schemaName));
} catch (Catalog.DatabaseNotExistException e) {
throw new RuntimeException(format("database not exists: '%s'", schemaName));
}
}

@Override
public TrinoTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
return getTableHandle(tableName);
Expand Down Expand Up @@ -130,6 +162,83 @@ private List<SchemaTableName> listTables(String schema) {
}
}

@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) {
SchemaTableName table = tableMetadata.getTable();
Identifier identifier = Identifier.create(table.getSchemaName(), table.getTableName());

try {
catalog.createTable(identifier, prepareSchema(tableMetadata), false);
} catch (Catalog.DatabaseNotExistException e) {
throw new RuntimeException(format("database not exists: '%s'", table.getSchemaName()));
} catch (Catalog.TableAlreadyExistException e) {
throw new RuntimeException(format("table already existed: '%s'", table.getTableName()));
}
}

@Override
public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Optional<Object>> properties) {
TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
Identifier identifier = new Identifier(trinoTableHandle.getSchemaName(), trinoTableHandle.getTableName());
List<SchemaChange> changes = new ArrayList<>();
Map<String, String> options = properties.entrySet().stream()
.collect(toMap(Map.Entry::getKey, e -> (String) e.getValue().get()));
options.forEach((key, value) -> changes.add(SchemaChange.setOption(key, value)));
// TODO: remove options, SET PROPERTIES x = DEFAULT
try {
catalog.alterTable(identifier,changes,false);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(format("table not exists: '%s'", trinoTableHandle.getTableName()));
}
}

private Schema prepareSchema(ConnectorTableMetadata tableMetadata) {
Map<String, Object> properties = new HashMap<>(tableMetadata.getProperties());
Schema.Builder builder =
Schema.newBuilder()
.primaryKey(TrinoTableOptions.getPrimaryKeys(properties))
.partitionKeys(TrinoTableOptions.getPartitionedKeys(properties));

for (ColumnMetadata column : tableMetadata.getColumns()) {
builder.column(
column.getName(),
TrinoTypeUtils.toPaimonType(column.getType()),
column.getComment());
}

TrinoTableOptionUtils.buildOptions(builder, properties);

return builder.build();
}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName) {
TrinoTableHandle oldTableHandle = (TrinoTableHandle) tableHandle;
try {
catalog.renameTable(
new Identifier(oldTableHandle.getSchemaName(), oldTableHandle.getTableName()),
new Identifier(newTableName.getSchemaName(), newTableName.getTableName()),
false);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(format("table not exists: '%s'", oldTableHandle.getTableName()));
} catch (Catalog.TableAlreadyExistException e) {
throw new RuntimeException(format("table already existed: '%s'", newTableName.getTableName()));
}
}

@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) {
TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
try {
catalog.dropTable(
new Identifier(
trinoTableHandle.getSchemaName(), trinoTableHandle.getTableName()),
false);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(format("table not exists: '%s'", trinoTableHandle.getTableName()));
}
}

@Override
public Map<String, ColumnHandle> getColumnHandles(
ConnectorSession session, ConnectorTableHandle tableHandle) {
Expand Down Expand Up @@ -162,6 +271,47 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
table -> getTableHandle(session, table).columnMetadatas()));
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) {
TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
Identifier identifier = new Identifier(trinoTableHandle.getSchemaName(), trinoTableHandle.getTableName());
List<SchemaChange> changes = new ArrayList<>();
changes.add(SchemaChange.addColumn(column.getName(),TrinoTypeUtils.toPaimonType(column.getType())));
try {
catalog.alterTable(identifier,changes,false);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(format("table not exists: '%s'", trinoTableHandle.getTableName()));
}
}

@Override
public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle source, String target) {
TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
Identifier identifier = new Identifier(trinoTableHandle.getSchemaName(), trinoTableHandle.getTableName());
TrinoColumnHandle trinoColumnHandle = (TrinoColumnHandle) source;
List<SchemaChange> changes = new ArrayList<>();
changes.add(SchemaChange.renameColumn(trinoColumnHandle.getColumnName(),target));
try {
catalog.alterTable(identifier,changes,false);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(format("table not exists: '%s'", trinoTableHandle.getTableName()));
}
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) {
TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
Identifier identifier = new Identifier(trinoTableHandle.getSchemaName(), trinoTableHandle.getTableName());
TrinoColumnHandle trinoColumnHandle = (TrinoColumnHandle) column;
List<SchemaChange> changes = new ArrayList<>();
changes.add(SchemaChange.dropColumn(trinoColumnHandle.getColumnName()));
try {
catalog.alterTable(identifier,changes,false);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(format("table not exists: '%s'", trinoTableHandle.getTableName()));
}
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
ConnectorSession session, ConnectorTableHandle handle, Constraint constraint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import static org.apache.flink.shaded.guava30.com.google.common.base.Verify.verify;

/** Trino {@link ConnectorPageSource}. */
public abstract class TrinoPageSourceBase implements ConnectorPageSource {
public abstract class TrinoPageSourceBase implements ConnectorPageSource {

private final RecordReader<InternalRow> reader;
private final PageBuilder pageBuilder;
Expand Down
Loading