diff --git a/src/main/java/mondrian/xmla/XmlaHandler.java b/src/main/java/mondrian/xmla/XmlaHandler.java index 8f85bb2..43f76fa 100644 --- a/src/main/java/mondrian/xmla/XmlaHandler.java +++ b/src/main/java/mondrian/xmla/XmlaHandler.java @@ -64,6 +64,8 @@ public class XmlaHandler { final ConnectionFactory connectionFactory; private final String prefix; + private Map queryResultLookup; + private boolean streamExecuteResults; /** * Returns a new OlapConnection opened with the credentials specified in the @@ -610,6 +612,29 @@ public XmlaHandler(ConnectionFactory connectionFactory, String prefix) assert prefix != null; this.connectionFactory = connectionFactory; this.prefix = prefix; + this.streamExecuteResults = false; + } + + public void setStreamExecuteResults(final int maxResultsToDefer) { + if (!this.streamExecuteResults) { + this.streamExecuteResults = true; + this.queryResultLookup = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry( + java.util.Map.Entry eldest) + { + if (size() > maxResultsToDefer) { + try { + eldest.getValue().close(); + } catch (Exception e) { + //ignore + } + return true; + } + return false; + } + }; + } } /** @@ -693,22 +718,6 @@ private void execute( XmlaResponse response) throws XmlaException { - final Map properties = request.getProperties(); - - // Default responseMimeType is SOAP. - Enumeration.ResponseMimeType responseMimeType = - getResponseMimeType(request); - - // Default value is SchemaData, or Data for JSON responses. - final String contentName = - properties.get(PropertyDefinition.Content.name()); - Content content = Util.lookup( - Content.class, - contentName, - responseMimeType == Enumeration.ResponseMimeType.JSON - ? Content.Data - : Content.DEFAULT); - // Handle execute QueryResult result = null; try { @@ -717,8 +726,80 @@ private void execute( } else { result = executeQuery(request); } + } catch (XmlaException xe) { + if (result != null) { + try { + result.close(); + } catch (SQLException e) { + //ignore + } + } + throw xe; + } catch (RuntimeException re) { + if (result != null) { + try { + result.close(); + } catch (SQLException e) { + //ignore + } + } + throw re; + } + if (streamExecuteResults) { + synchronized(queryResultLookup) { + queryResultLookup.put(request, result); + } + } else { + writeQueryResult(result, request, response.getWriter()); + } + } + + public void writeQueryResult(XmlaRequest request, SaxWriter writer) { + QueryResult result; + synchronized(queryResultLookup) { + result = queryResultLookup.remove(request); + } + if (result != null) { + writeQueryResult(result, request, writer); + } + } + + public void closeQueryResult(XmlaRequest request) { + QueryResult result = null; + if (queryResultLookup != null) { + synchronized(queryResultLookup) { + result = queryResultLookup.remove(request); + } + } + if (result != null) { + try { + result.close(); + } catch (SQLException e) { + //ignore + } + } + } + + private void writeQueryResult(QueryResult result, + XmlaRequest request, SaxWriter writer) { + try { + final Map properties = request.getProperties(); + + // Default responseMimeType is SOAP. + Enumeration.ResponseMimeType responseMimeType = + getResponseMimeType(request); + + // Default value is SchemaData, or Data for JSON responses. + final String contentName = + properties.get(PropertyDefinition.Content.name()); + Content content = Util.lookup( + Content.class, + contentName, + responseMimeType == Enumeration.ResponseMimeType.JSON + ? Content.Data + : Content.DEFAULT); + - SaxWriter writer = response.getWriter(); writer.startDocument(); writer.startElement( diff --git a/src/main/java/mondrian/xmla/XmlaServlet.java b/src/main/java/mondrian/xmla/XmlaServlet.java index a3650d5..123b240 100644 --- a/src/main/java/mondrian/xmla/XmlaServlet.java +++ b/src/main/java/mondrian/xmla/XmlaServlet.java @@ -155,6 +155,7 @@ protected void doPost( Phase phase = Phase.VALIDATE_HTTP_HEAD; Enumeration.ResponseMimeType mimeType = Enumeration.ResponseMimeType.SOAP; + Map context = new HashMap(); try { if (charEncoding != null) { @@ -172,8 +173,6 @@ protected void doPost( response.setContentType(mimeType.getMimeType()); - Map context = new HashMap(); - try { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Invoking validate http header callbacks"); @@ -192,7 +191,7 @@ protected void doPost( "Errors when invoking callbacks validateHttpHeader", xex); handleFault(response, responseSoapParts, phase, xex); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); return; } catch (Exception ex) { LOGGER.error( @@ -206,7 +205,7 @@ protected void doPost( CHH_FAULT_FS, ex)); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); return; } @@ -249,7 +248,7 @@ protected void doPost( LOGGER.error("Unable to unmarshall SOAP message", xex); handleFault(response, responseSoapParts, phase, xex); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); return; } @@ -270,7 +269,7 @@ protected void doPost( LOGGER.error("Errors when handling XML/A message", xex); handleFault(response, responseSoapParts, phase, xex); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); return; } @@ -289,7 +288,7 @@ protected void doPost( LOGGER.error("Errors when invoking callbacks preaction", xex); handleFault(response, responseSoapParts, phase, xex); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); return; } catch (Exception ex) { LOGGER.error("Errors when invoking callbacks preaction", ex); @@ -302,7 +301,7 @@ protected void doPost( CPREA_FAULT_FS, ex)); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); return; } @@ -323,7 +322,7 @@ protected void doPost( LOGGER.error("Errors when handling XML/A message", xex); handleFault(response, responseSoapParts, phase, xex); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); return; } @@ -346,7 +345,7 @@ protected void doPost( LOGGER.error("Errors when invoking callbacks postaction", xex); handleFault(response, responseSoapParts, phase, xex); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); return; } catch (Exception ex) { LOGGER.error("Errors when invoking callbacks postaction", ex); @@ -360,7 +359,7 @@ protected void doPost( CPOSTA_FAULT_FS, ex)); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); return; } @@ -368,17 +367,17 @@ protected void doPost( try { response.setStatus(HttpServletResponse.SC_OK); - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); } catch (XmlaException xex) { LOGGER.error("Errors when handling XML/A message", xex); handleFault(response, responseSoapParts, phase, xex); phase = Phase.SEND_ERROR; - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); } } catch (Throwable t) { LOGGER.error("Unknown Error when handling XML/A message", t); handleFault(response, responseSoapParts, phase, t); - marshallSoapMessage(response, responseSoapParts, mimeType); + marshallSoapMessage(response, responseSoapParts, mimeType, context); } } @@ -416,7 +415,8 @@ protected abstract void handleSoapBody( protected abstract void marshallSoapMessage( HttpServletResponse response, byte[][] responseSoapParts, - Enumeration.ResponseMimeType responseMimeType) + Enumeration.ResponseMimeType responseMimeType, + Map context) throws XmlaException; /** diff --git a/src/main/java/mondrian/xmla/impl/DefaultXmlaServlet.java b/src/main/java/mondrian/xmla/impl/DefaultXmlaServlet.java index bc66612..73adcc6 100644 --- a/src/main/java/mondrian/xmla/impl/DefaultXmlaServlet.java +++ b/src/main/java/mondrian/xmla/impl/DefaultXmlaServlet.java @@ -43,10 +43,26 @@ public abstract class DefaultXmlaServlet extends XmlaServlet { */ private static final String REQUIRE_AUTHENTICATED_SESSIONS = "requireAuthenticatedSessions"; + /** + * Servlet config parameter that determines whether the xmla + * body will be streamed. + */ + private static final String STREAM_XMLA_BODY = + "streamXmlaBody"; + /** + * Servlet config parameter that determines the maximum number + * of query results that can be concurrently deferred. + * Used only when streaming results. + */ + private static final String MAX_RESULTS_TO_DEFER = "maxResultsToDefer"; + private static final int DEFAULT_MAX_RESULTS_TO_DEFER = 30; + protected static final String CONTEXT_XMLA_REQUEST = "xmlaRequest"; private DocumentBuilderFactory domFactory = null; private boolean requireAuthenticatedSessions = false; + protected boolean streamXmlaBody = false; + protected int maxResultsToRetain; /** * Session properties, keyed by session ID. Currently just username and @@ -61,6 +77,23 @@ public void init(ServletConfig servletConfig) throws ServletException { this.requireAuthenticatedSessions = Boolean.parseBoolean( servletConfig.getInitParameter(REQUIRE_AUTHENTICATED_SESSIONS)); + this.streamXmlaBody = + Boolean.parseBoolean( + servletConfig.getInitParameter(STREAM_XMLA_BODY)); + if (streamXmlaBody) { + String initMaxResultsToDefer = servletConfig.getInitParameter(MAX_RESULTS_TO_DEFER); + if (initMaxResultsToDefer != null) { + try { + this.maxResultsToRetain = + Integer.parseInt(initMaxResultsToDefer); + } catch (NumberFormatException nfe) { + LOGGER.warn("Max results to defer servlet parameter must be an integer.", nfe); + this.maxResultsToRetain = DEFAULT_MAX_RESULTS_TO_DEFER; + } + } else { + this.maxResultsToRetain = DEFAULT_MAX_RESULTS_TO_DEFER; + } + } } protected static DocumentBuilderFactory getDocumentBuilderFactory() { @@ -502,7 +535,14 @@ protected void handleSoapBody( new DefaultXmlaResponse(osBuf, encoding, responseMimeType); try { - getXmlaHandler().process(xmlaReq, xmlaRes); + XmlaHandler handler = getXmlaHandler(); + if (streamXmlaBody) { + handler.setStreamExecuteResults(maxResultsToRetain); + } + handler.process(xmlaReq, xmlaRes); + if (streamXmlaBody) { + context.put(CONTEXT_XMLA_REQUEST, xmlaReq); + } } catch (XmlaException ex) { throw ex; } catch (Exception ex) { @@ -528,7 +568,8 @@ protected void handleSoapBody( protected void marshallSoapMessage( HttpServletResponse response, byte[][] responseSoapParts, - Enumeration.ResponseMimeType responseMimeType) + Enumeration.ResponseMimeType responseMimeType, + Map context) throws XmlaException { try { @@ -567,6 +608,7 @@ protected void marshallSoapMessage( byte[] soapBody = responseSoapParts[1]; Object[] byteChunks = null; + int bodyChunkIndex = 0; try { switch (responseMimeType) { @@ -574,6 +616,7 @@ protected void marshallSoapMessage( byteChunks = new Object[] { soapBody, }; + bodyChunkIndex = 0; break; case SOAP: @@ -599,6 +642,7 @@ protected void marshallSoapMessage( soapBody, s4.getBytes(encoding), }; + bodyChunkIndex = 3; break; } } catch (UnsupportedEncodingException uee) { @@ -633,25 +677,14 @@ protected void marshallSoapMessage( int bufferSize = 4096; ByteBuffer buffer = ByteBuffer.allocate(bufferSize); WritableByteChannel wch = Channels.newChannel(outputStream); - ReadableByteChannel rch; + int index = 0; for (Object byteChunk : byteChunks) { - if (byteChunk == null || ((byte[]) byteChunk).length == 0) { - continue; + // Write the byte chunk even when streaming the XML/A body. + writeBytes(byteChunk, buffer, wch, bufferSize); + if (index == bodyChunkIndex && this.streamXmlaBody) { + streamBody(response, outputStream, context); } - rch = Channels.newChannel( - new ByteArrayInputStream((byte[]) byteChunk)); - - int readSize; - do { - buffer.clear(); - readSize = rch.read(buffer); - buffer.flip(); - - int writeSize = 0; - while ((writeSize += wch.write(buffer)) < readSize) { - } - } while (readSize == bufferSize); - rch.close(); + index++; } outputStream.flush(); } catch (IOException ioe) { @@ -660,8 +693,10 @@ protected void marshallSoapMessage( ioe); } } catch (XmlaException xex) { + closeQueryResult(context); throw xex; } catch (Exception ex) { + closeQueryResult(context); throw new XmlaException( SERVER_FAULT_FC, MSM_UNKNOWN_CODE, @@ -670,6 +705,61 @@ protected void marshallSoapMessage( } } + private void writeBytes(Object byteChunk, ByteBuffer buffer, + WritableByteChannel wch, int bufferSize) + throws IOException { + if (byteChunk == null || ((byte[]) byteChunk).length == 0) { + return; + } + ReadableByteChannel rch = Channels.newChannel( + new ByteArrayInputStream((byte[]) byteChunk)); + + int readSize; + do { + buffer.clear(); + readSize = rch.read(buffer); + buffer.flip(); + + int writeSize = 0; + while ((writeSize += wch.write(buffer)) < readSize) { + } + } while (readSize == bufferSize); + rch.close(); + } + + protected void streamBody( + HttpServletResponse response, + OutputStream os, + Map context) + { + XmlaRequest xmlaRequest = (XmlaRequest)context.get(CONTEXT_XMLA_REQUEST); + // "ResponseMimeType" may be in the context if the "Accept" HTTP + // header was specified. But override if the SOAP request has the + // "ResponseMimeType" property. + Enumeration.ResponseMimeType responseMimeType = + Enumeration.ResponseMimeType.SOAP; + final String responseMimeTypeName = + xmlaRequest.getProperties().get("ResponseMimeType"); + if (responseMimeTypeName != null) { + responseMimeType = + Enumeration.ResponseMimeType.MAP.get( + responseMimeTypeName); + if (responseMimeType != null) { + context.put(CONTEXT_MIME_TYPE, responseMimeType); + } + } + + XmlaResponse xmlaResponse = + new DefaultXmlaResponse( + os, response.getCharacterEncoding(), responseMimeType); + getXmlaHandler().writeQueryResult(xmlaRequest, xmlaResponse.getWriter()); + } + + protected void closeQueryResult(Map context) { + XmlaRequest xmlaRequest = (XmlaRequest)context.get(CONTEXT_XMLA_REQUEST); + getXmlaHandler().closeQueryResult(xmlaRequest); + } + /** * This produces a SOAP 1.1 version Fault element - not a 1.2 version. * @@ -680,6 +770,10 @@ protected void handleFault( Phase phase, Throwable t) { + // If we were streaming results, we cannot send an XML/A fault + if (response.isCommitted()) { + return; + } // Regardless of whats been put into the response so far, clear // it out. response.reset(); @@ -781,7 +875,7 @@ protected void handleFault( uee); } catch (Exception e) { LOGGER.error( - "Unexcepted runimt exception when handing SOAP fault :("); + "Unexpected runtime exception when handing SOAP fault :("); } responseSoapParts[1] = osBuf.toByteArray();