|
39 | 39 | import org.elasticsearch.core.RefCounted; |
40 | 40 | import org.elasticsearch.core.Strings; |
41 | 41 | import org.elasticsearch.core.TimeValue; |
| 42 | +import org.elasticsearch.core.UpdateForV9; |
42 | 43 | import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; |
43 | 44 | import org.elasticsearch.node.Node; |
44 | 45 | import org.elasticsearch.plugins.Plugin; |
|
49 | 50 | import org.elasticsearch.test.ESTestCase; |
50 | 51 | import org.elasticsearch.test.tasks.MockTaskManager; |
51 | 52 | import org.elasticsearch.threadpool.ThreadPool; |
| 53 | +import org.elasticsearch.transport.BytesTransportRequest; |
52 | 54 | import org.elasticsearch.transport.ClusterConnectionManager; |
53 | 55 | import org.elasticsearch.transport.ConnectTransportException; |
54 | 56 | import org.elasticsearch.transport.ConnectionProfile; |
@@ -584,8 +586,13 @@ public void sendRequest( |
584 | 586 | // poor mans request cloning... |
585 | 587 | BytesStreamOutput bStream = new BytesStreamOutput(); |
586 | 588 | request.writeTo(bStream); |
587 | | - RequestHandlerRegistry<?> reg = MockTransportService.this.getRequestHandler(action); |
588 | | - final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput()); |
| 589 | + final TransportRequest clonedRequest; |
| 590 | + if (request instanceof BytesTransportRequest) { |
| 591 | + clonedRequest = copyRawBytesForBwC(bStream); |
| 592 | + } else { |
| 593 | + RequestHandlerRegistry<?> reg = MockTransportService.this.getRequestHandler(action); |
| 594 | + clonedRequest = reg.newRequest(bStream.bytes().streamInput()); |
| 595 | + } |
589 | 596 | assert clonedRequest.getClass().equals(MasterNodeRequestHelper.unwrapTermOverride(request).getClass()) |
590 | 597 | : clonedRequest + " vs " + request; |
591 | 598 |
|
@@ -633,6 +640,15 @@ protected void doRun() throws IOException { |
633 | 640 | } |
634 | 641 | } |
635 | 642 |
|
| 643 | + // Some request handlers read back a BytesTransportRequest |
| 644 | + // into a different class that cannot be re-serialized (i.e. JOIN_VALIDATE_ACTION_NAME), |
| 645 | + // in those cases we just copy the raw bytes back to a BytesTransportRequest. |
| 646 | + // This is only needed for the BwC for JOIN_VALIDATE_ACTION_NAME and can be removed in the next major |
| 647 | + @UpdateForV9(owner = UpdateForV9.Owner.DISTRIBUTED_COORDINATION) |
| 648 | + private static TransportRequest copyRawBytesForBwC(BytesStreamOutput bStream) throws IOException { |
| 649 | + return new BytesTransportRequest(bStream.bytes().streamInput()); |
| 650 | + } |
| 651 | + |
636 | 652 | @Override |
637 | 653 | public void clearCallback() { |
638 | 654 | synchronized (this) { |
|
0 commit comments