Skip to content

Fix concurrency issues with StubServiceEmitter#18249

Merged
kgyrtkirk merged 5 commits intoapache:masterfrom
kgyrtkirk:qt-fix
Jul 15, 2025
Merged

Fix concurrency issues with StubServiceEmitter#18249
kgyrtkirk merged 5 commits intoapache:masterfrom
kgyrtkirk:qt-fix

Conversation

@kgyrtkirk
Copy link
Member

@kgyrtkirk kgyrtkirk commented Jul 14, 2025

  • concurrency issue with StubServiceEmitter - Add more metric reporting to MSQ #18121 have added a few new metrics which have increased the load on it and have caused random appearances of an issue arising from the fact it used an ArrayList under the hood.
  • added some catches to shut down queries properly in case some unexpected exceptions occur - this could give better exceptions and reduce time to fix in the future

this should reduce the probability that QTest splits remain hanging

@github-actions github-actions bot added Area - Batch Ingestion Area - Dependencies Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 GHA labels Jul 14, 2025
@capistrant capistrant added this to the 34.0.0 milestone Jul 14, 2025
@kgyrtkirk kgyrtkirk changed the title Fix some issues with Dart testing Fix concurrency issues with StubServiceEmitter Jul 14, 2025
@kgyrtkirk
Copy link
Member Author

removed the jfr related enhancements as it was causing some OOM-s for the embedded tests

<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>5.10.2</version>
<version>5.13.3</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to anything or just opportunistic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not closely related; just in an earlier version I've tried out a junit 5.12.x feature....then undone it - but retained the bom upgrade as it might still be usefull later (didn't made things worse)

public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
{
return metricEvents;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I guess we can later update this method to also return a map containing read-only lists rather than the Queue or maybe just update the method to accept a metric name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes; some enhancements could be made there - actually in the latchingemitter even after this patch an assimptotically n^2 computation remained (over time)

I already had to undo a jfr fix in this patch as it was causing some other issues - and I'm afraid this won't fully solve the problem ; sometimes I could still reproduce the issue...with a lower probability....but its still there.

seems like recently it happens on apache/master more frequently - I have to collect more details

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually in the latchingemitter even after this patch an assimptotically n^2 computation remained (over time)

What is the n^2 computation, is it the evaluation of events against a given WaitCondition?
I had to tried to keep it O(n), might have missed some case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it O(n) but for every new event ; which is technically n^2 over time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be a bug then, because every WaitCondition tracks a processedUntil index.
And only processes new items after that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good pattern to expose internals. I don't think it should need this readuntil pattern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it is just for tests and not really exposed outside the class, only for use by the sub-class.

The read until is actually very useful to process only the stuff that we need to process.

Otherwise, we would need to pipe every new event through each existing condition either sync or async.
But that would still not handle the case where a condition is registered later.
I guess we could maintain a list of historical events for that purpose.

But all of that just seems overkill for a simple testing use case.

Please let me know if you have some other suggestions to improve this flow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading pre-existing events registrations seems dodgy to me; I feel like that's an artifact of the fact that the usage pattern is like:

doSomething();
waitSomething();

which could have trouble if doSomething have already happened before wait could even launche....the "read older events" is a try to fix this...however the problem arises from the fact that the pattern itself is flawed...

it should be more like

w = registerWait()
doSomething()
wait(w)

or possibly with some try-with-resources sugar

try( waitForSomething() ) {
  doSomething();
}

so that the contract of waitForSomething would be:

  • only reads messages which are recieved after its registered
  • no need to track read point/redispatch old events - dispatch only the event caught during the emit to the current readers
  • it won't even need access to how the events are stored by the StubEmitter

not sure if this would apply to all existing use cases - but I hope so...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would have liked it to be something like that. But it would make the embedded tests (where the LatchableEmitter is actually used) much less readable.

Copy link
Contributor

@kfaraz kfaraz Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original flow actually feels more intuitive to me.
We are essentially registering a consumer for the stream of metric events coming in.
And the last read point is just the offset that we have consumed so far.

(now that the events field is a queue, this is even more applicable)

@kgyrtkirk kgyrtkirk closed this Jul 15, 2025
@kgyrtkirk kgyrtkirk reopened this Jul 15, 2025
@kgyrtkirk kgyrtkirk merged commit 201de30 into apache:master Jul 15, 2025
121 of 153 checks passed
@kgyrtkirk kgyrtkirk deleted the qt-fix branch July 15, 2025 07:20
capistrant pushed a commit to capistrant/incubator-druid that referenced this pull request Jul 15, 2025
* concurrency issue with StubServiceEmitter - apache#18121 have added a few new metrics which have increased the load on it and have caused  random appearances of an issue arising from the fact it used an `ArrayList` under the hood.
* added some catches to shut down queries properly in case some unexpected exceptions occur - this could give better exceptions and reduce time to fix in the future

this should reduce the probability that `QTest` splits remain hanging
capistrant added a commit that referenced this pull request Jul 15, 2025
* concurrency issue with StubServiceEmitter - #18121 have added a few new metrics which have increased the load on it and have caused  random appearances of an issue arising from the fact it used an `ArrayList` under the hood.
* added some catches to shut down queries properly in case some unexpected exceptions occur - this could give better exceptions and reduce time to fix in the future

this should reduce the probability that `QTest` splits remain hanging

Co-authored-by: Zoltan Haindrich <kirk@rxd.hu>
kfaraz added a commit that referenced this pull request Jul 16, 2025
Follow up to #18249

Changes:
- Maintain a List of processed events in `LatchableEmitter`.
This is an improvement over the current flow where a copy of events is created upon receiving every new event.
- When a new condition is registered, evaluate all past events upfront, then add it to the set of wait conditions
- Evaluate each new event as it is received

Other changes:
- Hide the internal queue implementation of `StubServiceEmitter` from tests and sub-classes
- Reduce the usage of `StubServiceEmitter.getEvents()`. Use the inbuilt `verifyValue` methods instead.
ashibhardwaj pushed a commit to ashibhardwaj/druid that referenced this pull request Jul 23, 2025
Follow up to apache#18249

Changes:
- Maintain a List of processed events in `LatchableEmitter`.
This is an improvement over the current flow where a copy of events is created upon receiving every new event.
- When a new condition is registered, evaluate all past events upfront, then add it to the set of wait conditions
- Evaluate each new event as it is received

Other changes:
- Hide the internal queue implementation of `StubServiceEmitter` from tests and sub-classes
- Reduce the usage of `StubServiceEmitter.getEvents()`. Use the inbuilt `verifyValue` methods instead.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - Dependencies Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 GHA

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants