Skip to content

Commit 88664e3

Browse files
committed
Add rollback action for destination table for CTAS statements
While CREATE TABLE is an atomic operation, CREATE TABLE AS SELECT statement happens in two phases for JDBC connectors from Trino: - creating the table - inserting content into the table If the insertion of the data fails, the CTAS may leave residual empty tables behind. The JDBC connectors, in dealing with CREATE TABLE AS SELECT scenarios, remove the destination table in case of dealing with exceptions while inserting the content.
1 parent a8025f0 commit 88664e3

File tree

12 files changed

+34
-22
lines changed

12 files changed

+34
-22
lines changed

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -829,12 +829,13 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
829829
}
830830

831831
@Override
832-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
832+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
833833
{
834834
try {
835835
if (shouldUseFaultTolerantExecution(session)) {
836836
// Create the target table
837-
createTable(session, tableMetadata);
837+
JdbcOutputTableHandle destinationTableHandle = createTable(session, tableMetadata, tableMetadata.getTable().getTableName());
838+
rollbackActionConsumer.accept(() -> rollbackCreateDestinationTable(session, destinationTableHandle));
838839
// Create the temporary table
839840
ColumnMetadata pageSinkIdColumn = getPageSinkIdColumn(
840841
tableMetadata.getColumns().stream().map(ColumnMetadata::getName).toList());
@@ -849,6 +850,11 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
849850
}
850851
}
851852

853+
protected void rollbackCreateDestinationTable(ConnectorSession session, JdbcOutputTableHandle handle)
854+
{
855+
dropTable(session, handle.getRemoteTableName(), false);
856+
}
857+
852858
protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, String targetTableName)
853859
throws SQLException
854860
{

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -600,9 +600,9 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
600600
}
601601

602602
@Override
603-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
603+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
604604
{
605-
return delegate.beginCreateTable(session, tableMetadata);
605+
return delegate.beginCreateTable(session, tableMetadata, rollbackActionConsumer);
606606
}
607607

608608
@Override

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public class DefaultJdbcMetadata
137137
private final boolean precalculateStatisticsForPushdown;
138138
private final Set<JdbcQueryEventListener> jdbcQueryEventListeners;
139139

140-
private final List<Runnable> rollbackActions = new ArrayList<>();
140+
protected final List<Runnable> rollbackActions = new ArrayList<>();
141141

142142
public DefaultJdbcMetadata(
143143
JdbcClient jdbcClient,
@@ -1199,7 +1199,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
11991199
if (replace) {
12001200
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
12011201
}
1202-
JdbcOutputTableHandle handle = jdbcClient.beginCreateTable(session, tableMetadata);
1202+
JdbcOutputTableHandle handle = jdbcClient.beginCreateTable(session, tableMetadata, rollbackActions::add);
12031203
rollbackActions.add(() -> jdbcClient.rollbackCreateTable(session, handle));
12041204
return handle;
12051205
}

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ public Optional<PreparedQuery> legacyImplementJoin(
265265
}
266266

267267
@Override
268-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
268+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
269269
{
270-
return delegate().beginCreateTable(session, tableMetadata);
270+
return delegate().beginCreateTable(session, tableMetadata, rollbackActionConsumer);
271271
}
272272

273273
@Override

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ default void setTableProperties(ConnectorSession session, JdbcTableHandle handle
210210

211211
void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);
212212

213-
JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);
213+
JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer);
214214

215215
void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds);
216216

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,10 +368,10 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
368368
}
369369

370370
@Override
371-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
371+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
372372
{
373373
// no retrying as it could be not idempotent operation
374-
return delegate.beginCreateTable(session, tableMetadata);
374+
return delegate.beginCreateTable(session, tableMetadata, rollbackActionConsumer);
375375
}
376376

377377
@Override

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,9 +353,9 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
353353
}
354354

355355
@Override
356-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
356+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
357357
{
358-
return stats.getBeginCreateTable().wrap(() -> delegate().beginCreateTable(session, tableMetadata));
358+
return stats.getBeginCreateTable().wrap(() -> delegate().beginCreateTable(session, tableMetadata, rollbackActionConsumer));
359359
}
360360

361361
@Override

plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import java.util.Set;
7575
import java.util.TimeZone;
7676
import java.util.function.BiFunction;
77+
import java.util.function.Consumer;
7778

7879
import static com.google.common.base.Preconditions.checkArgument;
7980
import static com.google.common.collect.Iterables.getOnlyElement;
@@ -476,7 +477,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
476477
}
477478

478479
@Override
479-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
480+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
480481
{
481482
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with data");
482483
}

plugin/trino-exasol/src/main/java/io/trino/plugin/exasol/ExasolClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.OptionalLong;
5757
import java.util.Set;
5858
import java.util.function.BiFunction;
59+
import java.util.function.Consumer;
5960

6061
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping;
6162
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanColumnMapping;
@@ -314,7 +315,7 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
314315
}
315316

316317
@Override
317-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
318+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
318319
{
319320
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with data");
320321
}

plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteClient.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import java.util.Optional;
8484
import java.util.Set;
8585
import java.util.function.BiFunction;
86+
import java.util.function.Consumer;
8687

8788
import static com.google.common.base.Preconditions.checkArgument;
8889
import static com.google.common.base.Strings.isNullOrEmpty;
@@ -381,7 +382,7 @@ private static void checkDateValue(long epochDay)
381382
}
382383

383384
@Override
384-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
385+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
385386
{
386387
if (tableMetadata.getComment().isPresent()) {
387388
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment");
@@ -417,13 +418,15 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
417418

418419
try (Connection connection = connectionFactory.openConnection(session)) {
419420
execute(session, connection, sql);
420-
421-
return new IgniteOutputTableHandle(
421+
IgniteOutputTableHandle destinationTableHandle = new IgniteOutputTableHandle(
422422
new RemoteTableName(Optional.empty(), Optional.of(schemaTableName.getSchemaName()), schemaTableName.getTableName()),
423423
columnNames,
424424
columnTypes.build(),
425425
Optional.empty(),
426426
primaryKeys.isEmpty() ? Optional.of(IGNITE_DUMMY_ID) : Optional.empty());
427+
rollbackActionConsumer.accept(() -> rollbackCreateDestinationTable(session, destinationTableHandle));
428+
return destinationTableHandle;
429+
427430
}
428431
catch (SQLException e) {
429432
throw new TrinoException(JDBC_ERROR, e);

0 commit comments

Comments
 (0)