Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testPendingRequestContentThenTotalTimeout() throws Exception
completed.incrementAndGet();
assertThat(result.getRequestFailure(), notNullValue());
assertThat(result.getResponseFailure(), nullValue());
assertThat(result.getResponse().getStatus(), is(HttpStatus.OK_200));
assertThat(result.getResponse().getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
resultLatch.countDown();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ private enum StreamSendState
LAST_SENDING,

/** Last content sent and completed */
LAST_COMPLETE
LAST_COMPLETE,

/** Failing, so last send will never happen */
FAILED
}

private static final Logger LOG = LoggerFactory.getLogger(HttpChannelState.class);
Expand Down Expand Up @@ -340,7 +343,7 @@ public Runnable onContentAvailable()
}

@Override
public Invocable.InvocationType getInvocationType()
public InvocationType getInvocationType()
{
// TODO Can this actually be done, as we may need to invoke other Runnables after onContent?
// Could we at least avoid the lock???
Expand Down Expand Up @@ -575,6 +578,8 @@ private Throwable lockedStreamSend(boolean last, long length)
case LAST_SENDING, LAST_COMPLETE -> (length > 0)
? new IllegalStateException("last already written")
: NOTHING_TO_SEND;

case FAILED -> null;
};
}

Expand All @@ -588,7 +593,7 @@ private void lockedStreamSendCompleted(boolean success)
private boolean lockedIsLastStreamSendCompleted()
{
assert _lock.isHeldByCurrentThread();
return _streamSendState == StreamSendState.LAST_COMPLETE;
return _streamSendState == StreamSendState.LAST_COMPLETE || _streamSendState == StreamSendState.FAILED;
}

private boolean lockedLastStreamSend()
Expand Down Expand Up @@ -619,7 +624,9 @@ public String toString()
}
}

private class HandlerInvoker implements Invocable.Task, Callback
// HandlerInvoker is used as the Response's _writeCallback when ChannelCallback is succeeded and the last send still
// needs to be done, i.e.: _streamSendState set to LAST_SENDING by lockedLastStreamSend().
private class HandlerInvoker implements Task, Callback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should split the functionality of this class.
Leave run() in HandlerInvoker, but move the Callback functionality into a LastStreamSendCallback class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really not a 12.0 thing then

{
@Override
public void run()
Expand Down Expand Up @@ -719,25 +726,19 @@ public void run()
@Override
public void succeeded()
{
HttpStream stream;
boolean completeStream;
try (AutoLock ignored = _lock.lock())
{
assert _callbackCompleted;
_streamSendState = StreamSendState.LAST_COMPLETE;
completeStream = _handling == null;
stream = _stream;
}

if (completeStream)
completeStream(stream, null);
completeLastWrite(null);
}

/**
* Called only as {@link Callback} by last send from {@link ChannelCallback#succeeded}
*/
@Override
public void failed(Throwable failure)
{
completeLastWrite(failure);
}

private void completeLastWrite(Throwable failure)
{
HttpStream stream;
boolean completeStream;
Expand Down Expand Up @@ -1353,7 +1354,7 @@ public void succeeded()
httpChannel.lockedStreamSendCompleted(true);
}
if (callback != null)
httpChannel._writeInvoker.run(callback::succeeded);
httpChannel._writeInvoker.run(new ReadyTask(callback.getInvocationType(), callback::succeeded));
}

/**
Expand Down Expand Up @@ -1517,16 +1518,32 @@ private ChannelCallback(ChannelRequest request)
@Override
public void succeeded()
{
// Called when the request/response cycle is completing successfully.
completeChannelCallback(null);
}

/**
* Called when the {@link Handler} (or it's delegates) fail the request handling.
*
* @param failure The reason for the failure.
*/
@Override
public void failed(Throwable failure)
{
completeChannelCallback(failure);
}

