Skip to content

Commit 540828e

Browse files
committed
Add rollback action for destination table for CTAS statements
While CREATE TABLE is an atomic operation, CREATE TABLE AS SELECT statement happens in two phases for JDBC connectors from Trino: - creating the table - inserting content into the table If the insertion of the data fails, the CTAS may leave residual empty tables behind. The JDBC connectors, in dealing with CREATE TABLE AS SELECT scenarios, remove the destination table in case of dealing with exceptions while inserting the content.
1 parent a8025f0 commit 540828e

File tree

14 files changed

+49
-37
lines changed

14 files changed

+49
-37
lines changed

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -818,23 +818,24 @@ protected PreparedQuery applyQueryTransformations(JdbcTableHandle tableHandle, P
818818
}
819819

820820
@Override
821-
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
821+
public JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
822822
{
823823
try {
824-
createTable(session, tableMetadata, tableMetadata.getTable().getTableName());
824+
return createTable(session, tableMetadata, tableMetadata.getTable().getTableName());
825825
}
826826
catch (SQLException e) {
827827
throw new TrinoException(JDBC_ERROR, e);
828828
}
829829
}
830830

831831
@Override
832-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
832+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
833833
{
834834
try {
835835
if (shouldUseFaultTolerantExecution(session)) {
836836
// Create the target table
837-
createTable(session, tableMetadata);
837+
JdbcOutputTableHandle destinationTableHandle = createTable(session, tableMetadata);
838+
rollbackActionConsumer.accept(() -> rollbackCreateDestinationTable(session, destinationTableHandle));
838839
// Create the temporary table
839840
ColumnMetadata pageSinkIdColumn = getPageSinkIdColumn(
840841
tableMetadata.getColumns().stream().map(ColumnMetadata::getName).toList());
@@ -849,6 +850,11 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
849850
}
850851
}
851852

853+
protected void rollbackCreateDestinationTable(ConnectorSession session, JdbcOutputTableHandle handle)
854+
{
855+
dropTable(session, handle.getRemoteTableName(), false);
856+
}
857+
852858
protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, String targetTableName)
853859
throws SQLException
854860
{

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -593,16 +593,17 @@ public void setTableProperties(ConnectorSession session, JdbcTableHandle handle,
593593
}
594594

595595
@Override
596-
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
596+
public JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
597597
{
598-
delegate.createTable(session, tableMetadata);
598+
JdbcOutputTableHandle outputTableHandle = delegate.createTable(session, tableMetadata);
599599
invalidateTableCaches(tableMetadata.getTable());
600+
return outputTableHandle;
600601
}
601602

602603
@Override
603-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
604+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
604605
{
605-
return delegate.beginCreateTable(session, tableMetadata);
606+
return delegate.beginCreateTable(session, tableMetadata, rollbackActionConsumer);
606607
}
607608

608609
@Override

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public class DefaultJdbcMetadata
137137
private final boolean precalculateStatisticsForPushdown;
138138
private final Set<JdbcQueryEventListener> jdbcQueryEventListeners;
139139

140-
private final List<Runnable> rollbackActions = new ArrayList<>();
140+
protected final List<Runnable> rollbackActions = new ArrayList<>();
141141

142142
public DefaultJdbcMetadata(
143143
JdbcClient jdbcClient,
@@ -1199,7 +1199,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
11991199
if (replace) {
12001200
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
12011201
}
1202-
JdbcOutputTableHandle handle = jdbcClient.beginCreateTable(session, tableMetadata);
1202+
JdbcOutputTableHandle handle = jdbcClient.beginCreateTable(session, tableMetadata, rollbackActions::add);
12031203
rollbackActions.add(() -> jdbcClient.rollbackCreateTable(session, handle));
12041204
return handle;
12051205
}

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ public Optional<PreparedQuery> legacyImplementJoin(
265265
}
266266

267267
@Override
268-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
268+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
269269
{
270-
return delegate().beginCreateTable(session, tableMetadata);
270+
return delegate().beginCreateTable(session, tableMetadata, rollbackActionConsumer);
271271
}
272272

273273
@Override
@@ -448,9 +448,9 @@ public void setTableProperties(ConnectorSession session, JdbcTableHandle handle,
448448
}
449449

450450
@Override
451-
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
451+
public JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
452452
{
453-
delegate().createTable(session, tableMetadata);
453+
return delegate().createTable(session, tableMetadata);
454454
}
455455

456456
@Override

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,9 @@ default void setTableProperties(ConnectorSession session, JdbcTableHandle handle
208208
throw new TrinoException(NOT_SUPPORTED, "This connector does not support setting table properties");
209209
}
210210

211-
void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);
211+
JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);
212212

