Skip to content

Commit bfb91e8

Browse files
committed
HADOOP-19712 S3A: Deadlock in EvaluatingStatisticsMap.entryset()
Reworked how entrySet() and values() work, using .forEach() iterators after reviewing what ConcurrentHashMap does internally; it does a (safe) traverse. Add EvaluatingStatisticsMap.forEach() implementation which maps the passed in BiConsumer down to the evaluators.forEach, evaluating each value as it goes. Use that in IOStatisticsBinding.snapshot() code. Tests for all this.
1 parent d092171 commit bfb91e8

File tree

3 files changed

+95
-18
lines changed

3 files changed

+95
-18
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
package org.apache.hadoop.fs.statistics.impl;
2020

2121
import java.io.Serializable;
22+
import java.util.ArrayList;
2223
import java.util.Collection;
24+
import java.util.LinkedHashSet;
25+
import java.util.List;
2326
import java.util.Map;
2427
import java.util.Set;
2528
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.function.BiConsumer;
2630
import java.util.function.Function;
27-
import java.util.stream.Collectors;
2831

2932
/**
3033
* A map of functions which can be invoked to dynamically
@@ -132,11 +135,10 @@ public Set<String> keySet() {
132135
*/
133136
@Override
134137
public Collection<E> values() {
135-
Set<Entry<String, Function<String, E>>> evalEntries =
136-
evaluators.entrySet();
137-
return evalEntries.parallelStream().map((e) ->
138-
e.getValue().apply(e.getKey()))
139-
.collect(Collectors.toList());
138+
List<E> result = new ArrayList<>(size());
139+
evaluators.forEach((k, f) ->
140+
result.add(f.apply(k)));
141+
return result;
140142
}
141143

142144
/**
@@ -149,22 +151,37 @@ public Map<String, E> snapshot() {
149151

150152
/**
151153
* Creating the entry set forces an evaluation of the functions.
152-
*
154+
* <p>
155+
* Not synchronized, though thread safe.
156+
* <p>
153157
* This is not a snapshot, so if the evaluators actually return
154158
* references to mutable objects (e.g. a MeanStatistic instance)
155159
* then that value may still change.
156160
*
157-
* The evaluation may be parallelized.
158161
* @return an evaluated set of values
159162
*/
160163
@Override
161-
public synchronized Set<Entry<String, E>> entrySet() {
162-
Set<Entry<String, Function<String, E>>> evalEntries =
163-
evaluators.entrySet();
164-
Set<Entry<String, E>> r = evalEntries.parallelStream().map((e) ->
165-
new EntryImpl<>(e.getKey(), e.getValue().apply(e.getKey())))
166-
.collect(Collectors.toSet());
167-
return r;
164+
public Set<Entry<String, E>> entrySet() {
165+
Set<Entry<String, E>> result = new LinkedHashSet<>(size());
166+
evaluators.forEach((key, evaluator) -> {
167+
final E current = evaluator.apply(key);
168+
result.add(new EntryImpl<>(key, current));
169+
});
170+
return result;
171+
}
172+
173+
174+
/**
175+
* Hand down to the foreach iterator of the evaluators, by evaluating as each
176+
* entry is processed and passing that in to the {@code action} consumer.
177+
* @param action consumer of entries.
178+
*/
179+
@Override
180+
public void forEach(final BiConsumer<? super String, ? super E> action) {
181+
BiConsumer<String, Function<String, E>> biConsumer = (key, value) -> {
182+
action.accept(key, value.apply(key));
183+
};
184+
evaluators.forEach(biConsumer);
168185
}
169186

170187
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,8 @@ private static <E> Map<String, E> copyMap(
170170
// we have to clone the values so that they aren't
171171
// bound to the original values
172172
dest.clear();
173-
source.entrySet()
174-
.forEach(entry ->
175-
dest.put(entry.getKey(), copyFn.apply(entry.getValue())));
173+
source.forEach((key, current) ->
174+
dest.put(key, copyFn.apply(current)));
176175
return dest;
177176
}
178177

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
package org.apache.hadoop.fs.statistics;
2020

21+
import java.util.Collection;
22+
import java.util.Map;
23+
import java.util.Set;
24+
import java.util.concurrent.atomic.LongAdder;
25+
26+
import org.assertj.core.api.Assertions;
2127
import org.junit.jupiter.api.AfterEach;
2228
import org.junit.jupiter.api.BeforeEach;
2329
import org.junit.jupiter.api.Test;
@@ -174,4 +180,59 @@ public void testNegativeCounterIncrementIgnored() throws Throwable {
174180
.isEqualTo(2);
175181
}
176182

183+
@Test
184+
public void testForeach() throws Throwable {
185+
186+
final IOStatisticsStore store = iostatisticsStore()
187+
.withCounters(COUNT, "c1", "c2")
188+
.withGauges(GAUGE)
189+
.withMinimums(MIN)
190+
.withMaximums(MAX)
191+
.withMeanStatistics(MEAN)
192+
.build();
193+
store.setCounter(COUNT, 10);
194+
store.setCounter("c1", 1);
195+
store.setCounter("c2", 2);
196+
197+
// get the counter map, which is evaluated on demand
198+
final Map<String, Long> counters = store.counters();
199+
LongAdder entryCount = new LongAdder();
200+
LongAdder sum = new LongAdder();
201+
202+
// apply the foreach iteration
203+
counters.forEach((k, v) -> {
204+
entryCount.increment();
205+
sum.add(v);
206+
});
207+
Assertions.assertThat(entryCount.longValue())
208+
.describedAs("entry count")
209+
.isEqualTo(3);
210+
Assertions.assertThat(sum.longValue())
211+
.describedAs("sum of values")
212+
.isEqualTo(13);
213+
214+
// keyset is as expected
215+
final Set<String> keys = counters.keySet();
216+
Assertions.assertThat(keys)
217+
.describedAs("keys")
218+
.hasSize(3)
219+
.contains("c1", "c2", COUNT);
220+
221+
// values are as expected
222+
final Collection<Long> values = counters.values();
223+
Assertions.assertThat(values)
224+
.describedAs("values")
225+
.hasSize(3)
226+
.contains(10L, 1L, 2L);
227+
228+
// entries will all be evaluated
229+
final Set<Map.Entry<String, Long>> entries = counters.entrySet();
230+
entryCount.reset();
231+
sum.reset();
232+
entries.forEach(e -> {
233+
entryCount.increment();
234+
sum.add(e.getValue());
235+
});
236+
}
237+
177238
}

0 commit comments

Comments
 (0)