Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
package org.apache.hadoop.fs.statistics.impl;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* A map of functions which can be invoked to dynamically
Expand Down Expand Up @@ -132,11 +136,10 @@ public Set<String> keySet() {
*/
@Override
public Collection<E> values() {
Set<Entry<String, Function<String, E>>> evalEntries =
evaluators.entrySet();
return evalEntries.parallelStream().map((e) ->
e.getValue().apply(e.getKey()))
.collect(Collectors.toList());
List<E> result = new ArrayList<>(size());
evaluators.forEach((k, f) ->
result.add(f.apply(k)));
return result;
}

/**
Expand All @@ -149,22 +152,37 @@ public Map<String, E> snapshot() {

/**
* Creating the entry set forces an evaluation of the functions.
*
* <p>
* Not synchronized, though thread safe.
* <p>
* This is not a snapshot, so if the evaluators actually return
* references to mutable objects (e.g. a MeanStatistic instance)
* then that value may still change.
*
* The evaluation may be parallelized.
* @return an evaluated set of values
*/
@Override
public synchronized Set<Entry<String, E>> entrySet() {
Set<Entry<String, Function<String, E>>> evalEntries =
evaluators.entrySet();
Set<Entry<String, E>> r = evalEntries.parallelStream().map((e) ->
new EntryImpl<>(e.getKey(), e.getValue().apply(e.getKey())))
.collect(Collectors.toSet());
return r;
public Set<Entry<String, E>> entrySet() {
Set<Entry<String, E>> result = new LinkedHashSet<>(size());
evaluators.forEach((key, evaluator) -> {
final E current = evaluator.apply(key);
result.add(new EntryImpl<>(key, current));
});
return result;
}


/**
* Hand down to the foreach iterator of the evaluators, by evaluating as each
* entry is processed and passing that in to the {@code action} consumer.
* @param action consumer of entries.
*/
@Override
public void forEach(final BiConsumer<? super String, ? super E> action) {
BiConsumer<String, Function<String, E>> biConsumer = (key, value) -> {
action.accept(key, value.apply(key));
};
evaluators.forEach(biConsumer);
}

/**
Expand All @@ -173,7 +191,7 @@ public synchronized Set<Entry<String, E>> entrySet() {
*/
private static final class EntryImpl<E> implements Entry<String, E> {

private String key;
private final String key;

private E value;

Expand All @@ -197,6 +215,20 @@ public E setValue(final E val) {
this.value = val;
return val;
}

@Override
public boolean equals(final Object o) {
if (!(o instanceof Entry)) {
return false;
}
Entry<String, ?> entry = (Entry<String, ?>) o;
return Objects.equals(key, entry.getKey()) && Objects.equals(value, entry.getValue());
}

@Override
public int hashCode() {
return Objects.hashCode(key);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,8 @@ private static <E> Map<String, E> copyMap(
// we have to clone the values so that they aren't
// bound to the original values
dest.clear();
source.entrySet()
.forEach(entry ->
dest.put(entry.getKey(), copyFn.apply(entry.getValue())));
source.forEach((key, current) ->
dest.put(key, copyFn.apply(current)));
return dest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

package org.apache.hadoop.fs.statistics;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -174,4 +180,59 @@ public void testNegativeCounterIncrementIgnored() throws Throwable {
.isEqualTo(2);
}

@Test
public void testForeach() throws Throwable {

final IOStatisticsStore store = iostatisticsStore()
.withCounters(COUNT, "c1", "c2")
.withGauges(GAUGE)
.withMinimums(MIN)
.withMaximums(MAX)
.withMeanStatistics(MEAN)
.build();
store.setCounter(COUNT, 10);
store.setCounter("c1", 1);
store.setCounter("c2", 2);

// get the counter map, which is evaluated on demand
final Map<String, Long> counters = store.counters();
LongAdder entryCount = new LongAdder();
LongAdder sum = new LongAdder();

// apply the foreach iteration
counters.forEach((k, v) -> {
entryCount.increment();
sum.add(v);
});
Assertions.assertThat(entryCount.longValue())
.describedAs("entry count")
.isEqualTo(3);
Assertions.assertThat(sum.longValue())
.describedAs("sum of values")
.isEqualTo(13);

// keyset is as expected
final Set<String> keys = counters.keySet();
Assertions.assertThat(keys)
.describedAs("keys")
.hasSize(3)
.contains("c1", "c2", COUNT);

// values are as expected
final Collection<Long> values = counters.values();
Assertions.assertThat(values)
.describedAs("values")
.hasSize(3)
.contains(10L, 1L, 2L);

// entries will all be evaluated
final Set<Map.Entry<String, Long>> entries = counters.entrySet();
entryCount.reset();
sum.reset();
entries.forEach(e -> {
entryCount.increment();
sum.add(e.getValue());
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.statistics.impl;

import java.util.Map;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.assertj.core.api.Assertions.assertThat;

public class TestEvaluatingStatisticsMap extends AbstractHadoopTestBase {


@Test
public void testEvaluatingStatisticsMap() {
EvaluatingStatisticsMap<String> map = new EvaluatingStatisticsMap<>();

Assertions.assertThat(map).isEmpty();
Assertions.assertThat(map.keySet()).isEmpty();
Assertions.assertThat(map.values()).isEmpty();
Assertions.assertThat(map.entrySet()).isEmpty();

// fill the map with the environment vars
final Map<String, String> env = System.getenv();
env.forEach((k, v) -> map.addFunction(k, any -> v));

// verify the keys match
assertThat(map.keySet())
.describedAs("keys")
.containsExactlyInAnyOrderElementsOf(env.keySet());

// and that the values do
assertThat(map.values())
.describedAs("Evaluated values")
.containsExactlyInAnyOrderElementsOf(env.values());

// now assert that this holds for the entryset.
env.forEach((k, v) ->
assertThat(map.get(k))
.describedAs("looked up key %s", k)
.isNotNull()
.isEqualTo(v));

map.forEach((k, v) ->
assertThat(env.get(k))
.describedAs("env var %s", k)
.isNotNull()
.isEqualTo(v));


}
}