diff --git a/src/main/java/com/amazon/redshift/Driver.java b/src/main/java/com/amazon/redshift/Driver.java index fb99bd1..7db3546 100644 --- a/src/main/java/com/amazon/redshift/Driver.java +++ b/src/main/java/com/amazon/redshift/Driver.java @@ -5,29 +5,9 @@ package com.amazon.redshift; -import com.amazon.redshift.jdbc.RedshiftConnectionImpl; -import com.amazon.redshift.logger.LogLevel; -import com.amazon.redshift.logger.RedshiftLogger; -import com.amazon.redshift.util.DriverInfo; -import com.amazon.redshift.util.ExpressionProperties; -import com.amazon.redshift.util.GT; -import com.amazon.redshift.util.HostSpec; -import com.amazon.redshift.util.IniFile; -import com.amazon.redshift.util.RedshiftException; -import com.amazon.redshift.util.RedshiftState; -import com.amazon.redshift.util.SharedTimer; -import com.amazon.redshift.util.URLCoder; -import com.amazon.redshift.util.RedshiftProperties; - -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.directory.Attribute; -import javax.naming.directory.Attributes; -import javax.naming.directory.InitialDirContext; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.InetAddress; import java.net.URL; import java.security.AccessController; import java.security.PrivilegedActionException; @@ -41,11 +21,31 @@ import java.util.Enumeration; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.naming.Context; +import javax.naming.directory.Attribute; +import javax.naming.directory.Attributes; +import javax.naming.directory.InitialDirContext; + +import com.amazon.redshift.jdbc.RedshiftConnectionImpl; +import com.amazon.redshift.jdbc.ResourceLock; +import com.amazon.redshift.logger.LogLevel; +import com.amazon.redshift.logger.RedshiftLogger; +import com.amazon.redshift.util.DriverInfo; +import com.amazon.redshift.util.ExpressionProperties; +import com.amazon.redshift.util.GT; +import com.amazon.redshift.util.HostSpec; +import com.amazon.redshift.util.IniFile; +import com.amazon.redshift.util.RedshiftException; +import com.amazon.redshift.util.RedshiftProperties; +import com.amazon.redshift.util.RedshiftState; +import com.amazon.redshift.util.SharedTimer; +import com.amazon.redshift.util.URLCoder; + /** *

The Java SQL framework allows for multiple database drivers. Each driver should supply a class * that implements the Driver interface

@@ -97,29 +97,32 @@ public class Driver implements java.sql.Driver { throw new ExceptionInInitializerError(e); } } + private final ResourceLock lock = new ResourceLock(); // Helper to retrieve default properties from classloader resource // properties files. private Properties defaultProperties; - private synchronized Properties getDefaultProperties() throws IOException { - if (defaultProperties != null) { - return defaultProperties; - } - - // Make sure we load properties with the maximum possible privileges. - try { - defaultProperties = - AccessController.doPrivileged(new PrivilegedExceptionAction() { - public Properties run() throws IOException { - return loadDefaultProperties(); - } - }); - } catch (PrivilegedActionException e) { - throw (IOException) e.getException(); - } - - return defaultProperties; + private Properties getDefaultProperties() throws IOException { + try(ResourceLock ignore = lock.obtain()){ + if (defaultProperties != null) { + return defaultProperties; + } + + // Make sure we load properties with the maximum possible privileges. + try { + defaultProperties = + AccessController.doPrivileged(new PrivilegedExceptionAction() { + public Properties run() throws IOException { + return loadDefaultProperties(); + } + }); + } catch (PrivilegedActionException e) { + throw (IOException) e.getException(); + } + + return defaultProperties; + } } private Properties loadDefaultProperties() throws IOException { @@ -376,6 +379,9 @@ private RedshiftLogger getLogger(final Properties props) { * while enforcing a login timeout. */ private static class ConnectThread implements Runnable { + private final ResourceLock lock = new ResourceLock(); + private final Condition lockCondition = lock.newCondition(); + ConnectThread(String url, RedshiftProperties props, RedshiftLogger connLogger) { this.url = url; this.props = props; @@ -394,7 +400,7 @@ public void run() { error = t; } - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { if (abandoned) { if (conn != null) { try { @@ -405,7 +411,7 @@ public void run() { } else { result = conn; resultException = error; - notify(); + lockCondition.signal(); } } } @@ -420,7 +426,7 @@ public void run() { */ public Connection getResult(long timeout) throws SQLException { long expiry = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) + timeout; - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { while (true) { if (result != null) { return result; @@ -446,7 +452,7 @@ public Connection getResult(long timeout) throws SQLException { } try { - wait(delay); + lockCondition.await(delay, TimeUnit.MILLISECONDS); } catch (InterruptedException ie) { // reset the interrupt flag diff --git a/src/main/java/com/amazon/redshift/core/ByteOptimizedUTF8Encoder.java b/src/main/java/com/amazon/redshift/core/ByteOptimizedUTF8Encoder.java index 6aeb33f..c9c749c 100644 --- a/src/main/java/com/amazon/redshift/core/ByteOptimizedUTF8Encoder.java +++ b/src/main/java/com/amazon/redshift/core/ByteOptimizedUTF8Encoder.java @@ -8,6 +8,8 @@ import java.io.IOException; import java.nio.charset.Charset; +import com.amazon.redshift.jdbc.ResourceLock; + /** * UTF-8 encoder which validates input and is optimized for jdk 9+ where {@code String} objects are backed by * {@code byte[]}. @@ -40,12 +42,15 @@ public String decode(byte[] encodedString, int offset, int length) throws IOExce * Decodes to {@code char[]} in presence of non-ascii values after first copying all known ascii chars directly * from {@code byte[]} to {@code char[]}. */ - private synchronized String slowDecode(byte[] encodedString, int offset, int length, int curIdx) throws IOException { - final char[] chars = getCharArray(length); - int out = 0; - for (int i = offset; i < curIdx; ++i) { - chars[out++] = (char) encodedString[i]; - } - return decodeToChars(encodedString, curIdx, length - (curIdx - offset), chars, out); - } + private String slowDecode(byte[] encodedString, int offset, int length, int curIdx) throws IOException { + try (ResourceLock lock = new ResourceLock()) { + lock.obtain(); + final char[] chars = getCharArray(length); + int out = 0; + for (int i = offset; i < curIdx; ++i) { + chars[out++] = (char) encodedString[i]; + } + return decodeToChars(encodedString, curIdx, length - (curIdx - offset), chars, out); + } + } } diff --git a/src/main/java/com/amazon/redshift/core/IamHelper.java b/src/main/java/com/amazon/redshift/core/IamHelper.java index eaea08e..b8ebe3c 100755 --- a/src/main/java/com/amazon/redshift/core/IamHelper.java +++ b/src/main/java/com/amazon/redshift/core/IamHelper.java @@ -26,12 +26,12 @@ import com.amazonaws.services.redshift.model.DescribeCustomDomainAssociationsRequest; import com.amazonaws.services.redshift.model.DescribeCustomDomainAssociationsResult; import com.amazonaws.services.redshift.AmazonRedshiftClient; -import com.amazonaws.services.redshift.AmazonRedshiftClientBuilder; import com.amazonaws.util.StringUtils; import com.amazon.redshift.CredentialsHolder; import com.amazon.redshift.IPlugin; import com.amazon.redshift.RedshiftProperty; import com.amazon.redshift.jdbc.RedshiftConnectionImpl; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.logger.LogLevel; import com.amazon.redshift.logger.RedshiftLogger; import com.amazon.redshift.plugin.utils.RequestUtils; @@ -46,7 +46,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -790,66 +789,68 @@ static void callDescribeCustomDomainNameAssociationsAPIForV1(RedshiftJDBCSetting } } - private static synchronized GetClusterCredentialsResult getClusterCredentialsResult(RedshiftJDBCSettings settings, + private static GetClusterCredentialsResult getClusterCredentialsResult(RedshiftJDBCSettings settings, AmazonRedshift client, RedshiftLogger log, CredentialProviderType providerType, boolean idpCredentialsRefresh) throws AmazonClientException { - - String key = null; - GetClusterCredentialsResult credentials = null; - - if (!settings.m_iamDisableCache) { - key = getCredentialsCacheKey(settings, providerType, false); - credentials = credentialsCache.get(key); - } - - if (credentials == null || (providerType == CredentialProviderType.PLUGIN && idpCredentialsRefresh) - || RequestUtils.isCredentialExpired(credentials.getExpiration())) { - if (RedshiftLogger.isEnable()) - log.logInfo("GetClusterCredentials NOT from cache"); - - if (!settings.m_iamDisableCache) - credentialsCache.remove(key); - - if(settings.m_isCname) - { - // construct request packet with cname - GetClusterCredentialsRequest request = constructRequestForGetClusterCredentials(settings, true, log); - - try - { - // make api call with cname - credentials = makeGetClusterCredentialsAPICall(request, credentials, client, log); - } - catch(AmazonClientException ace) - { - // if api call with cname fails, recreate request packet with clusterid and re-make api call - - if(RedshiftLogger.isEnable()) - { - log.logInfo("GetClusterCredentials API call failed with CNAME request. Retrying with ClusterID."); - } - - request = constructRequestForGetClusterCredentials(settings, false, log); - credentials = makeGetClusterCredentialsAPICall(request, credentials, client, log); - } - } - else - { - // construct request packet with clusterid and make api call - GetClusterCredentialsRequest request = constructRequestForGetClusterCredentials(settings, false, log); - credentials = makeGetClusterCredentialsAPICall(request, credentials, client, log); - } - - if (!settings.m_iamDisableCache) - credentialsCache.put(key, credentials); - } - else - { - if (RedshiftLogger.isEnable()) - log.logInfo("GetClusterCredentials from cache"); - } - - return credentials; + try(ResourceLock lock = new ResourceLock()){ + lock.obtain(); + String key = null; + GetClusterCredentialsResult credentials = null; + + if (!settings.m_iamDisableCache) { + key = getCredentialsCacheKey(settings, providerType, false); + credentials = credentialsCache.get(key); + } + + if (credentials == null || (providerType == CredentialProviderType.PLUGIN && idpCredentialsRefresh) + || RequestUtils.isCredentialExpired(credentials.getExpiration())) { + if (RedshiftLogger.isEnable()) + log.logInfo("GetClusterCredentials NOT from cache"); + + if (!settings.m_iamDisableCache) + credentialsCache.remove(key); + + if(settings.m_isCname) + { + // construct request packet with cname + GetClusterCredentialsRequest request = constructRequestForGetClusterCredentials(settings, true, log); + + try + { + // make api call with cname + credentials = makeGetClusterCredentialsAPICall(request, credentials, client, log); + } + catch(AmazonClientException ace) + { + // if api call with cname fails, recreate request packet with clusterid and re-make api call + + if(RedshiftLogger.isEnable()) + { + log.logInfo("GetClusterCredentials API call failed with CNAME request. Retrying with ClusterID."); + } + + request = constructRequestForGetClusterCredentials(settings, false, log); + credentials = makeGetClusterCredentialsAPICall(request, credentials, client, log); + } + } + else + { + // construct request packet with clusterid and make api call + GetClusterCredentialsRequest request = constructRequestForGetClusterCredentials(settings, false, log); + credentials = makeGetClusterCredentialsAPICall(request, credentials, client, log); + } + + if (!settings.m_iamDisableCache) + credentialsCache.put(key, credentials); + } + else + { + if (RedshiftLogger.isEnable()) + log.logInfo("GetClusterCredentials from cache"); + } + + return credentials; + } } /** @@ -922,69 +923,72 @@ static void checkForApiCallRateExceedError(AmazonClientException ace, int i, Str } } - private static synchronized GetClusterCredentialsWithIAMResult getClusterCredentialsResultV2( + private static GetClusterCredentialsWithIAMResult getClusterCredentialsResultV2( RedshiftJDBCSettings settings, AmazonRedshiftClient client, RedshiftLogger log, CredentialProviderType providerType, boolean idpCredentialsRefresh, AWSCredentialsProvider provider, int getClusterCredentialApiType) throws AmazonClientException { - String key = null; - GetClusterCredentialsWithIAMResult credentials = null; - - if (!settings.m_iamDisableCache) - { - key = getCredentialsV2CacheKey(settings, providerType, provider, getClusterCredentialApiType, false); - credentials = credentialsV2Cache.get(key); - } - - if (credentials == null || (providerType == CredentialProviderType.PLUGIN && settings.m_idpToken != null) - || RequestUtils.isCredentialExpired(credentials.getExpiration())) - { - if (RedshiftLogger.isEnable()) - log.logInfo("GetClusterCredentialsV2 NOT from cache"); - - if (!settings.m_iamDisableCache) - credentialsV2Cache.remove(key); - - if(settings.m_isCname) - { - // construct request packet with cname - GetClusterCredentialsWithIAMRequest request = constructRequestForGetClusterCredentialsWithIAM(settings, true, log); - - try - { - // make api call with cname - credentials = makeGetClusterCredentialsWithIAMAPICall(request, credentials, client, log); - } - catch (AmazonClientException ce) - { - // if api call with cname fails, recreate request packet with clusterid and re-make api call - - if(RedshiftLogger.isEnable()) - { - log.logInfo("GetClusterCredentials API call failed with CNAME request. Retrying with ClusterID."); - } - - request = constructRequestForGetClusterCredentialsWithIAM(settings, false, log); - credentials = makeGetClusterCredentialsWithIAMAPICall(request, credentials, client, log); - } - } - else - { - // construct request packet with clusterid and make api call - GetClusterCredentialsWithIAMRequest request = constructRequestForGetClusterCredentialsWithIAM(settings, false, log); - credentials = makeGetClusterCredentialsWithIAMAPICall(request, credentials, client, log); - } - - if (!settings.m_iamDisableCache) - credentialsV2Cache.put(key, credentials); - } - else - { - if (RedshiftLogger.isEnable()) - log.logInfo("GetClusterCredentialsV2 from cache"); - } - - return credentials; + try (ResourceLock lock = new ResourceLock()) { + lock.obtain(); + String key = null; + GetClusterCredentialsWithIAMResult credentials = null; + + if (!settings.m_iamDisableCache) + { + key = getCredentialsV2CacheKey(settings, providerType, provider, getClusterCredentialApiType, false); + credentials = credentialsV2Cache.get(key); + } + + if (credentials == null || (providerType == CredentialProviderType.PLUGIN && settings.m_idpToken != null) + || RequestUtils.isCredentialExpired(credentials.getExpiration())) + { + if (RedshiftLogger.isEnable()) + log.logInfo("GetClusterCredentialsV2 NOT from cache"); + + if (!settings.m_iamDisableCache) + credentialsV2Cache.remove(key); + + if(settings.m_isCname) + { + // construct request packet with cname + GetClusterCredentialsWithIAMRequest request = constructRequestForGetClusterCredentialsWithIAM(settings, true, log); + + try + { + // make api call with cname + credentials = makeGetClusterCredentialsWithIAMAPICall(request, credentials, client, log); + } + catch (AmazonClientException ce) + { + // if api call with cname fails, recreate request packet with clusterid and re-make api call + + if(RedshiftLogger.isEnable()) + { + log.logInfo("GetClusterCredentials API call failed with CNAME request. Retrying with ClusterID."); + } + + request = constructRequestForGetClusterCredentialsWithIAM(settings, false, log); + credentials = makeGetClusterCredentialsWithIAMAPICall(request, credentials, client, log); + } + } + else + { + // construct request packet with clusterid and make api call + GetClusterCredentialsWithIAMRequest request = constructRequestForGetClusterCredentialsWithIAM(settings, false, log); + credentials = makeGetClusterCredentialsWithIAMAPICall(request, credentials, client, log); + } + + if (!settings.m_iamDisableCache) + credentialsV2Cache.put(key, credentials); + } + else + { + if (RedshiftLogger.isEnable()) + log.logInfo("GetClusterCredentialsV2 from cache"); + } + + return credentials; + } } /** diff --git a/src/main/java/com/amazon/redshift/core/OptimizedUTF8Encoder.java b/src/main/java/com/amazon/redshift/core/OptimizedUTF8Encoder.java index 03b5ab0..eaf2a70 100644 --- a/src/main/java/com/amazon/redshift/core/OptimizedUTF8Encoder.java +++ b/src/main/java/com/amazon/redshift/core/OptimizedUTF8Encoder.java @@ -5,6 +5,7 @@ package com.amazon.redshift.core; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.logger.RedshiftLogger; import com.amazon.redshift.util.GT; @@ -27,7 +28,8 @@ abstract class OptimizedUTF8Encoder extends Encoding { private final int thresholdSize = 8 * 1024; private char[] decoderArray; - + private ResourceLock lock = new ResourceLock(); + OptimizedUTF8Encoder() { super(UTF_8_CHARSET, true, RedshiftLogger.getDriverLogger()); decoderArray = new char[1024]; @@ -58,7 +60,8 @@ char[] getCharArray(int size) { /** * Decodes binary content to {@code String} by first converting to {@code char[]}. */ - synchronized String charDecode(byte[] encodedString, int offset, int length) throws IOException { + String charDecode(byte[] encodedString, int offset, int length) throws IOException { + try (ResourceLock ignore = lock.obtain()) { final char[] chars = getCharArray(length); int out = 0; for (int i = offset, j = offset + length; i < j; ++i) { @@ -70,6 +73,7 @@ synchronized String charDecode(byte[] encodedString, int offset, int length) thr } } return new String(chars, 0, out); + } } /** diff --git a/src/main/java/com/amazon/redshift/core/QueryExecutorBase.java b/src/main/java/com/amazon/redshift/core/QueryExecutorBase.java index a3db2e8..95732c4 100644 --- a/src/main/java/com/amazon/redshift/core/QueryExecutorBase.java +++ b/src/main/java/com/amazon/redshift/core/QueryExecutorBase.java @@ -11,6 +11,7 @@ import com.amazon.redshift.jdbc.AutoSave; import com.amazon.redshift.jdbc.EscapeSyntaxCallMode; import com.amazon.redshift.jdbc.PreferQueryMode; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.logger.LogLevel; import com.amazon.redshift.logger.RedshiftLogger; import com.amazon.redshift.util.HostSpec; @@ -27,9 +28,12 @@ import java.util.Map; import java.util.Properties; import java.util.TreeMap; +import java.util.concurrent.locks.Condition; public abstract class QueryExecutorBase implements QueryExecutor { - + protected ResourceLock lock = new ResourceLock(); + protected final Condition lockCondition = lock.newCondition(); + protected RedshiftLogger logger; protected final RedshiftStream pgStream; private final String user; @@ -229,34 +233,42 @@ public void sendQueryCancel() throws SQLException { } } - public synchronized void addWarning(SQLWarning newWarning) { + public void addWarning(SQLWarning newWarning) { + try (ResourceLock ignore = lock.obtain()) { if (warnings == null) { warnings = newWarning; } else { warnings.setNextWarning(newWarning); } + } } public void setCrossDatasharingEnabled(boolean isCrossDatasharingEnabled) { this.isCrossDatasharingEnabled = isCrossDatasharingEnabled; } - public synchronized void addNotification(RedshiftNotification notification) { + public void addNotification(RedshiftNotification notification) { + try (ResourceLock ignore = lock.obtain()) { notifications.add(notification); + } } @Override - public synchronized RedshiftNotification[] getNotifications() throws SQLException { + public RedshiftNotification[] getNotifications() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { RedshiftNotification[] array = notifications.toArray(new RedshiftNotification[0]); notifications.clear(); return array; + } } @Override - public synchronized SQLWarning getWarnings() { + public SQLWarning getWarnings() { + try (ResourceLock ignore = lock.obtain()) { SQLWarning chain = warnings; warnings = null; return chain; + } } @Override @@ -300,22 +312,30 @@ public void setServerVersionNum(int serverVersionNum) { this.serverVersionNum = serverVersionNum; } - public synchronized void setTransactionState(TransactionState state) { + public void setTransactionState(TransactionState state) { + try (ResourceLock ignore = lock.obtain()) { transactionState = state; + } } - public synchronized void setStandardConformingStrings(boolean value) { + public void setStandardConformingStrings(boolean value) { + try (ResourceLock ignore = lock.obtain()) { standardConformingStrings = value; + } } @Override - public synchronized boolean getStandardConformingStrings() { + public boolean getStandardConformingStrings() { + try (ResourceLock ignore = lock.obtain()) { return standardConformingStrings; + } } @Override - public synchronized TransactionState getTransactionState() { + public TransactionState getTransactionState() { + try (ResourceLock ignore = lock.obtain()) { return transactionState; + } } public void setEncoding(Encoding encoding) throws IOException { diff --git a/src/main/java/com/amazon/redshift/core/ServerlessIamHelper.java b/src/main/java/com/amazon/redshift/core/ServerlessIamHelper.java index b0f6600..8974c4e 100644 --- a/src/main/java/com/amazon/redshift/core/ServerlessIamHelper.java +++ b/src/main/java/com/amazon/redshift/core/ServerlessIamHelper.java @@ -5,6 +5,7 @@ import java.util.Map; import com.amazon.redshift.core.IamHelper.CredentialProviderType; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.logger.RedshiftLogger; import com.amazon.redshift.plugin.utils.RequestUtils; import com.amazonaws.AmazonClientException; @@ -21,7 +22,7 @@ // If user specify group_federation with serverless, // it will call Provision V2 API. public final class ServerlessIamHelper { - + private ResourceLock lock = new ResourceLock(); private RedshiftLogger log; private AWSRedshiftServerlessClient client; @@ -39,9 +40,9 @@ public final class ServerlessIamHelper { client = (AWSRedshiftServerlessClient) builder.withCredentials(credProvider).build(); } - synchronized void describeConfiguration(RedshiftJDBCSettings settings) { + void describeConfiguration(RedshiftJDBCSettings settings) { com.amazonaws.services.redshiftserverless.model.GetWorkgroupRequest req = new GetWorkgroupRequest(); - + try (ResourceLock ignore = lock.obtain()) { if(settings.m_workGroup != null && settings.m_workGroup.length() > 0) { // Set workgroup in the request req.setWorkgroupName(settings.m_workGroup); @@ -62,12 +63,14 @@ synchronized void describeConfiguration(RedshiftJDBCSettings settings) { settings.m_host = endpoint.getAddress(); settings.m_port = endpoint.getPort(); + } } - synchronized void getCredentialsResult(RedshiftJDBCSettings settings, + void getCredentialsResult(RedshiftJDBCSettings settings, CredentialProviderType providerType, boolean idpCredentialsRefresh ) throws AmazonClientException { + try (ResourceLock ignore = lock.obtain()) { String key = null; GetCredentialsResult credentials = null; @@ -140,4 +143,5 @@ synchronized void getCredentialsResult(RedshiftJDBCSettings settings, log.logInfo(now + ": Using GetCredentialsResultV2 with TimeToRefresh " + credentials.getNextRefreshTime()); } } + } } diff --git a/src/main/java/com/amazon/redshift/core/v3/CopyOperationImpl.java b/src/main/java/com/amazon/redshift/core/v3/CopyOperationImpl.java index 6650c06..04c6ffd 100644 --- a/src/main/java/com/amazon/redshift/core/v3/CopyOperationImpl.java +++ b/src/main/java/com/amazon/redshift/core/v3/CopyOperationImpl.java @@ -5,18 +5,20 @@ package com.amazon.redshift.core.v3; +import java.sql.SQLException; + import com.amazon.redshift.copy.CopyOperation; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.util.GT; import com.amazon.redshift.util.RedshiftException; import com.amazon.redshift.util.RedshiftState; -import java.sql.SQLException; - public abstract class CopyOperationImpl implements CopyOperation { QueryExecutorImpl queryExecutor; int rowFormat; int[] fieldFormats; long handledRowCount = -1; + private ResourceLock lock = new ResourceLock(); void init(QueryExecutorImpl q, int fmt, int[] fmts) { queryExecutor = q; @@ -41,7 +43,7 @@ public int getFormat() { } public boolean isActive() { - synchronized (queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { return queryExecutor.hasLock(this); } } diff --git a/src/main/java/com/amazon/redshift/core/v3/CopyQueryExecutor.java b/src/main/java/com/amazon/redshift/core/v3/CopyQueryExecutor.java index f97c800..695dd64 100644 --- a/src/main/java/com/amazon/redshift/core/v3/CopyQueryExecutor.java +++ b/src/main/java/com/amazon/redshift/core/v3/CopyQueryExecutor.java @@ -9,6 +9,7 @@ import com.amazon.redshift.copy.CopyOut; import com.amazon.redshift.core.RedshiftStream; import com.amazon.redshift.core.Utils; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.logger.LogLevel; import com.amazon.redshift.logger.RedshiftLogger; import com.amazon.redshift.util.ByteStreamWriter; @@ -23,13 +24,11 @@ class CopyQueryExecutor { private QueryExecutorImpl queryExecutor; - RedshiftLogger logger; - final RedshiftStream pgStream; - - - CopyQueryExecutor(QueryExecutorImpl queryExecutor, - RedshiftLogger logger, - RedshiftStream pgStream) { + RedshiftLogger logger; + final RedshiftStream pgStream; + private final ResourceLock lock = new ResourceLock(); + + CopyQueryExecutor(QueryExecutorImpl queryExecutor, RedshiftLogger logger, RedshiftStream pgStream) { this.queryExecutor = queryExecutor; this.logger = logger; this.pgStream = pgStream; @@ -49,7 +48,7 @@ CopyOperation startCopy(String sql, boolean suppressBegin) // Shouldn't call from synchronized method, which can cause dead-lock. queryExecutor.waitForRingBufferThreadToFinish(false, false, false, null, null); - synchronized(queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { queryExecutor.waitOnLock(); if (!suppressBegin) { @@ -87,7 +86,7 @@ void cancelCopy(CopyOperationImpl op) throws SQLException { try { if (op instanceof CopyIn) { - synchronized (queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { if(RedshiftLogger.isEnable()) logger.log(LogLevel.DEBUG, "FE => CopyFail"); final byte[] msg = Utils.encodeUTF8("Copy cancel requested"); @@ -125,7 +124,7 @@ void cancelCopy(CopyOperationImpl op) throws SQLException { // future operations, rather than failing due to the // broken connection, will simply hang waiting for this // lock. - synchronized (queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { if (queryExecutor.hasLock(op)) { queryExecutor.unlock(op); } @@ -405,7 +404,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block) * @throws IOException on database connection failure */ void initCopy(CopyOperationImpl op) throws SQLException, IOException { - synchronized(queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { pgStream.receiveInteger4(); // length not used int rowFormat = pgStream.receiveChar(); int numFields = pgStream.receiveInteger2(); @@ -428,7 +427,7 @@ void initCopy(CopyOperationImpl op) throws SQLException, IOException { * @throws SQLException on failure */ long endCopy(CopyOperationImpl op) throws SQLException { - synchronized(queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { if (!queryExecutor.hasLock(op)) { throw new RedshiftException(GT.tr("Tried to end inactive copy"), RedshiftState.OBJECT_NOT_IN_STATE); } @@ -464,7 +463,7 @@ long endCopy(CopyOperationImpl op) throws SQLException { */ void writeToCopy(CopyOperationImpl op, byte[] data, int off, int siz) throws SQLException { - synchronized(queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { if (!queryExecutor.hasLock(op)) { throw new RedshiftException(GT.tr("Tried to write to an inactive copy operation"), RedshiftState.OBJECT_NOT_IN_STATE); @@ -494,7 +493,7 @@ void writeToCopy(CopyOperationImpl op, byte[] data, int off, int siz) */ public void writeToCopy(CopyOperationImpl op, ByteStreamWriter from) throws SQLException { - synchronized(queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { if (!queryExecutor.hasLock(op)) { throw new RedshiftException(GT.tr("Tried to write to an inactive copy operation"), RedshiftState.OBJECT_NOT_IN_STATE); @@ -516,7 +515,7 @@ public void writeToCopy(CopyOperationImpl op, ByteStreamWriter from) } public void flushCopy(CopyOperationImpl op) throws SQLException { - synchronized(queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { if (!queryExecutor.hasLock(op)) { throw new RedshiftException(GT.tr("Tried to write to an inactive copy operation"), RedshiftState.OBJECT_NOT_IN_STATE); @@ -540,7 +539,7 @@ public void flushCopy(CopyOperationImpl op) throws SQLException { * @throws SQLException on any failure */ void readFromCopy(CopyOperationImpl op, boolean block) throws SQLException { - synchronized(queryExecutor) { + try (ResourceLock ignore = lock.obtain()) { if (!queryExecutor.hasLock(op)) { throw new RedshiftException(GT.tr("Tried to read from inactive copy"), RedshiftState.OBJECT_NOT_IN_STATE); diff --git a/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java b/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java index 5b52ea5..64bdb06 100644 --- a/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java +++ b/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java @@ -6,23 +6,46 @@ package com.amazon.redshift.core.v3; +import java.io.IOException; +import java.lang.ref.PhantomReference; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + import com.amazon.redshift.RedshiftProperty; -import com.amazon.redshift.copy.CopyIn; import com.amazon.redshift.copy.CopyOperation; -import com.amazon.redshift.copy.CopyOut; import com.amazon.redshift.core.CommandCompleteParser; import com.amazon.redshift.core.Encoding; import com.amazon.redshift.core.EncodingPredictor; import com.amazon.redshift.core.Field; import com.amazon.redshift.core.NativeQuery; import com.amazon.redshift.core.Oid; -import com.amazon.redshift.core.RedshiftBindException; -import com.amazon.redshift.core.RedshiftStream; import com.amazon.redshift.core.ParameterList; import com.amazon.redshift.core.Parser; import com.amazon.redshift.core.Query; import com.amazon.redshift.core.QueryExecutor; import com.amazon.redshift.core.QueryExecutorBase; +import com.amazon.redshift.core.RedshiftBindException; +import com.amazon.redshift.core.RedshiftStream; import com.amazon.redshift.core.ReplicationProtocol; import com.amazon.redshift.core.ResultCursor; import com.amazon.redshift.core.ResultHandler; @@ -37,44 +60,18 @@ import com.amazon.redshift.jdbc.AutoSave; import com.amazon.redshift.jdbc.BatchResultHandler; import com.amazon.redshift.jdbc.FieldMetadata; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.jdbc.TimestampUtils; import com.amazon.redshift.logger.LogLevel; import com.amazon.redshift.logger.RedshiftLogger; - -import com.amazon.redshift.util.QuerySanitizer; import com.amazon.redshift.util.ByteStreamWriter; import com.amazon.redshift.util.GT; +import com.amazon.redshift.util.QuerySanitizer; import com.amazon.redshift.util.RedshiftException; import com.amazon.redshift.util.RedshiftPropertyMaxResultBufferParser; import com.amazon.redshift.util.RedshiftState; import com.amazon.redshift.util.RedshiftWarning; - import com.amazon.redshift.util.ServerErrorMessage; -import java.io.IOException; -import java.lang.ref.PhantomReference; -import java.lang.ref.Reference; -import java.lang.ref.ReferenceQueue; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.SQLWarning; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.TimeZone; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; /** * QueryExecutor implementation for the V3 protocol. @@ -84,7 +81,7 @@ public class QueryExecutorImpl extends QueryExecutorBase { private static final Pattern ROLLBACK_PATTERN = Pattern.compile("\\brollback\\b", Pattern.CASE_INSENSITIVE); private static final Pattern COMMIT_PATTERN = Pattern.compile("\\bcommit\\b", Pattern.CASE_INSENSITIVE); private static final Pattern PREPARE_PATTERN = Pattern.compile("\\bprepare ++transaction\\b", Pattern.CASE_INSENSITIVE); - + private static boolean looksLikeCommit(String sql) { if ("COMMIT".equalsIgnoreCase(sql)) { return true; @@ -235,7 +232,7 @@ void unlock(Object holder) throws RedshiftException { RedshiftState.OBJECT_NOT_IN_STATE); } lockedFor = null; - this.notify(); + lockCondition.signal(); } /** @@ -245,7 +242,7 @@ void unlock(Object holder) throws RedshiftException { void waitOnLock() throws RedshiftException { while (lockedFor != null) { try { - this.wait(); + lockCondition.await(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RedshiftException( @@ -330,7 +327,7 @@ public void execute(Query query, ParameterList parameters, ResultHandler handler // Shouldn't call from synchronized method, which can cause dead-lock. waitForRingBufferThreadToFinish(false, false, false, null, null); - synchronized(this) { + try (ResourceLock ignore = lock.obtain()) { waitOnLock(); try { m_executingLock.lock(); @@ -420,7 +417,7 @@ public void execute(Query query, ParameterList parameters, ResultHandler handler finally { m_executingLock.unlock(); } - } // synchronized + } } private boolean sendAutomaticSavepoint(Query query, int flags) throws IOException { @@ -542,7 +539,7 @@ public void execute(Query[] queries, ParameterList[] parameterLists, // Shouldn't call from synchronized method, which can cause dead-lock. waitForRingBufferThreadToFinish(false, false, false, null, null); - synchronized(this) { + try (ResourceLock ignore = lock.obtain()) { waitOnLock(); try { m_executingLock.lock(); @@ -624,7 +621,7 @@ public void execute(Query[] queries, ParameterList[] parameterLists, finally { m_executingLock.unlock(); } - } // synchronized + } } private ResultHandler sendQueryPreamble(final ResultHandler delegateHandler, int flags) @@ -811,8 +808,10 @@ private void sendFastpathCall(int fnid, SimpleParameterList params) */ // Just for API compatibility with previous versions. - public synchronized void processNotifies() throws SQLException { + public void processNotifies() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { processNotifies(-1); + } } /** @@ -820,7 +819,8 @@ public synchronized void processNotifies() throws SQLException { * when =0, block forever * when < 0, don't block */ - public synchronized void processNotifies(int timeoutMillis) throws SQLException { + public void processNotifies(int timeoutMillis) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { waitOnLock(); // Asynchronous notifies only arrive when we are not in a transaction if (getTransactionState() != TransactionState.IDLE) { @@ -889,6 +889,7 @@ public synchronized void processNotifies(int timeoutMillis) throws SQLException setSocketTimeout(oldTimeout); } } + } } private void setSocketTimeout(int millis) throws RedshiftException { @@ -1002,8 +1003,10 @@ public void cancelCopy(CopyOperationImpl op) throws SQLException { * @return number of rows updated for server versions 8.2 or newer * @throws SQLException on failure */ - public synchronized long endCopy(CopyOperationImpl op) throws SQLException { + public long endCopy(CopyOperationImpl op) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { return copyQueryExecutor.endCopy(op); + } } /** @@ -1016,9 +1019,11 @@ public synchronized long endCopy(CopyOperationImpl op) throws SQLException { * @param siz number of bytes to send (usually data.length) * @throws SQLException on failure */ - public synchronized void writeToCopy(CopyOperationImpl op, byte[] data, int off, int siz) + public void writeToCopy(CopyOperationImpl op, byte[] data, int off, int siz) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { copyQueryExecutor.writeToCopy(op, data, off, siz); + } } /** @@ -1029,13 +1034,17 @@ public synchronized void writeToCopy(CopyOperationImpl op, byte[] data, int off, * @param from the source of bytes, e.g. a ByteBufferByteStreamWriter * @throws SQLException on failure */ - public synchronized void writeToCopy(CopyOperationImpl op, ByteStreamWriter from) + public void writeToCopy(CopyOperationImpl op, ByteStreamWriter from) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { copyQueryExecutor.writeToCopy(op, from); + } } - public synchronized void flushCopy(CopyOperationImpl op) throws SQLException { + public void flushCopy(CopyOperationImpl op) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { copyQueryExecutor.flushCopy(op); + } } /** @@ -1046,8 +1055,10 @@ public synchronized void flushCopy(CopyOperationImpl op) throws SQLException { * @param block whether to block waiting for input * @throws SQLException on any failure */ - synchronized void readFromCopy(CopyOperationImpl op, boolean block) throws SQLException { + void readFromCopy(CopyOperationImpl op, boolean block) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { copyQueryExecutor.readFromCopy(op, block); + } } /* @@ -1727,7 +1738,7 @@ private void registerParsedQuery(SimpleQuery query, String statementName) { } public void closeStatementAndPortal() { - synchronized(this) { + try(ResourceLock ignore = lock.obtain()) { // First, send CloseStatements for finalized SimpleQueries that had statement names assigned. try { processDeadParsedQueries(); @@ -1750,7 +1761,7 @@ public void closeStatementAndPortal() { logger.logError(sqe); } } - } // synchronized + } } private void processDeadParsedQueries() throws IOException { @@ -2511,7 +2522,7 @@ public void fetch(ResultCursor cursor, ResultHandler handler, int fetchSize, in // Shouldn't call from synchronized method, which can cause dead-lock. waitForRingBufferThreadToFinish(false, false, false, null, null); - synchronized(this) { + try (ResourceLock ignore = lock.obtain()) { waitOnLock(); try { m_executingLock.lock(); @@ -2551,7 +2562,7 @@ public void handleCommandStatus(String status, long updateCount, long insertOID) finally { m_executingLock.unlock(); } - } // synchronized + } } /* @@ -2959,7 +2970,7 @@ public void waitForRingBufferThreadToFinish(boolean calledFromConnectionClose, RedshiftRowsBlockingQueue queueRows, Thread ringBufferThread) { - synchronized(m_ringBufferThreadLock) { + try(ResourceLock ignore = lock.obtain()) { try { m_executingLock.lock(); // Wait for full read of any executing command diff --git a/src/main/java/com/amazon/redshift/hostchooser/GlobalHostStatusTracker.java b/src/main/java/com/amazon/redshift/hostchooser/GlobalHostStatusTracker.java index ad12e2b..4018c2f 100644 --- a/src/main/java/com/amazon/redshift/hostchooser/GlobalHostStatusTracker.java +++ b/src/main/java/com/amazon/redshift/hostchooser/GlobalHostStatusTracker.java @@ -5,6 +5,7 @@ package com.amazon.redshift.hostchooser; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.util.HostSpec; import java.util.ArrayList; @@ -18,7 +19,7 @@ public class GlobalHostStatusTracker { private static final Map hostStatusMap = new HashMap(); - + private static final ResourceLock lock = new ResourceLock(); /** * Store the actual observed host status. * @@ -27,7 +28,7 @@ public class GlobalHostStatusTracker { */ public static void reportHostStatus(HostSpec hostSpec, HostStatus hostStatus) { long now = System.nanoTime() / 1000000; - synchronized (hostStatusMap) { + try (ResourceLock ignore = lock.obtain()) { HostSpecStatus hostSpecStatus = hostStatusMap.get(hostSpec); if (hostSpecStatus == null) { hostSpecStatus = new HostSpecStatus(hostSpec); @@ -50,7 +51,7 @@ static List getCandidateHosts(HostSpec[] hostSpecs, HostRequirement targetServerType, long hostRecheckMillis) { List candidates = new ArrayList(hostSpecs.length); long latestAllowedUpdate = System.nanoTime() / 1000000 - hostRecheckMillis; - synchronized (hostStatusMap) { + try (ResourceLock ignore = lock.obtain()) { for (HostSpec hostSpec : hostSpecs) { HostSpecStatus hostInfo = hostStatusMap.get(hostSpec); // candidates are nodes we do not know about and the nodes with correct type diff --git a/src/main/java/com/amazon/redshift/jdbc/AbstractBlobClob.java b/src/main/java/com/amazon/redshift/jdbc/AbstractBlobClob.java index ed28ba1..c83f0ac 100644 --- a/src/main/java/com/amazon/redshift/jdbc/AbstractBlobClob.java +++ b/src/main/java/com/amazon/redshift/jdbc/AbstractBlobClob.java @@ -26,7 +26,7 @@ */ public abstract class AbstractBlobClob { protected BaseConnection conn; - + private ResourceLock lock = new ResourceLock(); private LargeObject currentLo; private boolean currentLoIsWriteable; private boolean support64bit; @@ -50,7 +50,8 @@ public AbstractBlobClob(BaseConnection conn, long oid) throws SQLException { subLOs = new ArrayList(); } - public synchronized void free() throws SQLException { + public void free() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (currentLo != null) { currentLo.close(); currentLo = null; @@ -60,6 +61,7 @@ public synchronized void free() throws SQLException { subLO.close(); } subLOs = null; + } } /** @@ -70,7 +72,8 @@ public synchronized void free() throws SQLException { * @param len maximum length * @throws SQLException if operation fails */ - public synchronized void truncate(long len) throws SQLException { + public void truncate(long len) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); if (!conn.haveMinimumServerVersion(ServerVersion.v8_3)) { throw new RedshiftException( @@ -92,37 +95,46 @@ public synchronized void truncate(long len) throws SQLException { } else { getLo(true).truncate((int) len); } + } } - public synchronized long length() throws SQLException { + public long length() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); if (support64bit) { return getLo(false).size64(); } else { return getLo(false).size(); } + } } - public synchronized byte[] getBytes(long pos, int length) throws SQLException { + public byte[] getBytes(long pos, int length) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { assertPosition(pos); getLo(false).seek((int) (pos - 1), LargeObject.SEEK_SET); return getLo(false).read(length); + } } - public synchronized InputStream getBinaryStream() throws SQLException { + public InputStream getBinaryStream() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); LargeObject subLO = getLo(false).copy(); addSubLO(subLO); subLO.seek(0, LargeObject.SEEK_SET); return subLO.getInputStream(); + } } - public synchronized OutputStream setBinaryStream(long pos) throws SQLException { + public OutputStream setBinaryStream(long pos) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { assertPosition(pos); LargeObject subLO = getLo(true).copy(); addSubLO(subLO); subLO.seek((int) (pos - 1)); return subLO.getOutputStream(); + } } /** @@ -133,7 +145,8 @@ public synchronized OutputStream setBinaryStream(long pos) throws SQLException { * @return position of the specified pattern * @throws SQLException if something wrong happens */ - public synchronized long position(byte[] pattern, long start) throws SQLException { + public long position(byte[] pattern, long start) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { assertPosition(start, pattern.length); int position = 1; @@ -158,6 +171,7 @@ public synchronized long position(byte[] pattern, long start) throws SQLExceptio } return result; + } } /** @@ -198,8 +212,10 @@ private byte next() { * @return position of given pattern * @throws SQLException if something goes wrong */ - public synchronized long position(Blob pattern, long start) throws SQLException { + public long position(Blob pattern, long start) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { return position(pattern.getBytes(1, (int) pattern.length()), start); + } } /** @@ -245,7 +261,8 @@ protected void checkFreed() throws SQLException { } } - protected synchronized LargeObject getLo(boolean forWrite) throws SQLException { + protected LargeObject getLo(boolean forWrite) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (this.currentLo != null) { if (forWrite && !currentLoIsWriteable) { // Reopen the stream in read-write, at the same pos. @@ -267,6 +284,7 @@ protected synchronized LargeObject getLo(boolean forWrite) throws SQLException { currentLo = lom.open(oid, forWrite ? LargeObjectManager.READWRITE : LargeObjectManager.READ); currentLoIsWriteable = forWrite; return currentLo; + } } protected void addSubLO(LargeObject subLO) { diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftArray.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftArray.java index 40de7b5..6782d45 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftArray.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftArray.java @@ -42,7 +42,7 @@ * @see ResultSet#getArray */ public class RedshiftArray implements java.sql.Array { - + private final ResourceLock lock = new ResourceLock(); static { ArrayAssistantRegistry.register(Oid.UUID, new UUIDArrayAssistant()); ArrayAssistantRegistry.register(Oid.UUID_ARRAY, new UUIDArrayAssistant()); @@ -420,7 +420,8 @@ private Class elementOidToClass(int oid) throws SQLException { * {@link #arrayList} is build. Method can be called many times in order to make sure that array * list is ready to use, however {@link #arrayList} will be set only once during first call. */ - private synchronized void buildArrayList() throws SQLException { + private void buildArrayList() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (arrayList != null) { return; } @@ -537,6 +538,7 @@ private synchronized void buildArrayList() throws SQLException { } } } + } } /** diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftBlob.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftBlob.java index 3211a2b..c49b49a 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftBlob.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftBlob.java @@ -10,13 +10,14 @@ import java.sql.SQLException; public class RedshiftBlob extends AbstractBlobClob implements java.sql.Blob { - + private final ResourceLock lock = new ResourceLock(); public RedshiftBlob(com.amazon.redshift.core.BaseConnection conn, long oid) throws SQLException { super(conn, oid); } - public synchronized java.io.InputStream getBinaryStream(long pos, long length) + public java.io.InputStream getBinaryStream(long pos, long length) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); LargeObject subLO = getLo(false).copy(); addSubLO(subLO); @@ -26,17 +27,22 @@ public synchronized java.io.InputStream getBinaryStream(long pos, long length) subLO.seek((int) pos - 1, LargeObject.SEEK_SET); } return subLO.getInputStream(length); + } } - public synchronized int setBytes(long pos, byte[] bytes) throws SQLException { + public int setBytes(long pos, byte[] bytes) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { return setBytes(pos, bytes, 0, bytes.length); + } } - public synchronized int setBytes(long pos, byte[] bytes, int offset, int len) + public int setBytes(long pos, byte[] bytes, int offset, int len) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { assertPosition(pos); getLo(true).seek((int) (pos - 1)); getLo(true).write(bytes, offset, len); return len; + } } } diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftCallableStatement.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftCallableStatement.java index f3cc110..3a52cdd 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftCallableStatement.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftCallableStatement.java @@ -46,7 +46,8 @@ public class RedshiftCallableStatement extends RedshiftPreparedStatement impleme private boolean haveProcessedResults = false; protected Object[] callResult; private int lastIndex = 0; - + private ResourceLock lock = new ResourceLock(); + RedshiftCallableStatement(RedshiftConnectionImpl connection, String sql, int rsType, int rsConcurrency, int rsHoldability) throws SQLException { super(connection, connection.borrowCallableQuery(sql), rsType, rsConcurrency, rsHoldability); @@ -940,7 +941,7 @@ private void getResults() throws SQLException { haveProcessedResults = true; ResultSet rs; - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (null == result) { throw new RedshiftException( @@ -1027,7 +1028,7 @@ else if (columnType == Types.REF_CURSOR && functionReturnType[j] == Types.OTHER) } rs.close(); - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { result = null; } } diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftClob.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftClob.java index 03e742d..a33554a 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftClob.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftClob.java @@ -13,64 +13,84 @@ import java.sql.SQLException; public class RedshiftClob extends AbstractBlobClob implements java.sql.Clob { - + private final ResourceLock lock = new ResourceLock(); public RedshiftClob(com.amazon.redshift.core.BaseConnection conn, long oid) throws java.sql.SQLException { super(conn, oid); } - public synchronized Reader getCharacterStream(long pos, long length) throws SQLException { + public Reader getCharacterStream(long pos, long length) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "getCharacterStream(long, long)"); + } } - public synchronized int setString(long pos, String str) throws SQLException { + public int setString(long pos, String str) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "setString(long,str)"); + } } - public synchronized int setString(long pos, String str, int offset, int len) throws SQLException { + public int setString(long pos, String str, int offset, int len) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "setString(long,String,int,int)"); + } } - public synchronized java.io.OutputStream setAsciiStream(long pos) throws SQLException { + public java.io.OutputStream setAsciiStream(long pos) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "setAsciiStream(long)"); + } } - public synchronized java.io.Writer setCharacterStream(long pos) throws SQLException { + public java.io.Writer setCharacterStream(long pos) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "setCharacteStream(long)"); + } } - public synchronized InputStream getAsciiStream() throws SQLException { + public InputStream getAsciiStream() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { return getBinaryStream(); + } } - public synchronized Reader getCharacterStream() throws SQLException { + public Reader getCharacterStream() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { Charset connectionCharset = Charset.forName(conn.getEncoding().name()); return new InputStreamReader(getBinaryStream(), connectionCharset); + } } - public synchronized String getSubString(long i, int j) throws SQLException { + public String getSubString(long i, int j) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { assertPosition(i, j); getLo(false).seek((int) i - 1); return new String(getLo(false).read(j)); + } } /** * For now, this is not implemented. */ - public synchronized long position(String pattern, long start) throws SQLException { + public long position(String pattern, long start) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "position(String,long)"); + } } /** * This should be simply passing the byte value of the pattern Blob. */ - public synchronized long position(Clob pattern, long start) throws SQLException { + public long position(Clob pattern, long start) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "position(Clob,start)"); + } } } diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftConnectionImpl.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftConnectionImpl.java index 03a15da..239f9a7 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftConnectionImpl.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftConnectionImpl.java @@ -91,7 +91,7 @@ import java.util.concurrent.Executor; public class RedshiftConnectionImpl implements BaseConnection { - + private ResourceLock lock = new ResourceLock(); private RedshiftLogger logger; private static final Set SUPPORTED_BINARY_OIDS = getSupportedBinaryOids(); private static final SQLPermission SQL_PERMISSION_ABORT = new SQLPermission("callAbort"); @@ -955,7 +955,8 @@ public String nativeSQL(String sql) throws SQLException { } @Override - public synchronized SQLWarning getWarnings() throws SQLException { + public SQLWarning getWarnings() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); SQLWarning newWarnings = queryExecutor.getWarnings(); // NB: also clears them. if (firstWarning == null) { @@ -965,13 +966,16 @@ public synchronized SQLWarning getWarnings() throws SQLException { } return firstWarning; + } } @Override - public synchronized void clearWarnings() throws SQLException { + public void clearWarnings() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); queryExecutor.getWarnings(); // Clear and discard. firstWarning = null; + } } public void setDatabaseMetadataCurrentDbOnly(boolean databaseMetadataCurrentDbOnly) throws SQLException { @@ -1475,18 +1479,22 @@ protected void abort() { queryExecutor.abort(); } - private synchronized Timer getTimer() { + private Timer getTimer() { + try (ResourceLock ignore = lock.obtain()) { if (cancelTimer == null) { cancelTimer = Driver.getSharedTimer().getTimer(); } return cancelTimer; + } } - private synchronized void releaseTimer() { + private void releaseTimer() { + try (ResourceLock ignore = lock.obtain()) { if (cancelTimer != null) { cancelTimer = null; Driver.getSharedTimer().releaseTimer(); } + } } @Override @@ -1776,7 +1784,7 @@ public boolean isValid(int timeout) throws SQLException { else { PreparedStatement checkConnectionQuery; - synchronized (this) + try (ResourceLock ignore = lock.obtain()) { checkConnectionQuery = prepareStatement(""); } diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftPreparedStatement.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftPreparedStatement.java index 816a23a..201422c 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftPreparedStatement.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftPreparedStatement.java @@ -74,7 +74,7 @@ public class RedshiftPreparedStatement extends RedshiftStatementImpl implements PreparedStatement { protected final CachedQuery preparedQuery; // Query fragments for prepared statement. protected final ParameterList preparedParameters; // Parameter values for prepared statement. - + private final ResourceLock lock = new ResourceLock(); private TimeZone defaultTimeZone; protected boolean enableGeneratedName; @@ -201,7 +201,7 @@ public boolean executeWithFlags(int flags) throws SQLException { execute(preparedQuery, preparedParameters, flags); - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); return (result != null && result.getResultSet() != null); } diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftResultSet.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftResultSet.java index 9aeaaaf..8539c12 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftResultSet.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftResultSet.java @@ -85,7 +85,7 @@ import java.util.concurrent.TimeUnit; public class RedshiftResultSet implements ResultSet, com.amazon.redshift.RedshiftRefCursorResultSet { - + private final ResourceLock lock = new ResourceLock(); // needed for updateable result set support private boolean updateable = false; private boolean doingUpdates = false; @@ -989,7 +989,8 @@ public void setFetchDirection(int direction) throws SQLException { this.fetchdirection = direction; } - public synchronized void cancelRowUpdates() throws SQLException { + public void cancelRowUpdates() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (onInsertRow) { throw new RedshiftException(GT.tr("Cannot call cancelRowUpdates() when on the insert row."), @@ -1001,9 +1002,11 @@ public synchronized void cancelRowUpdates() throws SQLException { clearRowBuffer(true); } + } } - public synchronized void deleteRow() throws SQLException { + public void deleteRow() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkUpdateable(); if (onInsertRow) { @@ -1061,10 +1064,12 @@ public synchronized void deleteRow() throws SQLException { currentRow--; moveToCurrentRow(); } + } } @Override - public synchronized void insertRow() throws SQLException { + public void insertRow() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkUpdateable(); if (!onInsertRow) { @@ -1132,10 +1137,12 @@ public synchronized void insertRow() throws SQLException { // need to clear this in case of another insert clearRowBuffer(false); } + } } @Override - public synchronized void moveToCurrentRow() throws SQLException { + public void moveToCurrentRow() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkUpdateable(); if (currentRow < 0 || currentRow >= rows.size()) { @@ -1147,10 +1154,12 @@ public synchronized void moveToCurrentRow() throws SQLException { onInsertRow = false; doingUpdates = false; + } } @Override - public synchronized void moveToInsertRow() throws SQLException { + public void moveToInsertRow() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkUpdateable(); if (insertStatement != null) { @@ -1162,11 +1171,12 @@ public synchronized void moveToInsertRow() throws SQLException { onInsertRow = true; doingUpdates = false; + } } // rowBuffer is the temporary storage for the row - private synchronized void clearRowBuffer(boolean copyCurrentRow) throws SQLException { - + private void clearRowBuffer(boolean copyCurrentRow) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { // inserts want an empty array while updates want a copy of the current row if (copyCurrentRow) { rowBuffer = thisRow.updateableCopy(); @@ -1176,6 +1186,7 @@ private synchronized void clearRowBuffer(boolean copyCurrentRow) throws SQLExcep // clear the updateValues hash map for the next set of updates updateValues.clear(); + } } public boolean rowDeleted() throws SQLException { @@ -1193,8 +1204,9 @@ public boolean rowUpdated() throws SQLException { return false; } - public synchronized void updateAsciiStream(int columnIndex, java.io.InputStream x, int length) + public void updateAsciiStream(int columnIndex, java.io.InputStream x, int length) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (x == null) { updateNull(columnIndex); return; @@ -1223,15 +1235,19 @@ public synchronized void updateAsciiStream(int columnIndex, java.io.InputStream } catch (IOException ie) { throw new RedshiftException(GT.tr("Provided InputStream failed."), null, ie); } + } } - public synchronized void updateBigDecimal(int columnIndex, java.math.BigDecimal x) + public void updateBigDecimal(int columnIndex, java.math.BigDecimal x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateBinaryStream(int columnIndex, java.io.InputStream x, int length) + public void updateBinaryStream(int columnIndex, java.io.InputStream x, int length) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (x == null) { updateNull(columnIndex); return; @@ -1265,22 +1281,30 @@ public synchronized void updateBinaryStream(int columnIndex, java.io.InputStream System.arraycopy(data, 0, data2, 0, numRead); updateBytes(columnIndex, data2); } + } } - public synchronized void updateBoolean(int columnIndex, boolean x) throws SQLException { + public void updateBoolean(int columnIndex, boolean x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateByte(int columnIndex, byte x) throws SQLException { + public void updateByte(int columnIndex, byte x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, String.valueOf(x)); + } } - public synchronized void updateBytes(int columnIndex, byte[] x) throws SQLException { + public void updateBytes(int columnIndex, byte[] x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateCharacterStream(int columnIndex, java.io.Reader x, int length) + public void updateCharacterStream(int columnIndex, java.io.Reader x, int length) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (x == null) { updateNull(columnIndex); return; @@ -1305,40 +1329,57 @@ public synchronized void updateCharacterStream(int columnIndex, java.io.Reader x } catch (IOException ie) { throw new RedshiftException(GT.tr("Provided Reader failed."), null, ie); } + } } - public synchronized void updateDate(int columnIndex, java.sql.Date x) throws SQLException { + public void updateDate(int columnIndex, java.sql.Date x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateDouble(int columnIndex, double x) throws SQLException { + public void updateDouble(int columnIndex, double x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateFloat(int columnIndex, float x) throws SQLException { + public void updateFloat(int columnIndex, float x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateInt(int columnIndex, int x) throws SQLException { + public void updateInt(int columnIndex, int x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateLong(int columnIndex, long x) throws SQLException { + public void updateLong(int columnIndex, long x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateNull(int columnIndex) throws SQLException { + public void updateNull(int columnIndex) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkColumnIndex(columnIndex); String columnTypeName = getRSType(columnIndex); updateValue(columnIndex, new NullObject(columnTypeName)); + } } - public synchronized void updateObject(int columnIndex, Object x) throws SQLException { + public void updateObject(int columnIndex, Object x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateObject(int columnIndex, Object x, int scale) throws SQLException { + public void updateObject(int columnIndex, Object x, int scale) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { this.updateObject(columnIndex, x); + } } @Override @@ -1410,7 +1451,8 @@ public void refreshRow() throws SQLException { } @Override - public synchronized void updateRow() throws SQLException { + public void updateRow() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkUpdateable(); if (onInsertRow) { @@ -1493,104 +1535,150 @@ public synchronized void updateRow() throws SQLException { connection.getLogger().log(LogLevel.DEBUG, "done updates"); updateValues.clear(); doingUpdates = false; + } } - public synchronized void updateShort(int columnIndex, short x) throws SQLException { + public void updateShort(int columnIndex, short x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateString(int columnIndex, String x) throws SQLException { + public void updateString(int columnIndex, String x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateTime(int columnIndex, Time x) throws SQLException { + public void updateTime(int columnIndex, Time x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); + } } - public synchronized void updateTimestamp(int columnIndex, Timestamp x) throws SQLException { + public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateValue(columnIndex, x); - + } } - public synchronized void updateNull(String columnName) throws SQLException { + public void updateNull(String columnName) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateNull(findColumn(columnName)); + } } - public synchronized void updateBoolean(String columnName, boolean x) throws SQLException { + public void updateBoolean(String columnName, boolean x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateBoolean(findColumn(columnName), x); + } } - public synchronized void updateByte(String columnName, byte x) throws SQLException { + public void updateByte(String columnName, byte x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateByte(findColumn(columnName), x); + } } - public synchronized void updateShort(String columnName, short x) throws SQLException { + public void updateShort(String columnName, short x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateShort(findColumn(columnName), x); + } } - public synchronized void updateInt(String columnName, int x) throws SQLException { + public void updateInt(String columnName, int x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateInt(findColumn(columnName), x); + } } - public synchronized void updateLong(String columnName, long x) throws SQLException { + public void updateLong(String columnName, long x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateLong(findColumn(columnName), x); + } } - public synchronized void updateFloat(String columnName, float x) throws SQLException { + public void updateFloat(String columnName, float x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateFloat(findColumn(columnName), x); + } } - public synchronized void updateDouble(String columnName, double x) throws SQLException { + public void updateDouble(String columnName, double x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateDouble(findColumn(columnName), x); + } } - public synchronized void updateBigDecimal(String columnName, BigDecimal x) throws SQLException { + public void updateBigDecimal(String columnName, BigDecimal x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateBigDecimal(findColumn(columnName), x); + } } - public synchronized void updateString(String columnName, String x) throws SQLException { + public void updateString(String columnName, String x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateString(findColumn(columnName), x); + } } - public synchronized void updateBytes(String columnName, byte[] x) throws SQLException { + public void updateBytes(String columnName, byte[] x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateBytes(findColumn(columnName), x); + } } - public synchronized void updateDate(String columnName, java.sql.Date x) throws SQLException { + public void updateDate(String columnName, java.sql.Date x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateDate(findColumn(columnName), x); + } } - public synchronized void updateTime(String columnName, java.sql.Time x) throws SQLException { + public void updateTime(String columnName, java.sql.Time x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateTime(findColumn(columnName), x); + } } - public synchronized void updateTimestamp(String columnName, java.sql.Timestamp x) + public void updateTimestamp(String columnName, java.sql.Timestamp x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateTimestamp(findColumn(columnName), x); + } } - public synchronized void updateAsciiStream(String columnName, java.io.InputStream x, int length) + public void updateAsciiStream(String columnName, java.io.InputStream x, int length) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateAsciiStream(findColumn(columnName), x, length); + } } - public synchronized void updateBinaryStream(String columnName, java.io.InputStream x, int length) + public void updateBinaryStream(String columnName, java.io.InputStream x, int length) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateBinaryStream(findColumn(columnName), x, length); + } } - public synchronized void updateCharacterStream(String columnName, java.io.Reader reader, + public void updateCharacterStream(String columnName, java.io.Reader reader, int length) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateCharacterStream(findColumn(columnName), reader, length); + } } - public synchronized void updateObject(String columnName, Object x, int scale) + public void updateObject(String columnName, Object x, int scale) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateObject(findColumn(columnName), x); + } } - public synchronized void updateObject(String columnName, Object x) throws SQLException { + public void updateObject(String columnName, Object x) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { updateObject(findColumn(columnName), x); + } } /** @@ -2566,7 +2654,7 @@ public long getLong(int columnIndex) throws SQLException { // hold strong references to user objects (e.g. classes -> classloaders), thus it might lead to // OutOfMemory conditions. @Override - public synchronized Throwable fillInStackTrace() { + public Throwable fillInStackTrace() { return this; } }; diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftSQLXML.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftSQLXML.java index e903fff..e80c326 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftSQLXML.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftSQLXML.java @@ -51,7 +51,7 @@ import javax.xml.transform.stream.StreamSource; public class RedshiftSQLXML implements SQLXML { - + private final ResourceLock lock = new ResourceLock(); private final BaseConnection conn; private String data; // The actual data contained. private boolean initialized; // Has someone assigned the data for this object? @@ -79,13 +79,16 @@ private RedshiftSQLXML(BaseConnection conn, String data, boolean initialized) { } @Override - public synchronized void free() { - freed = true; - data = null; + public void free() { + try (ResourceLock ignore = lock.obtain()) { + freed = true; + data = null; + } } @Override - public synchronized InputStream getBinaryStream() throws SQLException { + public InputStream getBinaryStream() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); ensureInitialized(); @@ -102,10 +105,12 @@ public synchronized InputStream getBinaryStream() throws SQLException { // For this reason don't make it translatable. throw new RedshiftException("Failed to re-encode xml data.", RedshiftState.DATA_ERROR, ioe); } + } } @Override - public synchronized Reader getCharacterStream() throws SQLException { + public Reader getCharacterStream() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); ensureInitialized(); @@ -114,6 +119,7 @@ public synchronized Reader getCharacterStream() throws SQLException { } return new StringReader(data); + } } // We must implement this unsafely because that's what the @@ -123,7 +129,8 @@ public synchronized Reader getCharacterStream() throws SQLException { // ensure they are the same. // @Override - public synchronized T getSource(Class sourceClass) throws SQLException { + public T getSource(Class sourceClass) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); ensureInitialized(); @@ -164,35 +171,43 @@ public synchronized T getSource(Class sourceClass) throws throw new RedshiftException(GT.tr("Unknown XML Source class: {0}", sourceClass), RedshiftState.INVALID_PARAMETER_TYPE); + } } @Override - public synchronized String getString() throws SQLException { + public String getString() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); ensureInitialized(); return data; + } } @Override - public synchronized OutputStream setBinaryStream() throws SQLException { + public OutputStream setBinaryStream() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); initialize(); active = true; byteArrayOutputStream = new ByteArrayOutputStream(); return byteArrayOutputStream; + } } @Override - public synchronized Writer setCharacterStream() throws SQLException { + public Writer setCharacterStream() throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); initialize(); active = true; stringWriter = new StringWriter(); return stringWriter; + } } @Override - public synchronized T setResult(Class resultClass) throws SQLException { + public T setResult(Class resultClass) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); initialize(); @@ -238,13 +253,16 @@ public synchronized T setResult(Class resultClass) throws throw new RedshiftException(GT.tr("Unknown XML Result class: {0}", resultClass), RedshiftState.INVALID_PARAMETER_TYPE); + } } @Override - public synchronized void setString(String value) throws SQLException { + public void setString(String value) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { checkFreed(); initialize(); data = value; + } } private void checkFreed() throws SQLException { diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftStatementImpl.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftStatementImpl.java index ad48ef7..231d806 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftStatementImpl.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftStatementImpl.java @@ -38,7 +38,7 @@ public class RedshiftStatementImpl implements Statement, BaseStatement { private static final String[] NO_RETURNING_COLUMNS = new String[0]; - + private ResourceLock lock = new ResourceLock(); /** * Default state for use or not binary transfers. Can use only for testing purposes */ @@ -278,7 +278,7 @@ public ResultSet executeQuery(String sql) throws SQLException { } protected ResultSet getSingleResultSet() throws SQLException { - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); ResultSet firstResult = result.getResultSet(); @@ -310,7 +310,7 @@ public int executeUpdate(String sql) throws SQLException { } protected final void checkNoResultUpdate() throws SQLException { - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (this.autoGeneratedKeys == Statement.NO_GENERATED_KEYS) { @@ -323,7 +323,7 @@ protected final void checkNoResultUpdate() throws SQLException { iter = iter.getNext(); } } - } // Synchronized + } } @Override @@ -379,7 +379,7 @@ public boolean executeWithFlags(CachedQuery simpleQuery, int flags) throws SQLEx flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE; } execute(simpleQuery, null, flags); - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); return (result != null && result.getResultSet() != null); } @@ -397,7 +397,7 @@ protected void closeForNextExecution() throws SQLException { clearWarnings(); // Close any existing resultsets associated with this statement. - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { while (firstUnclosedResult != null) { RedshiftResultSet rs = (RedshiftResultSet)firstUnclosedResult.getResultSet(); if (rs != null) { @@ -511,7 +511,7 @@ private void executeInternal(CachedQuery cachedQuery, ParameterList queryParamet } StatementResultHandler handler = new StatementResultHandler(this); - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { result = null; } try { @@ -521,7 +521,7 @@ private void executeInternal(CachedQuery cachedQuery, ParameterList queryParamet } finally { killTimerTask(connection.getQueryExecutor().isRingBufferThreadRunning()); } - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); result = firstUnclosedResult = handler.getResults(); @@ -551,7 +551,7 @@ public void setCursorName(String name) throws SQLException { @Override public int getUpdateCount() throws SQLException { - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (result == null || result.getResultSet() != null) { return -1; @@ -563,7 +563,7 @@ public int getUpdateCount() throws SQLException { } public boolean getMoreResults() throws SQLException { - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (result == null) { return false; @@ -716,7 +716,7 @@ public void clearWarnings() throws SQLException { } public ResultSet getResultSet() throws SQLException { - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (result == null) { @@ -739,7 +739,7 @@ public final void close() throws SQLException connection.getLogger().logFunction(true); // closing an already closed Statement is a no-op. - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { if (isClosed) { if (RedshiftLogger.isEnable()) @@ -777,7 +777,7 @@ protected void closeImpl() throws SQLException { */ public long getLastOID() throws SQLException { - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (result == null) { return 0; @@ -974,7 +974,7 @@ private BatchResultHandler internalExecuteBatch() throws SQLException { } } - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { result = null; } @@ -985,7 +985,7 @@ private BatchResultHandler internalExecuteBatch() throws SQLException { } finally { killTimerTask(connection.getQueryExecutor().isRingBufferThreadRunning()); // There might be some rows generated even in case of failures - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (wantsGeneratedKeysAlways) { generatedKeys = new ResultWrapper(handler.getGeneratedKeys()); @@ -1042,7 +1042,7 @@ public void cancel() throws SQLException { return; } // Synchronize on connection to avoid spinning in killTimerTask - synchronized (connection) { + try (ResourceLock ignore = lock.obtain()) { try { connection.cancelQuery(); } finally { @@ -1185,7 +1185,7 @@ private void killTimerTask(boolean isRingBufferThreadRunning) { // "timeout error" // We wait till state becomes "cancelled" boolean interrupted = false; - synchronized (connection) { + try (ResourceLock ignore = lock.obtain()) { // state check is performed under synchronized so it detects "cancelled" state faster // In other words, it prevents unnecessary ".wait()" call while (!STATE_UPDATER.compareAndSet(this, StatementCancelState.CANCELLED, StatementCancelState.IDLE)) { @@ -1211,7 +1211,7 @@ protected boolean getForceBinaryTransfer() { //JCP! if mvn.project.property.redshift.jdbc.spec >= "JDBC4.2" @Override public long getLargeUpdateCount() throws SQLException { - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (result == null || result.getResultSet() != null) { return -1; @@ -1388,7 +1388,7 @@ protected void checkCompletion() throws SQLException { return; } - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { ResultWrapper result = firstUnclosedResult; while (result != null) { if (result.getResultSet() != null && !result.getResultSet().isClosed()) { @@ -1409,7 +1409,7 @@ protected void checkCompletion() throws SQLException { } public boolean getMoreResults(int current) throws SQLException { - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); // CLOSE_CURRENT_RESULT if (current == Statement.CLOSE_CURRENT_RESULT && result != null @@ -1439,7 +1439,7 @@ public boolean getMoreResults(int current) throws SQLException { } public ResultSet getGeneratedKeys() throws SQLException { - synchronized (this) { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); if (generatedKeys == null || generatedKeys.getResultSet() == null) { return createDriverResultSet(new Field[0], new ArrayList()); diff --git a/src/main/java/com/amazon/redshift/jdbc/ResourceLock.java b/src/main/java/com/amazon/redshift/jdbc/ResourceLock.java new file mode 100644 index 0000000..eb6353b --- /dev/null +++ b/src/main/java/com/amazon/redshift/jdbc/ResourceLock.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2005, PostgreSQL Global Development Group + * See the LICENSE file in the project root for more information. + */ + +package com.amazon.redshift.jdbc; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * Extends a ReentrantLock for use in try-with-resources block. + * + *

Example use

+ *
{@code
+ *
+ *   try (ResourceLock ignore = lock.obtain()) {
+ *     // do something while holding the resource lock
+ *   }
+ *
+ * }
+ */ +public final class ResourceLock extends ReentrantLock implements AutoCloseable { + private static final long serialVersionUID = 8459051451899973878L; + + /** + * Obtain a lock and return the ResourceLock for use in try-with-resources block. + */ + public ResourceLock obtain() { + lock(); + return this; + } + + /** + * Unlock on exit of try-with-resources block. + */ + @Override + public void close() { + this.unlock(); + } +} \ No newline at end of file diff --git a/src/main/java/com/amazon/redshift/jdbc/TimestampUtils.java b/src/main/java/com/amazon/redshift/jdbc/TimestampUtils.java index 1dc6c49..25f788f 100644 --- a/src/main/java/com/amazon/redshift/jdbc/TimestampUtils.java +++ b/src/main/java/com/amazon/redshift/jdbc/TimestampUtils.java @@ -64,9 +64,9 @@ public class TimestampUtils { private static final LocalDateTime MIN_LOCAL_DATETIME = MIN_LOCAL_DATE.atStartOfDay(); private static final OffsetDateTime MIN_OFFSET_DATETIME = MIN_LOCAL_DATETIME.atOffset(ZoneOffset.UTC); //JCP! endif - + private final ResourceLock lock = new ResourceLock(); private static final Field DEFAULT_TIME_ZONE_FIELD; - + private TimeZone prevDefaultZoneFieldValue; private TimeZone defaultTimeZoneCache; @@ -383,7 +383,8 @@ private ParsedTimestamp parseBackendTimestamp(String str) throws SQLException { * @return null if s is null or a timestamp of the parsed string s. * @throws SQLException if there is a problem parsing s. */ - public synchronized Timestamp toTimestamp(Calendar cal, String s) throws SQLException { + public Timestamp toTimestamp(Calendar cal, String s) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (s == null) { return null; } @@ -442,6 +443,7 @@ public synchronized Timestamp toTimestamp(Calendar cal, String s) throws SQLExce result.setNanos(ts.nanos); return result; + } } //JCP! if mvn.project.property.redshift.jdbc.spec >= "JDBC4.2" @@ -584,7 +586,8 @@ public OffsetDateTime toOffsetDateTimeBin(byte[] bytes) throws RedshiftException //JCP! endif - public synchronized Time toTime(Calendar cal, String s) throws SQLException { + public Time toTime(Calendar cal, String s) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { // 1) Parse backend string if (s == null) { return null; @@ -627,9 +630,11 @@ public synchronized Time toTime(Calendar cal, String s) throws SQLException { // 2) Truncate date part so in given time zone the date would be formatted as 01/01/1970 Time timeObj = convertToTime(timeMillis, useCal.getTimeZone()); return (ts.nanos > 0) ? new RedshiftTime(timeObj, ts.nanos) : timeObj; + } } - public synchronized Date toDate(Calendar cal, String s) throws SQLException { + public Date toDate(Calendar cal, String s) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { // 1) Parse backend string Timestamp timestamp = toTimestamp(cal, s); @@ -640,6 +645,7 @@ public synchronized Date toDate(Calendar cal, String s) throws SQLException { // Note: infinite dates are handled in convertToDate // 2) Truncate date part so in given time zone the date would be formatted as 00:00 return convertToDate(timestamp.getTime(), cal == null ? null : cal.getTimeZone()); + } } private Calendar setupCalendar(Calendar cal) { @@ -672,12 +678,15 @@ private static boolean nanosExceed499(int nanos) { return nanos % 1000 > 499; } - public synchronized String toString(Calendar cal, Timestamp x) { + public String toString(Calendar cal, Timestamp x) { + try (ResourceLock ignore = lock.obtain()) { return toString(cal, x, true); + } } - public synchronized String toString(Calendar cal, Timestamp x, + public String toString(Calendar cal, Timestamp x, boolean withTimeZone) { + try (ResourceLock ignore = lock.obtain()) { if (x.getTime() == RedshiftStatement.DATE_POSITIVE_INFINITY) { return "infinity"; } else if (x.getTime() == RedshiftStatement.DATE_NEGATIVE_INFINITY) { @@ -711,14 +720,18 @@ public synchronized String toString(Calendar cal, Timestamp x, appendEra(sbuf, cal); return sbuf.toString(); + } } - public synchronized String toString(Calendar cal, Date x) { + public String toString(Calendar cal, Date x) { + try (ResourceLock ignore = lock.obtain()) { return toString(cal, x, true); + } } - public synchronized String toString(Calendar cal, Date x, + public String toString(Calendar cal, Date x, boolean withTimeZone) { + try (ResourceLock ignore = lock.obtain()) { if (x.getTime() == RedshiftStatement.DATE_POSITIVE_INFINITY) { return "infinity"; } else if (x.getTime() == RedshiftStatement.DATE_NEGATIVE_INFINITY) { @@ -738,14 +751,18 @@ public synchronized String toString(Calendar cal, Date x, } return sbuf.toString(); + } } - public synchronized String toString(Calendar cal, Time x) { + public String toString(Calendar cal, Time x) { + try (ResourceLock ignore = lock.obtain()) { return toString(cal, x, true); + } } - public synchronized String toString(Calendar cal, Time x, + public String toString(Calendar cal, Time x, boolean withTimeZone) { + try (ResourceLock ignore = lock.obtain()) { cal = setupCalendar(cal); cal.setTime(x); @@ -766,6 +783,7 @@ public synchronized String toString(Calendar cal, Time x, } return sbuf.toString(); + } } private static void appendDate(StringBuilder sb, Calendar cal) { @@ -877,7 +895,8 @@ private static void appendEra(StringBuilder sb, Calendar cal) { } //JCP! if mvn.project.property.redshift.jdbc.spec >= "JDBC4.2" - public synchronized String toString(LocalDate localDate) { + public String toString(LocalDate localDate) { + try (ResourceLock ignore = lock.obtain()) { if (LocalDate.MAX.equals(localDate)) { return "infinity"; } else if (localDate.isBefore(MIN_LOCAL_DATE)) { @@ -890,10 +909,11 @@ public synchronized String toString(LocalDate localDate) { appendEra(sbuf, localDate); return sbuf.toString(); + } } - public synchronized String toString(LocalTime localTime) { - + public String toString(LocalTime localTime) { + try (ResourceLock ignore = lock.obtain()) { sbuf.setLength(0); if (localTime.isAfter(MAX_TIME)) { @@ -909,9 +929,11 @@ public synchronized String toString(LocalTime localTime) { appendTime(sbuf, localTime); return sbuf.toString(); + } } - public synchronized String toString(OffsetDateTime offsetDateTime) { + public String toString(OffsetDateTime offsetDateTime) { + try (ResourceLock ignore = lock.obtain()) { if (offsetDateTime.isAfter(MAX_OFFSET_DATETIME)) { return "infinity"; } else if (offsetDateTime.isBefore(MIN_OFFSET_DATETIME)) { @@ -935,6 +957,7 @@ public synchronized String toString(OffsetDateTime offsetDateTime) { appendEra(sbuf, localDate); return sbuf.toString(); + } } /** @@ -943,7 +966,8 @@ public synchronized String toString(OffsetDateTime offsetDateTime) { * @param localDateTime The local date to format as a String * @return The formatted local date */ - public synchronized String toString(LocalDateTime localDateTime) { + public String toString(LocalDateTime localDateTime) { + try (ResourceLock ignore = lock.obtain()) { if (localDateTime.isAfter(MAX_LOCAL_DATETIME)) { return "infinity"; } else if (localDateTime.isBefore(MIN_LOCAL_DATETIME)) { @@ -953,6 +977,7 @@ public synchronized String toString(LocalDateTime localDateTime) { // LocalDateTime is always passed with time zone so backend can decide between timestamp and timestamptz ZonedDateTime zonedDateTime = localDateTime.atZone(getDefaultTz().toZoneId()); return toString(zonedDateTime.toOffsetDateTime()); + } } private static void appendDate(StringBuilder sb, LocalDate localDate) { diff --git a/src/main/java/com/amazon/redshift/jdbc/TypeInfoCache.java b/src/main/java/com/amazon/redshift/jdbc/TypeInfoCache.java index ef4441a..607a075 100644 --- a/src/main/java/com/amazon/redshift/jdbc/TypeInfoCache.java +++ b/src/main/java/com/amazon/redshift/jdbc/TypeInfoCache.java @@ -59,7 +59,8 @@ public class TypeInfoCache implements TypeInfo { private PreparedStatement getArrayDelimiterStatement; private PreparedStatement getTypeInfoStatement; private PreparedStatement getAllTypeInfoStatement; - + private final ResourceLock lock = new ResourceLock(); + // Geometry public static final String GEOMETRY_NAME = "geometry"; public static final int GEOMETRYOID = Oid.GEOMETRY; @@ -183,8 +184,9 @@ public TypeInfoCache(BaseConnection conn, int unknownLength) { rsNameToJavaClass.put("hstore", Map.class.getName()); } - public synchronized void addCoreType(String rsTypeName, Integer oid, Integer sqlType, + public void addCoreType(String rsTypeName, Integer oid, Integer sqlType, String javaClass, Integer arrayOid) { + try (ResourceLock ignore = lock.obtain()) { rsNameToJavaClass.put(rsTypeName, javaClass); rsNameToOid.put(rsTypeName, oid); oidToRsName.put(oid, rsTypeName); @@ -210,12 +212,15 @@ public synchronized void addCoreType(String rsTypeName, Integer oid, Integer sql rsNameToOid.put(pgArrayTypeName, arrayOid); oidToRsName.put(arrayOid, pgArrayTypeName); } + } } - public synchronized void addDataType(String type, Class klass) + public void addDataType(String type, Class klass) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { rsNameToRsObject.put(type, klass); rsNameToJavaClass.put(type, klass.getName()); + } } public Iterator getRSTypeNamesWithSQLTypes() { @@ -308,7 +313,8 @@ public int getSQLType(int oid) throws SQLException { return getSQLType(getRSType(oid)); } - public synchronized int getSQLType(String pgTypeName) throws SQLException { + public int getSQLType(String pgTypeName) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (pgTypeName.endsWith("[]")) { return Types.ARRAY; } @@ -343,6 +349,7 @@ public synchronized int getSQLType(String pgTypeName) throws SQLException { rsNameToSQLType.put(pgTypeName, type); return type; + } } private PreparedStatement getOidStatement(String pgTypeName) throws SQLException { @@ -452,7 +459,8 @@ private PreparedStatement getOidStatement(String pgTypeName) throws SQLException return oidStatementComplex; } - public synchronized int getRSType(String pgTypeName) throws SQLException { + public int getRSType(String pgTypeName) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { Integer oid = rsNameToOid.get(pgTypeName); if (oid != null) { return oid; @@ -481,9 +489,11 @@ public synchronized int getRSType(String pgTypeName) throws SQLException { rs.close(); return oid; + } } - public synchronized String getRSType(int oid) throws SQLException { + public String getRSType(int oid) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (oid == Oid.UNSPECIFIED) { return null; } @@ -536,6 +546,7 @@ public synchronized String getRSType(int oid) throws SQLException { rs.close(); return rsTypeName; + } } public int getRSArrayType(String elementTypeName) throws SQLException { @@ -552,15 +563,18 @@ public int getRSArrayType(String elementTypeName) throws SQLException { * @param oid input oid * @return oid of the array's base element or the provided oid (if not array) */ - protected synchronized int convertArrayToBaseOid(int oid) { + protected int convertArrayToBaseOid(int oid) { + try (ResourceLock ignore = lock.obtain()) { Integer i = rsArrayToRsType.get(oid); if (i == null) { return oid; } return i; + } } - public synchronized char getArrayDelimiter(int oid) throws SQLException { + public char getArrayDelimiter(int oid) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (oid == Oid.UNSPECIFIED) { return ','; } @@ -601,9 +615,11 @@ public synchronized char getArrayDelimiter(int oid) throws SQLException { rs.close(); return delim; + } } - public synchronized int getRSArrayElement(int oid) throws SQLException { + public int getRSArrayElement(int oid) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { if (oid == Oid.UNSPECIFIED) { return Oid.UNSPECIFIED; } @@ -656,13 +672,17 @@ public synchronized int getRSArrayElement(int oid) throws SQLException { rs.close(); return rsType; + } } - public synchronized Class getRSobject(String type) { - return rsNameToRsObject.get(type); - } + public Class getRSobject(String type) { + try (ResourceLock ignore = lock.obtain()) { + return rsNameToRsObject.get(type); + } + } - public synchronized String getJavaClass(int oid) throws SQLException { + public String getJavaClass(int oid) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { String pgTypeName = getRSType(oid); String result = rsNameToJavaClass.get(pgTypeName); @@ -679,6 +699,7 @@ public synchronized String getJavaClass(int oid) throws SQLException { } return result; + } } public String getTypeForAlias(String alias) { diff --git a/src/main/java/com/amazon/redshift/largeobject/BlobInputStream.java b/src/main/java/com/amazon/redshift/largeobject/BlobInputStream.java index 90d28cc..3531b88 100644 --- a/src/main/java/com/amazon/redshift/largeobject/BlobInputStream.java +++ b/src/main/java/com/amazon/redshift/largeobject/BlobInputStream.java @@ -9,6 +9,8 @@ import java.io.InputStream; import java.sql.SQLException; +import com.amazon.redshift.jdbc.ResourceLock; + /** * This is an implementation of an InputStream from a large object. */ @@ -47,7 +49,7 @@ public class BlobInputStream extends InputStream { * The limit. */ private long limit = -1; - + private final ResourceLock lock = new ResourceLock(); /** * @param lo LargeObject to read from */ @@ -150,8 +152,10 @@ public void close() throws IOException { * invalid. * @see java.io.InputStream#reset() */ - public synchronized void mark(int readlimit) { - mpos = apos; + public void mark(int readlimit) { + try (ResourceLock ignore = lock.obtain()) { + mpos = apos; + } } /** @@ -161,7 +165,8 @@ public synchronized void mark(int readlimit) { * @see java.io.InputStream#mark(int) * @see java.io.IOException */ - public synchronized void reset() throws IOException { + public void reset() throws IOException { + try (ResourceLock ignore = lock.obtain()) { checkClosed(); try { if (mpos <= Integer.MAX_VALUE) { @@ -174,6 +179,7 @@ public synchronized void reset() throws IOException { } catch (SQLException se) { throw new IOException(se.toString()); } + } } /** diff --git a/src/main/java/com/amazon/redshift/logger/LogConsoleHandler.java b/src/main/java/com/amazon/redshift/logger/LogConsoleHandler.java index af7ca88..69f8c21 100644 --- a/src/main/java/com/amazon/redshift/logger/LogConsoleHandler.java +++ b/src/main/java/com/amazon/redshift/logger/LogConsoleHandler.java @@ -2,26 +2,34 @@ import java.io.PrintWriter; +import com.amazon.redshift.jdbc.ResourceLock; + public class LogConsoleHandler implements LogHandler { - + private final ResourceLock lock = new ResourceLock(); private final PrintWriter writer = new PrintWriter(System.out); @Override - public synchronized void write(String message) throws Exception + public void write(String message) throws Exception { + try (ResourceLock ignore = lock.obtain()) { writer.println(message); writer.flush(); + } } @Override - public synchronized void close() throws Exception { + public void close() throws Exception { + try (ResourceLock ignore = lock.obtain()) { // Do nothing as Writer is on the stdout. + } } @Override - public synchronized void flush() { + public void flush() { + try (ResourceLock ignore = lock.obtain()) { if (writer != null) { writer.flush(); } + } } } diff --git a/src/main/java/com/amazon/redshift/logger/LogFileHandler.java b/src/main/java/com/amazon/redshift/logger/LogFileHandler.java index 9706c60..8b037ef 100644 --- a/src/main/java/com/amazon/redshift/logger/LogFileHandler.java +++ b/src/main/java/com/amazon/redshift/logger/LogFileHandler.java @@ -10,6 +10,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.util.GT; import com.amazon.redshift.util.RedshiftException; import com.amazon.redshift.util.RedshiftState; @@ -48,7 +49,7 @@ public class LogFileHandler implements LogHandler { private PrintWriter writer = null; private boolean flushAfterWrite; - + private final ResourceLock lock = new ResourceLock(); public LogFileHandler(String filename, boolean flushAfterWrite, String maxLogFileSize, @@ -69,8 +70,9 @@ public LogFileHandler(String filename, } @Override - public synchronized void write(String message) throws Exception + public void write(String message) throws Exception { + try (ResourceLock ignore = lock.obtain()) { writer.println(message); if (flushAfterWrite) writer.flush(); @@ -84,20 +86,25 @@ public synchronized void write(String message) throws Exception openFile(); } } + } } @Override - public synchronized void close() throws Exception { + public void close() throws Exception { + try (ResourceLock ignore = lock.obtain()) { if (writer != null) { writer.close(); } + } } @Override - public synchronized void flush() { + public void flush() { + try (ResourceLock ignore = lock.obtain()) { if (writer != null) { writer.flush(); } + } } private void createDirectory() throws RedshiftException diff --git a/src/main/java/com/amazon/redshift/logger/LogWriterHandler.java b/src/main/java/com/amazon/redshift/logger/LogWriterHandler.java index 1c645dc..ad52f91 100644 --- a/src/main/java/com/amazon/redshift/logger/LogWriterHandler.java +++ b/src/main/java/com/amazon/redshift/logger/LogWriterHandler.java @@ -3,35 +3,43 @@ import java.io.IOException; import java.io.Writer; +import com.amazon.redshift.jdbc.ResourceLock; + public class LogWriterHandler implements LogHandler { - - private final Writer writer; - - public LogWriterHandler(Writer inWriter) throws Exception { - writer = inWriter; - } - - @Override - public synchronized void write(String message) throws Exception - { - writer.write(message); - writer.flush(); - } - - @Override - public synchronized void close() throws Exception { - // Do nothing as Writer is not created by the JDBC driver. - } - - @Override - public synchronized void flush() { - if (writer != null) { - try { - writer.flush(); - } catch (IOException e) { - // Ignore + + private final Writer writer; + private final ResourceLock lock = new ResourceLock(); + + public LogWriterHandler(Writer inWriter) throws Exception { + writer = inWriter; + } + + @Override + public void write(String message) throws Exception { + try (ResourceLock ignore = lock.obtain()) { + writer.write(message); + writer.flush(); + } + } + + @Override + public void close() throws Exception { + try (ResourceLock ignore = lock.obtain()) { + // Do nothing as Writer is not created by the JDBC driver. + } + } + + @Override + public void flush() { + try (ResourceLock ignore = lock.obtain()) { + if (writer != null) { + try { + writer.flush(); + } catch (IOException e) { + // Ignore + } } - } - } - + } + } + } diff --git a/src/main/java/com/amazon/redshift/plugin/BrowserOktaSAMLCredentialsProvider.java b/src/main/java/com/amazon/redshift/plugin/BrowserOktaSAMLCredentialsProvider.java index 6a0fbea..19ef03e 100644 --- a/src/main/java/com/amazon/redshift/plugin/BrowserOktaSAMLCredentialsProvider.java +++ b/src/main/java/com/amazon/redshift/plugin/BrowserOktaSAMLCredentialsProvider.java @@ -2,7 +2,7 @@ import com.amazon.redshift.INativePlugin; import com.amazon.redshift.NativeTokenHolder; -import com.amazon.redshift.core.IamHelper; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.logger.LogLevel; import com.amazon.redshift.logger.RedshiftLogger; import com.amazon.redshift.plugin.httpserver.RequestHandler; @@ -69,7 +69,7 @@ public class BrowserOktaSAMLCredentialsProvider extends IdpCredentialsProvider i private NativeTokenHolder m_lastRefreshCredentials; // Used when cache is disable. protected Boolean m_disableCache = false; - + private ResourceLock lock = new ResourceLock(); /** * The custom log factory class. */ @@ -218,7 +218,7 @@ public NativeTokenHolder getCredentials() throws RedshiftException { if(RedshiftLogger.isEnable()) m_log.logInfo("SAML getCredentials NOT from cache"); - synchronized(this) { + try(ResourceLock ignore = lock.obtain()) { refresh(); if(m_disableCache) { diff --git a/src/main/java/com/amazon/redshift/plugin/CommonCredentialsProvider.java b/src/main/java/com/amazon/redshift/plugin/CommonCredentialsProvider.java index f7da6b1..c9a144e 100644 --- a/src/main/java/com/amazon/redshift/plugin/CommonCredentialsProvider.java +++ b/src/main/java/com/amazon/redshift/plugin/CommonCredentialsProvider.java @@ -16,6 +16,7 @@ import com.amazon.redshift.INativePlugin; import com.amazon.redshift.NativeTokenHolder; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.logger.LogLevel; import com.amazon.redshift.logger.RedshiftLogger; import com.amazon.redshift.util.RedshiftException; @@ -34,7 +35,7 @@ public abstract class CommonCredentialsProvider extends IdpCredentialsProvider i private static final Map m_cache = new HashMap(); protected Boolean m_disableCache = true; private NativeTokenHolder m_lastRefreshCredentials; // Used when cache is disabled - + private ResourceLock lock = new ResourceLock(); /** * Log properties file name. */ @@ -103,7 +104,7 @@ public NativeTokenHolder getCredentials() throws RedshiftException { } } - synchronized (this) { + try(ResourceLock ignore = lock.obtain()) { refresh(); if (m_disableCache) { diff --git a/src/main/java/com/amazon/redshift/plugin/JwtCredentialsProvider.java b/src/main/java/com/amazon/redshift/plugin/JwtCredentialsProvider.java index 1e74e34..35cc2c9 100755 --- a/src/main/java/com/amazon/redshift/plugin/JwtCredentialsProvider.java +++ b/src/main/java/com/amazon/redshift/plugin/JwtCredentialsProvider.java @@ -1,12 +1,5 @@ package com.amazon.redshift.plugin; -import com.amazon.redshift.RedshiftProperty; -import com.amazon.redshift.INativePlugin; -import com.amazon.redshift.NativeTokenHolder; -import com.amazon.redshift.core.IamHelper; -import com.amazon.redshift.logger.RedshiftLogger; -import com.amazon.redshift.util.RedshiftException; - import java.io.IOException; import java.net.URL; import java.util.Collections; @@ -17,6 +10,13 @@ import org.apache.commons.logging.LogFactory; +import com.amazon.redshift.INativePlugin; +import com.amazon.redshift.NativeTokenHolder; +import com.amazon.redshift.RedshiftProperty; +import com.amazon.redshift.jdbc.ResourceLock; +import com.amazon.redshift.logger.RedshiftLogger; +import com.amazon.redshift.util.RedshiftException; + public abstract class JwtCredentialsProvider extends IdpCredentialsProvider implements INativePlugin { @@ -28,7 +28,7 @@ public abstract class JwtCredentialsProvider extends IdpCredentialsProvider impl private static Map m_cache = new HashMap(); private NativeTokenHolder m_lastRefreshCredentials; // Used when cache is disable. - + private final ResourceLock lock = new ResourceLock(); /** * The custom log factory class. */ @@ -132,7 +132,7 @@ public NativeTokenHolder getCredentials() throws RedshiftException if(RedshiftLogger.isEnable()) m_log.logInfo("JWT getCredentials NOT from cache"); - synchronized(this) { + try(ResourceLock ignore = lock.obtain()) { refresh(); if(m_disableCache) { diff --git a/src/main/java/com/amazon/redshift/plugin/SamlCredentialsProvider.java b/src/main/java/com/amazon/redshift/plugin/SamlCredentialsProvider.java index 85c6c38..b7fff19 100755 --- a/src/main/java/com/amazon/redshift/plugin/SamlCredentialsProvider.java +++ b/src/main/java/com/amazon/redshift/plugin/SamlCredentialsProvider.java @@ -1,32 +1,7 @@ package com.amazon.redshift.plugin; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.SdkClientException; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.AnonymousAWSCredentials; -import com.amazonaws.auth.BasicSessionCredentials; -import com.amazonaws.services.securitytoken.AWSSecurityTokenService; -import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; -import com.amazonaws.services.securitytoken.model.AssumeRoleWithSAMLRequest; -import com.amazonaws.services.securitytoken.model.AssumeRoleWithSAMLResult; -import com.amazonaws.services.securitytoken.model.Credentials; -import com.amazonaws.util.StringUtils; -import com.amazon.redshift.CredentialsHolder; -import com.amazon.redshift.CredentialsHolder.IamMetadata; -import com.amazon.redshift.IPlugin; -import com.amazon.redshift.RedshiftProperty; -import com.amazon.redshift.core.IamHelper; -import com.amazon.redshift.httpclient.log.IamCustomLogFactory; -import com.amazon.redshift.logger.LogLevel; -import com.amazon.redshift.logger.RedshiftLogger; -import com.amazon.redshift.plugin.utils.RequestUtils; - import java.io.ByteArrayInputStream; import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URI; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -39,6 +14,7 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; + import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; @@ -46,6 +22,7 @@ import javax.xml.xpath.XPathConstants; import javax.xml.xpath.XPathExpressionException; import javax.xml.xpath.XPathFactory; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.LogFactory; import org.w3c.dom.Document; @@ -53,6 +30,29 @@ import org.w3c.dom.NodeList; import org.xml.sax.SAXException; +import com.amazon.redshift.CredentialsHolder; +import com.amazon.redshift.CredentialsHolder.IamMetadata; +import com.amazon.redshift.IPlugin; +import com.amazon.redshift.RedshiftProperty; +import com.amazon.redshift.core.IamHelper; +import com.amazon.redshift.httpclient.log.IamCustomLogFactory; +import com.amazon.redshift.jdbc.ResourceLock; +import com.amazon.redshift.logger.RedshiftLogger; +import com.amazon.redshift.plugin.utils.RequestUtils; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.auth.BasicSessionCredentials; +import com.amazonaws.services.securitytoken.AWSSecurityTokenService; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import com.amazonaws.services.securitytoken.model.AssumeRoleWithSAMLRequest; +import com.amazonaws.services.securitytoken.model.AssumeRoleWithSAMLResult; +import com.amazonaws.services.securitytoken.model.Credentials; +import com.amazonaws.util.StringUtils; + public abstract class SamlCredentialsProvider extends IdpCredentialsProvider implements IPlugin { protected static final String KEY_IDP_HOST = "idp_host"; @@ -75,6 +75,7 @@ public abstract class SamlCredentialsProvider extends IdpCredentialsProvider imp protected String m_region; protected Boolean m_disableCache = false; protected Boolean m_groupFederation = false; + private final ResourceLock lock = new ResourceLock(); private static Map m_cache = new HashMap(); @@ -235,7 +236,7 @@ public CredentialsHolder getCredentials() if(RedshiftLogger.isEnable()) m_log.logInfo("SAML getCredentials NOT from cache"); - synchronized(this) { + try(ResourceLock ignore = lock.obtain()) { refresh(); diff --git a/src/main/java/com/amazon/redshift/ssl/PKCS12KeyManager.java b/src/main/java/com/amazon/redshift/ssl/PKCS12KeyManager.java index e077b99..11b7ac8 100644 --- a/src/main/java/com/amazon/redshift/ssl/PKCS12KeyManager.java +++ b/src/main/java/com/amazon/redshift/ssl/PKCS12KeyManager.java @@ -5,10 +5,6 @@ package com.amazon.redshift.ssl; -import com.amazon.redshift.util.GT; -import com.amazon.redshift.util.RedshiftException; -import com.amazon.redshift.util.RedshiftState; - import java.io.File; import java.io.FileInputStream; import java.net.Socket; @@ -26,6 +22,11 @@ import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.x500.X500Principal; +import com.amazon.redshift.jdbc.ResourceLock; +import com.amazon.redshift.util.GT; +import com.amazon.redshift.util.RedshiftException; +import com.amazon.redshift.util.RedshiftState; + public class PKCS12KeyManager implements X509KeyManager { private final CallbackHandler cbh; @@ -33,6 +34,7 @@ public class PKCS12KeyManager implements X509KeyManager { private final String keyfile; private final KeyStore keyStore; boolean keystoreLoaded = false; + private final ResourceLock lock = new ResourceLock(); public PKCS12KeyManager(String pkcsFile, CallbackHandler cbh) throws RedshiftException { try { @@ -139,8 +141,8 @@ public PrivateKey getPrivateKey(String s) { return null; } - private synchronized void loadKeyStore() throws Exception { - + private void loadKeyStore() throws Exception { + try (ResourceLock ignore = lock.obtain()) { if (keystoreLoaded) { return; } @@ -167,5 +169,5 @@ private synchronized void loadKeyStore() throws Exception { keyStore.load(new FileInputStream(new File(keyfile)), pwdcb.getPassword()); keystoreLoaded = true; } - + } } diff --git a/src/main/java/com/amazon/redshift/util/LruCache.java b/src/main/java/com/amazon/redshift/util/LruCache.java index af65cab..649211d 100644 --- a/src/main/java/com/amazon/redshift/util/LruCache.java +++ b/src/main/java/com/amazon/redshift/util/LruCache.java @@ -10,6 +10,8 @@ import java.util.LinkedHashMap; import java.util.Map; +import com.amazon.redshift.jdbc.ResourceLock; + /** * Caches values in simple least-recently-accessed order. */ @@ -38,6 +40,7 @@ public interface CreateAction { private final long maxSizeBytes; private long currentSize; private final Map cache; + private final ResourceLock lock = new ResourceLock(); private class LimitedMap extends LinkedHashMap { LimitedMap(int initialCapacity, float loadFactor, boolean accessOrder) { @@ -98,8 +101,10 @@ public LruCache(int maxSizeEntries, long maxSizeBytes, boolean accessOrder, * @param key cache key * @return entry from cache or null if cache does not contain given key. */ - public synchronized Value get(Key key) { - return cache.get(key); + public Value get(Key key) { + try (ResourceLock ignore = lock.obtain()) { + return cache.get(key); + } } /** @@ -109,13 +114,15 @@ public synchronized Value get(Key key) { * @return entry from cache or newly created entry if cache does not contain given key. * @throws SQLException if entry creation fails */ - public synchronized Value borrow(Key key) throws SQLException { - Value value = cache.remove(key); - if (value == null) { - return createAction.create(key); + public Value borrow(Key key) throws SQLException { + try (ResourceLock ignore = lock.obtain()) { + Value value = cache.remove(key); + if (value == null) { + return createAction.create(key); + } + currentSize -= value.getSize(); + return value; } - currentSize -= value.getSize(); - return value; } /** @@ -124,24 +131,26 @@ public synchronized Value borrow(Key key) throws SQLException { * @param key key * @param value value */ - public synchronized void put(Key key, Value value) { - long valueSize = value.getSize(); - if (maxSizeBytes == 0 || maxSizeEntries == 0 || valueSize * 2 > maxSizeBytes) { - // Just destroy the value if cache is disabled or if entry would consume more than a half of - // the cache - evictValue(value); - return; - } - currentSize += valueSize; - Value prev = cache.put(key, value); - if (prev == null) { - return; - } - // This should be a rare case - currentSize -= prev.getSize(); - if (prev != value) { - evictValue(prev); - } + public void put(Key key, Value value) { + try (ResourceLock ignore = lock.obtain()) { + long valueSize = value.getSize(); + if (maxSizeBytes == 0 || maxSizeEntries == 0 || valueSize * 2 > maxSizeBytes) { + // Just destroy the value if cache is disabled or if entry would consume more than a half of + // the cache + evictValue(value); + return; + } + currentSize += valueSize; + Value prev = cache.put(key, value); + if (prev == null) { + return; + } + // This should be a rare case + currentSize -= prev.getSize(); + if (prev != value) { + evictValue(prev); + } + } } /** @@ -149,10 +158,12 @@ public synchronized void put(Key key, Value value) { * * @param m The map containing entries to put into the cache */ - public synchronized void putAll(Map m) { - for (Map.Entry entry : m.entrySet()) { - this.put(entry.getKey(), entry.getValue()); - } + public void putAll(Map m) { + try (ResourceLock ignore = lock.obtain()) { + for (Map.Entry entry : m.entrySet()) { + this.put(entry.getKey(), entry.getValue()); + } + } } public static final CreateAction NOOP_CREATE_ACTION = new CreateAction() { diff --git a/src/main/java/com/amazon/redshift/util/RedshiftProperties.java b/src/main/java/com/amazon/redshift/util/RedshiftProperties.java index 99bc255..7801851 100644 --- a/src/main/java/com/amazon/redshift/util/RedshiftProperties.java +++ b/src/main/java/com/amazon/redshift/util/RedshiftProperties.java @@ -1,6 +1,7 @@ package com.amazon.redshift.util; import com.amazon.redshift.RedshiftProperty; +import com.amazon.redshift.jdbc.ResourceLock; import java.util.Locale; import java.util.Properties; @@ -8,7 +9,7 @@ import java.util.Collections; public class RedshiftProperties extends Properties { - + private ResourceLock lock = new ResourceLock(); /** * Creates an empty property list with no default values. */ @@ -70,9 +71,11 @@ public String getProperty(String key) { } @Override - public synchronized Object setProperty(String key, String value) + public Object setProperty(String key, String value) { - return super.setProperty(key.toLowerCase(Locale.ENGLISH), value); + try(ResourceLock ignore = lock.obtain()) { + return super.setProperty(key.toLowerCase(Locale.ENGLISH), value); + } } public static void evaluateProperties(RedshiftProperties properties) throws RedshiftException diff --git a/src/main/java/com/amazon/redshift/util/RedshiftTimestamp.java b/src/main/java/com/amazon/redshift/util/RedshiftTimestamp.java index 7fbb463..4c9c80f 100644 --- a/src/main/java/com/amazon/redshift/util/RedshiftTimestamp.java +++ b/src/main/java/com/amazon/redshift/util/RedshiftTimestamp.java @@ -10,6 +10,8 @@ import java.util.Arrays; import java.util.Calendar; +import com.amazon.redshift.jdbc.ResourceLock; + /** * This class augments the Java built-in Timestamp to allow for explicit setting of the time zone. */ @@ -221,9 +223,12 @@ private static long convertTimeMillis(long timeMillis, Calendar to, Calendar fro * * @return The adjusted Timestamp object. */ - private synchronized Timestamp getAdjustedTimestamp() + private Timestamp getAdjustedTimestamp() { - return getTimestamp(this, Calendar.getInstance(), calendar); + try (ResourceLock lock = new ResourceLock()) { + lock.obtain(); + return getTimestamp(this, Calendar.getInstance(), calendar); + } } /** diff --git a/src/main/java/com/amazon/redshift/util/SharedTimer.java b/src/main/java/com/amazon/redshift/util/SharedTimer.java index 8456010..8edac84 100644 --- a/src/main/java/com/amazon/redshift/util/SharedTimer.java +++ b/src/main/java/com/amazon/redshift/util/SharedTimer.java @@ -8,6 +8,7 @@ import java.util.Timer; import java.util.concurrent.atomic.AtomicInteger; +import com.amazon.redshift.jdbc.ResourceLock; import com.amazon.redshift.logger.LogLevel; import com.amazon.redshift.logger.RedshiftLogger; @@ -18,6 +19,7 @@ public class SharedTimer { private static final RedshiftLogger logger = RedshiftLogger.getDriverLogger(); private volatile Timer timer = null; private final AtomicInteger refCount = new AtomicInteger(0); + private final ResourceLock lock = new ResourceLock(); public SharedTimer() { } @@ -26,50 +28,54 @@ public int getRefCount() { return refCount.get(); } - public synchronized Timer getTimer() { - if (timer == null) { - int index = timerCount.incrementAndGet(); - - /* - Temporarily switch contextClassLoader to the one that loaded this driver to avoid TimerThread preventing current - contextClassLoader - which may be the ClassLoader of a web application - from being GC:ed. - */ - final ClassLoader prevContextCL = Thread.currentThread().getContextClassLoader(); - try { - /* - Scheduled tasks whould not need to use .getContextClassLoader, so we just reset it to null - */ - Thread.currentThread().setContextClassLoader(null); - - timer = new Timer("Redshift-JDBC-SharedTimer-" + index, true); - } finally { - Thread.currentThread().setContextClassLoader(prevContextCL); - } - } - refCount.incrementAndGet(); - return timer; + public Timer getTimer() { + try (ResourceLock ignore = lock.obtain()) { + if (timer == null) { + int index = timerCount.incrementAndGet(); + + /* + Temporarily switch contextClassLoader to the one that loaded this driver to avoid TimerThread preventing current + contextClassLoader - which may be the ClassLoader of a web application - from being GC:ed. + */ + final ClassLoader prevContextCL = Thread.currentThread().getContextClassLoader(); + try { + /* + Scheduled tasks whould not need to use .getContextClassLoader, so we just reset it to null + */ + Thread.currentThread().setContextClassLoader(null); + + timer = new Timer("Redshift-JDBC-SharedTimer-" + index, true); + } finally { + Thread.currentThread().setContextClassLoader(prevContextCL); + } + } + refCount.incrementAndGet(); + return timer; + } } - public synchronized void releaseTimer() { - int count = refCount.decrementAndGet(); - if (count > 0) { - // There are outstanding references to the timer so do nothing - if(RedshiftLogger.isEnable() && logger != null) - logger.log(LogLevel.DEBUG, "Outstanding references still exist so not closing shared Timer"); - } else if (count == 0) { - // This is the last usage of the Timer so cancel it so it's resources can be release. - if(RedshiftLogger.isEnable() && logger != null) - logger.log(LogLevel.DEBUG, "No outstanding references to shared Timer, will cancel and close it"); - if (timer != null) { - timer.cancel(); - timer = null; - } - } else { - // Should not get here under normal circumstance, probably a bug in app code. - if(RedshiftLogger.isEnable() && logger != null) - logger.log(LogLevel.INFO, - "releaseTimer() called too many times; there is probably a bug in the calling code"); - refCount.set(0); - } + public void releaseTimer() { + try (ResourceLock ignore = lock.obtain()) { + int count = refCount.decrementAndGet(); + if (count > 0) { + // There are outstanding references to the timer so do nothing + if(RedshiftLogger.isEnable() && logger != null) + logger.log(LogLevel.DEBUG, "Outstanding references still exist so not closing shared Timer"); + } else if (count == 0) { + // This is the last usage of the Timer so cancel it so it's resources can be release. + if(RedshiftLogger.isEnable() && logger != null) + logger.log(LogLevel.DEBUG, "No outstanding references to shared Timer, will cancel and close it"); + if (timer != null) { + timer.cancel(); + timer = null; + } + } else { + // Should not get here under normal circumstance, probably a bug in app code. + if(RedshiftLogger.isEnable() && logger != null) + logger.log(LogLevel.INFO, + "releaseTimer() called too many times; there is probably a bug in the calling code"); + refCount.set(0); + } + } } }