Skip to content

Commit 57f378b

Browse files
committed
* log-processor: removed log forwarding
Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
1 parent c7d1db9 commit 57f378b

File tree

10 files changed

+6
-290
lines changed

10 files changed

+6
-290
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
### 9.2.5 (8/27/2025 - )
44

55
* db: check jdbc url on DBConfig
6+
* log-processor: removed log forwarding
7+
> not really useful to mix business message with system message
68
79
### 9.2.4 (8/8/2025 - 8/21/2025)
810

ext/log-processor/conf/prod/resources/app.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,3 @@ app.index.shards=
55
app.kibana.apiKey=
66
app.kibana.banner=
77
app.kibana.url=
8-
app.log.forward.config=

ext/log-processor/src/main/java/core/LogProcessorApp.java

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
package core;
22

33
import core.framework.http.HTTPClient;
4-
import core.framework.json.Bean;
5-
import core.framework.kafka.MessagePublisher;
64
import core.framework.log.message.ActionLogMessage;
75
import core.framework.log.message.EventMessage;
86
import core.framework.log.message.LogTopics;
97
import core.framework.log.message.StatMessage;
108
import core.framework.module.App;
11-
import core.framework.module.KafkaConfig;
129
import core.framework.search.module.SearchConfig;
13-
import core.log.LogForwardConfig;
1410
import core.log.domain.ActionDocument;
1511
import core.log.domain.EventDocument;
1612
import core.log.domain.StatDocument;
@@ -19,8 +15,6 @@
1915
import core.log.kafka.ActionLogMessageHandler;
2016
import core.log.kafka.EventMessageHandler;
2117
import core.log.kafka.StatMessageHandler;
22-
import core.log.service.ActionLogForwarder;
23-
import core.log.service.EventForwarder;
2418
import core.log.service.IndexOption;
2519
import core.log.service.IndexService;
2620
import core.log.service.JobConfig;
@@ -48,8 +42,7 @@ protected void initialize() {
4842

4943
configureKibanaService();
5044

51-
Forwarders forwarders = configureLogForwarders();
52-
configureKafka(forwarders);
45+
configureKafka();
5346
configureJob();
5447

5548
load(new DiagramModule());
@@ -73,15 +66,15 @@ private void configureIndexOption() {
7366
bind(option);
7467
}
7568

76-
private void configureKafka(Forwarders forwarders) {
69+
private void configureKafka() {
7770
kafka().uri(requiredProperty("sys.kafka.uri"));
7871
kafka().concurrency(2);
7972
kafka().minPoll(1024 * 1024, Duration.ofMillis(500)); // try to get at least 1M message
8073
kafka().maxPoll(3000, 3 * 1024 * 1024); // get 3M message at max
8174

82-
kafka().subscribe(LogTopics.TOPIC_ACTION_LOG, ActionLogMessage.class, bind(new ActionLogMessageHandler(forwarders.action)));
75+
kafka().subscribe(LogTopics.TOPIC_ACTION_LOG, ActionLogMessage.class, bind(ActionLogMessageHandler.class));
8376
kafka().subscribe(LogTopics.TOPIC_STAT, StatMessage.class, bind(StatMessageHandler.class));
84-
kafka().subscribe(LogTopics.TOPIC_EVENT, EventMessage.class, bind(new EventMessageHandler(forwarders.event)));
77+
kafka().subscribe(LogTopics.TOPIC_EVENT, EventMessage.class, bind(EventMessageHandler.class));
8578
}
8679

8780
private void configureSearch() {
@@ -109,30 +102,4 @@ private void configureJob() {
109102

110103
schedule().dailyAt("cleanup-old-index-job", bind(CleanupOldIndexJob.class), LocalTime.of(1, 0));
111104
}
112-
113-
private Forwarders configureLogForwarders() {
114-
var forwarders = new Forwarders();
115-
// configure by env APP_LOG_FORWARD_CONFIG
116-
String configValue = property("app.log.forward.config").orElse(null);
117-
if (configValue != null) {
118-
Bean.register(LogForwardConfig.class);
119-
LogForwardConfig config = Bean.fromJSON(LogForwardConfig.class, configValue);
120-
KafkaConfig kafka = kafka("forward");
121-
kafka.uri(config.kafkaURI);
122-
if (config.action != null) {
123-
MessagePublisher<ActionLogMessage> publisher = kafka.publish(config.action.topic, ActionLogMessage.class);
124-
forwarders.action = new ActionLogForwarder(publisher, config.action);
125-
}
126-
if (config.event != null) {
127-
MessagePublisher<EventMessage> publisher = kafka.publish(config.event.topic, EventMessage.class);
128-
forwarders.event = new EventForwarder(publisher, config.event);
129-
}
130-
}
131-
return forwarders;
132-
}
133-
134-
static class Forwarders {
135-
ActionLogForwarder action;
136-
EventForwarder event;
137-
}
138105
}

ext/log-processor/src/main/java/core/log/kafka/ActionLogMessageHandler.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
import core.framework.util.Maps;
1010
import core.log.domain.ActionDocument;
1111
import core.log.domain.TraceDocument;
12-
import core.log.service.ActionLogForwarder;
1312
import core.log.service.IndexService;
14-
import org.jspecify.annotations.Nullable;
1513

1614
import java.time.LocalDate;
1715
import java.util.List;
@@ -21,25 +19,16 @@
2119
* @author neo
2220
*/
2321
public class ActionLogMessageHandler implements BulkMessageHandler<ActionLogMessage> {
24-
@Nullable
25-
final ActionLogForwarder forwarder;
26-
2722
@Inject
2823
IndexService indexService;
2924
@Inject
3025
ElasticSearchType<ActionDocument> actionType;
3126
@Inject
3227
ElasticSearchType<TraceDocument> traceType;
3328

34-
public ActionLogMessageHandler(@Nullable ActionLogForwarder forwarder) {
35-
this.forwarder = forwarder;
36-
}
37-
3829
@Override
3930
public void handle(List<Message<ActionLogMessage>> messages) {
4031
index(messages, LocalDate.now());
41-
42-
if (forwarder != null) forwarder.forward(messages);
4332
}
4433

4534
void index(List<Message<ActionLogMessage>> messages, LocalDate now) {

ext/log-processor/src/main/java/core/log/kafka/EventMessageHandler.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@
88
import core.framework.search.ElasticSearchType;
99
import core.framework.util.Maps;
1010
import core.log.domain.EventDocument;
11-
import core.log.service.EventForwarder;
1211
import core.log.service.IndexService;
13-
import org.jspecify.annotations.Nullable;
1412

1513
import java.time.LocalDate;
1614
import java.util.List;
@@ -20,23 +18,14 @@
2018
* @author neo
2119
*/
2220
public class EventMessageHandler implements BulkMessageHandler<EventMessage> {
23-
@Nullable
24-
final EventForwarder forwarder;
25-
2621
@Inject
2722
IndexService indexService;
2823
@Inject
2924
ElasticSearchType<EventDocument> eventType;
3025

31-
public EventMessageHandler(@Nullable EventForwarder forwarder) {
32-
this.forwarder = forwarder;
33-
}
34-
3526
@Override
3627
public void handle(List<Message<EventMessage>> messages) {
3728
index(messages, LocalDate.now());
38-
39-
if (forwarder != null) forwarder.forward(messages);
4029
}
4130

4231
void index(List<Message<EventMessage>> messages, LocalDate now) {

ext/log-processor/src/main/java/core/log/service/ActionLogForwarder.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

ext/log-processor/src/main/java/core/log/service/EventForwarder.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

ext/log-processor/src/main/resources/app.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,3 @@ app.index.shards=1
55
app.kibana.apiKey=
66
app.kibana.banner=# local
77
app.kibana.url=http://localhost:5601
8-
app.log.forward.config=

ext/log-processor/src/test/java/core/log/service/ActionLogForwarderTest.java

Lines changed: 0 additions & 93 deletions
This file was deleted.

0 commit comments

Comments
 (0)