Skip to content

Commit d7eeaef

Browse files
committed
Refactor connector context with strategy pattern
Refactored DumConnectorContextImpl to use a strategy and chain-of-responsibility pattern for job item processing. Introduced JobProcessingChain and JobProcessingStrategy interfaces with implementations for index, reindex, deindex, ignore rules, and unchanged strategies. Added JobItemBatchProcessor for efficient batch queueing. Updated related classes and dependency injection. Updated POM files to version 2026.1.7 and added spring-boot-starter-json dependency.
1 parent 003c716 commit d7eeaef

File tree

39 files changed

+933
-323
lines changed

39 files changed

+933
-323
lines changed

aem-commons/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>com.viglet.dumont</groupId>
77
<artifactId>dumont</artifactId>
8-
<version>2026.1.4</version>
8+
<version>2026.1.7</version>
99
<relativePath>../pom.xml</relativePath>
1010
</parent>
1111
<groupId>com.viglet.dumont.connector</groupId>

aem/aem-plugin-sample/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>com.viglet.dumont.connector</groupId>
77
<artifactId>dumont-aem</artifactId>
8-
<version>2026.1.4</version>
8+
<version>2026.1.7</version>
99
<relativePath>../pom.xml</relativePath>
1010
</parent>
1111
<artifactId>aem-plugin-sample</artifactId>

aem/aem-plugin/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>com.viglet.dumont.connector</groupId>
77
<artifactId>dumont-aem</artifactId>
8-
<version>2026.1.4</version>
8+
<version>2026.1.7</version>
99
<relativePath>../pom.xml</relativePath>
1010
</parent>
1111
<artifactId>aem-plugin</artifactId>

