Skip to content

Commit 1bfeaaf

Browse files
Transparent async content (de)compression with gzip (#700)
1 parent 40b4111 commit 1bfeaaf

File tree

7 files changed

+764
-4
lines changed

7 files changed

+764
-4
lines changed
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.async.methods;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.Set;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.zip.CRC32;
36+
import java.util.zip.Deflater;
37+
38+
import org.apache.hc.core5.http.Header;
39+
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
40+
import org.apache.hc.core5.http.nio.DataStreamChannel;
41+
import org.apache.hc.core5.util.Args;
42+
43+
/**
44+
* Streams an {@link AsyncEntityProducer} through raw DEFLATE
45+
* and wraps the result in a valid GZIP container.
46+
* <p>
47+
* Memory usage is bounded (8 KiB buffers) and back-pressure
48+
* from the I/O reactor is honoured.
49+
*
50+
* @since 5.6
51+
*/
52+
public final class DeflatingGzipEntityProducer implements AsyncEntityProducer {
53+
54+
/* ---------------- constants & buffers --------------------------- */
55+
56+
private static final int IN_BUF = 8 * 1024;
57+
private static final int OUT_BUF = 8 * 1024;
58+
59+
private final AsyncEntityProducer delegate;
60+
private final CRC32 crc = new CRC32();
61+
private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
62+
private final byte[] in = new byte[IN_BUF];
63+
private final ByteBuffer outBuf = ByteBuffer.allocate(OUT_BUF);
64+
65+
private boolean headerSent = false;
66+
private boolean finished = false;
67+
private long uncompressed = 0;
68+
69+
private final AtomicBoolean released = new AtomicBoolean(false);
70+
71+
public DeflatingGzipEntityProducer(final AsyncEntityProducer delegate) {
72+
this.delegate = Args.notNull(delegate, "delegate");
73+
outBuf.flip(); // start in “read mode” with no data
74+
}
75+
76+
/* ------------------- metadata ------------------- */
77+
78+
@Override
79+
public boolean isRepeatable() {
80+
return delegate.isRepeatable();
81+
}
82+
83+
@Override
84+
public long getContentLength() {
85+
return -1;
86+
} // unknown
87+
88+
@Override
89+
public String getContentType() {
90+
return delegate.getContentType();
91+
}
92+
93+
@Override
94+
public String getContentEncoding() {
95+
return "gzip";
96+
}
97+
98+
@Override
99+
public boolean isChunked() {
100+
return true;
101+
}
102+
103+
@Override
104+
public Set<String> getTrailerNames() {
105+
return Collections.emptySet();
106+
}
107+
108+
@Override
109+
public int available() {
110+
return outBuf.hasRemaining() ? outBuf.remaining() : delegate.available();
111+
}
112+
113+
/* ------------------- core ----------------------- */
114+
115+
@Override
116+
public void produce(final DataStreamChannel chan) throws IOException {
117+
118+
flushOut(chan); // 1) flush any pending data
119+
120+
if (finished) {
121+
return; // already done
122+
}
123+
124+
delegate.produce(new InnerChannel(chan)); // 2) pull more input
125+
126+
/* 3) when delegate is done → finish deflater, drain, trailer */
127+
if (delegate.available() == 0 && !finished) {
128+
129+
deflater.finish(); // signal EOF to compressor
130+
while (!deflater.finished()) { // drain *everything*
131+
deflateToOut();
132+
flushOut(chan);
133+
}
134+
135+
/* ---------------- little-endian trailer ---------------- */
136+
final int crcVal = (int) crc.getValue();
137+
final int size = (int) uncompressed;
138+
139+
final byte[] trailer = {
140+
(byte) crcVal, (byte) (crcVal >>> 8),
141+
(byte) (crcVal >>> 16), (byte) (crcVal >>> 24),
142+
(byte) size, (byte) (size >>> 8),
143+
(byte) (size >>> 16), (byte) (size >>> 24)
144+
};
145+
chan.write(ByteBuffer.wrap(trailer));
146+
147+
finished = true;
148+
chan.endStream();
149+
}
150+
}
151+
152+
/* copy all currently available bytes from deflater into outBuf */
153+
private void deflateToOut() {
154+
outBuf.compact(); // switch to “write mode”
155+
byte[] arr = outBuf.array();
156+
int pos = outBuf.position();
157+
int lim = outBuf.limit();
158+
int n;
159+
while ((n = deflater.deflate(arr, pos, lim - pos, Deflater.NO_FLUSH)) > 0) {
160+
pos += n;
161+
if (pos == lim) { // buffer full → grow 2×
162+
final ByteBuffer bigger = ByteBuffer.allocate(arr.length * 2);
163+
outBuf.flip();
164+
bigger.put(outBuf);
165+
outBuf.clear();
166+
outBuf.put(bigger);
167+
arr = outBuf.array();
168+
lim = outBuf.limit();
169+
pos = outBuf.position();
170+
}
171+
}
172+
outBuf.position(pos);
173+
outBuf.flip(); // back to “read mode”
174+
}
175+
176+
/* send as much of outBuf as the channel will accept */
177+
private void flushOut(final DataStreamChannel chan) throws IOException {
178+
while (outBuf.hasRemaining()) {
179+
final int written = chan.write(outBuf);
180+
if (written == 0) {
181+
break; // back-pressure
182+
}
183+
}
184+
}
185+
186+
/* --------------- inner channel feeding deflater ---------------- */
187+
188+
private final class InnerChannel implements DataStreamChannel {
189+
private final DataStreamChannel chan;
190+
191+
InnerChannel(final DataStreamChannel chan) {
192+
this.chan = chan;
193+
}
194+
195+
@Override
196+
public void requestOutput() {
197+
chan.requestOutput();
198+
}
199+
200+
@Override
201+
public int write(final ByteBuffer src) throws IOException {
202+
203+
if (!headerSent) { // write 10-byte GZIP header
204+
chan.write(ByteBuffer.wrap(new byte[]{
205+
0x1f, (byte) 0x8b, 8, 0, 0, 0, 0, 0, 0, 0
206+
}));
207+
headerSent = true;
208+
}
209+
210+
int consumed = 0;
211+
while (src.hasRemaining()) {
212+
final int chunk = Math.min(src.remaining(), in.length);
213+
src.get(in, 0, chunk);
214+
215+
crc.update(in, 0, chunk);
216+
uncompressed += chunk;
217+
218+
deflater.setInput(in, 0, chunk);
219+
consumed += chunk;
220+
221+
deflateToOut();
222+
flushOut(chan);
223+
}
224+
return consumed;
225+
}
226+
227+
@Override
228+
public void endStream() { /* delegate.available()==0 is our signal */ }
229+
230+
@Override
231+
public void endStream(final List<? extends Header> t) {
232+
endStream();
233+
}
234+
}
235+
236+
/* ---------------- failure / cleanup ---------------------------- */
237+
238+
@Override
239+
public void failed(final Exception cause) {
240+
delegate.failed(cause);
241+
}
242+
243+
@Override
244+
public void releaseResources() {
245+
if (released.compareAndSet(false, true)) {
246+
deflater.end();
247+
delegate.releaseResources();
248+
}
249+
}
250+
}

0 commit comments

Comments
 (0)