|
23 | 23 | import org.elasticsearch.action.index.IndexRequest; |
24 | 24 | import org.elasticsearch.action.support.ActionFilters; |
25 | 25 | import org.elasticsearch.action.support.AutoCreateIndex; |
| 26 | +import org.elasticsearch.action.support.SubscribableListener; |
26 | 27 | import org.elasticsearch.action.support.TransportActions; |
27 | 28 | import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction; |
28 | 29 | import org.elasticsearch.client.internal.node.NodeClient; |
@@ -96,7 +97,10 @@ public TransportUpdateAction( |
96 | 97 |
|
97 | 98 | @Override |
98 | 99 | protected Executor executor(ShardId shardId) { |
99 | | - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); |
| 100 | + return executor(indicesService.indexServiceSafe(shardId.getIndex())); |
| 101 | + } |
| 102 | + |
| 103 | + private Executor executor(IndexService indexService) { |
100 | 104 | return threadPool.executor(indexService.getIndexSettings().getIndexMetadata().isSystem() ? Names.SYSTEM_WRITE : Names.WRITE); |
101 | 105 | } |
102 | 106 |
|
@@ -189,136 +193,148 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< |
189 | 193 | final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); |
190 | 194 | final IndexShard indexShard = indexService.getShard(shardId.getId()); |
191 | 195 | final MappingLookup mappingLookup = indexShard.mapperService().mappingLookup(); |
192 | | - final UpdateHelper.Result result = deleteInferenceResults( |
193 | | - request, |
194 | | - updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis), |
195 | | - indexService.getMetadata(), |
196 | | - mappingLookup |
197 | | - ); |
198 | | - |
199 | | - switch (result.getResponseResult()) { |
200 | | - case CREATED -> { |
201 | | - IndexRequest upsertRequest = result.action(); |
202 | | - // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request |
203 | | - final BytesReference upsertSourceBytes = upsertRequest.source(); |
204 | | - client.bulk( |
205 | | - toSingleItemBulkRequest(upsertRequest), |
206 | | - unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> { |
207 | | - UpdateResponse update = new UpdateResponse( |
208 | | - response.getShardInfo(), |
209 | | - response.getShardId(), |
210 | | - response.getId(), |
211 | | - response.getSeqNo(), |
212 | | - response.getPrimaryTerm(), |
213 | | - response.getVersion(), |
214 | | - response.getResult() |
215 | | - ); |
216 | | - if (request.fetchSource() != null && request.fetchSource().fetchSource()) { |
217 | | - Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap( |
218 | | - upsertSourceBytes, |
219 | | - true, |
220 | | - upsertRequest.getContentType() |
221 | | - ); |
222 | | - update.setGetResult( |
223 | | - UpdateHelper.extractGetResult( |
224 | | - request, |
225 | | - request.concreteIndex(), |
226 | | - mappingLookup, |
| 196 | + |
| 197 | + var executor = executor(indexService); |
| 198 | + assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE); |
| 199 | + |
| 200 | + SubscribableListener.newForked(indexShard::ensureMutable) |
| 201 | + // Make sure to fork back to a `write` thread pool if necessary |
| 202 | + .<UpdateHelper.Result>andThen(executor, threadPool.getThreadContext(), (l, unused) -> ActionListener.completeWith(l, () -> { |
| 203 | + assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE); |
| 204 | + return deleteInferenceResults( |
| 205 | + request, |
| 206 | + updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis), // Gets the doc using the engine |
| 207 | + indexService.getMetadata(), |
| 208 | + mappingLookup |
| 209 | + ); |
| 210 | + })) |
| 211 | + // Proceed with a single item bulk request |
| 212 | + .<UpdateResponse>andThen((l, result) -> { |
| 213 | + switch (result.getResponseResult()) { |
| 214 | + case CREATED -> { |
| 215 | + IndexRequest upsertRequest = result.action(); |
| 216 | + // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request |
| 217 | + final BytesReference upsertSourceBytes = upsertRequest.source(); |
| 218 | + client.bulk( |
| 219 | + toSingleItemBulkRequest(upsertRequest), |
| 220 | + unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> { |
| 221 | + UpdateResponse update = new UpdateResponse( |
| 222 | + response.getShardInfo(), |
| 223 | + response.getShardId(), |
| 224 | + response.getId(), |
227 | 225 | response.getSeqNo(), |
228 | 226 | response.getPrimaryTerm(), |
229 | 227 | response.getVersion(), |
230 | | - sourceAndContent.v2(), |
231 | | - sourceAndContent.v1(), |
232 | | - upsertSourceBytes |
233 | | - ) |
234 | | - ); |
235 | | - } else { |
236 | | - update.setGetResult(null); |
237 | | - } |
238 | | - update.setForcedRefresh(response.forcedRefresh()); |
239 | | - listener.onResponse(update); |
240 | | - }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))) |
241 | | - ); |
242 | | - } |
243 | | - case UPDATED -> { |
244 | | - IndexRequest indexRequest = result.action(); |
245 | | - // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request |
246 | | - final BytesReference indexSourceBytes = indexRequest.source(); |
247 | | - client.bulk( |
248 | | - toSingleItemBulkRequest(indexRequest), |
249 | | - unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> { |
250 | | - UpdateResponse update = new UpdateResponse( |
251 | | - response.getShardInfo(), |
252 | | - response.getShardId(), |
253 | | - response.getId(), |
254 | | - response.getSeqNo(), |
255 | | - response.getPrimaryTerm(), |
256 | | - response.getVersion(), |
257 | | - response.getResult() |
258 | | - ); |
259 | | - update.setGetResult( |
260 | | - UpdateHelper.extractGetResult( |
261 | | - request, |
262 | | - request.concreteIndex(), |
263 | | - mappingLookup, |
264 | | - response.getSeqNo(), |
265 | | - response.getPrimaryTerm(), |
266 | | - response.getVersion(), |
267 | | - result.updatedSourceAsMap(), |
268 | | - result.updateSourceContentType(), |
269 | | - indexSourceBytes |
270 | | - ) |
| 228 | + response.getResult() |
| 229 | + ); |
| 230 | + if (request.fetchSource() != null && request.fetchSource().fetchSource()) { |
| 231 | + Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap( |
| 232 | + upsertSourceBytes, |
| 233 | + true, |
| 234 | + upsertRequest.getContentType() |
| 235 | + ); |
| 236 | + update.setGetResult( |
| 237 | + UpdateHelper.extractGetResult( |
| 238 | + request, |
| 239 | + request.concreteIndex(), |
| 240 | + mappingLookup, |
| 241 | + response.getSeqNo(), |
| 242 | + response.getPrimaryTerm(), |
| 243 | + response.getVersion(), |
| 244 | + sourceAndContent.v2(), |
| 245 | + sourceAndContent.v1(), |
| 246 | + upsertSourceBytes |
| 247 | + ) |
| 248 | + ); |
| 249 | + } else { |
| 250 | + update.setGetResult(null); |
| 251 | + } |
| 252 | + update.setForcedRefresh(response.forcedRefresh()); |
| 253 | + l.onResponse(update); |
| 254 | + }, exception -> handleUpdateFailureWithRetry(l, request, exception, retryCount))) |
271 | 255 | ); |
272 | | - update.setForcedRefresh(response.forcedRefresh()); |
273 | | - listener.onResponse(update); |
274 | | - }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))) |
275 | | - ); |
276 | | - } |
277 | | - case DELETED -> { |
278 | | - DeleteRequest deleteRequest = result.action(); |
279 | | - client.bulk( |
280 | | - toSingleItemBulkRequest(deleteRequest), |
281 | | - unwrappingSingleItemBulkResponse(ActionListener.<DeleteResponse>wrap(response -> { |
282 | | - UpdateResponse update = new UpdateResponse( |
283 | | - response.getShardInfo(), |
284 | | - response.getShardId(), |
285 | | - response.getId(), |
286 | | - response.getSeqNo(), |
287 | | - response.getPrimaryTerm(), |
288 | | - response.getVersion(), |
289 | | - response.getResult() |
| 256 | + } |
| 257 | + case UPDATED -> { |
| 258 | + IndexRequest indexRequest = result.action(); |
| 259 | + // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request |
| 260 | + final BytesReference indexSourceBytes = indexRequest.source(); |
| 261 | + client.bulk( |
| 262 | + toSingleItemBulkRequest(indexRequest), |
| 263 | + unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> { |
| 264 | + UpdateResponse update = new UpdateResponse( |
| 265 | + response.getShardInfo(), |
| 266 | + response.getShardId(), |
| 267 | + response.getId(), |
| 268 | + response.getSeqNo(), |
| 269 | + response.getPrimaryTerm(), |
| 270 | + response.getVersion(), |
| 271 | + response.getResult() |
| 272 | + ); |
| 273 | + update.setGetResult( |
| 274 | + UpdateHelper.extractGetResult( |
| 275 | + request, |
| 276 | + request.concreteIndex(), |
| 277 | + mappingLookup, |
| 278 | + response.getSeqNo(), |
| 279 | + response.getPrimaryTerm(), |
| 280 | + response.getVersion(), |
| 281 | + result.updatedSourceAsMap(), |
| 282 | + result.updateSourceContentType(), |
| 283 | + indexSourceBytes |
| 284 | + ) |
| 285 | + ); |
| 286 | + update.setForcedRefresh(response.forcedRefresh()); |
| 287 | + l.onResponse(update); |
| 288 | + }, exception -> handleUpdateFailureWithRetry(l, request, exception, retryCount))) |
290 | 289 | ); |
291 | | - update.setGetResult( |
292 | | - UpdateHelper.extractGetResult( |
293 | | - request, |
294 | | - request.concreteIndex(), |
295 | | - mappingLookup, |
296 | | - response.getSeqNo(), |
297 | | - response.getPrimaryTerm(), |
298 | | - response.getVersion(), |
299 | | - result.updatedSourceAsMap(), |
300 | | - result.updateSourceContentType(), |
301 | | - null |
302 | | - ) |
| 290 | + } |
| 291 | + case DELETED -> { |
| 292 | + DeleteRequest deleteRequest = result.action(); |
| 293 | + client.bulk( |
| 294 | + toSingleItemBulkRequest(deleteRequest), |
| 295 | + unwrappingSingleItemBulkResponse(ActionListener.<DeleteResponse>wrap(response -> { |
| 296 | + UpdateResponse update = new UpdateResponse( |
| 297 | + response.getShardInfo(), |
| 298 | + response.getShardId(), |
| 299 | + response.getId(), |
| 300 | + response.getSeqNo(), |
| 301 | + response.getPrimaryTerm(), |
| 302 | + response.getVersion(), |
| 303 | + response.getResult() |
| 304 | + ); |
| 305 | + update.setGetResult( |
| 306 | + UpdateHelper.extractGetResult( |
| 307 | + request, |
| 308 | + request.concreteIndex(), |
| 309 | + mappingLookup, |
| 310 | + response.getSeqNo(), |
| 311 | + response.getPrimaryTerm(), |
| 312 | + response.getVersion(), |
| 313 | + result.updatedSourceAsMap(), |
| 314 | + result.updateSourceContentType(), |
| 315 | + null |
| 316 | + ) |
| 317 | + ); |
| 318 | + update.setForcedRefresh(response.forcedRefresh()); |
| 319 | + l.onResponse(update); |
| 320 | + }, exception -> handleUpdateFailureWithRetry(l, request, exception, retryCount))) |
303 | 321 | ); |
304 | | - update.setForcedRefresh(response.forcedRefresh()); |
305 | | - listener.onResponse(update); |
306 | | - }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))) |
307 | | - ); |
308 | | - } |
309 | | - case NOOP -> { |
310 | | - UpdateResponse update = result.action(); |
311 | | - IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex()); |
312 | | - if (indexServiceOrNull != null) { |
313 | | - IndexShard shard = indexService.getShardOrNull(shardId.getId()); |
314 | | - if (shard != null) { |
315 | | - shard.noopUpdate(); |
316 | 322 | } |
| 323 | + case NOOP -> { |
| 324 | + UpdateResponse update = result.action(); |
| 325 | + IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex()); |
| 326 | + if (indexServiceOrNull != null) { |
| 327 | + IndexShard shard = indexService.getShardOrNull(shardId.getId()); |
| 328 | + if (shard != null) { |
| 329 | + shard.noopUpdate(); |
| 330 | + } |
| 331 | + } |
| 332 | + l.onResponse(update); |
| 333 | + } |
| 334 | + default -> throw new IllegalStateException("Illegal result " + result.getResponseResult()); |
317 | 335 | } |
318 | | - listener.onResponse(update); |
319 | | - } |
320 | | - default -> throw new IllegalStateException("Illegal result " + result.getResponseResult()); |
321 | | - } |
| 336 | + }) |
| 337 | + .addListener(listener); |
322 | 338 | } |
323 | 339 |
|
324 | 340 | private void handleUpdateFailureWithRetry( |
|
0 commit comments