50
50
import org .elasticsearch .threadpool .FixedExecutorBuilder ;
51
51
import org .elasticsearch .threadpool .TestThreadPool ;
52
52
import org .elasticsearch .threadpool .ThreadPool ;
53
+ import org .elasticsearch .threadpool .ThreadPoolStats ;
53
54
import org .elasticsearch .xcontent .NamedXContentRegistry ;
54
55
import org .elasticsearch .xcontent .ToXContent ;
55
56
import org .elasticsearch .xcontent .XContentBuilder ;
97
98
import java .util .Map ;
98
99
import java .util .Set ;
99
100
import java .util .concurrent .CompletableFuture ;
101
+ import java .util .concurrent .CyclicBarrier ;
100
102
import java .util .concurrent .ExecutionException ;
101
103
import java .util .concurrent .ExecutorService ;
102
104
import java .util .concurrent .Semaphore ;
105
107
import java .util .stream .Collectors ;
106
108
import java .util .stream .IntStream ;
107
109
110
+ import static junit .framework .TestCase .fail ;
108
111
import static org .elasticsearch .index .mapper .MapperService .SINGLE_MAPPING_NAME ;
109
112
import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_PRIMARY_TERM ;
110
113
import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_SEQ_NO ;
118
121
import static org .elasticsearch .xpack .core .security .index .RestrictedIndicesNames .SECURITY_MAIN_ALIAS ;
119
122
import static org .elasticsearch .xpack .security .Security .SECURITY_CRYPTO_THREAD_POOL_NAME ;
120
123
import static org .elasticsearch .xpack .security .authc .ApiKeyService .API_KEY_METADATA_KEY ;
124
+ import static org .hamcrest .MatcherAssert .assertThat ;
121
125
import static org .hamcrest .Matchers .anEmptyMap ;
122
126
import static org .hamcrest .Matchers .contains ;
123
127
import static org .hamcrest .Matchers .containsString ;
142
146
143
147
public class ApiKeyServiceTests extends ESTestCase {
144
148
149
+ private static final int TEST_THREADPOOL_QUEUE_SIZE = 1000 ;
150
+
145
151
private ThreadPool threadPool ;
146
152
private XPackLicenseState licenseState ;
147
153
private Client client ;
@@ -157,7 +163,7 @@ public void createThreadPool() {
157
163
Settings .EMPTY ,
158
164
SECURITY_CRYPTO_THREAD_POOL_NAME ,
159
165
1 ,
160
- 1000 ,
166
+ TEST_THREADPOOL_QUEUE_SIZE ,
161
167
"xpack.security.crypto.thread_pool" ,
162
168
false
163
169
)
@@ -180,6 +186,90 @@ public void setupMocks() {
180
186
this .cacheInvalidatorRegistry = mock (CacheInvalidatorRegistry .class );
181
187
}
182
188
189
+ public void testFloodThreadpool () throws Exception {
190
+ // We're going to be blocking the security-crypto threadpool so we need a new one for the client
191
+ ThreadPool clientThreadpool = new TestThreadPool (
192
+ this .getClass ().getName (),
193
+ new FixedExecutorBuilder (Settings .EMPTY , this .getClass ().getName (), 1 , 100 , "no_settings_used" , false )
194
+ );
195
+ try {
196
+ when (client .threadPool ()).thenReturn (clientThreadpool );
197
+
198
+ // setup copied from testAuthenticateWithApiKey
199
+ final Settings settings = Settings .builder ().put (XPackSettings .API_KEY_SERVICE_ENABLED_SETTING .getKey (), true ).build ();
200
+ final ApiKeyService service = createApiKeyService (settings );
201
+
202
+ final String id = randomAlphaOfLength (12 );
203
+ final String key = randomAlphaOfLength (16 );
204
+
205
+ final User user ;
206
+ if (randomBoolean ()) {
207
+ user = new User (
208
+ new User (
"hulk" ,
new String [] {
"superuser" },
"Bruce Banner" ,
"[email protected] " ,
org .
elasticsearch .
core .
Map .
of (),
true ),
209
+ new User ("authenticated_user" , new String [] { "other" })
210
+ );
211
+ } else {
212
+ user = new User (
213
+ "hulk" ,
214
+ new String [] { "superuser" },
215
+ "Bruce Banner" ,
216
+
217
+ org .elasticsearch .core .Map .of (),
218
+ true
219
+ );
220
+ }
221
+ final Map <String , Object > metadata = mockKeyDocument (service , id , key , user );
222
+
223
+ // Block the security crypto threadpool
224
+ CyclicBarrier barrier = new CyclicBarrier (2 );
225
+ threadPool .executor (SECURITY_CRYPTO_THREAD_POOL_NAME ).execute (() -> safeAwait (barrier ));
226
+ // Now fill it up while the one thread is blocked
227
+ for (int i = 0 ; i < TEST_THREADPOOL_QUEUE_SIZE ; i ++) {
228
+ threadPool .executor (SECURITY_CRYPTO_THREAD_POOL_NAME ).execute (() -> {});
229
+ }
230
+
231
+ // Check that it's full
232
+ for (ThreadPoolStats .Stats stat : threadPool .stats ()) {
233
+ if (stat .getName ().equals (SECURITY_CRYPTO_THREAD_POOL_NAME )) {
234
+ assertThat (stat .getQueue (), equalTo (TEST_THREADPOOL_QUEUE_SIZE ));
235
+ assertThat (stat .getRejected (), equalTo (0L ));
236
+ }
237
+ }
238
+
239
+ // now try to auth with an API key
240
+ final AuthenticationResult auth = tryAuthenticate (service , id , key );
241
+ assertThat (auth .getStatus (), is (AuthenticationResult .Status .TERMINATE ));
242
+
243
+ // Make sure one was rejected and the queue is still full
244
+ for (ThreadPoolStats .Stats stat : threadPool .stats ()) {
245
+ if (stat .getName ().equals (SECURITY_CRYPTO_THREAD_POOL_NAME )) {
246
+ assertThat (stat .getQueue (), equalTo (TEST_THREADPOOL_QUEUE_SIZE ));
247
+ assertThat (stat .getRejected (), equalTo (1L ));
248
+ }
249
+ }
250
+ ListenableFuture <CachedApiKeyHashResult > cachedValue = service .getApiKeyAuthCache ().get (id );
251
+ assertThat ("since the request was rejected, there should be no cache entry for this key" , cachedValue , nullValue ());
252
+
253
+ // unblock the threadpool
254
+ safeAwait (barrier );
255
+
256
+ // wait for the threadpool queue to drain & check that the stats as as expected
257
+ flushThreadPoolExecutor (threadPool , SECURITY_CRYPTO_THREAD_POOL_NAME );
258
+ for (ThreadPoolStats .Stats stat : threadPool .stats ()) {
259
+ if (stat .getName ().equals (SECURITY_CRYPTO_THREAD_POOL_NAME )) {
260
+ assertThat (stat .getRejected (), equalTo (1L ));
261
+ assertThat (stat .getQueue (), equalTo (0 ));
262
+ }
263
+ }
264
+
265
+ // try to authenticate again with the same key - if this hangs, check the future caching
266
+ final AuthenticationResult shouldSucceed = tryAuthenticate (service , id , key );
267
+ assertThat (shouldSucceed .getStatus (), is (AuthenticationResult .Status .SUCCESS ));
268
+ } finally {
269
+ terminate (clientThreadpool );
270
+ }
271
+ }
272
+
183
273
public void testCreateApiKeyWillUseBulkAction () throws Exception {
184
274
final Settings settings = Settings .builder ().put (XPackSettings .API_KEY_SERVICE_ENABLED_SETTING .getKey (), true ).build ();
185
275
final ApiKeyService service = createApiKeyService (settings );
@@ -1698,4 +1788,29 @@ private void checkAuthApiKeyMetadata(Object metadata, AuthenticationResult authR
1698
1788
);
1699
1789
}
1700
1790
}
1791
+
1792
+ private static void flushThreadPoolExecutor (ThreadPool threadPool , String executorName ) {
1793
+ final int maxThreads = threadPool .info (executorName ).getMax ();
1794
+ final CyclicBarrier barrier = new CyclicBarrier (maxThreads + 1 );
1795
+ final ExecutorService executor = threadPool .executor (executorName );
1796
+ for (int i = 0 ; i < maxThreads ; i ++) {
1797
+ executor .execute (new AbstractRunnable () {
1798
+ @ Override
1799
+ protected void doRun () {
1800
+ safeAwait (barrier );
1801
+ }
1802
+
1803
+ @ Override
1804
+ public void onFailure (Exception e ) {
1805
+ fail (e .toString ());
1806
+ }
1807
+
1808
+ @ Override
1809
+ public boolean isForceExecution () {
1810
+ return true ;
1811
+ }
1812
+ });
1813
+ }
1814
+ safeAwait (barrier );
1815
+ }
1701
1816
}
0 commit comments