3434import io .narayana .lra .LRAConstants ;
3535import io .narayana .lra .LRAData ;
3636import io .narayana .lra .logging .LRALogger ;
37+ import io .smallrye .stork .Stork ;
38+ import io .smallrye .stork .api .Service ;
39+ import io .smallrye .stork .api .ServiceDefinition ;
40+ import io .smallrye .stork .servicediscovery .staticlist .StaticConfiguration ;
3741import jakarta .enterprise .context .RequestScoped ;
3842import jakarta .ws .rs .DELETE ;
3943import jakarta .ws .rs .GET ;
7276import java .util .concurrent .ExecutionException ;
7377import java .util .concurrent .TimeUnit ;
7478import java .util .concurrent .TimeoutException ;
79+ import org .eclipse .microprofile .config .ConfigProvider ;
7580import org .eclipse .microprofile .lra .annotation .AfterLRA ;
7681import org .eclipse .microprofile .lra .annotation .Compensate ;
7782import org .eclipse .microprofile .lra .annotation .Complete ;
@@ -94,13 +99,14 @@ public class NarayanaLRAClient implements Closeable {
9499 * to connect to the Narayana LRA coordinator
95100 */
96101 public static final String LRA_COORDINATOR_URL_KEY = "lra.coordinator.url" ;
102+ public static final String COORDINATOR_URLS_KEY = "lra.coordinator.urls" ;
97103
98104 // LRA Coordinator API
99105 private static final String START_PATH = "/start" ;
100- private static final String LEAVE_PATH = "/ %s/remove" ;
101- private static final String STATUS_PATH = "/ %s/status" ;
102- private static final String CLOSE_PATH = "/ %s/close" ;
103- private static final String CANCEL_PATH = "/ %s/cancel" ;
106+ private static final String LEAVE_PATH = "%s/remove" ;
107+ private static final String STATUS_PATH = "%s/status" ;
108+ private static final String CLOSE_PATH = "%s/close" ;
109+ private static final String CANCEL_PATH = "%s/cancel" ;
104110
105111 private static final String LINK_TEXT = "Link" ;
106112
@@ -117,6 +123,7 @@ public class NarayanaLRAClient implements Closeable {
117123 private static final long LEAVE_TIMEOUT = Long .getLong ("lra.internal.client.leave.timeout" , CLIENT_TIMEOUT );
118124 private static final long QUERY_TIMEOUT = Long .getLong ("lra.internal.client.query.timeout" , CLIENT_TIMEOUT );
119125
126+ private Service coordinatorService ;
120127 private URI coordinatorUrl ;
121128
122129 /**
@@ -142,7 +149,7 @@ public NarayanaLRAClient() {
142149 * @param coordinatorPath path where the LRA coordinator will be contacted
143150 */
144151 public NarayanaLRAClient (String protocol , String host , int port , String coordinatorPath ) {
145- coordinatorUrl = UriBuilder .fromPath (coordinatorPath ).scheme (protocol ).host (host ).port (port ).build ();
152+ clusterConfig ( UriBuilder .fromPath (coordinatorPath ).scheme (protocol ).host (host ).port (port ).build () );
146153 }
147154
148155 /**
@@ -153,7 +160,7 @@ public NarayanaLRAClient(String protocol, String host, int port, String coordina
153160 * @param coordinatorUrl uri of the LRA coordinator
154161 */
155162 public NarayanaLRAClient (URI coordinatorUrl ) {
156- this . coordinatorUrl = coordinatorUrl ;
163+ clusterConfig ( coordinatorUrl ) ;
157164 }
158165
159166 /**
@@ -165,13 +172,44 @@ public NarayanaLRAClient(URI coordinatorUrl) {
165172 * @throws IllegalStateException thrown when the provided URL String is not a URL format
166173 */
167174 public NarayanaLRAClient (String coordinatorUrl ) {
175+ clusterConfig (toURI (coordinatorUrl ));
176+ }
177+
178+ private URI toURI (String coordinatorUrl ) {
168179 try {
169- this . coordinatorUrl = new URI (coordinatorUrl );
180+ return new URI (coordinatorUrl );
170181 } catch (URISyntaxException use ) {
171182 String errMsg = LRALogger .i18nLogger .warn_invalid_uri (
172183 coordinatorUrl , use .getMessage () + " - NarayanaLRAClient constructor" );
173- LRALogger .logger .info (errMsg );
174- throw new IllegalStateException (errMsg , use );
184+ throw new RuntimeException (errMsg , use ); // TODO or throw use and change the caller
185+ }
186+ }
187+
188+ private void clusterConfig (URI coordinatorUrl ) {
189+ try {
190+ String coordinators = ConfigProvider .getConfig ().getValue (COORDINATOR_URLS_KEY , String .class );
191+
192+ if (coordinators == null || coordinators .isEmpty ()) {
193+ // COORDINATOR_URLS_KEY property is set but empty
194+ this .coordinatorUrl = coordinatorUrl ;
195+ } else {
196+ try {
197+ Stork .initialize ();
198+ var stork = Stork .getInstance ()
199+ .defineIfAbsent (COORDINATOR_PATH_NAME , ServiceDefinition .of (new StaticConfiguration ()
200+ .withAddressList (coordinators )
201+ .withShuffle ("true" ))); // default is round-robin, stork supports other algorithms
202+ this .coordinatorService = stork .getService (COORDINATOR_PATH_NAME ); // add a hook to shut it down
203+ this .coordinatorUrl = toURI (coordinators .split ("," )[0 ]);
204+ } catch (IllegalArgumentException e ) {
205+ this .coordinatorUrl = coordinatorUrl ;
206+ LRALogger .logger .infof ("Stork service discovery unavailable (%s), using coordinator %s" ,
207+ e .getMessage (), coordinatorUrl );
208+ }
209+ }
210+ } catch (Exception e ) {
211+ // COORDINATOR_URLS_KEY property not set
212+ this .coordinatorUrl = coordinatorUrl ;
175213 }
176214 }
177215
@@ -295,14 +333,29 @@ public URI startLRA(URI parentLRA, String clientID, Long timeout, ChronoUnit uni
295333 if (unit == null ) {
296334 unit = ChronoUnit .SECONDS ;
297335 }
298-
299336 try {
337+ URI coordinatorInstance ; // URI of one of a cluster of coordinators
338+
339+ if (coordinatorService != null ) {
340+ var instance = coordinatorService .selectInstance ().await ().indefinitely ();
341+ if (LRALogger .logger .isDebugEnabled ()) {
342+ LRALogger .logger .debugf ("Selected coordinator %s:%d%n" ,
343+ instance .getHost (), instance .getPort ());
344+ }
345+ coordinatorInstance = UriBuilder .fromPath (LRAConstants .COORDINATOR_PATH_NAME )
346+ .scheme (instance .isSecure () ? "https" : "http" ) // remark do we want to support the "storks" scheme
347+ .host (instance .getHost ())
348+ .port (instance .getPort ()).build ();
349+ } else {
350+ coordinatorInstance = coordinatorUrl ;
351+ }
352+
300353 String encodedParentLRA = parentLRA == null ? ""
301354 : URLEncoder .encode (parentLRA .toString (), StandardCharsets .UTF_8 );
302355
303356 client = getClient ();
304357
305- response = client .target (coordinatorUrl )
358+ response = client .target (coordinatorInstance )
306359 .path (START_PATH )
307360 .queryParam (CLIENT_ID_PARAM_NAME , clientID )
308361 .queryParam (TIMELIMIT_PARAM_NAME , Duration .of (timeout , unit ).toMillis ())
@@ -416,8 +469,7 @@ public void leaveLRA(URI lraId, String body) throws WebApplicationException {
416469 try {
417470 client = getClient ();
418471
419- response = client .target (coordinatorUrl )
420- .path (String .format (LEAVE_PATH , LRAConstants .getLRAUid (lraId )))
472+ response = client .target (String .format (LEAVE_PATH , lraId ))
421473 .request ()
422474 .header (NARAYANA_LRA_API_VERSION_HEADER_NAME , LRAConstants .CURRENT_API_VERSION_STRING )
423475 .async ()
@@ -576,7 +628,8 @@ public LRAStatus getStatus(URI uri) throws WebApplicationException {
576628 URL lraId ;
577629
578630 try {
579- lraId = uri .toURL ();
631+ // remove the query parameter since it's not in the spec
632+ lraId = UriBuilder .fromUri (uri ).replaceQuery (null ).build ().toURL ();
580633 } catch (MalformedURLException mue ) {
581634 throwGenericLRAException (null ,
582635 Response .Status .INTERNAL_SERVER_ERROR .getStatusCode (),
@@ -586,8 +639,7 @@ public LRAStatus getStatus(URI uri) throws WebApplicationException {
586639
587640 try {
588641 client = getClient ();
589- response = client .target (coordinatorUrl )
590- .path (String .format (STATUS_PATH , LRAConstants .getLRAUid (uri )))
642+ response = client .target (String .format (STATUS_PATH , lraId ))
591643 .request ()
592644 .header (NARAYANA_LRA_API_VERSION_HEADER_NAME , LRAConstants .CURRENT_API_VERSION_STRING )
593645 .async ()
@@ -769,17 +821,14 @@ public URI enlistCompensator(URI uri, Long timelimit, String linkHeader, StringB
769821 }
770822
771823 private void endLRA (URI lra , boolean confirm , String compensator , String userData ) throws WebApplicationException {
772- Client client = null ;
773- Response response = null ;
824+ Response response ;
774825
775826 lraTracef (lra , "%s LRA" , confirm ? "close" : "compensate" );
776827
777- try {
778- client = getClient ();
779- String lraUid = LRAConstants .getLRAUid (lra );
828+ try (Client client = getClient ()) {
780829 try {
781- response = client . target ( coordinatorUrl )
782- . path (confirm ? String .format (CLOSE_PATH , lraUid ) : String .format (CANCEL_PATH , lraUid ))
830+ URI uri = UriBuilder . fromUri ( lra ). replaceQuery ( null ). build ();
831+ response = client . target (confirm ? String .format (CLOSE_PATH , uri ) : String .format (CANCEL_PATH , uri ))
783832 .request ()
784833 .header (NARAYANA_LRA_API_VERSION_HEADER_NAME , LRAConstants .CURRENT_API_VERSION_STRING )
785834 .header (NARAYANA_LRA_PARTICIPANT_LINK_HEADER_NAME , compensator )
@@ -806,9 +855,6 @@ private void endLRA(URI lra, boolean confirm, String compensator, String userDat
806855 Current .pop (lra );
807856 Current .removeActiveLRACache (lra );
808857
809- if (client != null ) {
810- client .close ();
811- }
812858 }
813859 }
814860
0 commit comments