Skip to content

Commit bda41fb

Browse files
committed
BUG 35114255 - [34625628->14.1.1.0.13-CE] ENH: Stream query results to client by partition to avoid hitting 2GB buffer limit (v14.1.1.0 -> ce/v14.1.1.0 @ 98748)
[git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v14.1.1.0/": change = 98756]
1 parent 3019597 commit bda41fb

File tree

18 files changed

+482
-137
lines changed

18 files changed

+482
-137
lines changed

prj/coherence-core/src/main/java/com/tangosol/internal/util/DefaultAsyncNamedCache.java

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,38 @@
11
/*
2-
* Copyright (c) 2000, 2020, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
5-
* http://oss.oracle.com/licenses/upl.
5+
* https://oss.oracle.com/licenses/upl.
66
*/
77
package com.tangosol.internal.util;
88

99
import com.oracle.coherence.common.util.Options;
1010

11+
import com.tangosol.internal.util.processor.CacheProcessors;
12+
1113
import com.tangosol.net.AsyncNamedCache;
1214
import com.tangosol.net.CacheService;
1315
import com.tangosol.net.Member;
1416
import com.tangosol.net.NamedCache;
1517
import com.tangosol.net.PartitionedService;
18+
import com.tangosol.net.partition.PartitionSet;
1619

1720
import com.tangosol.util.Filter;
21+
import com.tangosol.util.ImmutableArrayList;
1822
import com.tangosol.util.InvocableMap;
19-
20-
import com.tangosol.internal.util.processor.CacheProcessors;
2123
import com.tangosol.util.InvocableMap.StreamingAggregator;
22-
2324
import com.tangosol.util.aggregator.AsynchronousAggregator;
24-
25+
import com.tangosol.util.filter.PartitionedFilter;
2526
import com.tangosol.util.processor.AsynchronousProcessor;
2627
import com.tangosol.util.processor.SingleEntryAsynchronousProcessor;
2728
import com.tangosol.util.processor.StreamingAsynchronousProcessor;
2829

30+
import java.util.ArrayList;
2931
import java.util.Collection;
3032
import java.util.HashMap;
33+
import java.util.List;
3134
import java.util.Map;
35+
import java.util.Set;
3236

3337
import java.util.concurrent.CompletableFuture;
3438

@@ -76,6 +80,81 @@ public NamedCache<K, V> getNamedCache()
7680
return m_cache;
7781
}
7882

83+
@SuppressWarnings("unchecked")
84+
@Override
85+
public CompletableFuture<Set<Map.Entry<K, V>>> entrySet(Filter<?> filter)
86+
{
87+
// optimized implementation that runs query against individual partitions
88+
// in parallel and aggregates the results
89+
90+
if (m_cache.getCacheService() instanceof PartitionedService && !(filter instanceof PartitionedFilter))
91+
{
92+
int cParts = ((PartitionedService) m_cache.getCacheService()).getPartitionCount();
93+
PartitionSet parts = new PartitionSet(cParts);
94+
95+
List<CompletableFuture<Void>> futures = new ArrayList<>(cParts);
96+
List<Map.Entry<K, V>> listEntries = new ArrayList<>();
97+
98+
for (int i = 0; i < cParts; i++)
99+
{
100+
parts.add(i);
101+
futures.add(invokeAll(new PartitionedFilter<>(filter, parts), new AsynchronousProcessor<>(CacheProcessors.binaryGet(), i))
102+
.thenAccept(results -> listEntries.addAll(results.entrySet())));
103+
parts.remove(i);
104+
}
105+
106+
CompletableFuture<Set<Map.Entry<K, V>>> result = new CompletableFuture<>();
107+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
108+
.whenComplete((v, err) ->
109+
{
110+
if (err != null)
111+
{
112+
result.completeExceptionally(err);
113+
}
114+
else
115+
{
116+
result.complete(new ImmutableArrayList(listEntries).getSet());
117+
}
118+
});
119+
120+
return result;
121+
}
122+
else
123+
{
124+
return AsyncNamedCache.super.entrySet(filter);
125+
}
126+
}
127+
128+
@Override
129+
public CompletableFuture<Void> entrySet(Filter<?> filter, Consumer<? super Map.Entry<? extends K, ? extends V>> callback)
130+
{
131+
// optimized implementation that runs query against individual partitions
132+
// in parallel and aggregates the results
133+
134+
if (m_cache.getCacheService() instanceof PartitionedService && !(filter instanceof PartitionedFilter))
135+
{
136+
int cParts = ((PartitionedService) m_cache.getCacheService()).getPartitionCount();
137+
PartitionSet parts = new PartitionSet(cParts);
138+
139+
List<CompletableFuture<Void>> futures = new ArrayList<>(cParts);
140+
141+
for (int i = 0; i < cParts; i++)
142+
{
143+
parts.add(i);
144+
futures.add(invokeAll(new PartitionedFilter<>(filter, parts),
145+
new StreamingAsynchronousProcessor<>(CacheProcessors.binaryGet(), i, callback),
146+
callback)); // needed to ensure the streaming invokeAll is called!
147+
parts.remove(i);
148+
}
149+
150+
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
151+
}
152+
else
153+
{
154+
return AsyncNamedCache.super.entrySet(filter, callback);
155+
}
156+
}
157+
79158
@Override
80159
public <R> CompletableFuture<R> invoke(K key,
81160
InvocableMap.EntryProcessor<K, V, R> processor)
@@ -101,7 +180,7 @@ public <R> CompletableFuture<Map<K, R>> invokeAll(Collection<? extends K> collKe
101180
}
102181

