-
Notifications
You must be signed in to change notification settings - Fork 2k
Call completeStream only once on failure. #13814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: jetty-12.0.x
Are you sure you want to change the base?
Changes from all commits
daa9a47
83c1718
2973561
2f7aa04
bcbf2c8
42e209f
baca52e
a326415
6bea600
649e48b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -93,7 +93,10 @@ private enum StreamSendState | |||||||||
| LAST_SENDING, | ||||||||||
|
|
||||||||||
| /** Last content sent and completed */ | ||||||||||
| LAST_COMPLETE | ||||||||||
| LAST_COMPLETE, | ||||||||||
|
|
||||||||||
| /** Failing, so last send will never happen */ | ||||||||||
| FAILED | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private static final Logger LOG = LoggerFactory.getLogger(HttpChannelState.class); | ||||||||||
|
|
@@ -340,7 +343,7 @@ public Runnable onContentAvailable() | |||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public Invocable.InvocationType getInvocationType() | ||||||||||
| public InvocationType getInvocationType() | ||||||||||
| { | ||||||||||
| // TODO Can this actually be done, as we may need to invoke other Runnables after onContent? | ||||||||||
| // Could we at least avoid the lock??? | ||||||||||
|
|
@@ -575,6 +578,8 @@ private Throwable lockedStreamSend(boolean last, long length) | |||||||||
| case LAST_SENDING, LAST_COMPLETE -> (length > 0) | ||||||||||
| ? new IllegalStateException("last already written") | ||||||||||
| : NOTHING_TO_SEND; | ||||||||||
|
|
||||||||||
| case FAILED -> null; | ||||||||||
| }; | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
@@ -588,7 +593,7 @@ private void lockedStreamSendCompleted(boolean success) | |||||||||
| private boolean lockedIsLastStreamSendCompleted() | ||||||||||
| { | ||||||||||
| assert _lock.isHeldByCurrentThread(); | ||||||||||
| return _streamSendState == StreamSendState.LAST_COMPLETE; | ||||||||||
| return _streamSendState == StreamSendState.LAST_COMPLETE || _streamSendState == StreamSendState.FAILED; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private boolean lockedLastStreamSend() | ||||||||||
|
|
@@ -619,7 +624,9 @@ public String toString() | |||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private class HandlerInvoker implements Invocable.Task, Callback | ||||||||||
| // HandlerInvoker is used as the Response's _writeCallback when ChannelCallback is succeeded and the last send still | ||||||||||
| // needs to be done, i.e.: _streamSendState set to LAST_SENDING by lockedLastStreamSend(). | ||||||||||
| private class HandlerInvoker implements Task, Callback | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should split the functionality of this class. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is really not a 12.0 thing then |
||||||||||
| { | ||||||||||
| @Override | ||||||||||
| public void run() | ||||||||||
|
|
@@ -719,33 +726,27 @@ public void run() | |||||||||
| @Override | ||||||||||
| public void succeeded() | ||||||||||
| { | ||||||||||
| HttpStream stream; | ||||||||||
| boolean completeStream; | ||||||||||
| try (AutoLock ignored = _lock.lock()) | ||||||||||
| { | ||||||||||
| assert _callbackCompleted; | ||||||||||
| _streamSendState = StreamSendState.LAST_COMPLETE; | ||||||||||
| completeStream = _handling == null; | ||||||||||
| stream = _stream; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| if (completeStream) | ||||||||||
| completeStream(stream, null); | ||||||||||
| completeLastWrite(null); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Called only as {@link Callback} by last send from {@link ChannelCallback#succeeded} | ||||||||||
| */ | ||||||||||
| @Override | ||||||||||
| public void failed(Throwable failure) | ||||||||||
| { | ||||||||||
| completeLastWrite(failure); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private void completeLastWrite(Throwable failure) | ||||||||||
| { | ||||||||||
| HttpStream stream; | ||||||||||
| boolean completeStream; | ||||||||||
| try (AutoLock ignored = _lock.lock()) | ||||||||||
| { | ||||||||||
| assert _callbackCompleted; | ||||||||||
| _streamSendState = StreamSendState.LAST_COMPLETE; | ||||||||||
| completeStream = _handling == null; | ||||||||||
| completeStream = _handling == null; // if we have not handled yet or have completed handling | ||||||||||
| stream = _stream; | ||||||||||
| failure = _callbackFailure = ExceptionUtil.combine(_callbackFailure, failure); | ||||||||||
| } | ||||||||||
|
|
@@ -1307,7 +1308,7 @@ else if (last && !(totalWritten == 0 && HttpMethod.HEAD.is(_request.getMethod()) | |||||||||
|
|
||||||||||
| if (writeFailure == NOTHING_TO_SEND) | ||||||||||
| { | ||||||||||
| httpChannelState._writeInvoker.run(callback::succeeded); | ||||||||||
| httpChannelState._writeInvoker.run(new ReadyTask(callback.getInvocationType(), callback::succeeded)); | ||||||||||
| return; | ||||||||||
| } | ||||||||||
| // Have we failed in some way? | ||||||||||
|
|
@@ -1353,7 +1354,7 @@ public void succeeded() | |||||||||
| httpChannel.lockedStreamSendCompleted(true); | ||||||||||
| } | ||||||||||
| if (callback != null) | ||||||||||
| httpChannel._writeInvoker.run(callback::succeeded); | ||||||||||
| httpChannel._writeInvoker.run(new ReadyTask(callback.getInvocationType(), callback::succeeded)); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
|
|
@@ -1517,16 +1518,32 @@ private ChannelCallback(ChannelRequest request) | |||||||||
| @Override | ||||||||||
| public void succeeded() | ||||||||||
| { | ||||||||||
| // Called when the request/response cycle is completing successfully. | ||||||||||
| completeChannelCallback(null); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Called when the {@link Handler} (or it's delegates) fail the request handling. | ||||||||||
| * | ||||||||||
| * @param failure The reason for the failure. | ||||||||||
| */ | ||||||||||
| @Override | ||||||||||
| public void failed(Throwable failure) | ||||||||||
| { | ||||||||||
| completeChannelCallback(failure); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private void completeChannelCallback(Throwable failure) | ||||||||||
| { | ||||||||||
| // Called when the request/response cycle is completing. | ||||||||||
| HttpStream stream; | ||||||||||
| boolean needLastStreamSend; | ||||||||||
| boolean doLastStreamSend = false; | ||||||||||
| HttpChannelState httpChannelState; | ||||||||||
| Throwable failure = null; | ||||||||||
| ChannelRequest request; | ||||||||||
| ChannelResponse response; | ||||||||||
| MetaData.Response responseMetaData = null; | ||||||||||
| boolean completeStream; | ||||||||||
| boolean completeStream = false; | ||||||||||
| ErrorResponse errorResponse = null; | ||||||||||
| Callback failedCallback = null; | ||||||||||
|
|
||||||||||
| try (AutoLock ignored = _request._lock.lock()) | ||||||||||
| { | ||||||||||
|
|
@@ -1537,115 +1554,125 @@ public void succeeded() | |||||||||
| httpChannelState = _request._httpChannelState; | ||||||||||
| response = httpChannelState._response; | ||||||||||
| stream = httpChannelState._stream; | ||||||||||
| assert httpChannelState._callbackFailure == null; | ||||||||||
|
|
||||||||||
| // We convert a call to succeeded with pending demand/write into a call to failed. | ||||||||||
| // Turn pending demand or unconsumed input on persistent connections into failure | ||||||||||
| if (httpChannelState._onContentAvailable != null) | ||||||||||
| failure = ExceptionUtil.combine(failure, new IllegalStateException("demand pending")); | ||||||||||
| else if (httpChannelState.getConnectionMetaData().isPersistent()) | ||||||||||
| failure = ExceptionUtil.combine(failure, stream.consumeAvailable()); | ||||||||||
| else | ||||||||||
| { | ||||||||||
| Throwable unconsumed = stream.consumeAvailable(); | ||||||||||
| if (failure != null) | ||||||||||
| ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed); | ||||||||||
|
Comment on lines
+1566
to
+1568
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No this is different. If failure==null, then it remains null. i.e. it is not a failure to not consume. I'll comment.... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I will try without the whole branch.... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Branch is needed |
||||||||||
| if (LOG.isDebugEnabled()) | ||||||||||
| LOG.debug("consumeAvailable: {} {} ", unconsumed == null, httpChannelState); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Pending writes are also failures | ||||||||||
| if (response.lockedIsWriting()) | ||||||||||
| failure = ExceptionUtil.combine(failure, new IllegalStateException("write pending")); | ||||||||||
|
|
||||||||||
| assert httpChannelState._callbackFailure == null; | ||||||||||
|
|
||||||||||
| needLastStreamSend = httpChannelState.lockedLastStreamSend(); | ||||||||||
| completeStream = !needLastStreamSend && httpChannelState._handling == null && httpChannelState.lockedIsLastStreamSendCompleted(); | ||||||||||
| if (needLastStreamSend) | ||||||||||
| response._writeCallback = httpChannelState._handlerInvoker; | ||||||||||
|
|
||||||||||
| if (httpChannelState._responseHeaders.commit()) | ||||||||||
| // If we are not failing (yet) and not committed, then commit and check headers | ||||||||||
| if (failure == null && httpChannelState._responseHeaders.commit()) | ||||||||||
| { | ||||||||||
| responseMetaData = response.lockedPrepareResponse(httpChannelState, true); | ||||||||||
| // Check the response headers against the content written. | ||||||||||
| long totalWritten = response._contentBytesWritten; | ||||||||||
| long committedContentLength = httpChannelState._committedContentLength; | ||||||||||
| if (committedContentLength >= 0 && | ||||||||||
| committedContentLength != totalWritten && | ||||||||||
| !(totalWritten == 0 && (HttpMethod.HEAD.is(_request.getMethod()) || response.getStatus() == HttpStatus.NOT_MODIFIED_304))) | ||||||||||
| failure = ExceptionUtil.combine(failure, new IOException("content-length %d != %d written".formatted(committedContentLength, totalWritten))); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| long totalWritten = response._contentBytesWritten; | ||||||||||
| long committedContentLength = httpChannelState._committedContentLength; | ||||||||||
|
|
||||||||||
| if (committedContentLength >= 0 && | ||||||||||
| committedContentLength != totalWritten && | ||||||||||
| !(totalWritten == 0 && (HttpMethod.HEAD.is(_request.getMethod()) || response.getStatus() == HttpStatus.NOT_MODIFIED_304))) | ||||||||||
| failure = ExceptionUtil.combine(failure, new IOException("content-length %d != %d written".formatted(committedContentLength, totalWritten))); | ||||||||||
|
|
||||||||||
| // Is the request fully consumed? | ||||||||||
| Throwable unconsumed = stream.consumeAvailable(); | ||||||||||
| if (LOG.isDebugEnabled()) | ||||||||||
| LOG.debug("consumeAvailable: {} {} ", unconsumed == null, httpChannelState); | ||||||||||
|
|
||||||||||
| if (unconsumed != null && httpChannelState.getConnectionMetaData().isPersistent()) | ||||||||||
| failure = ExceptionUtil.combine(failure, unconsumed); | ||||||||||
|
|
||||||||||
| if (failure != null) | ||||||||||
| // If we are still not failing, is a last stream send needed or can we complete? | ||||||||||
| if (failure == null) | ||||||||||
| { | ||||||||||
| doLastStreamSend = httpChannelState.lockedLastStreamSend(); | ||||||||||
| if (doLastStreamSend) | ||||||||||
| response._writeCallback = httpChannelState._handlerInvoker; | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels really sneaky. Why not just doing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we have done half the write mechanism already by doing the commit at 1578 above. Calling write now will not send the headers. |
||||||||||
| // or complete the stream if everything is done. | ||||||||||
| else if (httpChannelState.lockedIsLastStreamSendCompleted()) | ||||||||||
| completeStream = httpChannelState._handled; | ||||||||||
| } | ||||||||||
| else | ||||||||||
| { | ||||||||||
| // We are failing... | ||||||||||
| httpChannelState._callbackFailure = failure; | ||||||||||
| if (!stream.isCommitted()) | ||||||||||
|
|
||||||||||
| // Can we and should we generate an error response? | ||||||||||
| if (!stream.isCommitted() && !ExceptionUtil.hasAssociated(failure, Request.Handler.AbortException.class)) | ||||||||||
| { | ||||||||||
| // We are not committed, so we can send an error response. | ||||||||||
| errorResponse = new ErrorResponse(request); | ||||||||||
| } | ||||||||||
| else if (httpChannelState._handled) | ||||||||||
| { | ||||||||||
| // Callback and handling are completed, so it is just a matter of the last write | ||||||||||
| if (httpChannelState.lockedIsLastStreamSendCompleted()) | ||||||||||
| { | ||||||||||
| // We are committed, handling completed, and last write completed, so complete the stream now. | ||||||||||
| completeStream = true; | ||||||||||
| } | ||||||||||
| else if (response.lockedIsWriting()) | ||||||||||
| { | ||||||||||
| // We are currently writing so fail the app callback now and let the write completion handle the failure | ||||||||||
| // TODO If we don't want to wait for write completion then do | ||||||||||
| // Runnable task = response.lockedFailWrite(failure); | ||||||||||
| // failedCallback = Callback.from(task, httpChannelState._handlerInvoker); | ||||||||||
| failedCallback = response._writeCallback; | ||||||||||
| response._writeCallback = httpChannelState._handlerInvoker; | ||||||||||
|
Comment on lines
+1625
to
+1626
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this way, we may wait forever for the write to complete and invoke the How about this:
Suggested change
In 12.1.x we will leverage the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that continuing on with a pending write pointing at the HttpChannelState is a good idea. Feels like an invitation for a race. What is wrong waiting for the write to complete. It won't be forever unless they have disabled idle timeout. |
||||||||||
| } | ||||||||||
| else | ||||||||||
| { | ||||||||||
| // There has been no last write, but we will just fail the stream instead. | ||||||||||
|
|
||||||||||
| httpChannelState._streamSendState = StreamSendState.FAILED; | ||||||||||
| completeStream = true; | ||||||||||
gregw marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
| } | ||||||||||
| } | ||||||||||
| else | ||||||||||
| completeStream = true; | ||||||||||
| { | ||||||||||
| // We are still handling, sof for the most part | ||||||||||
| // let the HandlerInvoker deal with completion | ||||||||||
|
|
||||||||||
| // But if we are writing | ||||||||||
| if (response.lockedIsWriting()) | ||||||||||
| { | ||||||||||
| // We are currently writing so fail the app callback now and let the write completion handle the failure | ||||||||||
| // TODO If we don't want to wait for write completion then do | ||||||||||
| // Runnable task = response.lockedFailWrite(failure); | ||||||||||
| // failedCallback = Callback.from(task, httpChannelState._handlerInvoker); | ||||||||||
| failedCallback = response._writeCallback; | ||||||||||
| response._writeCallback = httpChannelState._handlerInvoker; | ||||||||||
| } | ||||||||||
| else if (!httpChannelState.lockedIsLastStreamSendCompleted()) | ||||||||||
| { | ||||||||||
| // last write it is not going to happen after failure, so we can just fail anyway | ||||||||||
| httpChannelState._streamSendState = StreamSendState.FAILED; | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| if (LOG.isDebugEnabled()) | ||||||||||
| LOG.debug("succeeded: failure={} needLastStreamSend={} {}", failure, needLastStreamSend, this); | ||||||||||
| LOG.debug("succeeded: failure={} doLastStreamSend={} {}", failure, doLastStreamSend, this); | ||||||||||
|
|
||||||||||
| if (failedCallback != null) | ||||||||||
| failedCallback.failed(Objects.requireNonNullElseGet(failure, IOException::new)); | ||||||||||
|
|
||||||||||
| if (errorResponse != null) | ||||||||||
| Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure); | ||||||||||
| else if (needLastStreamSend) | ||||||||||
| else if (doLastStreamSend) | ||||||||||
| stream.send(_request._metaData, responseMetaData, true, null, response); | ||||||||||
| else if (completeStream) | ||||||||||
| httpChannelState._handlerInvoker.completeStream(stream, failure); | ||||||||||
| else if (LOG.isDebugEnabled()) | ||||||||||
| LOG.debug("No action on succeeded {}", this); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Called when the {@link Handler} (or it's delegates) fail the request handling. | ||||||||||
| * | ||||||||||
| * @param failure The reason for the failure. | ||||||||||
| */ | ||||||||||
| @Override | ||||||||||
| public void failed(Throwable failure) | ||||||||||
| { | ||||||||||
| try | ||||||||||
| { | ||||||||||
| // Called when the request/response cycle is completing with a failure. | ||||||||||
| HttpStream stream; | ||||||||||
| ChannelRequest request; | ||||||||||
| HttpChannelState httpChannelState; | ||||||||||
| ErrorResponse errorResponse = null; | ||||||||||
| try (AutoLock ignored = _request._lock.lock()) | ||||||||||
| { | ||||||||||
| if (lockedCompleteCallback()) | ||||||||||
| return; | ||||||||||
| httpChannelState = _request._httpChannelState; | ||||||||||
| stream = httpChannelState._stream; | ||||||||||
| request = _request; | ||||||||||
|
|
||||||||||
| assert httpChannelState._callbackFailure == null; | ||||||||||
|
|
||||||||||
| httpChannelState._callbackFailure = failure; | ||||||||||
|
|
||||||||||
| if (!stream.isCommitted() && !(failure instanceof Request.Handler.AbortException)) | ||||||||||
| { | ||||||||||
| // Consume any input. | ||||||||||
| Throwable unconsumed = stream.consumeAvailable(); | ||||||||||
| ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed); | ||||||||||
|
|
||||||||||
| ChannelResponse response = httpChannelState._response; | ||||||||||
| if (LOG.isDebugEnabled()) | ||||||||||
| LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", stream.isCommitted(), response.isCommitted(), this); | ||||||||||
|
|
||||||||||
| errorResponse = new ErrorResponse(request); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| if (errorResponse != null) | ||||||||||
| Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure); | ||||||||||
| else | ||||||||||
| _request.getHttpChannelState()._handlerInvoker.failed(failure); | ||||||||||
| } | ||||||||||
| catch (Throwable t) | ||||||||||
| { | ||||||||||
| ExceptionUtil.addSuppressedIfNotAssociated(t, failure); | ||||||||||
| throw t; | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private boolean lockedCompleteCallback() | ||||||||||
| { | ||||||||||
| assert _request._lock.isHeldByCurrentThread(); | ||||||||||
|
|
@@ -1685,7 +1712,7 @@ public InvocationType getInvocationType() | |||||||||
|
|
||||||||||
| /** | ||||||||||
| * Used as the {@link Response} when writing the error response | ||||||||||
| * from {@link HttpChannelState.ChannelCallback#failed(Throwable)}. | ||||||||||
| * from {@link ChannelCallback#failed(Throwable)}. | ||||||||||
| */ | ||||||||||
| private static class ErrorResponse extends ChannelResponse | ||||||||||
| { | ||||||||||
|
|
@@ -1726,7 +1753,7 @@ MetaData.Response lockedPrepareResponse(HttpChannelState httpChannelState, boole | |||||||||
|
|
||||||||||
| /** | ||||||||||
| * Used as the {@link Response} and {@link Callback} when writing the error response | ||||||||||
| * from {@link HttpChannelState.ChannelCallback#failed(Throwable)}. | ||||||||||
| * from {@link ChannelCallback#failed(Throwable)}. | ||||||||||
| */ | ||||||||||
| private static class ErrorCallback implements Callback | ||||||||||
| { | ||||||||||
|
|
@@ -1744,7 +1771,7 @@ public ErrorCallback(ChannelRequest request, ErrorResponse response, HttpStream | |||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Called when the error write in {@link HttpChannelState.ChannelCallback#failed(Throwable)} succeeds. | ||||||||||
| * Called when the error write in {@link ChannelCallback#failed(Throwable)} succeeds. | ||||||||||
| */ | ||||||||||
| @Override | ||||||||||
| public void succeeded() | ||||||||||
|
|
@@ -1783,7 +1810,7 @@ public void succeeded() | |||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Called when the error write in {@link HttpChannelState.ChannelCallback#failed(Throwable)} fails. | ||||||||||
| * Called when the error write in {@link ChannelCallback#failed(Throwable)} fails. | ||||||||||
| * | ||||||||||
| * @param x The reason for the failure. | ||||||||||
| */ | ||||||||||
|
|
||||||||||
Uh oh!
There was an error while loading. Please reload this page.