Skip to content

Commit 5f75a09

Browse files
committed
BUG 35114254 - [34625628->14.1.1.0.13] ENH: Stream query results to client by partition to avoid hitting 2GB buffer limit (14.1.1.0 cl 99033 --> 14.1.1.0/ce)
back out cl's 98748, 98799, and 98806 RQ: job.9.20230310195716.229 #nobug80 [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v14.1.1.0/": change = 99081]
1 parent a670685 commit 5f75a09

File tree

19 files changed

+110
-455
lines changed

19 files changed

+110
-455
lines changed

prj/coherence-core/src/main/java/com/tangosol/internal/util/DefaultAsyncNamedCache.java

Lines changed: 7 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,27 @@
88

99
import com.oracle.coherence.common.util.Options;
1010

11-
import com.tangosol.internal.util.processor.CacheProcessors;
12-
1311
import com.tangosol.net.AsyncNamedCache;
1412
import com.tangosol.net.CacheService;
1513
import com.tangosol.net.Member;
1614
import com.tangosol.net.NamedCache;
1715
import com.tangosol.net.PartitionedService;
18-
import com.tangosol.net.partition.PartitionSet;
1916

2017
import com.tangosol.util.Filter;
21-
import com.tangosol.util.ImmutableArrayList;
2218
import com.tangosol.util.InvocableMap;
19+
20+
import com.tangosol.internal.util.processor.CacheProcessors;
2321
import com.tangosol.util.InvocableMap.StreamingAggregator;
22+
2423
import com.tangosol.util.aggregator.AsynchronousAggregator;
25-
import com.tangosol.util.filter.PartitionedFilter;
24+
2625
import com.tangosol.util.processor.AsynchronousProcessor;
2726
import com.tangosol.util.processor.SingleEntryAsynchronousProcessor;
2827
import com.tangosol.util.processor.StreamingAsynchronousProcessor;
2928

30-
import java.util.ArrayList;
3129
import java.util.Collection;
3230
import java.util.HashMap;
33-
import java.util.List;
3431
import java.util.Map;
35-
import java.util.Set;
3632

3733
import java.util.concurrent.CompletableFuture;
3834

@@ -80,81 +76,6 @@ public NamedCache<K, V> getNamedCache()
8076
return m_cache;
8177
}
8278

83-
@SuppressWarnings("unchecked")
84-
@Override
85-
public CompletableFuture<Set<Map.Entry<K, V>>> entrySet(Filter<?> filter)
86-
{
87-
// optimized implementation that runs query against individual partitions
88-
// in parallel and aggregates the results
89-
90-
if (m_cache.getCacheService() instanceof PartitionedService && !(filter instanceof PartitionedFilter))
91-
{
92-
int cParts = ((PartitionedService) m_cache.getCacheService()).getPartitionCount();
93-
PartitionSet parts = new PartitionSet(cParts);
94-
95-
List<CompletableFuture<Void>> futures = new ArrayList<>(cParts);
96-
List<Map.Entry<K, V>> listEntries = new ArrayList<>();
97-
98-
for (int i = 0; i < cParts; i++)
99-
{
100-
parts.add(i);
101-
futures.add(invokeAll(new PartitionedFilter<>(filter, parts), new AsynchronousProcessor<>(CacheProcessors.binaryGet(), i))
102-
.thenAccept(results -> listEntries.addAll(results.entrySet())));
103-
parts.remove(i);
104-
}
105-
106-
CompletableFuture<Set<Map.Entry<K, V>>> result = new CompletableFuture<>();
107-
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
108-
.whenComplete((v, err) ->
109-
{
110-
if (err != null)
111-
{
112-
result.completeExceptionally(err);
113-
}
114-
else
115-
{
116-
result.complete(new ImmutableArrayList(listEntries).getSet());
117-
}
118-
});
119-
120-
return result;
121-
}
122-
else
123-
{
124-
return AsyncNamedCache.super.entrySet(filter);
125-
}
126-
}
127-
128-
@Override
129-
public CompletableFuture<Void> entrySet(Filter<?> filter, Consumer<? super Map.Entry<? extends K, ? extends V>> callback)
130-
{
131-
// optimized implementation that runs query against individual partitions
132-
// in parallel and aggregates the results
133-
134-
if (m_cache.getCacheService() instanceof PartitionedService && !(filter instanceof PartitionedFilter))
135-
{
136-
int cParts = ((PartitionedService) m_cache.getCacheService()).getPartitionCount();
137-
PartitionSet parts = new PartitionSet(cParts);
138-
139-
List<CompletableFuture<Void>> futures = new ArrayList<>(cParts);
140-
141-
for (int i = 0; i < cParts; i++)
142-
{
143-
parts.add(i);
144-
futures.add(invokeAll(new PartitionedFilter<>(filter, parts),
145-
new StreamingAsynchronousProcessor<>(CacheProcessors.binaryGet(), i, callback),
146-
callback)); // needed to ensure the streaming invokeAll is called!
147-
parts.remove(i);
148-
}
149-
150-
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
151-
}
152-
else
153-
{
154-
return AsyncNamedCache.super.entrySet(filter, callback);
155-
}
156-
}
157-
15879
@Override
15980
public <R> CompletableFuture<R> invoke(K key,
16081
InvocableMap.EntryProcessor<K, V, R> processor)
@@ -180,7 +101,7 @@ public <R> CompletableFuture<Map<K, R>> invokeAll(Collection<? extends K> collKe
180101
}
181102

