Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ private void copyToTable(SnowflakeBatchServiceConfig config) throws SQLException
files = files.replaceAll(stagingBucketDir, "");
DataSource dataSource = dataSourceProviderFn.apply(null);

prepareTableAccordingCreateDisposition(dataSource, table, tableSchema, createDisposition);
prepareTableAccordingCreateDisposition(
dataSource, database, schema, table, tableSchema, createDisposition);
prepareTableAccordingWriteDisposition(dataSource, table, writeDisposition);

if (!storageIntegrationName.isEmpty()) {
Expand Down Expand Up @@ -193,6 +194,8 @@ private static boolean checkIfTableIsEmpty(ResultSet resultSet, int columnId)

private void prepareTableAccordingCreateDisposition(
DataSource dataSource,
String database,
String schema,
String table,
SnowflakeTableSchema tableSchema,
CreateDisposition createDisposition)
Expand All @@ -201,7 +204,7 @@ private void prepareTableAccordingCreateDisposition(
case CREATE_NEVER:
break;
case CREATE_IF_NEEDED:
createTableIfNotExists(dataSource, table, tableSchema);
createTableIfNotExists(dataSource, database, schema, table, tableSchema);
break;
}
}
Expand All @@ -222,11 +225,16 @@ private void prepareTableAccordingWriteDisposition(
}

private void createTableIfNotExists(
DataSource dataSource, String table, SnowflakeTableSchema tableSchema) throws SQLException {
DataSource dataSource,
String database,
String schema,
String table,
SnowflakeTableSchema tableSchema)
throws SQLException {
String query =
String.format(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '%s');",
table.toUpperCase());
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s');",
database.toUpperCase(), schema.toUpperCase(), table.toUpperCase());

runConnectionWithStatement(
dataSource,
Expand Down
Loading