103182
@Override
104-
public <R> CompletableFuture<Map<K, R>> invokeAll(Filter filter,
183+
public <R> CompletableFuture<Map<K, R>> invokeAll(Filter<?> filter,
105184
InvocableMap.EntryProcessor<K, V, R> processor)
106185
{
107186
AsynchronousProcessor<K, V, R> asyncProcessor =
@@ -126,7 +205,7 @@ public <R> CompletableFuture<Void> invokeAll(Collection<? extends K> collKeys,
126205
}
127206

128207
@Override
129-
public <R> CompletableFuture<Void> invokeAll(Filter filter,
208+
public <R> CompletableFuture<Void> invokeAll(Filter<?> filter,
130209
InvocableMap.EntryProcessor<K, V, R> processor,
131210
Consumer<? super Map.Entry<? extends K, ? extends R>> callback)
132211
{
@@ -152,7 +231,7 @@ public <R> CompletableFuture<R> aggregate(
152231

153232
@Override
154233
public <R> CompletableFuture<R> aggregate(
155-
Filter filter, InvocableMap.EntryAggregator<? super K, ? super V, R> aggregator)
234+
Filter<?> filter, InvocableMap.EntryAggregator<? super K, ? super V, R> aggregator)
156235
{
157236
AsynchronousAggregator<? super K, ? super V, ?, R> asyncAggregator =
158237
instantiateAsyncAggregator(aggregator);

prj/coherence-core/src/main/java/com/tangosol/internal/util/processor/CacheProcessors.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*
2-
* Copyright (c) 2000, 2020, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
5-
* http://oss.oracle.com/licenses/upl.
5+
* https://oss.oracle.com/licenses/upl.
66
*/
77
package com.tangosol.internal.util.processor;
88

@@ -18,6 +18,7 @@
1818
import com.tangosol.net.NamedCache;
1919

2020
import com.tangosol.util.BinaryEntry;
21+
import com.tangosol.util.ConverterCollections;
2122
import com.tangosol.util.ExternalizableHelper;
2223
import com.tangosol.util.InvocableMap;
2324
import com.tangosol.util.LiteMap;
@@ -59,6 +60,11 @@ public static <K, V> InvocableMap.EntryProcessor<K, V, V> get()
5960
return new Get<>();
6061
}
6162

63+
public static <K, V> InvocableMap.EntryProcessor<K, V, V> binaryGet()
64+
{
65+
return new BinaryGet<>();
66+
}
67+
6268
public static <K, V> InvocableMap.EntryProcessor<K, V, Optional<V>> getOrDefault()
6369
{
6470
return new GetOrDefault<>();
@@ -382,6 +388,28 @@ public Map<K, V> processAll(Set<? extends InvocableMap.Entry<K, V>> setEntries)
382388
}
383389
}
384390

391+
/**
392+
* Get entry processor that avoids value deserialization by returning
393+
* Binary value directly, assuming it will be converted on the client
394+
* via {@link ConverterCollections.ConverterEntry} or similar.
395+
*
396+
* @param <K> the type of the Map entry key
397+
* @param <V> the type of the Map entry value
398+
*/
399+
public static class BinaryGet<K, V>
400+
extends Get<K, V>
401+
{
402+
@SuppressWarnings("unchecked")
403+
@Override
404+
public V process(InvocableMap.Entry<K, V> entry)
405+
{
406+
// cast below is a hack to make compiler happy,
407+
// and only works because of type erasure, as both
408+
// V and Binary erase into Object
409+
return (V) ((BinaryEntry) entry).getBinaryValue();
410+
}
411+
}
412+
385413
/**
386414
* GetOrDefault entry processor.
387415
*

0 commit comments

Comments
 (0)