55import java .util .*;
66import java .util .concurrent .*;
77import java .util .concurrent .atomic .*;
8+ import java .util .concurrent .StructuredTaskScope .Joiner ;
89
910// NOTE: this is NOT production-ready, just an experiment
1011//
11- // this is similar in spirit to StructuredTaskScope. ShutdownOnSuccess<T>, but for N tasks
12+ // this is similar in spirit to ShutdownOnSuccess<T>, but for N tasks
1213// i.e. given M tasks, success is defined when N of them complete successfully
1314// to wit: `invokeSome(n)`
1415
15- public class CustomStructuredTaskScope <T > extends StructuredTaskScope < T > {
16- private static final String LOG_PREFIX = "TRACER CustomStructuredTaskScope " ;
16+ public class CustomJoiner <T > implements Joiner < T , List < T > > {
17+ private static final String LOG_PREFIX = "TRACER CustomJoiner " ;
1718
1819 private AtomicInteger successCounter = new AtomicInteger (0 );
1920 private AtomicBoolean hasReachedThreshold = new AtomicBoolean (false );
@@ -23,10 +24,50 @@ public class CustomStructuredTaskScope<T> extends StructuredTaskScope<T> {
2324 // sanity check:
2425 private AtomicInteger failCounter = new AtomicInteger (0 );
2526
26- public CustomStructuredTaskScope (int numTasksForSuccess ) {
27+ public CustomJoiner (int numTasksForSuccess ) {
2728 this .numTasksForSuccess = numTasksForSuccess ;
2829 }
2930
31+ @ Override
32+ public boolean onComplete (StructuredTaskScope .Subtask <? extends T > subtask ) {
33+ boolean result = false ;
34+
35+ try {
36+ var state = subtask .state ();
37+ if (state == StructuredTaskScope .Subtask .State .SUCCESS ) {
38+ int numSuccess = successCounter .incrementAndGet ();
39+ if (numSuccess <= numTasksForSuccess ) {
40+ results .add (subtask .get ());
41+ }
42+
43+ if (numSuccess == numTasksForSuccess ) {
44+ hasReachedThreshold .getAndSet (true );
45+ System .out .println (LOG_PREFIX + " success threshold reached..." );
46+ result = true ;
47+ }
48+ } else if (state == StructuredTaskScope .Subtask .State .FAILED ) {
49+ failCounter .incrementAndGet ();
50+ }
51+ } catch (Exception ex ) {
52+ System .err .println (LOG_PREFIX + " ERROR caught ex: " + ex .getMessage ());
53+ }
54+
55+ return result ;
56+ }
57+
58+ @ Override
59+ public List <T > result () {
60+ if (! hasReachedThreshold .get ()) {
61+ throw new IllegalStateException ("success threshold not met" );
62+ }
63+ System .out .println (LOG_PREFIX + " success! num ok: " + numTasksForSuccess +
64+ " num failed: " + failCounter .get ());
65+
66+ return results ;
67+ }
68+
69+ /*
70+ *
3071 @Override
3172 protected void handleComplete(StructuredTaskScope.Subtask<? extends T> subtask) {
3273 try {
@@ -60,4 +101,5 @@ public List<T> results() {
60101
61102 return new ArrayList<T>(results);
62103 }
104+ */
63105}
0 commit comments