Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,29 @@
*/
package org.apache.phoenix.jdbc;

import org.apache.hadoop.hbase.client.Consistency;
import org.apache.phoenix.exception.FailoverSQLException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Reader;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
Expand All @@ -41,7 +54,9 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.text.Format;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -372,6 +387,32 @@ public PreparedStatement prepareStatement(String sql) throws SQLException {
return wrapActionDuringFailover(() -> connection.prepareStatement(sql));
}

@Override
public String getDatePattern() throws SQLException {
return wrapActionDuringFailover(() -> connection.getDatePattern());
}

@Override
public PTable getTable(@Nullable String tenantId, String fullTableName, @Nullable Long timestamp) throws SQLException {
return wrapActionDuringFailover(() -> connection.getTable(tenantId, fullTableName, timestamp));
}

@Override
public boolean isRunningUpgrade() throws SQLException {
return wrapActionDuringFailover(() -> connection.isRunningUpgrade());
}

@Override
public String getURL() throws SQLException {
return wrapActionDuringFailover(() -> connection.getURL());
}

@Override
public LogLevel getLogLevel() throws SQLException {
return wrapActionDuringFailover(() -> connection.getLogLevel());
}


@Override
public CallableStatement prepareCall(String sql) throws SQLException {
return wrapActionDuringFailover(() -> connection.prepareCall(sql));
Expand Down Expand Up @@ -623,6 +664,88 @@ public int getNetworkTimeout() throws SQLException {
return wrapActionDuringFailover(() -> connection.getNetworkTimeout());
}

@Override
public ConnectionQueryServices getQueryServices() throws SQLException {
return wrapActionDuringFailover(() -> connection.getQueryServices());
}

@Override
public PTable getTable(PTableKey key) throws SQLException {
return wrapActionDuringFailover(() -> connection.getTable(key));
}

@Override
public PTable getTable(String name) throws SQLException {
return wrapActionDuringFailover(() -> connection.getTable(name));
}

@Override
public PTable getTableNoCache(String name) throws SQLException {
return wrapActionDuringFailover(() -> connection.getTableNoCache(name));
}

@Override
public Consistency getConsistency() throws SQLException {
return wrapActionDuringFailover(() -> connection.getConsistency());
}

@Override
@Nullable
public PName getTenantId() throws SQLException {
return wrapActionDuringFailover(() -> connection.getTenantId());
}

@Override
public MutationState getMutationState() throws SQLException {
return wrapActionDuringFailover(() -> connection.getMutationState());
}

@Override
public PMetaData getMetaDataCache() throws SQLException {
return wrapActionDuringFailover(() -> connection.getMetaDataCache());
}

@Override
public int getMutateBatchSize() throws SQLException {
return wrapActionDuringFailover(() -> connection.getMutateBatchSize());
}

@Override
public int executeStatements(Reader reader, List<Object> binds, PrintStream out) throws IOException, SQLException {
return wrapActionDuringFailover(() -> {
try {
return connection.executeStatements(reader,binds,out);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

@Override
public Format getFormatter(PDataType type) throws SQLException {
return wrapActionDuringFailover(() -> connection.getFormatter(type));
}

@Override
public void setRunningUpgrade(boolean isRunningUpgrade) throws SQLException {
wrapActionDuringFailover(() -> connection.setRunningUpgrade(isRunningUpgrade));
}

@Override
public PTable getTable(String tenantId, String fullTableName) throws SQLException {
return wrapActionDuringFailover(() -> connection.getTable(tenantId, fullTableName));
}

@Override
public PTable getTableNoCache(PName tenantId, String name) throws SQLException {
return wrapActionDuringFailover(() -> connection.getTableNoCache(tenantId, name));
}

@Override
public void setIsClosing(boolean imitateIsClosing) throws SQLException {
wrapActionDuringFailover(() -> connection.setIsClosing(imitateIsClosing));
}

/**
* @return the currently wrapped connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ HAGroupInfo getGroupInfo() {
return info;
}

Properties getProperties() {
public Properties getProperties() {
return properties;
}

Expand All @@ -748,7 +748,7 @@ public ClusterRoleRecord getRoleRecord() {
* The lifecycle management is confined to this class because an HA group is a shared resource.
* Someone calling close on this would make it unusable, since the state would become closed.
*/
void close() {
public void close() {
roleManagerExecutor.shutdownNow();
try {
// TODO: Parameterize and set in future work item for pluggable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,28 @@
*/
package org.apache.phoenix.jdbc;

import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.ParallelPhoenixUtil.FutureResult;
import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Reader;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
Expand All @@ -41,6 +54,7 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.text.Format;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -679,4 +693,105 @@ public void clearMetrics() {
}
context.resetMetrics();
}

@Override
public ConnectionQueryServices getQueryServices() throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public PTable getTable(PTableKey key) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public PTable getTable(String name) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public PTable getTableNoCache(String name) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public Consistency getConsistency() throws SQLException {
throw new UnsupportedOperationException();
}

@Override
@Nullable
public PName getTenantId() throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public MutationState getMutationState() throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public PMetaData getMetaDataCache() throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public int getMutateBatchSize() throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public int executeStatements(Reader reader, List<Object> binds, PrintStream out) throws IOException, SQLException {
throw new UnsupportedOperationException();
}

@Override
public Format getFormatter(PDataType type) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public void setRunningUpgrade(boolean isRunningUpgrade) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public PTable getTable(String tenantId, String fullTableName) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public PTable getTableNoCache(PName tenantId, String name) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public void setIsClosing(boolean imitateIsClosing) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public String getDatePattern() throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public PTable getTable(@Nullable String tenantId, String fullTableName, @Nullable Long timestamp) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public boolean isRunningUpgrade() throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public String getURL() throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public LogLevel getLogLevel() throws SQLException {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use runOnConnections() for these methods, we are not using parallel policy for tests but if we want to then it will be helpful?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,26 @@

package org.apache.phoenix.jdbc;

import org.apache.hadoop.hbase.client.Consistency;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.types.PDataType;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.Format;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -47,5 +64,29 @@ public interface PhoenixMonitoredConnection extends Connection {
* metrics for individual DML.
*/
void clearMetrics();
ConnectionQueryServices getQueryServices() throws SQLException;
PTable getTable(PTableKey key) throws SQLException;
PTable getTable(String name) throws SQLException;
PTable getTableNoCache(String name) throws SQLException;
Consistency getConsistency() throws SQLException;
PName getTenantId() throws SQLException;
MutationState getMutationState() throws SQLException;
PMetaData getMetaDataCache() throws SQLException;
int getMutateBatchSize() throws SQLException;
int executeStatements(Reader reader, List<Object> binds,
PrintStream out) throws IOException, SQLException;
Format getFormatter(PDataType type) throws SQLException;
void setRunningUpgrade(boolean isRunningUpgrade) throws SQLException;
PTable getTable(String tenantId, String fullTableName)
throws SQLException;
PTable getTableNoCache(PName tenantId, String name) throws SQLException;
void setIsClosing(boolean imitateIsClosing) throws SQLException;
PreparedStatement prepareStatement(String sql) throws SQLException;
String getDatePattern() throws SQLException;
PTable getTable(@Nullable String tenantId, String fullTableName,
@Nullable Long timestamp) throws SQLException;
boolean isRunningUpgrade() throws SQLException;
String getURL() throws SQLException;
LogLevel getLogLevel() throws SQLException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ public Pair<Integer, ResultSet> call() throws SQLException {
final long startExecuteMutationTime = EnvironmentEdgeManager.currentTimeMillis();
clearResultSet();
try {
PhoenixConnection conn = getConnection();
PhoenixMonitoredConnection conn = getConnection();
if (conn.getQueryServices().isUpgradeRequired() && !conn.isRunningUpgrade()
&& stmt.getOperation() != Operation.UPGRADE) {
throw new UpgradeRequiredException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3795,7 +3795,7 @@ private void copyDataFromPhoenixTTLtoTTL(PhoenixConnection oldMetaConnection) th

}

private void moveTTLFromHBaseLevelTTLToPhoenixLevelTTL(PhoenixConnection oldMetaConnection) throws IOException {
private void moveTTLFromHBaseLevelTTLToPhoenixLevelTTL(PhoenixConnection oldMetaConnection) throws IOException, SQLException {
// Increase the timeouts so that the scan queries during Copy Data does not timeout
// on large SYSCAT Tables
Map<String, String> options = new HashMap<>();
Expand Down
Loading