Skip to content

Commit 8eb6f9a

Browse files
CURATOR-725: Allow for global compression (#512)
1 parent 914f2f7 commit 8eb6f9a

25 files changed

+1100
-32
lines changed

curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,13 @@ public interface CuratorFramework extends Closeable {
345345
*/
346346
SchemaSet getSchemaSet();
347347

348+
/**
349+
* Return whether compression is enabled by default for all create, setData and getData operations.
350+
*
351+
* @return if compression is enabled
352+
*/
353+
boolean compressionEnabled();
354+
348355
/**
349356
* Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
350357
* done from the {@link #runSafe(Runnable)} thread.

curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public static class Builder {
163163
private List<AuthInfo> authInfos = null;
164164
private byte[] defaultData = LOCAL_ADDRESS;
165165
private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER;
166+
private boolean compressionEnabled = false;
166167
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
167168
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
168169
private boolean canBeReadOnly = false;
@@ -367,6 +368,18 @@ public Builder compressionProvider(CompressionProvider compressionProvider) {
367368
return this;
368369
}
369370

371+
/**
372+
* By default, each write or read call must explicitly use compression.
373+
* Call this method to enable compression by default on all read and write calls.
374+
* <p>
375+
* In order to implement filtered compression, use this option and a custom {@link CompressionProvider} that only compresses and decompresses the zNodes that match the desired filter.
376+
* @return this
377+
*/
378+
public Builder enableCompression() {
379+
this.compressionEnabled = true;
380+
return this;
381+
}
382+
370383
/**
371384
* @param zookeeperFactory the zookeeper factory to use
372385
* @return this
@@ -542,6 +555,10 @@ public CompressionProvider getCompressionProvider() {
542555
return compressionProvider;
543556
}
544557

558+
public boolean compressionEnabled() {
559+
return compressionEnabled;
560+
}
561+
545562
public ThreadFactory getThreadFactory() {
546563
return threadFactory;
547564
}

curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,12 @@ public interface Compressible<T> {
2626
* @return this
2727
*/
2828
public T compressed();
29+
30+
/**
31+
* Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework}
32+
* has compressionEnabled
33+
*
34+
* @return this
35+
*/
36+
public T uncompressed();
2937
}

curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,12 @@ public interface Decompressible<T> {
2626
* @return this
2727
*/
2828
public T decompressed();
29+
30+
/**
31+
* Cause the data to not be de-compressed, even if the {@link org.apache.curator.framework.CuratorFramework}
32+
* has compressionEnabled
33+
*
34+
* @return this
35+
*/
36+
public T undecompressed();
2937
}

curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public class CreateBuilderImpl
100100
acling = new ACLing(client.getAclProvider());
101101
createParentsIfNeeded = false;
102102
createParentsAsContainers = false;
103-
compress = false;
103+
compress = client.compressionEnabled();
104104
setDataIfExists = false;
105105
storingStat = null;
106106
ttl = -1;
@@ -193,6 +193,12 @@ public ACLCreateModePathAndBytesable<T> compressed() {
193193
return this;
194194
}
195195

196+
@Override
197+
public ACLCreateModePathAndBytesable<T> uncompressed() {
198+
CreateBuilderImpl.this.uncompressed();
199+
return this;
200+
}
201+
196202
@Override
197203
public T forPath(String path) throws Exception {
198204
return forPath(path, client.getDefaultData());
@@ -216,7 +222,16 @@ public T forPath(String path, byte[] data) throws Exception {
216222

217223
@Override
218224
public CreateBackgroundModeStatACLable compressed() {
219-
compress = true;
225+
return withCompression(true);
226+
}
227+
228+
@Override
229+
public CreateBackgroundModeStatACLable uncompressed() {
230+
return withCompression(false);
231+
}
232+
233+
private CreateBackgroundModeStatACLable withCompression(boolean compress) {
234+
this.compress = compress;
220235
return new CreateBackgroundModeStatACLable() {
221236
@Override
222237
public CreateBackgroundModeACLable storingStatIn(Stat stat) {

curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
109109
private final FailedDeleteManager failedDeleteManager;
110110
private final FailedRemoveWatchManager failedRemoveWatcherManager;
111111
private final CompressionProvider compressionProvider;
112+
private final boolean compressionEnabled;
112113
private final ACLProvider aclProvider;
113114
private final NamespaceFacadeCache namespaceFacadeCache;
114115
private final boolean useContainerParentsIfAvailable;
@@ -185,6 +186,7 @@ public void process(WatchedEvent watchedEvent) {
185186
builder.getSimulatedSessionExpirationPercent(),
186187
builder.getConnectionStateListenerManagerFactory());
187188
compressionProvider = builder.getCompressionProvider();
189+
compressionEnabled = builder.compressionEnabled();
188190
aclProvider = builder.getAclProvider();
189191
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
190192
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
@@ -284,6 +286,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) {
284286
failedDeleteManager = parent.failedDeleteManager;
285287
failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
286288
compressionProvider = parent.compressionProvider;
289+
compressionEnabled = parent.compressionEnabled;
287290
aclProvider = parent.aclProvider;
288291
namespaceFacadeCache = parent.namespaceFacadeCache;
289292
namespace = parent.namespace;
@@ -628,6 +631,11 @@ public SchemaSet getSchemaSet() {
628631
return schemaSet;
629632
}
630633

634+
@Override
635+
public boolean compressionEnabled() {
636+
return compressionEnabled;
637+
}
638+
631639
ACLProvider getAclProvider() {
632640
return aclProvider;
633641
}

curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<S
4646
responseStat = null;
4747
watching = new Watching(client);
4848
backgrounding = new Backgrounding();
49-
decompress = false;
49+
decompress = client.compressionEnabled();
5050
}
5151

5252
public GetDataBuilderImpl(
@@ -64,7 +64,16 @@ public GetDataBuilderImpl(
6464

6565
@Override
6666
public GetDataWatchBackgroundStatable decompressed() {
67-
decompress = true;
67+
return withDecompression(true);
68+
}
69+
70+
@Override
71+
public GetDataWatchBackgroundStatable undecompressed() {
72+
return withDecompression(false);
73+
}
74+
75+
private GetDataWatchBackgroundStatable withDecompression(boolean decompress) {
76+
this.decompress = decompress;
6877
return new GetDataWatchBackgroundStatable() {
6978
@Override
7079
public ErrorListenerPathable<byte[]> inBackground() {

curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class SetDataBuilderImpl
5454
this.client = client;
5555
backgrounding = new Backgrounding();
5656
version = -1;
57-
compress = false;
57+
compress = client.compressionEnabled();
5858
}
5959

6060
public SetDataBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, int version, boolean compress) {
@@ -94,12 +94,27 @@ public VersionPathAndBytesable<T> compressed() {
9494
compress = true;
9595
return this;
9696
}
97+
98+
@Override
99+
public VersionPathAndBytesable<T> uncompressed() {
100+
compress = false;
101+
return this;
102+
}
97103
};
98104
}
99105

100106
@Override
101107
public SetDataBackgroundVersionable compressed() {
102-
compress = true;
108+
return withCompression(true);
109+
}
110+
111+
@Override
112+
public SetDataBackgroundVersionable uncompressed() {
113+
return withCompression(false);
114+
}
115+
116+
public SetDataBackgroundVersionable withCompression(boolean compress) {
117+
this.compress = compress;
103118
return new SetDataBackgroundVersionable() {
104119
@Override
105120
public ErrorListenerPathAndBytesable<Stat> inBackground() {

curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder {
3535
TempGetDataBuilderImpl(CuratorFrameworkImpl client) {
3636
this.client = client;
3737
responseStat = null;
38-
decompress = false;
38+
decompress = client.compressionEnabled();
3939
}
4040

4141
@Override
@@ -44,6 +44,12 @@ public StatPathable<byte[]> decompressed() {
4444
return this;
4545
}
4646

47+
@Override
48+
public StatPathable<byte[]> undecompressed() {
49+
decompress = false;
50+
return this;
51+
}
52+
4753
@Override
4854
public Pathable<byte[]> storingStatIn(Stat stat) {
4955
responseStat = stat;

curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
2323
import static org.junit.jupiter.api.Assertions.assertEquals;
2424
import static org.junit.jupiter.api.Assertions.assertNotEquals;
25+
import static org.junit.jupiter.api.Assertions.assertThrows;
2526
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.zip.ZipException;
2628
import org.apache.curator.framework.CuratorFramework;
2729
import org.apache.curator.framework.CuratorFrameworkFactory;
2830
import org.apache.curator.framework.api.CompressionProvider;
@@ -97,6 +99,73 @@ public void testSetData() throws Exception {
9799
}
98100
}
99101

102+
@Test
103+
public void testSetDataGlobalCompression() throws Exception {
104+
final byte[] data = "here's a string".getBytes();
105+
final byte[] gzipedData = GzipCompressionProvider.doCompress(data);
106+
107+
CuratorFramework client = CuratorFrameworkFactory.builder()
108+
.connectString(server.getConnectString())
109+
.retryPolicy(new RetryOneTime(1))
110+
.enableCompression()
111+
.build();
112+
try {
113+
client.start();
114+
115+
// Create with explicit compression
116+
client.create().compressed().creatingParentsIfNeeded().forPath("/a/b/c", data);
117+
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
118+
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
119+
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
120+
assertEquals(
121+
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());
122+
123+
// Create explicitly without compression
124+
client.delete().forPath("/a/b/c");
125+
client.create().uncompressed().creatingParentsIfNeeded().forPath("/a/b/c", data);
126+
assertArrayEquals(data, client.getData().undecompressed().forPath("/a/b/c"));
127+
assertThrows(
128+
ZipException.class, () -> client.getData().decompressed().forPath("/a/b/c"));
129+
assertThrows(ZipException.class, () -> client.getData().forPath("/a/b/c"));
130+
assertEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());
131+
132+
// Create with implicit (global) compression
133+
client.delete().forPath("/a/b/c");
134+
client.create().creatingParentsIfNeeded().forPath("/a/b/c", data);
135+
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
136+
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
137+
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
138+
assertEquals(
139+
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());
140+
141+
// SetData with explicit compression
142+
client.setData().compressed().forPath("/a/b/c", data);
143+
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
144+
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
145+
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
146+
assertEquals(
147+
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());
148+
149+
// SetData explicitly without compression
150+
client.setData().uncompressed().forPath("/a/b/c", data);
151+
assertArrayEquals(data, client.getData().undecompressed().forPath("/a/b/c"));
152+
assertThrows(
153+
ZipException.class, () -> client.getData().decompressed().forPath("/a/b/c"));
154+
assertThrows(ZipException.class, () -> client.getData().forPath("/a/b/c"));
155+
assertEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());
156+
157+
// SetData with implicit (global) compression
158+
client.setData().forPath("/a/b/c", data);
159+
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
160+
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
161+
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
162+
assertEquals(
163+
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());
164+
} finally {
165+
CloseableUtils.closeQuietly(client);
166+
}
167+
}
168+
100169
@Test
101170
public void testSimple() throws Exception {
102171
final byte[] data = "here's a string".getBytes();

0 commit comments

Comments
 (0)