private void completeChannelCallback(Throwable failure)
{
// Called when the request/response cycle is completing.
HttpStream stream;
boolean needLastStreamSend;
boolean doLastStreamSend = false;
HttpChannelState httpChannelState;
Throwable failure = null;
ChannelRequest request;
ChannelResponse response;
MetaData.Response responseMetaData = null;
boolean completeStream;
boolean completeStream = false;
ErrorResponse errorResponse = null;
Callback failedCallback = null;

try (AutoLock ignored = _request._lock.lock())
{
Expand All @@ -1537,115 +1554,105 @@ public void succeeded()
httpChannelState = _request._httpChannelState;
response = httpChannelState._response;
stream = httpChannelState._stream;
assert httpChannelState._callbackFailure == null;

// We convert a call to succeeded with pending demand/write into a call to failed.
// Turn pending demand or unconsumed input on persistent connections into failure
if (httpChannelState._onContentAvailable != null)
failure = ExceptionUtil.combine(failure, new IllegalStateException("demand pending"));
else if (httpChannelState.getConnectionMetaData().isPersistent())
failure = ExceptionUtil.combine(failure, stream.consumeAvailable());
else
{
Throwable unconsumed = stream.consumeAvailable();
if (failure != null)
ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed);
Comment on lines +1566 to +1568
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just ExceptionUtil.combine(failure, stream.consumeAvailable()); as the line above?
But then, this else block is identical to the else-if above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is different. If failure==null, then it remains null. i.e. it is not a failure to not consume. I'll comment....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I will try without the whole branch....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Branch is needed

if (LOG.isDebugEnabled())
LOG.debug("consumeAvailable: {} {} ", unconsumed == null, httpChannelState);
}

// Pending writes are also failures
if (response.lockedIsWriting())
failure = ExceptionUtil.combine(failure, new IllegalStateException("write pending"));

assert httpChannelState._callbackFailure == null;

needLastStreamSend = httpChannelState.lockedLastStreamSend();
completeStream = !needLastStreamSend && httpChannelState._handling == null && httpChannelState.lockedIsLastStreamSendCompleted();
if (needLastStreamSend)
response._writeCallback = httpChannelState._handlerInvoker;

if (httpChannelState._responseHeaders.commit())
// If we are not failing (yet) and not committed, then commit and check headers
if (failure == null && httpChannelState._responseHeaders.commit())
{
responseMetaData = response.lockedPrepareResponse(httpChannelState, true);
// Check the response headers against the content written.
long totalWritten = response._contentBytesWritten;
long committedContentLength = httpChannelState._committedContentLength;
if (committedContentLength >= 0 &&
committedContentLength != totalWritten &&
!(totalWritten == 0 && (HttpMethod.HEAD.is(_request.getMethod()) || response.getStatus() == HttpStatus.NOT_MODIFIED_304)))
failure = ExceptionUtil.combine(failure, new IOException("content-length %d != %d written".formatted(committedContentLength, totalWritten)));
}

long totalWritten = response._contentBytesWritten;
long committedContentLength = httpChannelState._committedContentLength;

if (committedContentLength >= 0 &&
committedContentLength != totalWritten &&
!(totalWritten == 0 && (HttpMethod.HEAD.is(_request.getMethod()) || response.getStatus() == HttpStatus.NOT_MODIFIED_304)))
failure = ExceptionUtil.combine(failure, new IOException("content-length %d != %d written".formatted(committedContentLength, totalWritten)));

// Is the request fully consumed?
Throwable unconsumed = stream.consumeAvailable();
if (LOG.isDebugEnabled())
LOG.debug("consumeAvailable: {} {} ", unconsumed == null, httpChannelState);

if (unconsumed != null && httpChannelState.getConnectionMetaData().isPersistent())
failure = ExceptionUtil.combine(failure, unconsumed);

if (failure != null)
// If we are still not failing, is a last stream send needed or can we complete?
if (failure == null)
{
doLastStreamSend = httpChannelState.lockedLastStreamSend();
if (doLastStreamSend)
response._writeCallback = httpChannelState._handlerInvoker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels really sneaky.

Why not just doing response.write(true, null, httpChannelState._handlerInvoker)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have done half the write mechanism already by doing the commit at 1578 above. Calling write now will not send the headers.

// or complete the stream if everything is done.
else if (httpChannelState.lockedIsLastStreamSendCompleted())
completeStream = httpChannelState._handled;
}
else
{
// We are failing...
httpChannelState._callbackFailure = failure;
if (!stream.isCommitted())

// Can we and should we generate an error response?
if (!stream.isCommitted() && !ExceptionUtil.isAssociated(failure, Request.Handler.AbortException.class))
{
// We are not committed, so we can send an error response.
errorResponse = new ErrorResponse(request);
else
completeStream = true;
}
else if (httpChannelState._handled)
{
// Callback and handling are completed, so it is just a matter of the last write
if (httpChannelState.lockedIsLastStreamSendCompleted())
{
// We are committed, handling completed, and last write completed, so complete the stream now.
completeStream = true;
}
else if (response.lockedIsWriting())
{
// We are currently writing, so let the completion of that write handle the failure
httpChannelState._callbackFailure = failure;
failedCallback = response._writeCallback;
response._writeCallback = httpChannelState._handlerInvoker;
Comment on lines +1625 to +1626
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this way, we may wait forever for the write to complete and invoke the _handlerInvoker.

How about this:

Suggested change
failedCallback = response._writeCallback;
response._writeCallback = httpChannelState._handlerInvoker;
Runnable task = response.lockedFailWrite(failure);
failedCallback = Callback.from(task, httpChannelState._handlerInvoker);

In 12.1.x we will leverage the new cancelSend() feature automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that continuing on with a pending write pointing at the HttpChannelState is a good idea. Feels like an invitation for a race. What is wrong waiting for the write to complete. It won't be forever unless they have disabled idle timeout.

}
else
{
// There has been no last write, but we will just fail the stream instead.
completeStream = true;
}
}
else if (!httpChannelState.lockedIsLastStreamSendCompleted() && !response.lockedIsWriting())
{
// last write is not going to happen after failure, so we can just fail anyway
httpChannelState._streamSendState = StreamSendState.LAST_COMPLETE;
}
}
}

