Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,13 @@ public abstract class LoadUnloadConfig extends BaseSnowflakeConfig {


public LoadUnloadConfig(String accountName, String database,
String schemaName, String username, String password,
String schemaName, String tableName, String username, String password,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableName is not required here!

@Nullable Boolean keyPairEnabled, @Nullable String path,
@Nullable String passphrase, @Nullable Boolean oauth2Enabled, @Nullable String clientId,
@Nullable String clientSecret, @Nullable String refreshToken,
@Nullable String connectionArguments) {
super(accountName, database, schemaName, username, password, keyPairEnabled, path, passphrase, oauth2Enabled,
super(accountName, database, schemaName, tableName, username, password, keyPairEnabled, path, passphrase,
oauth2Enabled,
clientId, clientSecret, refreshToken, connectionArguments);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ public class LoadActionConfig extends LoadUnloadConfig {
@Nullable
private String pattern;

public LoadActionConfig(String accountName, String database, String schemaName, String username, String password,
public LoadActionConfig(String accountName, String database, String schemaName, String tableName,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableName not required here as well

String username, String password,
@Nullable Boolean keyPairEnabled, @Nullable String path, @Nullable String passphrase,
@Nullable Boolean oauth2Enabled, @Nullable String clientId, @Nullable String clientSecret,
@Nullable String refreshToken, @Nullable String connectionArguments) {
super(accountName, database, schemaName, username, password, keyPairEnabled, path, passphrase, oauth2Enabled,
super(accountName, database, tableName, schemaName, username, password, keyPairEnabled, path, passphrase,
oauth2Enabled,
clientId, clientSecret, refreshToken, connectionArguments);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ public class UnloadActionConfig extends LoadUnloadConfig {
private Boolean includeHeader;


public UnloadActionConfig(String accountName, String database, String schemaName, String username, String password,
public UnloadActionConfig(String accountName, String database, String schemaName, String tableName, String username,
String password,
@Nullable Boolean keyPairEnabled, @Nullable String path, @Nullable String passphrase,
@Nullable Boolean oauth2Enabled, @Nullable String clientId, @Nullable String clientSecret,
@Nullable String refreshToken, @Nullable String connectionArguments) {
super(accountName, database, schemaName, username, password, keyPairEnabled, path, passphrase, oauth2Enabled,
super(accountName, database, schemaName, tableName, username, password, keyPairEnabled, path,
passphrase, oauth2Enabled,
clientId, clientSecret, refreshToken, connectionArguments);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public class RunSQLConfig extends BaseSnowflakeConfig {
@Macro
private String query;

public RunSQLConfig(String accountName, String database, String schemaName, String username, String password,
public RunSQLConfig(String accountName, String database, String schemaName, String tableName, String username,
String password,
@Nullable Boolean keyPairEnabled, @Nullable String path, @Nullable String passphrase,
@Nullable Boolean oauth2Enabled, @Nullable String clientId, @Nullable String clientSecret,
@Nullable String refreshToken, @Nullable String connectionArguments) {
super(accountName, database, schemaName, username, password, keyPairEnabled, path, passphrase, oauth2Enabled,
super(accountName, database, schemaName, tableName, username, password, keyPairEnabled, path, passphrase,
oauth2Enabled,
clientId, clientSecret, refreshToken, connectionArguments);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class BaseSnowflakeConfig extends PluginConfig {
public static final String PROPERTY_ACCOUNT_NAME = "accountName";
public static final String PROPERTY_DATABASE = "database";
public static final String PROPERTY_SCHEMA_NAME = "schemaName";
public static final String PROPERTY_TABLE_NAME = "TableName";
public static final String PROPERTY_WAREHOUSE = "warehouse";
public static final String PROPERTY_ROLE = "role";
public static final String PROPERTY_USERNAME = "username";
Expand Down Expand Up @@ -63,6 +64,13 @@ public class BaseSnowflakeConfig extends PluginConfig {
@Macro
private String schemaName;

@Name(PROPERTY_TABLE_NAME)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table Name should be moved under Basic section using widgets.json

@Description("Name of the table to import data from. If specified, importQuery will be ignored.")
@Macro
@Nullable
private String tableName;


@Nullable
@Name(PROPERTY_WAREHOUSE)
@Description("Warehouse to connect to. If not specified default warehouse is used.")
Expand All @@ -87,6 +95,7 @@ public class BaseSnowflakeConfig extends PluginConfig {
@Nullable
private String password;


@Name(PROPERTY_KEY_PAIR_ENABLED)
@Description("If true, plugin will perform Key Pair authentication.")
@Nullable
Expand Down Expand Up @@ -136,6 +145,7 @@ public class BaseSnowflakeConfig extends PluginConfig {
public BaseSnowflakeConfig(String accountName,
String database,
String schemaName,
String tableName,
String username,
String password,
@Nullable Boolean keyPairEnabled,
Expand All @@ -150,6 +160,7 @@ public BaseSnowflakeConfig(String accountName,
this.database = database;
this.schemaName = schemaName;
this.username = username;
this.tableName = tableName;
this.password = password;
this.keyPairEnabled = keyPairEnabled;
this.privateKey = privateKey;
Expand All @@ -161,6 +172,7 @@ public BaseSnowflakeConfig(String accountName,
this.connectionArguments = connectionArguments;
}


public String getAccountName() {
return accountName;
}
Expand All @@ -173,6 +185,11 @@ public String getSchemaName() {
return schemaName;
}

@Nullable
public String getTableName() {
return tableName;
}

@Nullable
public String getWarehouse() {
return warehouse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,24 @@
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Field;
//import java.sql.*;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comments

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
* A class which accesses Snowflake API.
* Establishes a connection to Snowflake using BaseSnowflakeConfig.
* Initializes and configures SnowflakeBasicDataSource.
* Sets application name (CDAP) and row limit (LIMIT_ROWS).
*/

public class SnowflakeAccessor {
private static final String APPLICATION_NAME = "CDAP";
private static final int LIMIT_ROWS = 1;
Expand All @@ -61,6 +67,10 @@ public SnowflakeAccessor(BaseSnowflakeConfig config) {
initDataSource(dataSource, config);
}

/**
* A class which will help in connection
*/

public void runSQL(String query) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove javadoc

try (Connection connection = dataSource.getConnection();
PreparedStatement populateStmt = connection.prepareStatement(query);) {
Expand Down Expand Up @@ -105,6 +115,32 @@ public List<SnowflakeFieldDescriptor> describeQuery(String query) throws IOExcep
return fieldDescriptors;
}

/**
* Returns field descriptors for specified tableName.
*
* @return List of field descriptors.
* @throws IOException thrown if there are any issue with the I/O operations.
*/

public List<SnowflakeFieldDescriptor> describeTable(String schemaName, String tableName) throws SQLException {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Javadoc for public methods

List<SnowflakeFieldDescriptor> fieldDescriptors = new ArrayList<>();

try (Connection connection = dataSource.getConnection()) {
DatabaseMetaData dbMetaData = connection.getMetaData();

try (ResultSet columns = dbMetaData.getColumns(null, schemaName, tableName, null)) {
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
int columnType = columns.getInt("DATA_TYPE");
boolean nullable = columns.getInt("NULLABLE") == DatabaseMetaData.columnNullable;

fieldDescriptors.add(new SnowflakeFieldDescriptor(columnName, columnType, nullable));
}
}
}
return fieldDescriptors;
}

private void initDataSource(SnowflakeBasicDataSource dataSource, BaseSnowflakeConfig config) {
dataSource.setDatabaseName(config.getDatabase());
dataSource.setSchema(config.getSchemaName());
Expand Down Expand Up @@ -193,4 +229,8 @@ private static String writeTextToTmpFile(String text) {
throw new RuntimeException("Cannot write key to temporary file", e);
}
}

public String getSchema() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add javadoc here as well

return config.getSchemaName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.plugin.snowflake.source.batch.SnowflakeInputFormatProvider;
import io.cdap.plugin.snowflake.source.batch.SnowflakeSourceAccessor;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -64,18 +65,19 @@ public static Schema getSchema(SnowflakeBatchSourceConfig config, FailureCollect
}

SnowflakeSourceAccessor snowflakeSourceAccessor =
new SnowflakeSourceAccessor(config, SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
return getSchema(snowflakeSourceAccessor, config.getSchema(), collector, config.getImportQuery());
new SnowflakeSourceAccessor(config, SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
return getSchema(snowflakeSourceAccessor, config.getSchema(), collector, config.getTableName(),
config.getImportQuery());
}

public static Schema getSchema(SnowflakeSourceAccessor snowflakeAccessor, String schema,
FailureCollector collector, String importQuery) {
FailureCollector collector, String tableName, String importQuery) {
try {
if (!Strings.isNullOrEmpty(schema)) {
return getParsedSchema(schema);
}
return Strings.isNullOrEmpty(importQuery) ? null : getSchema(snowflakeAccessor, importQuery);
} catch (SchemaParseException e) {
return getSchema(snowflakeAccessor, tableName, importQuery);
} catch (SchemaParseException | IllegalArgumentException e) {
collector.addFailure(String.format("Unable to retrieve output schema. Reason: '%s'", e.getMessage()),
null)
.withStacktrace(e.getStackTrace())
Expand All @@ -95,15 +97,26 @@ private static Schema getParsedSchema(String schema) {
}
}

public static Schema getSchema(SnowflakeAccessor snowflakeAccessor, String importQuery) {
public static Schema getSchema(SnowflakeAccessor snowflakeAccessor,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getSchema method used within the SchemaHelper class only should be made private

String tableName, String importQuery) {
try {
List<SnowflakeFieldDescriptor> result = snowflakeAccessor.describeQuery(importQuery);
List<SnowflakeFieldDescriptor> result;
// If tableName is provided, describe the table
if (!Strings.isNullOrEmpty(tableName)) {
result = snowflakeAccessor.describeTable(snowflakeAccessor.getSchema(), tableName);
} else {
result = snowflakeAccessor.describeQuery(importQuery);
}

List<Schema.Field> fields = result.stream()
.map(fieldDescriptor -> Schema.Field.of(fieldDescriptor.getName(), getSchema(fieldDescriptor)))
.collect(Collectors.toList());
.map(fieldDescriptor -> Schema.Field.of(fieldDescriptor.getName(),
getSchema(fieldDescriptor)))
.collect(Collectors.toList());
return Schema.recordOf("data", fields);
} catch (IOException e) {
} catch (SQLException e) {
throw new SchemaParseException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public class SnowflakeSinkConfig extends BaseSnowflakeConfig {
private String copyOptions;

public SnowflakeSinkConfig(String referenceName, String accountName, String database,
String schemaName, String username, String password,
String schemaName, String tableName, String username, String password,
@Nullable Boolean keyPairEnabled, @Nullable String path,
@Nullable String passphrase, @Nullable Boolean oauth2Enabled,
@Nullable String clientId, @Nullable String clientSecret,
@Nullable String refreshToken, @Nullable String connectionArguments) {
super(accountName, database, schemaName, username, password,
super(accountName, database, schemaName, tableName, username, password,
keyPairEnabled, path, passphrase, oauth2Enabled, clientId, clientSecret, refreshToken, connectionArguments);
this.referenceName = referenceName;
}
Expand Down Expand Up @@ -104,8 +104,8 @@ private void validateInputSchema(Schema schema, FailureCollector failureCollecto
}

SnowflakeAccessor snowflakeAccessor = new SnowflakeAccessor(this);
Schema expectedSchema = SchemaHelper.getSchema(snowflakeAccessor, String.format(GET_FIELDS_QUERY, tableName));

// Schema expectedSchema = SchemaHelper.getSchema(snowflakeAccessor, String.format(GET_FIELDS_QUERY, tableName));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comments

Schema expectedSchema = SchemaHelper.getSchema(snowflakeAccessor, tableName, null);
try {
SchemaHelper.checkCompatibility(expectedSchema, schema);
} catch (IllegalArgumentException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.snowflake.source.batch;

import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -34,6 +35,7 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
public static final String PROPERTY_IMPORT_QUERY = "importQuery";
public static final String PROPERTY_MAX_SPLIT_SIZE = "maxSplitSize";
public static final String PROPERTY_SCHEMA = "schema";
// public static final String PROPERTY_TABLE_NAME = "tableName";

@Name(PROPERTY_REFERENCE_NAME)
@Description("This will be used to uniquely identify this source/sink for lineage, annotating metadata, etc.")
Expand All @@ -42,8 +44,10 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
@Name(PROPERTY_IMPORT_QUERY)
@Description("Query for import data.")
@Macro
@Nullable
private String importQuery;


@Name(PROPERTY_MAX_SPLIT_SIZE)
@Description("Maximum split size specified in bytes.")
@Macro
Expand All @@ -55,17 +59,20 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
@Macro
private String schema;


public SnowflakeBatchSourceConfig(String referenceName, String accountName, String database,
String schemaName, String importQuery, String username, String password,
String schemaName, @Nullable String importQuery, @Nullable String tableName,
String username, String password,
@Nullable Boolean keyPairEnabled, @Nullable String path,
@Nullable String passphrase, @Nullable Boolean oauth2Enabled,
@Nullable String clientId, @Nullable String clientSecret,
@Nullable String refreshToken, Long maxSplitSize,
@Nullable String connectionArguments, @Nullable String schema) {
super(accountName, database, schemaName, username, password,
super(accountName, database, schemaName, tableName, username, password,
keyPairEnabled, path, passphrase, oauth2Enabled, clientId, clientSecret, refreshToken, connectionArguments);
this.referenceName = referenceName;
this.importQuery = importQuery;
// this.tableName = tableName;
this.maxSplitSize = maxSplitSize;
this.schema = schema;
}
Expand All @@ -74,6 +81,11 @@ public String getImportQuery() {
return importQuery;
}

// @Nullable
// public String getTableName() {
// return tableName;
// }

public Long getMaxSplitSize() {
return maxSplitSize;
}
Expand All @@ -89,11 +101,14 @@ public String getSchema() {

public void validate(FailureCollector collector) {
super.validate(collector);

if (!containsMacro(PROPERTY_MAX_SPLIT_SIZE) && Objects.nonNull(maxSplitSize)
&& maxSplitSize < 0) {
collector.addFailure("Maximum Slit Size cannot be a negative number.", null)
.withConfigProperty(PROPERTY_MAX_SPLIT_SIZE);
if (!containsMacro(PROPERTY_IMPORT_QUERY) && !containsMacro(PROPERTY_TABLE_NAME)) {
if (Strings.isNullOrEmpty(getTableName()) && Strings.isNullOrEmpty(importQuery)) {
collector.addFailure("Both importQuery and tableName cannot be NULL at the same time.",
"Provide either an importQuery or a tableName.")
.withConfigProperty(PROPERTY_IMPORT_QUERY)
.withConfigProperty(PROPERTY_TABLE_NAME);
}
}

}
}
Loading