Skip to content

Commit 6d9873a

Browse files
authored
ASM_DD Product listeners are not sync properly (#10226)
What Does This Do Change the order of apply/remove listeners to remove/apply only for ASM_DD product Motivation https://datadog.zendesk.com/agent/tickets/2388805 Additional Notes ASM_DD Product listeners needs to remove configs before add/update due to WAF requirements, this is a provisional fix we need to discuss the whole config management
1 parent fbdb1cf commit 6d9873a

File tree

2 files changed

+335
-2
lines changed

2 files changed

+335
-2
lines changed

remote-config/remote-config-core/src/main/java/datadog/remoteconfig/state/ProductState.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,10 @@ public boolean apply(
5858
errors = null;
5959

6060
List<ParsedConfigKey> configBeenUsedByProduct = new ArrayList<>();
61+
List<ParsedConfigKey> changedKeys = new ArrayList<>();
6162
boolean changesDetected = false;
6263

64+
// Step 1: Detect all changes
6365
for (ParsedConfigKey configKey : relevantKeys) {
6466
try {
6567
RemoteConfigResponse.Targets.ConfigTarget target =
@@ -68,14 +70,28 @@ public boolean apply(
6870

6971
if (isTargetChanged(configKey, target)) {
7072
changesDetected = true;
71-
byte[] content = getTargetFileContent(fleetResponse, configKey);
72-
callListenerApplyTarget(fleetResponse, hinter, configKey, content);
73+
changedKeys.add(configKey);
7374
}
7475
} catch (ReportableException e) {
7576
recordError(e);
7677
}
7778
}
7879

80+
// Step 2: For products other than ASM_DD, apply changes immediately
81+
if (product != Product.ASM_DD) {
82+
for (ParsedConfigKey configKey : changedKeys) {
83+
try {
84+
byte[] content = getTargetFileContent(fleetResponse, configKey);
85+
callListenerApplyTarget(fleetResponse, hinter, configKey, content);
86+
} catch (ReportableException e) {
87+
recordError(e);
88+
}
89+
}
90+
}
91+
92+
// Step 3: Remove obsolete configurations (for all products)
93+
// For ASM_DD, this is critical: removes MUST happen before applies to prevent
94+
// duplicate rule warnings from the ddwaf rule parser and causing memory spikes.
7995
List<ParsedConfigKey> keysToRemove =
8096
cachedTargetFiles.keySet().stream()
8197
.filter(configKey -> !configBeenUsedByProduct.contains(configKey))
@@ -86,6 +102,22 @@ public boolean apply(
86102
callListenerRemoveTarget(hinter, configKey);
87103
}
88104

105+
// Step 4: For ASM_DD, apply changes AFTER removes
106+
// TODO: This is a temporary solution. The proper fix requires better synchronization
107+
// between remove and add/update operations. This should be discussed
108+
// with the guild to determine the best long-term design approach.
109+
if (product == Product.ASM_DD) {
110+
for (ParsedConfigKey configKey : changedKeys) {
111+
try {
112+
byte[] content = getTargetFileContent(fleetResponse, configKey);
113+
callListenerApplyTarget(fleetResponse, hinter, configKey, content);
114+
} catch (ReportableException e) {
115+
recordError(e);
116+
}
117+
}
118+
}
119+
120+
// Step 5: Commit if there were changes
89121
if (changesDetected) {
90122
try {
91123
callListenerCommit(hinter);
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
package datadog.remoteconfig.state
2+
3+
import datadog.remoteconfig.PollingRateHinter
4+
import datadog.remoteconfig.Product
5+
import datadog.remoteconfig.ReportableException
6+
import datadog.remoteconfig.tuf.RemoteConfigResponse
7+
import spock.lang.Specification
8+
9+
class ProductStateSpecification extends Specification {
10+
11+
PollingRateHinter hinter = Mock()
12+
13+
void 'test apply for non-ASM_DD product applies changes before removes'() {
14+
given: 'a ProductState for ASM_DATA'
15+
def productState = new ProductState(Product.ASM_DATA)
16+
def listener = new OrderRecordingListener()
17+
productState.addProductListener(listener)
18+
19+
and: 'first apply with config1 to cache it'
20+
def response1 = buildResponse([
21+
'org/ASM_DATA/config1/foo': [version: 1, length: 8, hash: 'oldhash1']
22+
])
23+
def key1 = ParsedConfigKey.parse('org/ASM_DATA/config1/foo')
24+
productState.apply(response1, [key1], hinter)
25+
listener.operations.clear() // Clear for the actual test
26+
27+
and: 'a new response with config1 (changed hash) and config2 (new)'
28+
def response2 = buildResponse([
29+
'org/ASM_DATA/config1/foo': [version: 2, length: 8, hash: 'newhash1'],
30+
'org/ASM_DATA/config2/foo': [version: 1, length: 8, hash: 'hash2']
31+
])
32+
def key2 = ParsedConfigKey.parse('org/ASM_DATA/config2/foo')
33+
34+
when: 'apply is called'
35+
def changed = productState.apply(response2, [key1, key2], hinter)
36+
37+
then: 'changes are detected'
38+
changed
39+
40+
and: 'operations happen in order: apply config1, apply config2, commit (no removes)'
41+
listener.operations == [
42+
'accept:org/ASM_DATA/config1/foo',
43+
'accept:org/ASM_DATA/config2/foo',
44+
'commit'
45+
]
46+
}
47+
48+
void 'test apply for ASM_DD product applies changes after removes'() {
49+
given: 'a ProductState for ASM_DD'
50+
def productState = new ProductState(Product.ASM_DD)
51+
def listener = new OrderRecordingListener()
52+
productState.addProductListener(listener)
53+
54+
and: 'first apply with config1 and config2 to cache them'
55+
def response1 = buildResponse([
56+
'org/ASM_DD/config1/foo': [version: 1, length: 8, hash: 'oldhash1'],
57+
'org/ASM_DD/config2/foo': [version: 1, length: 8, hash: 'hash2']
58+
])
59+
def key1 = ParsedConfigKey.parse('org/ASM_DD/config1/foo')
60+
def key2 = ParsedConfigKey.parse('org/ASM_DD/config2/foo')
61+
productState.apply(response1, [key1, key2], hinter)
62+
listener.operations.clear() // Clear for the actual test
63+
64+
and: 'a new response with only config1 (changed hash) - config2 will be removed'
65+
def response2 = buildResponse([
66+
'org/ASM_DD/config1/foo': [version: 2, length: 8, hash: 'newhash1']
67+
])
68+
69+
when: 'apply is called'
70+
def changed = productState.apply(response2, [key1], hinter)
71+
72+
then: 'changes are detected'
73+
changed
74+
75+
and: 'operations happen in order: remove config2 FIRST, then apply config1, then commit'
76+
listener.operations == ['remove:org/ASM_DD/config2/foo', 'accept:org/ASM_DD/config1/foo', 'commit']
77+
}
78+
79+
void 'test ASM_DD with multiple new configs removes before applies all'() {
80+
given: 'a ProductState for ASM_DD'
81+
def productState = new ProductState(Product.ASM_DD)
82+
def listener = new OrderRecordingListener()
83+
productState.addProductListener(listener)
84+
85+
and: 'first apply with old configs'
86+
def response1 = buildResponse([
87+
'org/ASM_DD/old1/foo': [version: 1, length: 8, hash: 'hash_old1'],
88+
'org/ASM_DD/old2/foo': [version: 1, length: 8, hash: 'hash_old2']
89+
])
90+
def oldKey1 = ParsedConfigKey.parse('org/ASM_DD/old1/foo')
91+
def oldKey2 = ParsedConfigKey.parse('org/ASM_DD/old2/foo')
92+
productState.apply(response1, [oldKey1, oldKey2], hinter)
93+
listener.operations.clear() // Clear for the actual test
94+
95+
and: 'a response with completely new configs'
96+
def response2 = buildResponse([
97+
'org/ASM_DD/new1/foo': [version: 1, length: 8, hash: 'hash_new1'],
98+
'org/ASM_DD/new2/foo': [version: 1, length: 8, hash: 'hash_new2']
99+
])
100+
def newKey1 = ParsedConfigKey.parse('org/ASM_DD/new1/foo')
101+
def newKey2 = ParsedConfigKey.parse('org/ASM_DD/new2/foo')
102+
103+
when: 'apply is called'
104+
def changed = productState.apply(response2, [newKey1, newKey2], hinter)
105+
106+
then: 'changes are detected'
107+
changed
108+
109+
and: 'all removes happen before all applies'
110+
listener.operations.size() == 5 // 2 removes + 2 accepts + 1 commit
111+
listener.operations.findAll { it.startsWith('remove:') }.size() == 2
112+
listener.operations.findAll { it.startsWith('accept:') }.size() == 2
113+
114+
and: 'removes come before accepts'
115+
def lastRemoveIdx = listener.operations.findLastIndexOf { it.startsWith('remove:') }
116+
def firstAcceptIdx = listener.operations.findIndexOf { it.startsWith('accept:') }
117+
lastRemoveIdx < firstAcceptIdx
118+
}
119+
120+
void 'test no changes detected when config hashes match'() {
121+
given: 'a ProductState'
122+
def productState = new ProductState(Product.ASM_DATA)
123+
def listener = new OrderRecordingListener()
124+
productState.addProductListener(listener)
125+
126+
and: 'first apply with a config'
127+
def response = buildResponse([
128+
'org/ASM_DATA/config1/foo': [version: 1, length: 8, hash: 'hash1']
129+
])
130+
def key1 = ParsedConfigKey.parse('org/ASM_DATA/config1/foo')
131+
productState.apply(response, [key1], hinter)
132+
listener.operations.clear() // Clear for the actual test
133+
134+
when: 'apply is called again with the same hash'
135+
def changed = productState.apply(response, [key1], hinter)
136+
137+
then: 'no changes are detected'
138+
!changed
139+
140+
and: 'no listener operations occurred'
141+
listener.operations.isEmpty()
142+
}
143+
144+
void 'test error handling during apply'() {
145+
given: 'a ProductState'
146+
def productState = new ProductState(Product.ASM_DATA)
147+
def listener = Mock(ProductListener)
148+
productState.addProductListener(listener)
149+
150+
and: 'a response with a config'
151+
def response = buildResponse([
152+
'org/ASM_DATA/config1/foo': [version: 1, length: 8, hash: 'hash1']
153+
])
154+
155+
and: 'listener throws an exception'
156+
listener.accept(_, _, _) >> { throw new RuntimeException('Listener error') }
157+
158+
def key1 = ParsedConfigKey.parse('org/ASM_DATA/config1/foo')
159+
160+
when: 'apply is called'
161+
def changed = productState.apply(response, [key1], hinter)
162+
163+
then: 'changes are still detected'
164+
changed
165+
166+
and: 'commit is still called despite the error'
167+
1 * listener.commit(hinter)
168+
}
169+
170+
void 'test reportable exception is recorded'() {
171+
given: 'a ProductState'
172+
def productState = new ProductState(Product.ASM_DATA)
173+
def listener = Mock(ProductListener)
174+
productState.addProductListener(listener)
175+
176+
and: 'a response with a config'
177+
def response = buildResponse([
178+
'org/ASM_DATA/config1/foo': [version: 1, length: 8, hash: 'hash1']
179+
])
180+
181+
and: 'listener throws a ReportableException'
182+
def exception = new ReportableException('Test error')
183+
listener.accept(_, _, _) >> { throw exception }
184+
185+
def key1 = ParsedConfigKey.parse('org/ASM_DATA/config1/foo')
186+
187+
when: 'apply is called'
188+
productState.apply(response, [key1], hinter)
189+
190+
then: 'error is recorded'
191+
productState.hasError()
192+
productState.getErrors().contains(exception)
193+
}
194+
195+
void 'test configListeners are called in addition to productListeners'() {
196+
given: 'a ProductState'
197+
def productState = new ProductState(Product.ASM_DATA)
198+
def productListener = new OrderRecordingListener()
199+
def configListener = new OrderRecordingListener()
200+
productState.addProductListener(productListener)
201+
productState.addProductListener('config1', configListener)
202+
203+
and: 'a response with two configs'
204+
def response = buildResponse([
205+
'org/ASM_DATA/config1/foo': [version: 1, length: 8, hash: 'hash1'],
206+
'org/ASM_DATA/config2/foo': [version: 1, length: 8, hash: 'hash2']
207+
])
208+
209+
def key1 = ParsedConfigKey.parse('org/ASM_DATA/config1/foo')
210+
def key2 = ParsedConfigKey.parse('org/ASM_DATA/config2/foo')
211+
212+
when: 'apply is called'
213+
productState.apply(response, [key1, key2], hinter)
214+
215+
then: 'productListener received both configs'
216+
productListener.operations.findAll { it.startsWith('accept:') }.size() == 2
217+
218+
and: 'configListener only received config1'
219+
configListener.operations == ['accept:org/ASM_DATA/config1/foo', 'commit']
220+
}
221+
222+
void 'test remove operations cleanup cached data'() {
223+
given: 'a ProductState'
224+
def productState = new ProductState(Product.ASM_DATA)
225+
def listener = Mock(ProductListener)
226+
productState.addProductListener(listener)
227+
228+
and: 'first apply with a config to cache it'
229+
def response1 = buildResponse([
230+
'org/ASM_DATA/config1/foo': [version: 1, length: 8, hash: 'hash1']
231+
])
232+
def key1 = ParsedConfigKey.parse('org/ASM_DATA/config1/foo')
233+
productState.apply(response1, [key1], hinter)
234+
235+
and: 'an empty response (config should be removed)'
236+
def response2 = buildResponse([:])
237+
238+
when: 'apply is called'
239+
def changed = productState.apply(response2, [], hinter)
240+
241+
then: 'changes are detected'
242+
changed
243+
244+
and: 'listener remove was called'
245+
1 * listener.remove(key1, hinter)
246+
247+
and: 'cached data is cleaned up'
248+
productState.getCachedTargetFiles().isEmpty()
249+
productState.getConfigStates().isEmpty()
250+
}
251+
252+
// Helper methods
253+
254+
RemoteConfigResponse buildResponse(Map<String, Map> targets) {
255+
def response = Mock(RemoteConfigResponse)
256+
257+
for (def entry : targets.entrySet()) {
258+
def path = entry.key
259+
def targetData = entry.value
260+
261+
def target = new RemoteConfigResponse.Targets.ConfigTarget()
262+
def hashString = targetData.hash as String
263+
target.hashes = ['sha256': hashString]
264+
target.length = targetData.length as long
265+
266+
def custom = new RemoteConfigResponse.Targets.ConfigTarget.ConfigTargetCustom()
267+
custom.version = targetData.version as long
268+
target.custom = custom
269+
270+
response.getTarget(path) >> target
271+
response.getFileContents(path) >> "content_${targetData.hash}".bytes
272+
}
273+
274+
// Handle empty targets case
275+
if (targets.isEmpty()) {
276+
response.getTarget(_) >> null
277+
}
278+
279+
return response
280+
}
281+
282+
// Test helper class to record operation order
283+
static class OrderRecordingListener implements ProductListener {
284+
List<String> operations = []
285+
286+
@Override
287+
void accept(datadog.remoteconfig.state.ConfigKey configKey, byte[] content, PollingRateHinter pollingRateHinter) {
288+
operations << "accept:${configKey.toString()}"
289+
}
290+
291+
@Override
292+
void remove(datadog.remoteconfig.state.ConfigKey configKey, PollingRateHinter pollingRateHinter) {
293+
operations << "remove:${configKey.toString()}"
294+
}
295+
296+
@Override
297+
void commit(PollingRateHinter pollingRateHinter) {
298+
operations << 'commit'
299+
}
300+
}
301+
}

0 commit comments

Comments
 (0)