diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java index e4680f2d81fa0..fc1ab9e2bdd2c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java @@ -19,12 +19,16 @@ package org.apache.hadoop.fs.statistics.impl; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; import java.util.function.Function; -import java.util.stream.Collectors; /** * A map of functions which can be invoked to dynamically @@ -132,11 +136,10 @@ public Set keySet() { */ @Override public Collection values() { - Set>> evalEntries = - evaluators.entrySet(); - return evalEntries.parallelStream().map((e) -> - e.getValue().apply(e.getKey())) - .collect(Collectors.toList()); + List result = new ArrayList<>(size()); + evaluators.forEach((k, f) -> + result.add(f.apply(k))); + return result; } /** @@ -149,22 +152,37 @@ public Map snapshot() { /** * Creating the entry set forces an evaluation of the functions. - * + *

+ * Not synchronized, though thread safe. + *

* This is not a snapshot, so if the evaluators actually return * references to mutable objects (e.g. a MeanStatistic instance) * then that value may still change. * - * The evaluation may be parallelized. * @return an evaluated set of values */ @Override - public synchronized Set> entrySet() { - Set>> evalEntries = - evaluators.entrySet(); - Set> r = evalEntries.parallelStream().map((e) -> - new EntryImpl<>(e.getKey(), e.getValue().apply(e.getKey()))) - .collect(Collectors.toSet()); - return r; + public Set> entrySet() { + Set> result = new LinkedHashSet<>(size()); + evaluators.forEach((key, evaluator) -> { + final E current = evaluator.apply(key); + result.add(new EntryImpl<>(key, current)); + }); + return result; + } + + + /** + * Hand down to the foreach iterator of the evaluators, by evaluating as each + * entry is processed and passing that in to the {@code action} consumer. + * @param action consumer of entries. + */ + @Override + public void forEach(final BiConsumer action) { + BiConsumer> biConsumer = (key, value) -> { + action.accept(key, value.apply(key)); + }; + evaluators.forEach(biConsumer); } /** @@ -173,7 +191,7 @@ public synchronized Set> entrySet() { */ private static final class EntryImpl implements Entry { - private String key; + private final String key; private E value; @@ -197,6 +215,20 @@ public E setValue(final E val) { this.value = val; return val; } + + @Override + public boolean equals(final Object o) { + if (!(o instanceof Entry)) { + return false; + } + Entry entry = (Entry) o; + return Objects.equals(key, entry.getKey()) && Objects.equals(value, entry.getValue()); + } + + @Override + public int hashCode() { + return Objects.hashCode(key); + } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java index 6a5d01fb3b074..882648abd2059 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java @@ -170,9 +170,8 @@ private static Map copyMap( // we have to clone the values so that they aren't // bound to the original values dest.clear(); - source.entrySet() - .forEach(entry -> - dest.put(entry.getKey(), copyFn.apply(entry.getValue()))); + source.forEach((key, current) -> + dest.put(key, copyFn.apply(current))); return dest; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java index 200c87e43327d..9963ff9456c83 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java @@ -18,6 +18,12 @@ package org.apache.hadoop.fs.statistics; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.LongAdder; + +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -174,4 +180,59 @@ public void testNegativeCounterIncrementIgnored() throws Throwable { .isEqualTo(2); } + @Test + public void testForeach() throws Throwable { + + final IOStatisticsStore store = iostatisticsStore() + .withCounters(COUNT, "c1", "c2") + .withGauges(GAUGE) + .withMinimums(MIN) + .withMaximums(MAX) + .withMeanStatistics(MEAN) + .build(); + store.setCounter(COUNT, 10); + store.setCounter("c1", 1); + store.setCounter("c2", 2); + + // get the counter map, which is evaluated on demand + final Map counters = store.counters(); + LongAdder entryCount = new LongAdder(); + LongAdder sum = new LongAdder(); + + // apply the foreach iteration + counters.forEach((k, v) -> { + entryCount.increment(); + sum.add(v); + }); + Assertions.assertThat(entryCount.longValue()) + .describedAs("entry count") + .isEqualTo(3); + Assertions.assertThat(sum.longValue()) + .describedAs("sum of values") + .isEqualTo(13); + + // keyset is as expected + final Set keys = counters.keySet(); + Assertions.assertThat(keys) + .describedAs("keys") + .hasSize(3) + .contains("c1", "c2", COUNT); + + // values are as expected + final Collection values = counters.values(); + Assertions.assertThat(values) + .describedAs("values") + .hasSize(3) + .contains(10L, 1L, 2L); + + // entries will all be evaluated + final Set> entries = counters.entrySet(); + entryCount.reset(); + sum.reset(); + entries.forEach(e -> { + entryCount.increment(); + sum.add(e.getValue()); + }); + } + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/impl/TestEvaluatingStatisticsMap.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/impl/TestEvaluatingStatisticsMap.java new file mode 100644 index 0000000000000..d1cc0e362fbd6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/impl/TestEvaluatingStatisticsMap.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.statistics.impl; + +import java.util.Map; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.apache.hadoop.test.AbstractHadoopTestBase; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestEvaluatingStatisticsMap extends AbstractHadoopTestBase { + + + @Test + public void testEvaluatingStatisticsMap() { + EvaluatingStatisticsMap map = new EvaluatingStatisticsMap<>(); + + Assertions.assertThat(map).isEmpty(); + Assertions.assertThat(map.keySet()).isEmpty(); + Assertions.assertThat(map.values()).isEmpty(); + Assertions.assertThat(map.entrySet()).isEmpty(); + + // fill the map with the environment vars + final Map env = System.getenv(); + env.forEach((k, v) -> map.addFunction(k, any -> v)); + + // verify the keys match + assertThat(map.keySet()) + .describedAs("keys") + .containsExactlyInAnyOrderElementsOf(env.keySet()); + + // and that the values do + assertThat(map.values()) + .describedAs("Evaluated values") + .containsExactlyInAnyOrderElementsOf(env.values()); + + // now assert that this holds for the entryset. + env.forEach((k, v) -> + assertThat(map.get(k)) + .describedAs("looked up key %s", k) + .isNotNull() + .isEqualTo(v)); + + map.forEach((k, v) -> + assertThat(env.get(k)) + .describedAs("env var %s", k) + .isNotNull() + .isEqualTo(v)); + + + } +}