Skip to content

Commit dc9d968

Browse files
arturobernalgok2c
authored andcommitted
HTTPCLIENT-1822: async transparent content decompression. Add DeflatingAsyncEntityProducer and InflatingAsyncEntityCunsumer using Deflater / Inflater API directly
1 parent 9780a8f commit dc9d968

File tree

9 files changed

+1352
-0
lines changed

9 files changed

+1352
-0
lines changed
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.async.methods;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.Set;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.zip.Deflater;
36+
37+
import org.apache.hc.core5.http.Header;
38+
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
39+
import org.apache.hc.core5.http.nio.DataStreamChannel;
40+
import org.apache.hc.core5.util.Args;
41+
42+
/**
43+
* {@code AsyncEntityProducer} that streams the output of another producer
44+
* through the raw DEFLATE compression algorithm.
45+
*
46+
* <p>The delegate’s bytes are read in small chunks, compressed with
47+
* {@link java.util.zip.Deflater} and written immediately to the HTTP I/O
48+
* layer. Memory use is therefore bounded even for very large request
49+
* entities.</p>
50+
*
51+
* @since 5.6
52+
*/
53+
public final class DeflatingAsyncEntityProducer implements AsyncEntityProducer {
54+
55+
/**
56+
* inbound copy‐buffer
57+
*/
58+
private static final int IN_BUF = 8 * 1024;
59+
/**
60+
* outbound staging buffer
61+
*/
62+
private static final int OUT_BUF = 8 * 1024;
63+
64+
private final AsyncEntityProducer delegate;
65+
private final String contentType;
66+
private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, /*nowrap=*/true);
67+
68+
/**
69+
* holds compressed bytes not yet sent downstream
70+
*/
71+
private final ByteBuffer pending = ByteBuffer.allocate(OUT_BUF);
72+
private final byte[] in = new byte[IN_BUF];
73+
74+
private final AtomicBoolean delegateEnded = new AtomicBoolean(false);
75+
private boolean finished = false;
76+
77+
public DeflatingAsyncEntityProducer(final AsyncEntityProducer delegate) {
78+
this.delegate = Args.notNull(delegate, "delegate");
79+
this.contentType = delegate.getContentType();
80+
/* place pending into “read-mode” with no data */
81+
pending.flip();
82+
}
83+
84+
// ------------------------------------------------------------------ metadata
85+
86+
@Override
87+
public boolean isRepeatable() {
88+
return false;
89+
}
90+
91+
@Override
92+
public long getContentLength() {
93+
return -1;
94+
} // unknown
95+
96+
@Override
97+
public String getContentType() {
98+
return contentType;
99+
}
100+
101+
@Override
102+
public String getContentEncoding() {
103+
return "deflate";
104+
}
105+
106+
@Override
107+
public boolean isChunked() {
108+
return true;
109+
}
110+
111+
@Override
112+
public Set<String> getTrailerNames() {
113+
return Collections.emptySet();
114+
}
115+
116+
@Override
117+
public int available() {
118+
if (pending.hasRemaining()) {
119+
return pending.remaining();
120+
}
121+
return delegate.available();
122+
}
123+
124+
// ------------------------------------------------------------------ core
125+
126+
@Override
127+
public void produce(final DataStreamChannel channel) throws IOException {
128+
/* 1 — flush any leftover compressed bytes first */
129+
if (flushPending(channel)) {
130+
return; // back-pressure: outer channel could not accept more
131+
}
132+
133+
/* 2 — pull more data from delegate */
134+
delegate.produce(new InnerChannel(channel));
135+
136+
/* 3 — if delegate ended, finish the deflater */
137+
if (delegateEnded.get() && !finished) {
138+
deflater.finish();
139+
deflateToPending();
140+
flushPending(channel);
141+
if (!pending.hasRemaining()) {
142+
finished = true;
143+
channel.endStream();
144+
}
145+
}
146+
}
147+
148+
/**
149+
* copy as much as possible from {@link #pending} to the wire
150+
*/
151+
private boolean flushPending(final DataStreamChannel ch) throws IOException {
152+
while (pending.hasRemaining()) {
153+
final int written = ch.write(pending);
154+
if (written == 0) {
155+
return true; // back-pressure
156+
}
157+
}
158+
pending.clear().flip(); // no data left → empty read-mode
159+
return false;
160+
}
161+
162+
/**
163+
* drain {@link #deflater} into {@link #pending}
164+
*/
165+
private void deflateToPending() {
166+
/* switch pending to write-mode */
167+
pending.compact();
168+
final byte[] out = pending.array();
169+
int total;
170+
do {
171+
total = deflater.deflate(out, pending.position(), pending.remaining(),
172+
Deflater.NO_FLUSH);
173+
pending.position(pending.position() + total);
174+
if (!pending.hasRemaining() && total > 0) {
175+
/* buffer full: grow to the next power of two */
176+
final ByteBuffer bigger = ByteBuffer.allocate(pending.capacity() * 2);
177+
pending.flip();
178+
bigger.put(pending);
179+
pending.clear();
180+
pending.put(bigger);
181+
}
182+
} while (total > 0);
183+
pending.flip(); // back to read-mode
184+
}
185+
186+
// ------------------------------------------------------------------ inner channel that receives raw bytes
187+
188+
private final class InnerChannel implements DataStreamChannel {
189+
private final DataStreamChannel outer;
190+
191+
InnerChannel(final DataStreamChannel outer) {
192+
this.outer = outer;
193+
}
194+
195+
@Override
196+
public void requestOutput() {
197+
outer.requestOutput();
198+
}
199+
200+
@Override
201+
public int write(final ByteBuffer src) throws IOException {
202+
int consumed = 0;
203+
while (src.hasRemaining()) {
204+
final int chunk = Math.min(src.remaining(), in.length);
205+
src.get(in, 0, chunk);
206+
deflater.setInput(in, 0, chunk);
207+
consumed += chunk;
208+
deflateToPending();
209+
if (flushPending(outer)) { // honour back-pressure
210+
break;
211+
}
212+
}
213+
return consumed;
214+
}
215+
216+
@Override
217+
public void endStream() {
218+
delegateEnded.set(true);
219+
}
220+
221+
@Override
222+
public void endStream(final List<? extends Header> trailers) {
223+
endStream();
224+
}
225+
}
226+
227+
// ------------------------------------------------------------------ error / cleanup
228+
229+
@Override
230+
public void failed(final Exception cause) {
231+
delegate.failed(cause);
232+
}
233+
234+
@Override
235+
public void releaseResources() {
236+
delegate.releaseResources();
237+
deflater.end();
238+
}
239+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.async.methods;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.List;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.zip.DataFormatException;
34+
import java.util.zip.Inflater;
35+
36+
import org.apache.hc.core5.http.Header;
37+
import org.apache.hc.core5.http.HttpException;
38+
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
39+
import org.apache.hc.core5.http.nio.CapacityChannel;
40+
41+
/**
42+
* <p>Streaming, non-blocking {@link AsyncDataConsumer} that transparently
43+
* inflates a response whose {@code Content-Encoding} is {@code deflate}.
44+
* The decoded bytes are pushed straight to the wrapped downstream consumer
45+
* while honouring reactor back-pressure.</p>
46+
*
47+
* <p>The implementation understands both formats that exist “in the wild”: the
48+
* raw DEFLATE stream (RFC 1951) and the zlib-wrapped variant (RFC 1950).
49+
* If the caller does not specify which one to expect, the first two bytes of
50+
* the stream are inspected and the proper decoder is chosen automatically.</p>
51+
*
52+
* <p>No {@code InputStream}/{@code OutputStream} buffering is used; memory
53+
* footprint is bounded and suitable for very large payloads.</p>
54+
*
55+
* @since 5.6
56+
*/
57+
public final class InflatingAsyncDataConsumer implements AsyncDataConsumer {
58+
59+
private final AsyncDataConsumer downstream;
60+
private final Boolean nowrapHint;
61+
private Inflater inflater;
62+
private boolean formatChosen;
63+
private final byte[] out = new byte[8 * 1024];
64+
private final AtomicBoolean closed = new AtomicBoolean(false);
65+
66+
public InflatingAsyncDataConsumer(
67+
final AsyncDataConsumer downstream, final Boolean nowrapHint) {
68+
this.downstream = downstream;
69+
this.nowrapHint = nowrapHint;
70+
this.inflater = new Inflater(nowrapHint == null || nowrapHint);
71+
}
72+
73+
@Override
74+
public void updateCapacity(final CapacityChannel ch) throws IOException {
75+
downstream.updateCapacity(ch);
76+
}
77+
78+
@Override
79+
public void consume(final ByteBuffer src) throws IOException {
80+
if (closed.get()) {
81+
return;
82+
}
83+
84+
if (nowrapHint == null && !formatChosen && src.remaining() >= 2) {
85+
src.mark();
86+
final int b0 = src.get() & 0xFF;
87+
final int b1 = src.get() & 0xFF;
88+
src.reset();
89+
final boolean zlib = b0 == 0x78 &&
90+
(b1 == 0x01 || b1 == 0x5E || b1 == 0x9C || b1 == 0xDA);
91+
if (zlib) {
92+
inflater.end();
93+
inflater = new Inflater(false);
94+
}
95+
formatChosen = true;
96+
}
97+
98+
final byte[] in = new byte[src.remaining()];
99+
src.get(in);
100+
inflater.setInput(in);
101+
102+
try {
103+
int n;
104+
while ((n = inflater.inflate(out)) > 0) {
105+
downstream.consume(ByteBuffer.wrap(out, 0, n));
106+
}
107+
if (inflater.needsDictionary()) {
108+
throw new IOException("Deflate dictionary required");
109+
}
110+
} catch (final DataFormatException ex) {
111+
throw new IOException("Corrupt DEFLATE stream", ex);
112+
}
113+
}
114+
115+
@Override
116+
public void streamEnd(final List<? extends Header> trailers)
117+
throws HttpException, IOException {
118+
if (closed.compareAndSet(false, true)) {
119+
inflater.end();
120+
downstream.streamEnd(trailers);
121+
}
122+
}
123+
124+
@Override
125+
public void releaseResources() {
126+
inflater = null;
127+
downstream.releaseResources();
128+
}
129+
}

0 commit comments

Comments
 (0)