213-
JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);
213+
JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer);
214214

215215
void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds);
216216

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -361,17 +361,17 @@ public void setTableProperties(ConnectorSession session, JdbcTableHandle handle,
361361
}
362362

363363
@Override
364-
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
364+
public JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
365365
{
366366
// no retrying as it could be not idempotent operation
367-
delegate.createTable(session, tableMetadata);
367+
return delegate.createTable(session, tableMetadata);
368368
}
369369

370370
@Override
371-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
371+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
372372
{
373373
// no retrying as it could be not idempotent operation
374-
return delegate.beginCreateTable(session, tableMetadata);
374+
return delegate.beginCreateTable(session, tableMetadata, rollbackActionConsumer);
375375
}
376376

377377
@Override

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,15 +347,15 @@ public void setTableProperties(ConnectorSession session, JdbcTableHandle handle,
347347
}
348348

349349
@Override
350-
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
350+
public JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
351351
{
352-
stats.getCreateTable().wrap(() -> delegate().createTable(session, tableMetadata));
352+
return stats.getCreateTable().wrap(() -> delegate().createTable(session, tableMetadata));
353353
}
354354

355355
@Override
356-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
356+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
357357
{
358-
return stats.getBeginCreateTable().wrap(() -> delegate().beginCreateTable(session, tableMetadata));
358+
return stats.getBeginCreateTable().wrap(() -> delegate().beginCreateTable(session, tableMetadata, rollbackActionConsumer));
359359
}
360360

361361
@Override

plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import java.util.Set;
7575
import java.util.TimeZone;
7676
import java.util.function.BiFunction;
77+
import java.util.function.Consumer;
7778

7879
import static com.google.common.base.Preconditions.checkArgument;
7980
import static com.google.common.collect.Iterables.getOnlyElement;
@@ -470,13 +471,13 @@ public OptionalLong update(ConnectorSession session, JdbcTableHandle handle)
470471
}
471472

472473
@Override
473-
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
474+
public JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
474475
{
475476
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables");
476477
}
477478

478479
@Override
479-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
480+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
480481
{
481482
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with data");
482483
}

plugin/trino-exasol/src/main/java/io/trino/plugin/exasol/ExasolClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.OptionalLong;
5757
import java.util.Set;
5858
import java.util.function.BiFunction;
59+
import java.util.function.Consumer;
5960

6061
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping;
6162
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanColumnMapping;
@@ -117,7 +118,7 @@ public void truncateTable(ConnectorSession session, JdbcTableHandle handle)
117118
}
118119

119120
@Override
120-
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
121+
public JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
121122
{
122123
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables");
123124
}
@@ -314,7 +315,7 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
314315
}
315316

316317
@Override
317-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
318+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
318319
{
319320
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with data");
320321
}

plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import java.util.Optional;
8484
import java.util.Set;
8585
import java.util.function.BiFunction;
86+
import java.util.function.Consumer;
8687

8788
import static com.google.common.base.Preconditions.checkArgument;
8889
import static com.google.common.base.Strings.isNullOrEmpty;
@@ -381,7 +382,7 @@ private static void checkDateValue(long epochDay)
381382
}
382383

383384
@Override
384-
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
385+
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Consumer<Runnable> rollbackActionConsumer)
385386
{
386387
if (tableMetadata.getComment().isPresent()) {
387388
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment");

0 commit comments

Comments
 (0)