Skip to content

Commit ee22eac

Browse files
authored
Merge pull request #33918: Remove type suppressions from MetricsPusher and MetricsSink
2 parents cccf870 + b7d1e06 commit ee22eac

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsPusher.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@
3535
import org.checkerframework.checker.nullness.qual.Nullable;
3636

3737
/** Component that regularly merges metrics and pushes them to a metrics sink. */
38-
@SuppressWarnings({
39-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
40-
})
4138
public class MetricsPusher implements Serializable {
4239

4340
private MetricsSink metricsSink;
@@ -76,7 +73,7 @@ public void start() {
7673

7774
private void tearDown() {
7875
pushMetrics();
79-
if (!scheduledFuture.isCancelled()) {
76+
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
8077
scheduledFuture.cancel(true);
8178
}
8279
}

runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.beam.sdk.metrics.MetricResult;
3232
import org.apache.beam.sdk.metrics.MetricsOptions;
3333
import org.apache.beam.sdk.metrics.MetricsSink;
34+
import org.checkerframework.checker.nullness.qual.Nullable;
3435

3536
/**
3637
* Sink to push metrics to Graphite. Graphite requires a timestamp. So metrics are reported with the
@@ -40,9 +41,6 @@
4041
* {@code beam.counter.throughput.nbRecords.attempted.value} Or {@code
4142
* beam.distribution.throughput.nbRecordsPerSec.attempted.mean}
4243
*/
43-
@SuppressWarnings({
44-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
45-
})
4644
public class MetricsGraphiteSink implements MetricsSink {
4745
private static final Charset UTF_8 = Charset.forName("UTF-8");
4846
private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
@@ -103,7 +101,8 @@ public String toString() {
103101
try {
104102
messagePayload.append(createCommittedMessage());
105103
} catch (UnsupportedOperationException e) {
106-
if (!e.getMessage().contains("committed metrics")) {
104+
@Nullable String message = e.getMessage();
105+
if (message != null && !message.contains("committed metrics")) {
107106
throw e;
108107
}
109108
}

runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSink.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@
4141
import org.apache.beam.sdk.metrics.MetricsSink;
4242

4343
/** HTTP Sink to push metrics in a POST HTTP request. */
44-
@SuppressWarnings({
45-
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
46-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
47-
})
4844
public class MetricsHttpSink implements MetricsSink {
4945
private final String urlString;
5046
private final ObjectMapper objectMapper = new ObjectMapper();
@@ -112,6 +108,7 @@ public MetricKeySerializer(Class<MetricKey> t) {
112108
super(t);
113109
}
114110

111+
@SuppressWarnings("nullness") // gen.writeObjectField expects nulls but is unannotated
115112
public void inline(MetricKey value, JsonGenerator gen, SerializerProvider provider)
116113
throws IOException {
117114
gen.writeObjectField("name", value.metricName());
@@ -131,32 +128,41 @@ public void serialize(MetricKey value, JsonGenerator gen, SerializerProvider pro
131128
* JSON serializer for {@link MetricResult}; conform to an older format where the {@link MetricKey
132129
* key's} {@link MetricName name} and "step" (ptransform) are inlined.
133130
*/
134-
public static class MetricResultSerializer extends StdSerializer<MetricResult> {
131+
public static class MetricResultSerializer extends StdSerializer<MetricResult<?>> {
135132
private final MetricKeySerializer keySerializer;
136133

137-
public MetricResultSerializer(Class<MetricResult> t) {
134+
public MetricResultSerializer(Class<MetricResult<?>> t) {
138135
super(t);
139136
keySerializer = new MetricKeySerializer(MetricKey.class);
140137
}
141138

142139
@Override
143-
public void serialize(MetricResult value, JsonGenerator gen, SerializerProvider provider)
140+
public void serialize(MetricResult<?> value, JsonGenerator gen, SerializerProvider provider)
144141
throws IOException {
145142
gen.writeStartObject();
143+
writeAttemptedAndCommitted(value, gen);
144+
keySerializer.inline(value.getKey(), gen, provider);
145+
gen.writeEndObject();
146+
}
147+
148+
@SuppressWarnings("nullness") // gen.writeObjectField expects nulls but is unannotated
149+
private void writeAttemptedAndCommitted(MetricResult<?> value, JsonGenerator gen)
150+
throws IOException {
146151
gen.writeObjectField("attempted", value.getAttempted());
147152
if (value.hasCommitted()) {
148153
gen.writeObjectField("committed", value.getCommitted());
149154
}
150-
keySerializer.inline(value.getKey(), gen, provider);
151-
gen.writeEndObject();
152155
}
153156
}
154157

155158
private String serializeMetrics(MetricQueryResults metricQueryResults) throws Exception {
156159
SimpleModule module = new JodaModule();
157160
module.addSerializer(new MetricNameSerializer(MetricName.class));
158161
module.addSerializer(new MetricKeySerializer(MetricKey.class));
159-
module.addSerializer(new MetricResultSerializer(MetricResult.class));
162+
// This odd cast converts from rawtype Class<MetricResult> to Class<MetricResult<?>>
163+
// so the rest of the file can be properly typed
164+
module.addSerializer(
165+
new MetricResultSerializer((Class<MetricResult<?>>) (Object) MetricResult.class));
160166
objectMapper.registerModule(module);
161167
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
162168
objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
@@ -171,6 +177,7 @@ private String serializeMetrics(MetricQueryResults metricQueryResults) throws Ex
171177
result = objectMapper.writeValueAsString(metricQueryResults);
172178
} catch (JsonMappingException exception) {
173179
if ((exception.getCause() instanceof UnsupportedOperationException)
180+
&& exception.getCause().getMessage() != null
174181
&& exception.getCause().getMessage().contains("committed metrics")) {
175182
filterProvider.removeFilter("committedMetrics");
176183
filter = SimpleBeanPropertyFilter.serializeAllExcept("committed");

0 commit comments

Comments
 (0)