Skip to content

Commit d5a3b97

Browse files
authored
Merge pull request #43 from cloudsufi/snowFlakeError
[PLUGIN-1832] Error Management Snowflake Source and Sink, fix sonar issues, and added new Validation and fix bugs for maximum split size and NPE issue handled
2 parents 1d9cacd + fcd56b5 commit d5a3b97

18 files changed

+560
-94
lines changed

src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -230,33 +230,10 @@ public String getConnectionArguments() {
230230
}
231231

232232
public void validate(FailureCollector collector) {
233-
if (getOauth2Enabled()) {
234-
if (!containsMacro(PROPERTY_CLIENT_ID)
235-
&& Strings.isNullOrEmpty(getClientId())) {
236-
collector.addFailure("Client ID is not set.", null)
237-
.withConfigProperty(PROPERTY_CLIENT_ID);
238-
}
239-
if (!containsMacro(PROPERTY_CLIENT_SECRET)
240-
&& Strings.isNullOrEmpty(getClientSecret())) {
241-
collector.addFailure("Client Secret is not set.", null)
242-
.withConfigProperty(PROPERTY_CLIENT_SECRET);
243-
}
244-
if (!containsMacro(PROPERTY_REFRESH_TOKEN)
245-
&& Strings.isNullOrEmpty(getRefreshToken())) {
246-
collector.addFailure("Refresh Token is not set.", null)
247-
.withConfigProperty(PROPERTY_REFRESH_TOKEN);
248-
}
249-
} else if (getKeyPairEnabled()) {
250-
if (!containsMacro(PROPERTY_USERNAME)
251-
&& Strings.isNullOrEmpty(getUsername())) {
252-
collector.addFailure("Username is not set.", null)
253-
.withConfigProperty(PROPERTY_USERNAME);
254-
}
255-
if (!containsMacro(PROPERTY_PRIVATE_KEY)
256-
&& Strings.isNullOrEmpty(getPrivateKey())) {
257-
collector.addFailure("Private Key is not set.", null)
258-
.withConfigProperty(PROPERTY_PRIVATE_KEY);
259-
}
233+
if (Boolean.TRUE.equals(getOauth2Enabled())) {
234+
validateWhenOath2Enabled(collector);
235+
} else if (Boolean.TRUE.equals(getKeyPairEnabled())) {
236+
validateWhenKeyPairEnabled(collector);
260237
} else {
261238
if (!containsMacro(PROPERTY_USERNAME)
262239
&& Strings.isNullOrEmpty(getUsername())) {
@@ -272,6 +249,37 @@ public void validate(FailureCollector collector) {
272249
validateConnection(collector);
273250
}
274251

252+
private void validateWhenKeyPairEnabled(FailureCollector collector) {
253+
if (!containsMacro(PROPERTY_USERNAME)
254+
&& Strings.isNullOrEmpty(getUsername())) {
255+
collector.addFailure("Username is not set.", null)
256+
.withConfigProperty(PROPERTY_USERNAME);
257+
}
258+
if (!containsMacro(PROPERTY_PRIVATE_KEY)
259+
&& Strings.isNullOrEmpty(getPrivateKey())) {
260+
collector.addFailure("Private Key is not set.", null)
261+
.withConfigProperty(PROPERTY_PRIVATE_KEY);
262+
}
263+
}
264+
265+
private void validateWhenOath2Enabled(FailureCollector collector) {
266+
if (!containsMacro(PROPERTY_CLIENT_ID)
267+
&& Strings.isNullOrEmpty(getClientId())) {
268+
collector.addFailure("Client ID is not set.", null)
269+
.withConfigProperty(PROPERTY_CLIENT_ID);
270+
}
271+
if (!containsMacro(PROPERTY_CLIENT_SECRET)
272+
&& Strings.isNullOrEmpty(getClientSecret())) {
273+
collector.addFailure("Client Secret is not set.", null)
274+
.withConfigProperty(PROPERTY_CLIENT_SECRET);
275+
}
276+
if (!containsMacro(PROPERTY_REFRESH_TOKEN)
277+
&& Strings.isNullOrEmpty(getRefreshToken())) {
278+
collector.addFailure("Refresh Token is not set.", null)
279+
.withConfigProperty(PROPERTY_REFRESH_TOKEN);
280+
}
281+
}
282+
275283
public boolean canConnect() {
276284
return (!containsMacro(PROPERTY_DATABASE) && !containsMacro(PROPERTY_SCHEMA_NAME)
277285
&& !containsMacro(PROPERTY_ACCOUNT_NAME) && !containsMacro(PROPERTY_USERNAME)
@@ -299,7 +307,7 @@ protected void validateConnection(FailureCollector collector) {
299307
.withConfigProperty(PROPERTY_USERNAME);
300308

301309
// TODO: for oauth2
302-
if (keyPairEnabled) {
310+
if (Boolean.TRUE.equals(keyPairEnabled)) {
303311
failure.withConfigProperty(PROPERTY_PRIVATE_KEY);
304312
} else {
305313
failure.withConfigProperty(PROPERTY_PASSWORD);

src/main/java/io/cdap/plugin/snowflake/common/OAuthUtil.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919
import com.google.gson.JsonElement;
2020
import com.google.gson.JsonParser;
2121
import com.google.gson.JsonSyntaxException;
22+
import io.cdap.cdap.api.exception.ErrorCategory;
23+
import io.cdap.cdap.api.exception.ErrorType;
24+
import io.cdap.cdap.api.exception.ErrorUtils;
2225
import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException;
2326
import org.apache.http.client.methods.CloseableHttpResponse;
2427
import org.apache.http.client.methods.HttpPost;
2528
import org.apache.http.client.utils.URIBuilder;
2629
import org.apache.http.entity.StringEntity;
2730
import org.apache.http.impl.client.CloseableHttpClient;
2831
import org.apache.http.util.EntityUtils;
32+
2933
import java.io.IOException;
3034
import java.net.URI;
3135
import java.net.URISyntaxException;
@@ -50,16 +54,23 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient
5054
httppost.setHeader("Content-type", "application/x-www-form-urlencoded");
5155

5256
// set grant type and refresh_token. It should be in body not url!
53-
StringEntity entity = new StringEntity(String.format("refresh_token=%s&grant_type=refresh_token",
54-
URLEncoder.encode(config.getRefreshToken(), "UTF-8")));
55-
httppost.setEntity(entity);
57+
try {
58+
StringEntity entity = new StringEntity(String.format("refresh_token=%s&grant_type=refresh_token",
59+
URLEncoder.encode(config.getRefreshToken(), "UTF-8")));
60+
httppost.setEntity(entity);
61+
} catch (NullPointerException e) {
62+
String errorMessage = String.format("Failed to encode URL due to missing Refresh Token with message: %s.",
63+
e.getMessage());
64+
String errorReason = "Error encoding URL due to missing Refresh Token.";
65+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
66+
errorReason, errorMessage, ErrorType.USER, true, e);
67+
}
5668

5769
// set 'Authorization' header
5870
String stringToEncode = config.getClientId() + ":" + config.getClientSecret();
5971
String encondedAuthorization = new String(Base64.getEncoder().encode(stringToEncode.getBytes()));
6072
httppost.setHeader("Authorization", String.format("Basic %s", encondedAuthorization));
6173

62-
6374
CloseableHttpResponse response = httpclient.execute(httppost);
6475
String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
6576

@@ -72,7 +83,13 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient
7283

7384
// if exception happened during parsing OR if json does not contain 'access_token' key.
7485
if (jsonElement == null) {
75-
throw new RuntimeException(String.format("Unexpected response '%s' from '%s'", responseString, uri.toString()));
86+
String errorReason = String.format("Failed to parse access token from response. Request %s returned response " +
87+
"code '%s' & reason: %s", uri.toString(), response.getStatusLine().getStatusCode(),
88+
response.getStatusLine().getReasonPhrase());
89+
String errorMessage = String.format("Failed to parse access token, request %s returned response %s " +
90+
"with code '%s'.", uri, responseString, response.getStatusLine().getStatusCode());
91+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
92+
errorReason, errorMessage, ErrorType.SYSTEM, true, new JsonSyntaxException(errorReason));
7693
}
7794

7895
return jsonElement.getAsString();
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.snowflake.common;
18+
19+
import com.google.common.base.Throwables;
20+
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
21+
import io.cdap.cdap.api.exception.ErrorCategory;
22+
import io.cdap.cdap.api.exception.ErrorType;
23+
import io.cdap.cdap.api.exception.ErrorUtils;
24+
import io.cdap.cdap.api.exception.ProgramFailureException;
25+
import io.cdap.cdap.etl.api.exception.ErrorContext;
26+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
27+
import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException;
28+
import io.cdap.plugin.snowflake.common.exception.SchemaParseException;
29+
30+
import java.util.List;
31+
32+
/**
33+
* Error details provided for the Snowflake
34+
**/
35+
public class SnowflakeErrorDetailsProvider implements ErrorDetailsProvider {
36+
37+
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s";
38+
39+
@Override
40+
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
41+
List<Throwable> causalChain = Throwables.getCausalChain(e);
42+
for (Throwable t : causalChain) {
43+
if (t instanceof ProgramFailureException) {
44+
// if causal chain already has program failure exception, return null to avoid double wrap.
45+
return null;
46+
}
47+
if (t instanceof IllegalArgumentException) {
48+
return getProgramFailureException((IllegalArgumentException) t, errorContext, ErrorType.USER);
49+
}
50+
if (t instanceof IllegalStateException) {
51+
return getProgramFailureException((IllegalStateException) t, errorContext, ErrorType.SYSTEM);
52+
}
53+
if (t instanceof SchemaParseException) {
54+
return getProgramFailureException((SchemaParseException) t, errorContext, ErrorType.USER);
55+
}
56+
if (t instanceof UnexpectedFormatException) {
57+
return getProgramFailureException((UnexpectedFormatException) t, errorContext, ErrorType.SYSTEM);
58+
}
59+
if (t instanceof ConnectionTimeoutException) {
60+
return getProgramFailureException((ConnectionTimeoutException) t, errorContext, ErrorType.SYSTEM);
61+
}
62+
}
63+
return null;
64+
}
65+
66+
/**
67+
* Get a ProgramFailureException with the given error
68+
* information from {@link Exception}.
69+
*
70+
* @param exception The Exception to get the error information from.
71+
* @return A ProgramFailureException with the given error information.
72+
*/
73+
private ProgramFailureException getProgramFailureException(Exception exception, ErrorContext errorContext,
74+
ErrorType errorType) {
75+
String errorMessage = exception.getMessage();
76+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
77+
errorMessage,
78+
String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), errorMessage), errorType, false, exception);
79+
}
80+
}

0 commit comments

Comments
 (0)