Skip to content

Commit 9380d8e

Browse files
authored
PHOENIX-7638 Creating a large number of views leads to OS thread exhaustion (#2273)
1 parent 6686bab commit 9380d8e

File tree

6 files changed

+193
-21
lines changed

6 files changed

+193
-21
lines changed

phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4745,9 +4745,9 @@ public MutationState addColumn(PTable table, List<ColumnDef> origColumnDefs,
47454745
/**
47464746
* To check if TTL is defined at any of the child below we are checking it at
47474747
* {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, ColumnMutator, int, PTable, PTable, boolean)}
4748-
* level where in function {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
4749-
* validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[],
4750-
* byte[], List, int)} we are already traversing through allDescendantViews.
4748+
* level where in function
4749+
* {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], byte[], List, int)}
4750+
* we are already traversing through allDescendantViews.
47514751
*/
47524752
}
47534753

phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.ArrayList;
3434
import java.util.Collections;
3535
import java.util.List;
36-
3736
import org.apache.hadoop.hbase.Cell;
3837
import org.apache.hadoop.hbase.CellUtil;
3938
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -237,9 +236,9 @@ public static int write(Tuple result, DataOutput out) throws IOException {
237236
* @throws SQLException If any SQL operation fails.
238237
*/
239238
@SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE",
240-
justification = "Tge statement object needs to be kept open for the returned RS to be "
241-
+ "valid, however this is acceptable as not callingPhoenixStatement.close() "
242-
+ "causes no resource leak")
239+
justification = "Tge statement object needs to be kept open for the returned RS to be "
240+
+ "valid, however this is acceptable as not callingPhoenixStatement.close() "
241+
+ "causes no resource leak")
243242
public static ResultSet getResultSet(Tuple toProject, TableName tableName, Connection conn,
244243
boolean withPrefetch) throws SQLException {
245244
if (tableName == null) {

phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.phoenix.query.QueryServicesOptions;
4242
import org.apache.phoenix.util.ClientUtil;
4343
import org.apache.phoenix.util.SchemaUtil;
44+
import org.apache.phoenix.util.ServerUtil;
4445
import org.slf4j.Logger;
4546
import org.slf4j.LoggerFactory;
4647

@@ -66,10 +67,12 @@ public void start(CoprocessorEnvironment env) throws IOException {
6667

6768
@Override
6869
public void stop(CoprocessorEnvironment env) throws IOException {
70+
RegionServerCoprocessor.super.stop(env);
6971
if (uncoveredIndexThreadPool != null) {
7072
uncoveredIndexThreadPool
7173
.stop("PhoenixRegionServerEndpoint is stopping. Shutting down uncovered index threadpool.");
7274
}
75+
ServerUtil.ConnectionFactory.shutdown();
7376
}
7477

7578
@Override

phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,16 @@
4545
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
4646
import org.apache.hadoop.hbase.util.Bytes;
4747
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
48-
import org.apache.phoenix.hbase.index.util.VersionUtil;
4948
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
5049
import org.apache.phoenix.query.QueryServices;
5150
import org.apache.phoenix.query.QueryServicesOptions;
5251
import org.slf4j.Logger;
5352
import org.slf4j.LoggerFactory;
5453

5554
public class ServerUtil {
56-
private static final int COPROCESSOR_SCAN_WORKS = VersionUtil.encodeVersion("0.98.6");
5755
private static final Logger LOGGER = LoggerFactory.getLogger(ServerUtil.class);
5856
private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
5957

60-
private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment env) {
61-
return (VersionUtil.encodeVersion(env.getHBaseVersion()) >= COPROCESSOR_SCAN_WORKS);
62-
}
63-
6458
public static boolean hasCoprocessor(RegionCoprocessorEnvironment env,
6559
String CoprocessorClassName) {
6660
Collection<CoprocessorDescriptor> coprocessors =
@@ -99,17 +93,11 @@ private static Table getTableFromSingletonPool(RegionCoprocessorEnvironment env,
9993

10094
public static Table getHTableForCoprocessorScan(RegionCoprocessorEnvironment env,
10195
Table writerTable) throws IOException {
102-
if (coprocessorScanWorks(env)) {
103-
return writerTable;
104-
}
10596
return getTableFromSingletonPool(env, writerTable.getName());
10697
}
10798

10899
public static Table getHTableForCoprocessorScan(RegionCoprocessorEnvironment env,
109100
TableName tableName) throws IOException {
110-
if (coprocessorScanWorks(env)) {
111-
return env.getConnection().getTable(tableName);
112-
}
113101
return getTableFromSingletonPool(env, tableName);
114102
}
115103

@@ -222,6 +210,7 @@ public static Configuration getTypeSpecificConfiguration(ConnectionType connecti
222210

223211
public static void shutdown() {
224212
synchronized (ConnectionFactory.class) {
213+
LOGGER.info("Closing ServerUtil.ConnectionFactory connections");
225214
for (Connection connection : connections.values()) {
226215
try {
227216
connection.close();

phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,45 @@
1818
package org.apache.phoenix.end2end;
1919

2020
import static org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
21+
import static org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_CORE_SIZE;
22+
import static org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_MAX_SIZE;
23+
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
24+
import static org.junit.Assert.assertEquals;
2125

2226
import com.google.protobuf.RpcCallback;
2327
import com.google.protobuf.RpcController;
28+
import java.io.IOException;
29+
import java.lang.reflect.Field;
2430
import java.sql.Connection;
2531
import java.sql.DriverManager;
2632
import java.sql.ResultSet;
2733
import java.sql.Statement;
34+
import java.util.Arrays;
35+
import java.util.List;
2836
import java.util.Map;
37+
import java.util.Properties;
38+
import java.util.concurrent.ThreadPoolExecutor;
39+
import java.util.concurrent.TimeUnit;
40+
import org.apache.hadoop.hbase.CoprocessorEnvironment;
2941
import org.apache.hadoop.hbase.DoNotRetryIOException;
42+
import org.apache.hadoop.hbase.TableName;
43+
import org.apache.hadoop.hbase.client.ConnectionImplementation;
44+
import org.apache.hadoop.hbase.client.HTable;
45+
import org.apache.hadoop.hbase.client.Mutation;
46+
import org.apache.hadoop.hbase.client.Table;
47+
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
48+
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
3049
import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
3150
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
3251
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
3352
import org.apache.phoenix.protobuf.ProtobufUtil;
3453
import org.apache.phoenix.query.BaseTest;
3554
import org.apache.phoenix.query.QueryServices;
3655
import org.apache.phoenix.util.ClientUtil;
56+
import org.apache.phoenix.util.MetaDataUtil;
3757
import org.apache.phoenix.util.ReadOnlyProps;
3858
import org.apache.phoenix.util.SchemaUtil;
59+
import org.apache.phoenix.util.ServerUtil;
3960
import org.apache.phoenix.util.TestUtil;
4061
import org.junit.Assert;
4162
import org.junit.BeforeClass;
@@ -64,6 +85,105 @@ public static synchronized void doSetup() throws Exception {
6485
}
6586

6687
public static class TestMetaDataEndpointImpl extends MetaDataEndpointImpl {
88+
private RegionCoprocessorEnvironment env;
89+
90+
public static void setTestCreateView(boolean testCreateView) {
91+
TestMetaDataEndpointImpl.testCreateView = testCreateView;
92+
}
93+
94+
private static volatile boolean testCreateView = false;
95+
96+
@Override
97+
public void start(CoprocessorEnvironment env) throws IOException {
98+
super.start(env);
99+
if (env instanceof RegionCoprocessorEnvironment) {
100+
this.env = (RegionCoprocessorEnvironment) env;
101+
} else {
102+
throw new CoprocessorException("Must be loaded on a table region!");
103+
}
104+
}
105+
106+
@Override
107+
public void createTable(RpcController controller, MetaDataProtos.CreateTableRequest request,
108+
RpcCallback<MetaDataProtos.MetaDataResponse> done) {
109+
// Invoke the actual create table routine
110+
super.createTable(controller, request, done);
111+
112+
byte[][] rowKeyMetaData = new byte[3][];
113+
byte[] schemaName = null;
114+
byte[] tableName = null;
115+
String fullTableName = null;
116+
117+
// Get the singleton connection for testing
118+
org.apache.hadoop.hbase.client.Connection conn = ServerUtil.ConnectionFactory
119+
.getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, env);
120+
try {
121+
// Get the current table creation details
122+
List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
123+
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
124+
schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
125+
tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
126+
fullTableName = SchemaUtil.getTableName(schemaName, tableName);
127+
128+
ThreadPoolExecutor ctpe = null;
129+
ThreadPoolExecutor htpe = null;
130+
131+
// Get the thread pool executor from the connection.
132+
if (conn instanceof ConnectionImplementation) {
133+
ConnectionImplementation connImpl = ((ConnectionImplementation) conn);
134+
Field props = null;
135+
props = ConnectionImplementation.class.getDeclaredField("batchPool");
136+
props.setAccessible(true);
137+
ctpe = (ThreadPoolExecutor) props.get(connImpl);
138+
LOGGER.debug("ConnectionImplementation Thread pool info :" + ctpe.toString());
139+
140+
}
141+
142+
// Get the thread pool executor from the HTable.
143+
Table hTable =
144+
ServerUtil.getHTableForCoprocessorScan(env, TableName.valueOf(fullTableName));
145+
if (hTable instanceof HTable) {
146+
HTable testTable = (HTable) hTable;
147+
Field props = testTable.getClass().getDeclaredField("pool");
148+
props.setAccessible(true);
149+
htpe = ((ThreadPoolExecutor) props.get(hTable));
150+
LOGGER.debug("HTable Thread pool info :" + htpe.toString());
151+
// Assert the HTable thread pool config match the Connection pool configs.
152+
// Since we are not overriding any defaults, it should match the defaults.
153+
assertEquals(htpe.getMaximumPoolSize(), DEFAULT_HCONNECTION_POOL_MAX_SIZE);
154+
assertEquals(htpe.getCorePoolSize(), DEFAULT_HCONNECTION_POOL_CORE_SIZE);
155+
LOGGER.debug("HTable threadpool info {}, {}, {}, {}", htpe.getCorePoolSize(),
156+
htpe.getMaximumPoolSize(), htpe.getQueue().remainingCapacity(),
157+
htpe.getKeepAliveTime(TimeUnit.SECONDS));
158+
159+
int count = Thread.activeCount();
160+
Thread[] th = new Thread[count];
161+
// returns the number of threads put into the array
162+
Thread.enumerate(th);
163+
long hTablePoolCount =
164+
Arrays.stream(th).filter(s -> s.getName().equals("htable-pool-0")).count();
165+
// Assert no default HTable threadpools are created.
166+
assertEquals(0, hTablePoolCount);
167+
LOGGER.debug("htable-pool-0 threads {}", hTablePoolCount);
168+
}
169+
// Assert that the threadpool from Connection and HTable are the same.
170+
assertEquals(ctpe, htpe);
171+
} catch (RuntimeException | NoSuchFieldException | IllegalAccessException | IOException t) {
172+
// handle cases that an IOE is wrapped inside a RuntimeException
173+
// like HTableInterface#createHTableInterface
174+
MetaDataProtos.MetaDataResponse.Builder builder =
175+
MetaDataProtos.MetaDataResponse.newBuilder();
176+
177+
LOGGER.error("This is unexpected");
178+
ProtobufUtil.setControllerException(controller,
179+
ClientUtil.createIOException(SchemaUtil
180+
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, false)
181+
.toString(), new DoNotRetryIOException("Not allowed")));
182+
done.run(builder.build());
183+
184+
}
185+
186+
}
67187

68188
@Override
69189
public void getVersion(RpcController controller, MetaDataProtos.GetVersionRequest request,
@@ -81,6 +201,67 @@ public void getVersion(RpcController controller, MetaDataProtos.GetVersionReques
81201
}
82202
}
83203

204+
@Test
205+
public void testViewCreationAndServerConnections() throws Throwable {
206+
final String tableName = generateUniqueName();
207+
final String view01 = "v01_" + tableName;
208+
final String view02 = "v02_" + tableName;
209+
final String index_view01 = "idx_v01_" + tableName;
210+
final String index_view02 = "idx_v02_" + tableName;
211+
final String index_view03 = "idx_v03_" + tableName;
212+
final String index_view04 = "idx_v04_" + tableName;
213+
final int NUM_VIEWS = 50;
214+
215+
TestMetaDataEndpointImpl.setTestCreateView(true);
216+
try (Connection conn = DriverManager.getConnection(getUrl())) {
217+
TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", MetaDataEndpointImpl.class);
218+
TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", TestMetaDataEndpointImpl.class);
219+
220+
final Statement stmt = conn.createStatement();
221+
222+
stmt.execute("CREATE TABLE " + tableName
223+
+ " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3 VARCHAR,"
224+
+ " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))"
225+
+ " UPDATE_CACHE_FREQUENCY=ALWAYS, MULTI_TENANT=true");
226+
conn.commit();
227+
228+
for (int i = 0; i < NUM_VIEWS; i++) {
229+
Properties props = new Properties();
230+
String viewTenantId = String.format("00T%012d", i);
231+
props.setProperty(TENANT_ID_ATTRIB, viewTenantId);
232+
// Create multilevel tenant views
233+
try (Connection tConn = DriverManager.getConnection(getUrl(), props)) {
234+
final Statement viewStmt = tConn.createStatement();
235+
viewStmt
236+
.execute("CREATE VIEW " + view01 + " (VCOL1 CHAR(8), COL5 VARCHAR) AS SELECT * FROM "
237+
+ tableName + " WHERE COL2 = 'col2'");
238+
239+
viewStmt.execute("CREATE VIEW " + view02 + " (VCOL2 CHAR(10), COL6 VARCHAR)"
240+
+ " AS SELECT * FROM " + view01 + " WHERE VCOL1 = 'vcol1'");
241+
tConn.commit();
242+
243+
// Create multilevel tenant indexes
244+
final Statement indexStmt = tConn.createStatement();
245+
indexStmt.execute("CREATE INDEX " + index_view01 + " ON " + view01 + " (COL5) INCLUDE "
246+
+ "(COL1, COL2, COL3)");
247+
indexStmt.execute("CREATE INDEX " + index_view02 + " ON " + view02 + " (COL6) INCLUDE "
248+
+ "(COL1, COL2, COL3)");
249+
indexStmt.execute(
250+
"CREATE INDEX " + index_view03 + " ON " + view01 + " (COL5) INCLUDE " + "(COL2, COL1)");
251+
indexStmt.execute(
252+
"CREATE INDEX " + index_view04 + " ON " + view02 + " (COL6) INCLUDE " + "(COL2, COL1)");
253+
254+
tConn.commit();
255+
256+
}
257+
258+
}
259+
260+
TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", TestMetaDataEndpointImpl.class);
261+
TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", MetaDataEndpointImpl.class);
262+
}
263+
}
264+
84265
@Test
85266
public void testConnectionFromMetadataServer() throws Throwable {
86267
final String tableName = generateUniqueName();

phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ public void testPlainUpsertWithReturning() throws Exception {
125125
assertEquals("a", rs.getString(1));
126126
assertEquals("b", rs.getString(2));
127127
assertFalse(rs.next());
128-
stmt = conn.prepareStatement("UPSERT INTO " + tableName
129-
+ " (inst,host,\"DATE\") VALUES(?,'b',CURRENT_DATE())");
128+
stmt = conn.prepareStatement(
129+
"UPSERT INTO " + tableName + " (inst,host,\"DATE\") VALUES(?,'b',CURRENT_DATE())");
130130
stmt.setString(1, "a");
131131
stmt.execute();
132132
rs = stmt.getResultSet();

0 commit comments

Comments
 (0)