Skip to content

Commit 69baee9

Browse files
authored
Add parameter to acknowledge group events on conclude immediately, and a parameter to disable group acknowledgments in aggregate processor (opensearch-project#6430)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent c0dba45 commit 69baee9

File tree

7 files changed

+101
-10
lines changed

7 files changed

+101
-10
lines changed

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizer.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,17 @@ class AggregateActionSynchronizer {
4646
private final AggregateAction aggregateAction;
4747
private final AggregateGroupManager aggregateGroupManager;
4848

49+
private final boolean disableGroupAcknowledgments;
50+
4951
private static final Logger LOG = LoggerFactory.getLogger(AggregateActionSynchronizer.class);
5052

51-
private AggregateActionSynchronizer(final AggregateAction aggregateAction, final AggregateGroupManager aggregateGroupManager, final PluginMetrics pluginMetrics) {
53+
private AggregateActionSynchronizer(final AggregateAction aggregateAction,
54+
final AggregateGroupManager aggregateGroupManager,
55+
final PluginMetrics pluginMetrics,
56+
final AggregateProcessorConfig aggregateProcessorConfig) {
5257
this.aggregateAction = aggregateAction;
5358
this.aggregateGroupManager = aggregateGroupManager;
59+
this.disableGroupAcknowledgments = aggregateProcessorConfig.getDisableGroupAcknowledgments();
5460

5561
this.actionHandleEventsProcessingErrors = pluginMetrics.counter(ACTION_HANDLE_EVENTS_PROCESSING_ERRORS);
5662
this.actionConcludeGroupEventsProcessingErrors = pluginMetrics.counter(ACTION_CONCLUDE_GROUP_EVENTS_PROCESSING_ERRORS);
@@ -92,7 +98,9 @@ AggregateActionResponse handleEventForGroup(final Event event, final Identificat
9298
handleEventForGroupLock.lock();
9399
try {
94100
LOG.debug("Start critical section in handleEventForGroup");
95-
aggregateGroup.attachToEventAcknowledgementSet(event);
101+
if (!disableGroupAcknowledgments) {
102+
aggregateGroup.attachToEventAcknowledgementSet(event);
103+
}
96104
handleEventResponse = aggregateAction.handleEvent(event, aggregateGroup);
97105
aggregateGroupManager.putGroupWithHash(hash, aggregateGroup);
98106
} catch (final Exception e) {
@@ -107,8 +115,11 @@ AggregateActionResponse handleEventForGroup(final Event event, final Identificat
107115
}
108116

109117
static class AggregateActionSynchronizerProvider {
110-
public AggregateActionSynchronizer provide(final AggregateAction aggregateAction, final AggregateGroupManager aggregateGroupManager, final PluginMetrics pluginMetrics) {
111-
return new AggregateActionSynchronizer(aggregateAction, aggregateGroupManager, pluginMetrics);
118+
public AggregateActionSynchronizer provide(final AggregateAction aggregateAction,
119+
final AggregateGroupManager aggregateGroupManager,
120+
final PluginMetrics pluginMetrics,
121+
final AggregateProcessorConfig aggregateProcessorConfig) {
122+
return new AggregateActionSynchronizer(aggregateAction, aggregateGroupManager, pluginMetrics, aggregateProcessorConfig);
112123
}
113124
}
114125
}

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.opensearch.dataprepper.plugins.processor.aggregate;
1111

1212
import com.google.common.collect.Maps;
13+
import org.opensearch.dataprepper.model.event.EventHandle;
1314
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;
1415

1516
import java.time.Duration;
@@ -21,9 +22,11 @@ class AggregateGroupManager {
2122

2223
private final Map<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> allGroups = Maps.newConcurrentMap();
2324
private final Duration groupDuration;
25+
private final boolean acknowledgeOnConclude;
2426

25-
AggregateGroupManager(final Duration groupDuration) {
27+
AggregateGroupManager(final Duration groupDuration, final boolean acknowledgeOnConclude) {
2628
this.groupDuration = groupDuration;
29+
this.acknowledgeOnConclude = acknowledgeOnConclude;
2730
}
2831

2932
AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap) {
@@ -43,6 +46,13 @@ List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>>
4346

4447
void closeGroup(final IdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
4548
allGroups.remove(hashKeyMap, group);
49+
50+
if (acknowledgeOnConclude) {
51+
EventHandle handle = group.getEventHandle();
52+
if (handle != null) {
53+
handle.release(true);
54+
}
55+
}
4656
group.resetGroup();
4757
}
4858

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
6060

6161
@DataPrepperPluginConstructor
6262
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final ExpressionEvaluator expressionEvaluator) {
63-
this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new AggregateGroupManager(aggregateProcessorConfig.getGroupDuration()),
63+
this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new AggregateGroupManager(aggregateProcessorConfig.getGroupDuration(), aggregateProcessorConfig.getAcknowledgeOnConclude()),
6464
new IdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys()), new AggregateActionSynchronizer.AggregateActionSynchronizerProvider(), expressionEvaluator);
6565
}
6666
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final AggregateGroupManager aggregateGroupManager,
@@ -73,7 +73,7 @@ public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfi
7373
this.expressionEvaluator = expressionEvaluator;
7474
this.identificationKeysHasher = identificationKeysHasher;
7575
this.aggregateAction = loadAggregateAction(pluginFactory);
76-
this.aggregateActionSynchronizer = aggregateActionSynchronizerProvider.provide(aggregateAction, aggregateGroupManager, pluginMetrics);
76+
this.aggregateActionSynchronizer = aggregateActionSynchronizerProvider.provide(aggregateAction, aggregateGroupManager, pluginMetrics, aggregateProcessorConfig);
7777

7878
this.actionConcludeGroupEventsOutCounter = pluginMetrics.counter(ACTION_CONCLUDE_GROUP_EVENTS_OUT);
7979
this.actionConcludeGroupEventsDroppedCounter = pluginMetrics.counter(ACTION_CONCLUDE_GROUP_EVENTS_DROPPED);

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ public class AggregateProcessorConfig {
8181
})
8282
private String whenCondition;
8383

84+
@JsonPropertyDescription("When set to true, releases the group's event handle when the group concludes. " +
85+
"This sacrifices true end-to-end acknowledgments for aggregated events but prevents reprocessing.")
86+
@JsonProperty("acknowledge_on_conclude")
87+
private Boolean acknowledgeOnConclude = false;
88+
89+
@JsonProperty("disable_group_acknowledgments")
90+
private Boolean disableGroupAcknowledgments = false;
91+
8492
public List<String> getIdentificationKeys() {
8593
return identificationKeys;
8694
}
@@ -112,4 +120,12 @@ boolean isValidConfig() {
112120

113121
public PluginModel getAggregateAction() { return aggregateAction; }
114122

123+
public Boolean getAcknowledgeOnConclude() {
124+
return acknowledgeOnConclude;
125+
}
126+
127+
public Boolean getDisableGroupAcknowledgments() {
128+
return disableGroupAcknowledgments;
129+
}
130+
115131
}

data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizerTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.mockito.Mockito.times;
3737
import static org.mockito.Mockito.verify;
3838
import static org.mockito.Mockito.verifyNoInteractions;
39+
import static org.mockito.Mockito.verifyNoMoreInteractions;
3940
import static org.mockito.Mockito.when;
4041

4142
@ExtendWith(MockitoExtension.class)
@@ -75,6 +76,9 @@ public class AggregateActionSynchronizerTest {
7576
@Mock
7677
private Counter actionConcludeGroupEventsProcessingErrors;
7778

79+
@Mock
80+
private AggregateProcessorConfig aggregateProcessorConfig;
81+
7882
@BeforeEach
7983
void setup() {
8084
doNothing().when(handleEventForGroupLock).lock();
@@ -93,7 +97,7 @@ void setup() {
9397

9498
private AggregateActionSynchronizer createObjectUnderTest() {
9599
final AggregateActionSynchronizer.AggregateActionSynchronizerProvider aggregateActionSynchronizerProvider = new AggregateActionSynchronizer.AggregateActionSynchronizerProvider();
96-
return aggregateActionSynchronizerProvider.provide(aggregateAction, aggregateGroupManager, pluginMetrics);
100+
return aggregateActionSynchronizerProvider.provide(aggregateAction, aggregateGroupManager, pluginMetrics, aggregateProcessorConfig);
97101
}
98102

99103
@Test
@@ -170,6 +174,29 @@ void handleEventForGroup_calls_expected_functions_and_returns_correct_AggregateA
170174
assertThat(handleEventResponse, equalTo(aggregateActionResponse));
171175
}
172176

177+
@Test
178+
void handleEventForGroup_calls_expected_functions_and_returns_correct_AggregateActionResponse_with_disable_group_acknowledgments_true() {
179+
when(aggregateProcessorConfig.getDisableGroupAcknowledgments()).thenReturn(true);
180+
181+
final AggregateActionSynchronizer objectUnderTest = createObjectUnderTest();
182+
when(aggregateAction.handleEvent(event, aggregateGroup)).thenReturn(aggregateActionResponse);
183+
184+
final AggregateActionResponse handleEventResponse = objectUnderTest.handleEventForGroup(event, identificationKeysMap, aggregateGroup);
185+
186+
final InOrder inOrder = Mockito.inOrder(concludeGroupLock, handleEventForGroupLock, aggregateGroup, aggregateAction, aggregateGroupManager);
187+
inOrder.verify(aggregateGroup).getConcludeGroupLock();
188+
inOrder.verify(aggregateGroup).getHandleEventForGroupLock();
189+
inOrder.verify(concludeGroupLock).lock();
190+
inOrder.verify(concludeGroupLock).unlock();
191+
inOrder.verify(handleEventForGroupLock).lock();
192+
inOrder.verify(aggregateAction).handleEvent(event, aggregateGroup);
193+
inOrder.verify(aggregateGroupManager).putGroupWithHash(identificationKeysMap, aggregateGroup);
194+
inOrder.verify(handleEventForGroupLock).unlock();
195+
196+
verifyNoMoreInteractions(aggregateGroup);
197+
assertThat(handleEventResponse, equalTo(aggregateActionResponse));
198+
}
199+
173200
@Test
174201
void locks_are_unlocked_and_event_returned_when_aggregateAction_handleEvent_throws_exception() {
175202
final AggregateActionSynchronizer objectUnderTest = createObjectUnderTest();

data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.opensearch.dataprepper.plugins.processor.aggregate;
1111

12+
import org.opensearch.dataprepper.model.event.EventHandle;
1213
import org.junit.jupiter.api.BeforeEach;
1314
import org.junit.jupiter.api.Test;
1415
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;
@@ -27,6 +28,7 @@
2728
import static org.hamcrest.MatcherAssert.assertThat;
2829
import static org.hamcrest.Matchers.is;
2930
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.verify;
3032
import static org.mockito.Mockito.when;
3133

3234
public class AggregateGroupManagerTest {
@@ -46,7 +48,7 @@ void setup() {
4648
}
4749

4850
private AggregateGroupManager createObjectUnderTest() {
49-
return new AggregateGroupManager(TEST_GROUP_DURATION);
51+
return new AggregateGroupManager(TEST_GROUP_DURATION, false);
5052
}
5153

5254
@Test
@@ -145,4 +147,29 @@ void getGroupsToConclude_with_force_conclude_return_all() {
145147
assertThat(groupsToConclude.get(1).getValue(), equalTo(groupToConclude1));
146148
}
147149
}
150+
151+
@Test
152+
void closeGroup_with_acknowledge_on_conclude_releases_event_handle() {
153+
aggregateGroupManager = new AggregateGroupManager(TEST_GROUP_DURATION, true);
154+
155+
final AggregateGroup group = mock(AggregateGroup.class);
156+
final EventHandle eventHandle = mock(EventHandle.class);
157+
when(group.getEventHandle()).thenReturn(eventHandle);
158+
159+
aggregateGroupManager.closeGroup(identificationKeysMap, group);
160+
161+
verify(eventHandle).release(true);
162+
verify(group).resetGroup();
163+
}
164+
165+
@Test
166+
void closeGroup_without_acknowledge_on_conclude_does_not_release_event_handle() {
167+
aggregateGroupManager = new AggregateGroupManager(TEST_GROUP_DURATION, false);
168+
169+
final AggregateGroup group = mock(AggregateGroup.class);
170+
171+
aggregateGroupManager.closeGroup(identificationKeysMap, group);
172+
173+
verify(group).resetGroup();
174+
}
148175
}

data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ void setUp() {
148148
.withEventType("event")
149149
.build();
150150

151-
when(aggregateActionSynchronizerProvider.provide(aggregateAction, aggregateGroupManager, pluginMetrics)).thenReturn(aggregateActionSynchronizer);
151+
when(aggregateActionSynchronizerProvider.provide(aggregateAction, aggregateGroupManager, pluginMetrics, aggregateProcessorConfig)).thenReturn(aggregateActionSynchronizer);
152152

153153
when(pluginMetrics.counter(AggregateProcessor.ACTION_HANDLE_EVENTS_OUT)).thenReturn(actionHandleEventsOutCounter);
154154
when(pluginMetrics.counter(AggregateProcessor.ACTION_HANDLE_EVENTS_DROPPED)).thenReturn(actionHandleEventsDroppedCounter);

0 commit comments

Comments
 (0)