aem/aem-plugin/src/main/java/com/viglet/dumont/connector/plugin/aem/DumAemPluginProcess.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void sentToIndexStandalone(@NotNull String source,
115115
dumAemPathList.getPaths());
116116
dumAemSourceService.getDumAemSourceByName(source).ifPresentOrElse(dumAemSource -> {
117117
DumAemSession dumAemSession = dumAemSessionService.getDumAemSession(dumAemSource,
118-
dumAemPathList);
118+
dumAemPathList, true);
119119
indexContentIdList(dumAemPathList.getPaths(), dumAemSession);
120120
if (connectorDependencies) {
121121
indexDependencies(dumAemSession, dumAemPathList.getPaths());
@@ -187,7 +187,7 @@ public void indexAll(DumAemSource dumAemSource) {
187187
return;
188188
}
189189
runningSources.add(dumAemSource.getName());
190-
DumAemSession dumAemSession = dumAemSessionService.getDumAemSession(dumAemSource);
190+
DumAemSession dumAemSession = dumAemSessionService.getDumAemSession(dumAemSource, false);
191191
try {
192192
this.getNodesFromJson(dumAemSession);
193193
} catch (Exception e) {
@@ -204,8 +204,13 @@ private void indexDependencies(DumAemSession dumAemSession, List<String> idList)
204204
}
205205

206206
private void finished(DumAemSession dumAemSession) {
207-
if (!dumAemSession.isStandalone())
207+
if (!dumAemSession.isStandalone()) {
208+
log.info("Stopping source process. {}", dumAemSession.getSource());
208209
runningSources.remove(dumAemSession.getSource());
210+
} else {
211+
log.info("Finished standalone indexing for source: {}",
212+
dumAemSession.getSource());
213+
}
209214
dumConnectorContext.finishIndexing(dumAemSession, dumAemSession.isStandalone());
210215
}
211216

aem/aem-plugin/src/main/java/com/viglet/dumont/connector/plugin/aem/service/DumAemSessionService.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public DumAemSessionService(DumAemSourceService dumAemSourceService,
3030
}
3131

3232
public DumAemSession getDumAemSession(DumAemSource dumAemSource,
33-
DumAemPathList dumAemPathList) {
33+
DumAemPathList dumAemPathList, boolean standalone) {
3434
// Retrieve content mapping once and reuse
3535
DumAemContentMapping dumAemContentMapping = dumAemContentMappingService
3636
.getDumAemContentMapping(dumAemSource);
@@ -60,7 +60,7 @@ public DumAemSession getDumAemSession(DumAemSource dumAemSource,
6060
return DumAemSession.builder()
6161
.configuration(dumAemConfiguration)
6262
.event(event)
63-
.standalone(true)
63+
.standalone(standalone)
6464
.indexChildren(indexChildren)
6565
.source(session.getSource())
6666
.transactionId(session.getTransactionId())
@@ -73,8 +73,7 @@ public DumAemSession getDumAemSession(DumAemSource dumAemSource,
7373
.build();
7474
}
7575

76-
public DumAemSession getDumAemSession(DumAemSource dumAemSource) {
77-
return getDumAemSession(dumAemSource, (DumAemPathList) null);
78-
76+
public DumAemSession getDumAemSession(DumAemSource dumAemSource, boolean standalone) {
77+
return getDumAemSession(dumAemSource, (DumAemPathList) null, standalone);
7978
}
8079
}

aem/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>com.viglet.dumont</groupId>
77
<artifactId>dumont</artifactId>
8-
<version>2026.1.4</version>
8+
<version>2026.1.7</version>
99
<relativePath>../pom.xml</relativePath>
1010
</parent>
1111
<groupId>com.viglet.dumont.connector</groupId>

commons/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<parent>
1212
<groupId>com.viglet.dumont</groupId>
1313
<artifactId>dumont</artifactId>
14-
<version>2026.1.4</version>
14+
<version>2026.1.7</version>
1515
<relativePath>../pom.xml</relativePath>
1616
</parent>
1717
<artifactId>dumont-commons</artifactId>

connector/connector-app/pom.xml

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>dumont-connector</artifactId>
77
<groupId>com.viglet.dumont.connector</groupId>
8-
<version>2026.1.4</version>
8+
<version>2026.1.7</version>
99
</parent>
1010

1111
<artifactId>connector-app</artifactId>
@@ -52,12 +52,6 @@
5252
<dependency>
5353
<groupId>com.viglet.turing</groupId>
5454
<artifactId>turing-java-sdk</artifactId>
55-
<exclusions>
56-
<exclusion>
57-
<groupId>com.fasterxml.jackson.datatype</groupId>
58-
<artifactId>jackson-datatype-jdk8</artifactId>
59-
</exclusion>
60-
</exclusions>
6155
</dependency>
6256
<dependency>
6357
<groupId>com.sezinkarli</groupId>
@@ -102,6 +96,10 @@
10296
<groupId>org.springframework.boot</groupId>
10397
<artifactId>spring-boot-starter-hazelcast</artifactId>
10498
</dependency>
99+
<dependency>
100+
<groupId>org.springframework.boot</groupId>
101+
<artifactId>spring-boot-starter-json</artifactId>
102+
</dependency>
105103
<dependency>
106104
<groupId>org.springframework.boot</groupId>
107105
<artifactId>spring-boot-starter-test</artifactId>
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
*
3+
* Copyright (C) 2016-2024 the original author or authors.
4+
*
5+
* This program is free software: you can redistribute it and/or modify it under the terms of the
6+
* GNU General Public License as published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10+
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11+
* General Public License for more details.
12+
*
13+
* You should have received a copy of the GNU General Public License along with this program. If
14+
* not, see <https://www.gnu.org/licenses/>.
15+
*/
16+
17+
package com.viglet.dumont.connector.batch;
18+
19+
import static com.viglet.dumont.commons.indexing.DumIndexingStatus.SENT_TO_QUEUE;
20+
import static com.viglet.dumont.commons.sn.field.DumSNFieldName.ID;
21+
import static com.viglet.dumont.connector.commons.logging.DumConnectorLoggingUtils.setSuccessStatus;
22+
import static com.viglet.dumont.connector.constant.DumConnectorConstants.CONNECTOR_INDEXING_QUEUE;
23+
24+
import org.springframework.beans.factory.annotation.Value;
25+
import org.springframework.jms.core.JmsMessagingTemplate;
26+
import org.springframework.stereotype.Component;
27+
28+
import com.google.common.collect.Iterators;
29+
import com.viglet.dumont.connector.commons.DumConnectorSession;
30+
import com.viglet.turing.client.sn.job.TurSNJobItem;
31+
import com.viglet.turing.client.sn.job.TurSNJobItems;
32+
33+
import lombok.extern.slf4j.Slf4j;
34+
35+
/**
36+
* Batch processor for managing job items efficiently.
37+
* Handles buffering and sending job items to the message queue.
38+
*
39+
* @author Alexandre Oliveira
40+
* @since 2026.1
41+
*/
42+
@Slf4j
43+
@Component
44+
public class JobItemBatchProcessor {
45+
46+
private final TurSNJobItems buffer = new TurSNJobItems();
47+
private final JmsMessagingTemplate jmsMessagingTemplate;
48+
private final int batchSize;
49+
50+
public JobItemBatchProcessor(
51+
@Value("${dumont.job.size:50}") int batchSize,
52+
JmsMessagingTemplate jmsMessagingTemplate) {
53+
this.batchSize = batchSize;
54+
this.jmsMessagingTemplate = jmsMessagingTemplate;
55+
}
56+
57+
/**
58+
* Adds a job item to the buffer.
59+
* Automatically flushes when batch size is reached.
60+
*
61+
* @param item the job item to add
62+
* @param session the connector session
63+
*/
64+
public synchronized void add(TurSNJobItem item, DumConnectorSession session) {
65+
buffer.add(item);
66+
logQueueInfo();
67+
68+
if (buffer.size() >= batchSize) {
69+
flush(session);
70+
}
71+
}
72+
73+
/**
74+
* Flushes all buffered items to the message queue.
75+
*
76+
* @param session the connector session
77+
*/
78+
public synchronized void flush(DumConnectorSession session) {
79+
if (buffer.size() == 0) {
80+
log.info("No job to send to connector queue.");
81+
return;
82+
}
83+
84+
log.info("Sending {} jobs to connector queue.", buffer.size());
85+
86+
if (log.isDebugEnabled()) {
87+
for (TurSNJobItem turSNJobItem : buffer) {
88+
log.debug("TurSNJobItem Id: {}", turSNJobItem.getAttributes().get(ID));
89+
}
90+
}
91+
92+
// Update status for all items
93+
for (TurSNJobItem turSNJobItem : buffer) {
94+
setSuccessStatus(turSNJobItem, session, SENT_TO_QUEUE);
95+
}
96+
97+
// Create a copy to send
98+
TurSNJobItems itemsToSend = new TurSNJobItems();
99+
for (TurSNJobItem turSNJobItem : buffer) {
100+
itemsToSend.add(turSNJobItem);
101+
}
102+
103+
jmsMessagingTemplate.convertAndSend(CONNECTOR_INDEXING_QUEUE, itemsToSend);
104+
buffer.clear();
105+
106+
log.info("Successfully sent batch to connector queue.");
107+
}
108+
109+
/**
110+
* Returns the current buffer size.
111+
*
112+
* @return the number of items in the buffer
113+
*/
114+
public synchronized int size() {
115+
return buffer.size();
116+
}
117+
118+
/**
119+
* Checks if the buffer is empty.
120+
*
121+
* @return true if buffer has no items
122+
*/
123+
public synchronized boolean isEmpty() {
124+
return buffer.size() == 0;
125+
}
126+
127+
private void logQueueInfo() {
128+
log.debug("Total Job Items in buffer: {}", Iterators.size(buffer.iterator()));
129+
}
130+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
*
3+
* Copyright (C) 2016-2024 the original author or authors.
4+
*
5+
* This program is free software: you can redistribute it and/or modify it under the terms of the
6+
* GNU General Public License as published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10+
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11+
* General Public License for more details.
12+
*
13+
* You should have received a copy of the GNU General Public License along with this program. If
14+
* not, see <https://www.gnu.org/licenses/>.
15+
*/
16+
17+
package com.viglet.dumont.connector.chain;
18+
19+
import java.util.Comparator;
20+
import java.util.List;
21+
22+
import org.springframework.stereotype.Component;
23+
24+
import com.viglet.dumont.connector.batch.JobItemBatchProcessor;
25+
import com.viglet.dumont.connector.commons.domain.DumJobItemWithSession;
26+
import com.viglet.dumont.connector.strategy.JobProcessingStrategy;
27+
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
/**
31+
* Chain of Responsibility implementation for processing job items.
32+
* Routes job items to the appropriate strategy based on priority and
33+
* capability.
34+
*
35+
* @author Alexandre Oliveira
36+
* @since 2026.1
37+
*/
38+
@Slf4j
39+
@Component
40+
public class JobProcessingChain {
41+
42+
private final List<JobProcessingStrategy> strategies;
43+
44+
public JobProcessingChain(List<JobProcessingStrategy> strategies) {
45+
// Sort strategies by priority (lower values first)
46+
this.strategies = strategies.stream()
47+
.sorted(Comparator.comparingInt(JobProcessingStrategy::getPriority))
48+
.toList();
49+
50+
log.info("Initialized JobProcessingChain with {} strategies", strategies.size());
51+
strategies.forEach(strategy -> log.debug("Strategy: {} with priority {}",
52+
strategy.getClass().getSimpleName(),
53+
strategy.getPriority()));
54+
}
55+
56+
/**
57+
* Processes a job item by finding and executing the first matching strategy.
58+
*
59+
* @param jobItem the job item to process
60+
* @param batchProcessor the batch processor for queueing items
61+
*/
62+
public void process(DumJobItemWithSession jobItem, JobItemBatchProcessor batchProcessor) {
63+
if (jobItem == null || jobItem.turSNJobItem() == null) {
64+
log.warn("Received null job item, skipping processing");
65+
return;
66+
}
67+
68+
strategies.stream()
69+
.filter(strategy -> strategy.canHandle(jobItem))
70+
.findFirst()
71+
.ifPresentOrElse(
72+
strategy -> {
73+
log.debug("Processing {} with {}",
74+
jobItem.turSNJobItem().getId(),
75+
strategy.getClass().getSimpleName());
76+
strategy.process(jobItem, batchProcessor);
77+
},
78+
() -> log.warn("No strategy found for job item: {} (action: {})",
79+
jobItem.turSNJobItem().getId(),
80+
jobItem.turSNJobItem().getTurSNJobAction()));
81+
}
82+
83+
/**
84+
* Returns the list of registered strategies.
85+
*
86+
* @return list of strategies sorted by priority
87+
*/
88+
public List<JobProcessingStrategy> getStrategies() {
89+
return strategies;
90+
}
91+
}

0 commit comments

Comments
 (0)