Skip to content

Commit 4e203bb

Browse files
[TH2-5063] Made error collector to collect and publish errors to even… (#105)
1 parent 80db17c commit 4e203bb

13 files changed

+468
-110
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Overview (5.2.3)
1+
# Overview (5.3.0)
22

33
Message store (mstore) is an important th2 component responsible for storing raw messages into Cradle. Please refer to [Cradle repository] (https://github.com/th2-net/cradleapi/blob/master/README.md) for more details. This component has a pin for listening messages via MQ.
44

@@ -115,6 +115,12 @@ spec:
115115
This is a list of supported features provided by libraries.
116116
Please see more details about this feature via [link](https://github.com/th2-net/th2-common-j#configuration-formats).
117117
118+
## 5.3.0
119+
120+
* Mstore publishes event with aggregated statistics about internal errors into event router periodically
121+
* Updated common: `5.6.0-dev`
122+
* Added common-utils: `2.2.2-dev`
123+
118124
## 5.2.4
119125

120126
* Migrated to the cradle version with fixed load pages where `removed` field is null problem.

build.gradle

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ plugins {
99
id 'java-library'
1010
id 'application'
1111
id 'com.palantir.docker' version '0.25.0'
12-
id "org.owasp.dependencycheck" version "8.3.1"
12+
id "org.owasp.dependencycheck" version "8.4.0"
1313
id 'com.github.jk1.dependency-license-report' version '2.5'
1414
id "de.undercouch.download" version "5.4.0"
1515
}
1616

1717
ext {
1818
cradleVersion = '5.1.4-dev'
19-
commonVersion = '5.4.1-dev'
19+
commonVersion = '5.6.0-dev'
20+
commonUtilsVersion = '2.2.2-dev'
2021
}
2122

2223
group = 'com.exactpro.th2'
@@ -88,7 +89,7 @@ tasks.withType(Sign).configureEach {
8889
}
8990
// disable running task 'initializeSonatypeStagingRepository' on a gitlab
9091
tasks.configureEach { task ->
91-
if (task.name.equals('initializeSonatypeStagingRepository') &&
92+
if (task.name == 'initializeSonatypeStagingRepository' &&
9293
!(project.hasProperty('sonatypeUsername') && project.hasProperty('sonatypePassword'))
9394
) {
9495
task.enabled = false
@@ -159,6 +160,9 @@ dependencies {
159160
api platform('com.exactpro.th2:bom:4.5.0')
160161

161162
implementation "com.exactpro.th2:common:$commonVersion"
163+
implementation("com.exactpro.th2:common-utils:$commonUtilsVersion") {
164+
because("executor service utils is used")
165+
}
162166
implementation 'com.exactpro.th2:task-utils:0.1.1'
163167

164168
implementation 'com.google.protobuf:protobuf-java-util'

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
release_version=5.2.4
1+
release_version=5.3.0
22
description='th2 mstore component'
33
vcs_url=https://github.com/th2-net/th2-mstore

src/main/java/com/exactpro/th2/mstore/AbstractMessageProcessor.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656

5757
public abstract class AbstractMessageProcessor implements AutoCloseable {
5858
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageProcessor.class);
59+
protected final ErrorCollector errorCollector;
5960
protected final CradleStorage cradleStorage;
6061
private final ScheduledExecutorService drainExecutor = Executors.newSingleThreadScheduledExecutor();
6162
protected final Map<SessionKey, MessageOrderingProperties> sessions = new ConcurrentHashMap<>();
@@ -69,11 +70,13 @@ public abstract class AbstractMessageProcessor implements AutoCloseable {
6970
private final ManualDrainTrigger manualDrain;
7071

7172
public AbstractMessageProcessor(
73+
@NotNull ErrorCollector errorCollector,
7274
@NotNull CradleStorage cradleStorage,
7375
@NotNull Persistor<GroupedMessageBatchToStore> persistor,
7476
@NotNull Configuration configuration,
7577
@NotNull Integer prefetchCount
7678
) {
79+
this.errorCollector = requireNonNull(errorCollector, "Error collector can't be null");
7780
this.cradleStorage = requireNonNull(cradleStorage, "Cradle storage can't be null");
7881
this.persistor = requireNonNull(persistor, "Persistor can't be null");
7982
this.configuration = requireNonNull(configuration, "'Configuration' parameter");
@@ -103,13 +106,13 @@ public void close() {
103106
future.cancel(false);
104107
}
105108
} catch (RuntimeException ex) {
106-
LOGGER.error("Cannot cancel drain task", ex);
109+
errorCollector.collect(LOGGER, "Cannot cancel drain task", ex);
107110
}
108111

109112
try {
110113
drain(true);
111114
} catch (RuntimeException ex) {
112-
LOGGER.error("Cannot drain left batches during shutdown", ex);
115+
errorCollector.collect(LOGGER, "Cannot drain left batches during shutdown", ex);
113116
}
114117

115118
try {
@@ -122,27 +125,27 @@ public void close() {
122125
}
123126
}
124127
} catch (InterruptedException e) {
125-
LOGGER.error("Cannot gracefully shutdown drain executor", e);
128+
errorCollector.collect(LOGGER, "Cannot gracefully shutdown drain executor", e);
126129
Thread.currentThread().interrupt();
127130
} catch (RuntimeException e) {
128-
LOGGER.error("Cannot gracefully shutdown drain executor", e);
131+
errorCollector.collect(LOGGER, "Cannot gracefully shutdown drain executor", e);
129132
}
130133
}
131134

132-
private static void confirm(Confirmation confirmation) {
135+
protected void confirm(Confirmation confirmation) {
133136
try {
134137
confirmation.confirm();
135138
} catch (Exception e) {
136-
LOGGER.error("Exception confirming message", e);
139+
errorCollector.collect(LOGGER, "Exception confirming message", e);
137140
}
138141
}
139142

140143

141-
private static void reject(Confirmation confirmation) {
144+
protected void reject(Confirmation confirmation) {
142145
try {
143146
confirmation.reject();
144147
} catch (Exception e) {
145-
LOGGER.error("Exception rejecting message", e);
148+
errorCollector.collect(LOGGER, "Exception rejecting message", e);
146149
}
147150
}
148151

@@ -316,23 +319,14 @@ private void drain(boolean force) {
316319
protected void persist(ConsolidatedBatch data) {
317320
GroupedMessageBatchToStore batch = data.batch;
318321
try (Histogram.Timer ignored = metrics.startMeasuringPersistenceLatency()) {
319-
persistor.persist(batch, new Callback<>() {
320-
@Override
321-
public void onSuccess(GroupedMessageBatchToStore batch) {
322-
data.confirmations.forEach(AbstractMessageProcessor::confirm);
323-
}
324-
325-
@Override
326-
public void onFail(GroupedMessageBatchToStore batch) {
327-
data.confirmations.forEach(AbstractMessageProcessor::reject);
328-
}
329-
});
322+
persistor.persist(batch, new ProcessorCallback(data));
330323
} catch (Exception e) {
324+
errorCollector.collect("Exception storing batch for group \"" + batch.getGroup() + '\"');
331325
if (LOGGER.isErrorEnabled()) {
332326
LOGGER.error("Exception storing batch for group \"{}\": {}", batch.getGroup(),
333327
formatMessageBatchToStore(batch, false), e);
334328
}
335-
data.confirmations.forEach(AbstractMessageProcessor::reject);
329+
data.confirmations.forEach(this::reject);
336330
}
337331
}
338332

@@ -432,4 +426,22 @@ public static String identifySessionGroup(String sessionGroup, String sessionAli
432426
return (sessionGroup == null || sessionGroup.isBlank()) ? requireNonBlank(sessionAlias, "'Session alias' parameter can not be blank") : sessionGroup;
433427
}
434428
}
429+
430+
private class ProcessorCallback implements Callback<GroupedMessageBatchToStore> {
431+
private final ConsolidatedBatch data;
432+
433+
public ProcessorCallback(@NotNull ConsolidatedBatch data) {
434+
this.data = requireNonNull(data, "Data can't be bull");
435+
}
436+
437+
@Override
438+
public void onSuccess(GroupedMessageBatchToStore batch) {
439+
data.confirmations.forEach(AbstractMessageProcessor.this::confirm);
440+
}
441+
442+
@Override
443+
public void onFail(GroupedMessageBatchToStore batch) {
444+
data.confirmations.forEach(AbstractMessageProcessor.this::reject);
445+
}
446+
}
435447
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright 2023 Exactpro (Exactpro Systems Limited)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.exactpro.th2.mstore;
17+
18+
import com.exactpro.th2.common.event.Event;
19+
import com.exactpro.th2.common.event.Event.Status;
20+
import com.exactpro.th2.common.event.IBodyData;
21+
import com.exactpro.th2.common.grpc.EventBatch;
22+
import com.exactpro.th2.common.grpc.EventID;
23+
import com.exactpro.th2.common.schema.message.MessageRouter;
24+
import com.fasterxml.jackson.annotation.JsonCreator;
25+
import org.jetbrains.annotations.NotNull;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.IOException;
30+
import java.time.Instant;
31+
import java.util.Collection;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
import java.util.concurrent.ScheduledExecutorService;
35+
import java.util.concurrent.ScheduledFuture;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.locks.Lock;
38+
import java.util.concurrent.locks.ReentrantLock;
39+
40+
import static java.util.Objects.requireNonNull;
41+
42+
@SuppressWarnings("unused")
43+
public class ErrorCollector implements AutoCloseable {
44+
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorCollector.class);
45+
private final ScheduledFuture<?> drainFuture;
46+
private final MessageRouter<EventBatch> eventRouter;
47+
private final EventID rootEvent;
48+
private final Lock lock = new ReentrantLock();
49+
private Map<String, ErrorMetadata> errors = new HashMap<>();
50+
51+
public ErrorCollector(@NotNull ScheduledExecutorService executor,
52+
@NotNull MessageRouter<EventBatch> eventRouter,
53+
@NotNull EventID rootEvent,
54+
long period,
55+
@NotNull TimeUnit unit) {
56+
this.eventRouter = requireNonNull(eventRouter, "Event router can't be null");
57+
this.rootEvent = requireNonNull(rootEvent, "Root event can't be null");
58+
requireNonNull(unit, "Unit can't be null");
59+
this.drainFuture = requireNonNull(executor, "Executor can't be null")
60+
.scheduleAtFixedRate(this::drain, period, period, unit);
61+
}
62+
63+
public ErrorCollector(@NotNull ScheduledExecutorService executor,
64+
@NotNull MessageRouter<EventBatch> eventRouter,
65+
@NotNull EventID rootEvent) {
66+
this(executor, eventRouter, rootEvent, 1, TimeUnit.MINUTES);
67+
}
68+
69+
/**
70+
* Log error and call the {@link #collect(String)}} method
71+
* @param error is used as key identifier. Avoid put a lot of unique values
72+
*/
73+
public void collect(Logger logger, String error, Throwable cause) {
74+
logger.error(error, cause);
75+
collect(error);
76+
}
77+
78+
/**
79+
* @param error is used as key identifier. Avoid put a lot of unique values
80+
*/
81+
public void collect(String error) {
82+
lock.lock();
83+
try {
84+
errors.compute(error, (key, metadata) -> {
85+
if (metadata == null) {
86+
return new ErrorMetadata();
87+
}
88+
metadata.inc();
89+
return metadata;
90+
});
91+
} finally {
92+
lock.unlock();
93+
}
94+
}
95+
96+
@Override
97+
public void close() throws Exception {
98+
drainFuture.cancel(true);
99+
drain();
100+
}
101+
102+
private void drain() {
103+
try {
104+
Map<String, ErrorMetadata> map = clear();
105+
if (map.isEmpty()) { return; }
106+
107+
eventRouter.sendAll(Event.start()
108+
.name("mstore internal problem(s): " + calculateTotalQty(map.values()))
109+
.type("InternalError")
110+
.status(Status.FAILED)
111+
.bodyData(new BodyData(map))
112+
.toBatchProto(rootEvent));
113+
114+
} catch (IOException | RuntimeException e) {
115+
LOGGER.error("Drain events task failure", e);
116+
}
117+
}
118+
119+
private Map<String, ErrorMetadata> clear() {
120+
lock.lock();
121+
try {
122+
Map<String, ErrorMetadata> result = errors;
123+
errors = new HashMap<>();
124+
return result;
125+
} finally {
126+
lock.unlock();
127+
}
128+
}
129+
130+
private static int calculateTotalQty(Collection<ErrorMetadata> errors) {
131+
return errors.stream()
132+
.map(ErrorMetadata::getQuantity)
133+
.reduce(0, Integer::sum);
134+
}
135+
136+
private static class BodyData implements IBodyData {
137+
private final Map<String, ErrorMetadata> errors;
138+
@JsonCreator
139+
private BodyData(Map<String, ErrorMetadata> errors) {
140+
this.errors = errors;
141+
}
142+
public Map<String, ErrorMetadata> getErrors() {
143+
return errors;
144+
}
145+
}
146+
147+
private static class ErrorMetadata {
148+
private final Instant firstDate = Instant.now();
149+
private Instant lastDate;
150+
private int quantity = 1;
151+
152+
public void inc() {
153+
quantity += 1;
154+
lastDate = Instant.now();
155+
}
156+
157+
public Instant getFirstDate() {
158+
return firstDate;
159+
}
160+
161+
public Instant getLastDate() {
162+
return lastDate;
163+
}
164+
165+
public void setLastDate(Instant lastDate) {
166+
this.lastDate = lastDate;
167+
}
168+
169+
public int getQuantity() {
170+
return quantity;
171+
}
172+
173+
public void setQuantity(int quantity) {
174+
this.quantity = quantity;
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)