|
21 | 21 | import org.elasticsearch.common.Strings; |
22 | 22 | import org.elasticsearch.common.io.stream.StreamInput; |
23 | 23 | import org.elasticsearch.common.io.stream.StreamOutput; |
24 | | -import org.elasticsearch.core.Tuple; |
25 | 24 | import org.elasticsearch.index.IndexService; |
26 | 25 | import org.elasticsearch.index.engine.Engine; |
27 | 26 | import org.elasticsearch.index.engine.InternalEngine; |
@@ -58,68 +57,57 @@ protected TransportShardMultiGetFomTranslogAction( |
58 | 57 | @Override |
59 | 58 | protected void doExecute(Task task, Request request, ActionListener<Response> listener) { |
60 | 59 | var multiGetShardRequest = request.getMultiGetShardRequest(); |
61 | | - IndexShard indexShard = getIndexShard(indicesService, request); |
| 60 | + var shardId = request.getShardId(); |
| 61 | + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); |
| 62 | + IndexShard indexShard = indexService.getShard(shardId.id()); |
| 63 | + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); |
| 64 | + assert multiGetShardRequest.realtime(); |
62 | 65 | ActionListener.completeWith(listener, () -> { |
63 | | - Tuple<MultiGetShardResponse, Boolean> multiGetShardResponse = getResponse(multiGetShardRequest, indexShard); |
| 66 | + var multiGetShardResponse = new MultiGetShardResponse(); |
| 67 | + var someItemsNotFoundInTranslog = false; |
| 68 | + for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { |
| 69 | + var item = multiGetShardRequest.items.get(i); |
| 70 | + try { |
| 71 | + var result = indexShard.getService() |
| 72 | + .getFromTranslog( |
| 73 | + item.id(), |
| 74 | + item.storedFields(), |
| 75 | + multiGetShardRequest.realtime(), |
| 76 | + item.version(), |
| 77 | + item.versionType(), |
| 78 | + item.fetchSourceContext(), |
| 79 | + multiGetShardRequest.isForceSyntheticSource() |
| 80 | + ); |
| 81 | + GetResponse getResponse = null; |
| 82 | + if (result == null) { |
| 83 | + someItemsNotFoundInTranslog = true; |
| 84 | + } else { |
| 85 | + getResponse = new GetResponse(result); |
| 86 | + } |
| 87 | + multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); |
| 88 | + } catch (RuntimeException | IOException e) { |
| 89 | + if (TransportActions.isShardNotAvailableException(e)) { |
| 90 | + throw e; |
| 91 | + } |
| 92 | + logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e); |
| 93 | + multiGetShardResponse.add( |
| 94 | + multiGetShardRequest.locations.get(i), |
| 95 | + new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) |
| 96 | + ); |
| 97 | + } |
| 98 | + } |
64 | 99 | long segmentGeneration = -1; |
65 | | - if (multiGetShardResponse.v2()) { |
| 100 | + if (someItemsNotFoundInTranslog) { |
66 | 101 | Engine engine = indexShard.getEngineOrNull(); |
67 | 102 | if (engine == null) { |
68 | 103 | throw new AlreadyClosedException("engine closed"); |
69 | 104 | } |
70 | 105 | segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets(); |
71 | 106 | } |
72 | | - return new Response(multiGetShardResponse.v1(), indexShard.getOperationPrimaryTerm(), segmentGeneration); |
| 107 | + return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); |
73 | 108 | }); |
74 | 109 | } |
75 | 110 |
|
76 | | - public static IndexShard getIndexShard(IndicesService indicesService, Request request) { |
77 | | - var shardId = request.getShardId(); |
78 | | - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); |
79 | | - IndexShard indexShard = indexService.getShard(shardId.id()); |
80 | | - assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); |
81 | | - assert request.getMultiGetShardRequest().realtime(); |
82 | | - return indexShard; |
83 | | - } |
84 | | - |
85 | | - public static Tuple<MultiGetShardResponse, Boolean> getResponse(MultiGetShardRequest multiGetShardRequest, IndexShard indexShard) |
86 | | - throws IOException { |
87 | | - var multiGetShardResponse = new MultiGetShardResponse(); |
88 | | - var someItemsNotFoundInTranslog = false; |
89 | | - for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { |
90 | | - var item = multiGetShardRequest.items.get(i); |
91 | | - try { |
92 | | - var result = indexShard.getService() |
93 | | - .getFromTranslog( |
94 | | - item.id(), |
95 | | - item.storedFields(), |
96 | | - multiGetShardRequest.realtime(), |
97 | | - item.version(), |
98 | | - item.versionType(), |
99 | | - item.fetchSourceContext(), |
100 | | - multiGetShardRequest.isForceSyntheticSource() |
101 | | - ); |
102 | | - GetResponse getResponse = null; |
103 | | - if (result == null) { |
104 | | - someItemsNotFoundInTranslog = true; |
105 | | - } else { |
106 | | - getResponse = new GetResponse(result); |
107 | | - } |
108 | | - multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); |
109 | | - } catch (RuntimeException | IOException e) { |
110 | | - if (TransportActions.isShardNotAvailableException(e)) { |
111 | | - throw e; |
112 | | - } |
113 | | - logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", multiGetShardRequest.shardId(), item.id(), e); |
114 | | - multiGetShardResponse.add( |
115 | | - multiGetShardRequest.locations.get(i), |
116 | | - new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) |
117 | | - ); |
118 | | - } |
119 | | - } |
120 | | - return Tuple.tuple(multiGetShardResponse, someItemsNotFoundInTranslog); |
121 | | - } |
122 | | - |
123 | 111 | public static class Request extends ActionRequest { |
124 | 112 |
|
125 | 113 | private final MultiGetShardRequest multiGetShardRequest; |
|
0 commit comments