Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -829,12 +829,13 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
{
try {
if (shouldUseFaultTolerantExecution(session)) {
// Create the target table
createTable(session, tableMetadata);
JdbcOutputTableHandle destinationTableHandle = createTable(session, tableMetadata, tableMetadata.getTable().getTableName());
rollbackActionConsumer.accept(() -> rollbackCreateDestinationTable(session, destinationTableHandle.getRemoteTableName()));
// Create the temporary table
ColumnMetadata pageSinkIdColumn = getPageSinkIdColumn(
tableMetadata.getColumns().stream().map(ColumnMetadata::getName).toList());
Expand All @@ -849,6 +850,11 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
}
}

protected void rollbackCreateDestinationTable(ConnectorSession session, RemoteTableName remoteTableName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollbackTableCreation / rollbackCreateTable

{
dropTable(session, remoteTableName, false);
}

protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, String targetTableName)
throws SQLException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,9 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
{
return delegate.beginCreateTable(session, tableMetadata);
return delegate.beginCreateTable(session, tableMetadata, rollbackActionConsumer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public class DefaultJdbcMetadata
private final boolean precalculateStatisticsForPushdown;
private final Set<JdbcQueryEventListener> jdbcQueryEventListeners;

private final List<Runnable> rollbackActions = new ArrayList<>();
protected final List<Runnable> rollbackActions = new ArrayList<>();

public DefaultJdbcMetadata(
JdbcClient jdbcClient,
Expand Down Expand Up @@ -1199,7 +1199,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
JdbcOutputTableHandle handle = jdbcClient.beginCreateTable(session, tableMetadata);
JdbcOutputTableHandle handle = jdbcClient.beginCreateTable(session, tableMetadata, rollbackActions::add);
rollbackActions.add(() -> jdbcClient.rollbackCreateTable(session, handle));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we had rollback here. do we rollback twice now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rollbackCreateTable is mainly for the clean ups of the temporary tables

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the responsibilities are now blurred, aren't they?
maybe we can clarify them with better naming or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the responsibilities are now blurred, aren't they?

now, yes. But I think the name is ok since we are inside the "createTable" method, that's is fair to have a rollback method like "rollbackCreateTable", it just our implementation only do cleanups of temporary table. Ideally if we could do the cleanup of target table inside this method as well

@findinpath #27848 the method is going to split such cleanups into two methods(we already do), but finally, I think that's more elegant that we should only has one rollback methods for "createTable", caller don't care or needs to know the cleanups table is "temporary" or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While rolling back the creation of the destination table is characteristic only for CTAS statements, rolling back the creation of a temporary table can occur on any DML statement.
I see a good rationale on having two separate methods for doing rollback for table creation.

return handle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ public Optional<PreparedQuery> legacyImplementJoin(
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
{
return delegate().beginCreateTable(session, tableMetadata);
return delegate().beginCreateTable(session, tableMetadata, rollbackActionConsumer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ default void setTableProperties(ConnectorSession session, JdbcTableHandle handle

void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);

JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);
JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollbackActionCollector would be a better name

more importantly, this is a rather odd abstraction and doesn't seem necessary.
so far it was JdbcMetadata who was respomnsible for instantiating and remembering rollback actions. let's not change that, if we don't have to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollbackActionCollector would be a better name

+1

so far it was JdbcMetadata who was respomnsible for instantiating and remembering rollback actions

We are entering a situation where it is unclear when it is safe to roll back the target table during CTAS. For example, in BaseJdbcClient#beginCreateTable, when FTE is enabled, both the target table and a temporary table are created in the method, during rollback in DefaultJdbcMetadata, we cannot reliably determine which tables should be cleaned up

let's not change that, if we don't have to

For now, this works as a solution, but I believe we could evolve the API to shift the responsibility back to DefaultJdbcMetadata. For example, we could change the method's return value so that DefaultJdbcMetadata knows exactly which tables need to be cleaned up

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollbackActionCollector would be a better name

+1

Included in

however, the name may still be imperfect -- #27702 (comment)

in BaseJdbcClient#beginCreateTable, when FTE is enabled, both the target table and a temporary table are created in the method

I made the same mistake. It's not FTE-related despite the code making it look so.

during rollback in DefaultJdbcMetadata, we cannot reliably determine which tables should be cleaned up

It's unclear why destination table is created there. I think it might be possible to defer that statement until later.

let's not change that, if we don't have to

For now, this works as a solution, but I believe we could evolve the API to shift the responsibility back to DefaultJdbcMetadata. For example, we could change the method's return value so that DefaultJdbcMetadata knows exactly which tables need to be cleaned up

That was also my intention.
I stopped short from execution on it in

when i saw the beginMerge flow. We create high number of tables there, and someone needs to clean them up if we fail mid-way.


void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
{
// no retrying as it could be not idempotent operation
return delegate.beginCreateTable(session, tableMetadata);
return delegate.beginCreateTable(session, tableMetadata, rollbackActionConsumer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
{
return stats.getBeginCreateTable().wrap(() -> delegate().beginCreateTable(session, tableMetadata));
return stats.getBeginCreateTable().wrap(() -> delegate().beginCreateTable(session, tableMetadata, rollbackActionConsumer));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
Expand Down Expand Up @@ -476,7 +477,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with data");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanColumnMapping;
Expand Down Expand Up @@ -314,7 +315,7 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with data");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
Expand Down Expand Up @@ -381,7 +382,7 @@ private static void checkDateValue(long epochDay)
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
{
if (tableMetadata.getComment().isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment");
Expand Down Expand Up @@ -417,13 +418,14 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto

try (Connection connection = connectionFactory.openConnection(session)) {
execute(session, connection, sql);

return new IgniteOutputTableHandle(
IgniteOutputTableHandle destinationTableHandle = new IgniteOutputTableHandle(
new RemoteTableName(Optional.empty(), Optional.of(schemaTableName.getSchemaName()), schemaTableName.getTableName()),
columnNames,
columnTypes.build(),
Optional.empty(),
primaryKeys.isEmpty() ? Optional.of(IGNITE_DUMMY_ID) : Optional.empty());
rollbackActionConsumer.accept(() -> rollbackCreateDestinationTable(session, destinationTableHandle.getRemoteTableName()));
return destinationTableHandle;
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
if (saveMode == REPLACE) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
igniteClient.beginCreateTable(session, tableMetadata);
igniteClient.beginCreateTable(session, tableMetadata, _ -> {});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document why it's ok to ignore a rollback action
(it should be actually safer not to ignore -- this is CREATE TABLE flow. if nothing follows CREATE TABLE, nothing fails later and no rollback will take place)

}

@Override
Expand All @@ -232,7 +232,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
return igniteClient.beginCreateTable(session, tableMetadata);
return igniteClient.beginCreateTable(session, tableMetadata, rollbackActions::add);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.firstNonNull;
Expand Down Expand Up @@ -363,9 +364,9 @@ protected void dropSchema(ConnectorSession session, Connection connection, Strin
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
{
JdbcOutputTableHandle table = super.beginCreateTable(session, tableMetadata);
JdbcOutputTableHandle table = super.beginCreateTable(session, tableMetadata, rollbackActionConsumer);
enableTableLockOnBulkLoadTableOption(session, table);
return table;
}
Expand Down
Loading