182103
@Override
183-
public <R> CompletableFuture<Map<K, R>> invokeAll(Filter<?> filter,
104+
public <R> CompletableFuture<Map<K, R>> invokeAll(Filter filter,
184105
InvocableMap.EntryProcessor<K, V, R> processor)
185106
{
186107
AsynchronousProcessor<K, V, R> asyncProcessor =
@@ -205,7 +126,7 @@ public <R> CompletableFuture<Void> invokeAll(Collection<? extends K> collKeys,
205126
}
206127

207128
@Override
208-
public <R> CompletableFuture<Void> invokeAll(Filter<?> filter,
129+
public <R> CompletableFuture<Void> invokeAll(Filter filter,
209130
InvocableMap.EntryProcessor<K, V, R> processor,
210131
Consumer<? super Map.Entry<? extends K, ? extends R>> callback)
211132
{
@@ -231,7 +152,7 @@ public <R> CompletableFuture<R> aggregate(
231152

232153
@Override
233154
public <R> CompletableFuture<R> aggregate(
234-
Filter<?> filter, InvocableMap.EntryAggregator<? super K, ? super V, R> aggregator)
155+
Filter filter, InvocableMap.EntryAggregator<? super K, ? super V, R> aggregator)
235156
{
236157
AsynchronousAggregator<? super K, ? super V, ?, R> asyncAggregator =
237158
instantiateAsyncAggregator(aggregator);

prj/coherence-core/src/main/java/com/tangosol/internal/util/processor/CacheProcessors.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.tangosol.net.NamedCache;
1919

2020
import com.tangosol.util.BinaryEntry;
21-
import com.tangosol.util.ConverterCollections;
2221
import com.tangosol.util.ExternalizableHelper;
2322
import com.tangosol.util.InvocableMap;
2423
import com.tangosol.util.LiteMap;
@@ -60,11 +59,6 @@ public static <K, V> InvocableMap.EntryProcessor<K, V, V> get()
6059
return new Get<>();
6160
}
6261

63-
public static <K, V> InvocableMap.EntryProcessor<K, V, V> binaryGet()
64-
{
65-
return new BinaryGet<>();
66-
}
67-
6862
public static <K, V> InvocableMap.EntryProcessor<K, V, Optional<V>> getOrDefault()
6963
{
7064
return new GetOrDefault<>();
@@ -388,28 +382,6 @@ public Map<K, V> processAll(Set<? extends InvocableMap.Entry<K, V>> setEntries)
388382
}
389383
}
390384

391-
/**
392-
* Get entry processor that avoids value deserialization by returning
393-
* Binary value directly, assuming it will be converted on the client
394-
* via {@link ConverterCollections.ConverterEntry} or similar.
395-
*
396-
* @param <K> the type of the Map entry key
397-
* @param <V> the type of the Map entry value
398-
*/
399-
public static class BinaryGet<K, V>
400-
extends Get<K, V>
401-
{
402-
@SuppressWarnings("unchecked")
403-
@Override
404-
public V process(InvocableMap.Entry<K, V> entry)
405-
{
406-
// cast below is a hack to make compiler happy,
407-
// and only works because of type erasure, as both
408-
// V and Binary erase into Object
409-
return (V) ((BinaryEntry) entry).getBinaryValue();
410-
}
411-
}
412-
413385
/**
414386
* GetOrDefault entry processor.
415387
*

prj/coherence-core/src/main/java/com/tangosol/net/AsyncNamedCache.java

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,13 @@
3232
import java.util.Map;
3333
import java.util.Random;
3434
import java.util.Set;
35-
import java.util.TreeSet;
3635

3736
import java.util.concurrent.CompletableFuture;
3837

3938
import java.util.function.BiConsumer;
4039
import java.util.function.Consumer;
4140
import java.util.function.IntSupplier;
4241

43-
import java.util.stream.Collectors;
44-
4542
/**
4643
* Asynchronous {@link NamedCache}.
4744
*
@@ -106,7 +103,7 @@ public default CompletableFuture<Map<K, V>> getAll(Collection<? extends K> colKe
106103
* @return a {@link CompletableFuture} for a Map of keys to values for the
107104
* specified filter
108105
*/
109-
public default CompletableFuture<Map<K, V>> getAll(Filter<?> filter)
106+
public default CompletableFuture<Map<K, V>> getAll(Filter filter)
110107
{
111108
return invokeAll(filter, CacheProcessors.get());
112109
}
@@ -234,7 +231,7 @@ public default CompletableFuture<Void> removeAll(Collection<? extends K> colKeys
234231
*
235232
* @return a {@link CompletableFuture}
236233
*/
237-
public default CompletableFuture<Void> removeAll(Filter<?> filter)
234+
public default CompletableFuture<Void> removeAll(Filter filter)
238235
{
239236
return invokeAll(filter, CacheProcessors.removeBlind()).thenAccept(ANY);
240237
}
@@ -260,7 +257,7 @@ public default CompletableFuture<Set<K>> keySet()
260257
*
261258
* @return a set of keys for entries that satisfy the specified criteria
262259
*/
263-
public default CompletableFuture<Set<K>> keySet(Filter<?> filter)
260+
public default CompletableFuture<Set<K>> keySet(Filter filter)
264261
{
265262
return invokeAll(filter, CacheProcessors.nop())
266263
.thenApply(Map::keySet);
@@ -291,7 +288,7 @@ public default CompletableFuture<Void> keySet(Consumer<? super K> callback)
291288
* @return a {@link CompletableFuture} that can be used to determine whether
292289
* the operation completed
293290
*/
294-
public default CompletableFuture<Void> keySet(Filter<?> filter,
291+
public default CompletableFuture<Void> keySet(Filter filter,
295292
Consumer<? super K> callback)
296293
{
297294
return invokeAll(filter, CacheProcessors.nop(),
@@ -319,9 +316,9 @@ public default CompletableFuture<Set<Map.Entry<K, V>>> entrySet()
319316
*
320317
* @return a set of entries that satisfy the specified criteria
321318
*/
322-
public default CompletableFuture<Set<Map.Entry<K, V>>> entrySet(Filter<?> filter)
319+
public default CompletableFuture<Set<Map.Entry<K, V>>> entrySet(Filter filter)
323320
{
324-
return invokeAll(filter, CacheProcessors.binaryGet())
321+
return invokeAll(filter, CacheProcessors.get())
325322
.thenApply(Map::entrySet);
326323
}
327324

@@ -341,14 +338,13 @@ public default CompletableFuture<Set<Map.Entry<K, V>>> entrySet(Filter<?> filter
341338
*
342339
* @return a set of entries that satisfy the specified criteria
343340
*/
344-
@SuppressWarnings("unchecked")
345-
default CompletableFuture<Set<Map.Entry<K, V>>> entrySet(Filter<?> filter, Comparator<?> comparator)
341+
public default CompletableFuture<Set<Map.Entry<K, V>>> entrySet(Filter filter, Comparator comparator)
346342
{
347-
return entrySet(filter)
348-
.thenApply(setResult ->
343+
return invokeAll(filter, CacheProcessors.get())
344+
.thenApply(mapResult ->
349345
{
350-
int cEntries = setResult.size();
351-
Map.Entry<K, V>[] aEntries = setResult.toArray(new Map.Entry[cEntries]);
346+
int cEntries = mapResult.size();
347+
Map.Entry[] aEntries = mapResult.entrySet().toArray(new Map.Entry[cEntries]);
352348

353349
Arrays.sort(aEntries, new EntryComparator(
354350
comparator == null ? SafeComparator.INSTANCE : comparator));
@@ -393,9 +389,10 @@ public default CompletableFuture<Void> entrySet(Consumer<? super Map.Entry<? ext
393389
* @return a {@link CompletableFuture} that can be used to determine whether
394390
* the operation completed
395391
*/
396-
public default CompletableFuture<Void> entrySet(Filter<?> filter, BiConsumer<? super K, ? super V> callback)
392+
public default CompletableFuture<Void> entrySet(Filter filter, BiConsumer<? super K, ? super V> callback)
397393
{
398-
return entrySet(filter, entry -> callback.accept(entry.getKey(), entry.getValue()));
394+
return invokeAll(filter, CacheProcessors.get(),
395+
entry -> callback.accept(entry.getKey(), entry.getValue()));
399396
}
400397

401398
/**
@@ -409,10 +406,10 @@ public default CompletableFuture<Void> entrySet(Filter<?> filter, BiConsumer<? s
409406
* @return a {@link CompletableFuture} that can be used to determine whether
410407
* the operation completed
411408
*/
412-
public default CompletableFuture<Void> entrySet(Filter<?> filter,
409+
public default CompletableFuture<Void> entrySet(Filter filter,
413410
Consumer<? super Map.Entry<? extends K, ? extends V>> callback)
414411
{
415-
return invokeAll(filter, CacheProcessors.binaryGet(), callback);
412+
return invokeAll(filter, CacheProcessors.get(), callback);
416413
}
417414

418415
/**
@@ -435,10 +432,10 @@ public default CompletableFuture<Collection<V>> values()
435432
* @return a collection of values for entries that satisfy the specified
436433
* criteria
437434
*/
438-
public default CompletableFuture<Collection<V>> values(Filter<?> filter)
435+
public default CompletableFuture<Collection<V>> values(Filter filter)
439436
{
440-
return entrySet(filter)
441-
.thenApply(entries -> entries.stream().map(Map.Entry::getValue).collect(Collectors.toList()));
437+
return invokeAll(filter, CacheProcessors.get())
438+
.thenApply(Map::values);
442439
}
443440

444441
/**
@@ -454,10 +451,15 @@ public default CompletableFuture<Collection<V>> values(Filter<?> filter)
454451
* @return a collection of values for entries that satisfy the specified
455452
* criteria
456453
*/
457-
public default CompletableFuture<Collection<V>> values(Filter<?> filter, Comparator<? super V> comparator)
454+
public default CompletableFuture<Collection<V>> values(Filter filter, Comparator<? super V> comparator)
458455
{
459-
return entrySet(filter)
460-
.thenApply(entries -> entries.stream().map(Map.Entry::getValue).collect(Collectors.toCollection(() -> new TreeSet<>(comparator))));
456+
return values(filter)
457+
.thenApply(colValues ->
458+
{
459+
List<V> values = new ArrayList<>(colValues);
460+
values.sort(comparator);
461+
return values;
462+
});
461463
}
462464

463465
/**
@@ -485,9 +487,10 @@ public default CompletableFuture<Void> values(Consumer<? super V> callback)
485487
* @return a {@link CompletableFuture} that can be used to determine whether
486488
* the operation completed
487489
*/
488-
public default CompletableFuture<Void> values(Filter<?> filter, Consumer<? super V> callback)
490+
public default CompletableFuture<Void> values(Filter filter, Consumer<? super V> callback)
489491
{
490-
return entrySet(filter, entry -> callback.accept(entry.getValue()));
492+
return invokeAll(filter, CacheProcessors.get(),
493+
entry -> callback.accept(entry.getValue()));
491494
}
492495

493496
// ---- Asynchronous InvocableMap methods -------------------------------
@@ -553,7 +556,7 @@ public <R> CompletableFuture<Map<K, R>> invokeAll(Collection<? extends K> collKe
553556
* @return a {@link CompletableFuture} that can be used to obtain the result
554557
* of the invocation for each entry
555558
*/
556-
public <R> CompletableFuture<Map<K, R>> invokeAll(Filter<?> filter,
559+
public <R> CompletableFuture<Map<K, R>> invokeAll(Filter filter,
557560
InvocableMap.EntryProcessor<K, V, R> processor);
558561

559562
/**
@@ -672,7 +675,7 @@ public default <R> CompletableFuture<Void> invokeAll(Collection<? extends K> col
672675
* @return a {@link CompletableFuture} that can be used to determine if the
673676
* operation completed successfully
674677
*/
675-
public <R> CompletableFuture<Void> invokeAll(Filter<?> filter,
678+
public <R> CompletableFuture<Void> invokeAll(Filter filter,
676679
InvocableMap.EntryProcessor<K, V, R> processor,
677680
Consumer<? super Map.Entry<? extends K,? extends R>> callback);
678681

@@ -696,7 +699,7 @@ public <R> CompletableFuture<Void> invokeAll(Filter<?> filter,
696699
* @return a {@link CompletableFuture} that can be used to determine if the
697700
* operation completed successfully
698701
*/
699-
public default <R> CompletableFuture<Void> invokeAll(Filter<?> filter,
702+
public default <R> CompletableFuture<Void> invokeAll(Filter filter,
700703
InvocableMap.EntryProcessor<K, V, R> processor, BiConsumer<? super K,? super R> callback)
701704
{
702705
return invokeAll(filter, processor,
@@ -747,7 +750,7 @@ public <R> CompletableFuture<R> aggregate(Collection<? extends K> collKeys,
747750
* @return a {@link CompletableFuture} that can be used to obtain the result
748751
* of the aggregation
749752
*/
750-
public <R> CompletableFuture<R> aggregate(Filter<?> filter,
753+
public <R> CompletableFuture<R> aggregate(Filter filter,
751754
InvocableMap.EntryAggregator<? super K, ? super V, R> aggregator);
752755

753756
// ---- Asynchronous Map methods ----------------------------------------
@@ -1047,7 +1050,7 @@ public default CompletableFuture<Map<K, Void>> replaceAll(Collection<? extends K
10471050
* @param filter the filter that should be used to select entries
10481051
* @param function the function to apply to each entry
10491052
*/
1050-
public default CompletableFuture<Map<K, Void>> replaceAll(Filter<?> filter,
1053+
public default CompletableFuture<Map<K, Void>> replaceAll(Filter filter,
10511054
Remote.BiFunction<? super K, ? super V, ? extends V> function)
10521055
{
10531056
return invokeAll(filter, CacheProcessors.replace(function));

0 commit comments

Comments
 (0)