if (LOG.isDebugEnabled())
LOG.debug("succeeded: failure={} needLastStreamSend={} {}", failure, needLastStreamSend, this);
LOG.debug("succeeded: failure={} doLastStreamSend={} {}", failure, doLastStreamSend, this);

if (failedCallback != null)
failedCallback.failed(Objects.requireNonNullElseGet(failure, IOException::new));

if (errorResponse != null)
Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure);
else if (needLastStreamSend)
else if (doLastStreamSend)
stream.send(_request._metaData, responseMetaData, true, null, response);
else if (completeStream)
httpChannelState._handlerInvoker.completeStream(stream, failure);
else if (LOG.isDebugEnabled())
LOG.debug("No action on succeeded {}", this);
}

/**
* Called when the {@link Handler} (or it's delegates) fail the request handling.
*
* @param failure The reason for the failure.
*/
@Override
public void failed(Throwable failure)
{
try
{
// Called when the request/response cycle is completing with a failure.
HttpStream stream;
ChannelRequest request;
HttpChannelState httpChannelState;
ErrorResponse errorResponse = null;
try (AutoLock ignored = _request._lock.lock())
{
if (lockedCompleteCallback())
return;
httpChannelState = _request._httpChannelState;
stream = httpChannelState._stream;
request = _request;

assert httpChannelState._callbackFailure == null;

httpChannelState._callbackFailure = failure;

if (!stream.isCommitted() && !(failure instanceof Request.Handler.AbortException))
{
// Consume any input.
Throwable unconsumed = stream.consumeAvailable();
ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed);

ChannelResponse response = httpChannelState._response;
if (LOG.isDebugEnabled())
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", stream.isCommitted(), response.isCommitted(), this);

errorResponse = new ErrorResponse(request);
}
}

if (errorResponse != null)
Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure);
else
_request.getHttpChannelState()._handlerInvoker.failed(failure);
}
catch (Throwable t)
{
ExceptionUtil.addSuppressedIfNotAssociated(t, failure);
throw t;
}
}

private boolean lockedCompleteCallback()
{
assert _request._lock.isHeldByCurrentThread();
Expand Down Expand Up @@ -1685,7 +1692,7 @@ public InvocationType getInvocationType()

/**
* Used as the {@link Response} when writing the error response
* from {@link HttpChannelState.ChannelCallback#failed(Throwable)}.
* from {@link ChannelCallback#failed(Throwable)}.
*/
private static class ErrorResponse extends ChannelResponse
{
Expand Down Expand Up @@ -1726,7 +1733,7 @@ MetaData.Response lockedPrepareResponse(HttpChannelState httpChannelState, boole

/**
* Used as the {@link Response} and {@link Callback} when writing the error response
* from {@link HttpChannelState.ChannelCallback#failed(Throwable)}.
* from {@link ChannelCallback#failed(Throwable)}.
*/
private static class ErrorCallback implements Callback
{
Expand All @@ -1744,7 +1751,7 @@ public ErrorCallback(ChannelRequest request, ErrorResponse response, HttpStream
}

/**
* Called when the error write in {@link HttpChannelState.ChannelCallback#failed(Throwable)} succeeds.
* Called when the error write in {@link ChannelCallback#failed(Throwable)} succeeds.
*/
@Override
public void succeeded()
Expand Down Expand Up @@ -1783,7 +1790,7 @@ public void succeeded()
}

/**
* Called when the error write in {@link HttpChannelState.ChannelCallback#failed(Throwable)} fails.
* Called when the error write in {@link ChannelCallback#failed(Throwable)} fails.
*
* @param x The reason for the failure.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -360,9 +359,13 @@ public boolean handle(Request request, Response response, Callback callback)
\r
""";

String response = _connector.getResponse(request);
assertThat(response, is(nullValue()));
await().atMost(5, TimeUnit.SECONDS).until(_statsHandler::getResponses5xx, is(1));
try (StacklessLogging ignored = new StacklessLogging(Response.class))
{
String response = _connector.getResponse(request);
assertThat(response, containsString("HTTP/1.1 500 "));
assertThat(response, containsString("content-length 1 != 0 written"));
await().atMost(5, TimeUnit.SECONDS).until(_statsHandler::getResponses5xx, is(1));
}
}

@Test
Expand Down
Loading