Skip to content

Commit 2cfeb31

Browse files
authored
SnowflakeIO: filter on database and schema when searching for existing table (#34486)
1 parent 8c7ed02 commit 2cfeb31

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceImpl.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ private void copyToTable(SnowflakeBatchServiceConfig config) throws SQLException
132132
files = files.replaceAll(stagingBucketDir, "");
133133
DataSource dataSource = dataSourceProviderFn.apply(null);
134134

135-
prepareTableAccordingCreateDisposition(dataSource, table, tableSchema, createDisposition);
135+
prepareTableAccordingCreateDisposition(
136+
dataSource, database, schema, table, tableSchema, createDisposition);
136137
prepareTableAccordingWriteDisposition(dataSource, table, writeDisposition);
137138

138139
if (!storageIntegrationName.isEmpty()) {
@@ -193,6 +194,8 @@ private static boolean checkIfTableIsEmpty(ResultSet resultSet, int columnId)
193194

194195
private void prepareTableAccordingCreateDisposition(
195196
DataSource dataSource,
197+
String database,
198+
String schema,
196199
String table,
197200
SnowflakeTableSchema tableSchema,
198201
CreateDisposition createDisposition)
@@ -201,7 +204,7 @@ private void prepareTableAccordingCreateDisposition(
201204
case CREATE_NEVER:
202205
break;
203206
case CREATE_IF_NEEDED:
204-
createTableIfNotExists(dataSource, table, tableSchema);
207+
createTableIfNotExists(dataSource, database, schema, table, tableSchema);
205208
break;
206209
}
207210
}
@@ -222,11 +225,16 @@ private void prepareTableAccordingWriteDisposition(
222225
}
223226

224227
private void createTableIfNotExists(
225-
DataSource dataSource, String table, SnowflakeTableSchema tableSchema) throws SQLException {
228+
DataSource dataSource,
229+
String database,
230+
String schema,
231+
String table,
232+
SnowflakeTableSchema tableSchema)
233+
throws SQLException {
226234
String query =
227235
String.format(
228-
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '%s');",
229-
table.toUpperCase());
236+
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s');",
237+
database.toUpperCase(), schema.toUpperCase(), table.toUpperCase());
230238

231239
runConnectionWithStatement(
232240
dataSource,

0 commit comments

Comments
 (0)