Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@
<groupId>org.eclipse.microprofile.lra</groupId>
<artifactId>microprofile-lra-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.config</groupId>
<artifactId>microprofile-config-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.smallrye.config</groupId>
<artifactId>smallrye-config</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.stork</groupId>
<artifactId>stork-core</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.stork</groupId>
<artifactId>stork-service-discovery-static-list</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.narayana.lra</groupId>
<artifactId>lra-service-base</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import io.narayana.lra.LRAConstants;
import io.narayana.lra.LRAData;
import io.narayana.lra.logging.LRALogger;
import io.smallrye.stork.Stork;
import io.smallrye.stork.api.Service;
import io.smallrye.stork.api.ServiceDefinition;
import io.smallrye.stork.servicediscovery.staticlist.StaticConfiguration;
import jakarta.enterprise.context.RequestScoped;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
Expand Down Expand Up @@ -72,6 +76,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.lra.annotation.AfterLRA;
import org.eclipse.microprofile.lra.annotation.Compensate;
import org.eclipse.microprofile.lra.annotation.Complete;
Expand All @@ -94,13 +99,14 @@ public class NarayanaLRAClient implements Closeable {
* to connect to the Narayana LRA coordinator
*/
public static final String LRA_COORDINATOR_URL_KEY = "lra.coordinator.url";
public static final String COORDINATOR_URLS_KEY = "lra.coordinator.urls";

// LRA Coordinator API
private static final String START_PATH = "/start";
private static final String LEAVE_PATH = "/%s/remove";
private static final String STATUS_PATH = "/%s/status";
private static final String CLOSE_PATH = "/%s/close";
private static final String CANCEL_PATH = "/%s/cancel";
private static final String LEAVE_PATH = "%s/remove";
private static final String STATUS_PATH = "%s/status";
private static final String CLOSE_PATH = "%s/close";
private static final String CANCEL_PATH = "%s/cancel";

private static final String LINK_TEXT = "Link";

Expand All @@ -117,6 +123,7 @@ public class NarayanaLRAClient implements Closeable {
private static final long LEAVE_TIMEOUT = Long.getLong("lra.internal.client.leave.timeout", CLIENT_TIMEOUT);
private static final long QUERY_TIMEOUT = Long.getLong("lra.internal.client.query.timeout", CLIENT_TIMEOUT);

private Service coordinatorService;
private URI coordinatorUrl;

/**
Expand All @@ -142,7 +149,7 @@ public NarayanaLRAClient() {
* @param coordinatorPath path where the LRA coordinator will be contacted
*/
public NarayanaLRAClient(String protocol, String host, int port, String coordinatorPath) {
coordinatorUrl = UriBuilder.fromPath(coordinatorPath).scheme(protocol).host(host).port(port).build();
clusterConfig(UriBuilder.fromPath(coordinatorPath).scheme(protocol).host(host).port(port).build());
}

/**
Expand All @@ -153,7 +160,7 @@ public NarayanaLRAClient(String protocol, String host, int port, String coordina
* @param coordinatorUrl uri of the LRA coordinator
*/
public NarayanaLRAClient(URI coordinatorUrl) {
this.coordinatorUrl = coordinatorUrl;
clusterConfig(coordinatorUrl);
}

/**
Expand All @@ -165,13 +172,44 @@ public NarayanaLRAClient(URI coordinatorUrl) {
* @throws IllegalStateException thrown when the provided URL String is not a URL format
*/
public NarayanaLRAClient(String coordinatorUrl) {
clusterConfig(toURI(coordinatorUrl));
}

private URI toURI(String coordinatorUrl) {
try {
this.coordinatorUrl = new URI(coordinatorUrl);
return new URI(coordinatorUrl);
} catch (URISyntaxException use) {
String errMsg = LRALogger.i18nLogger.warn_invalid_uri(
coordinatorUrl, use.getMessage() + " - NarayanaLRAClient constructor");
LRALogger.logger.info(errMsg);
throw new IllegalStateException(errMsg, use);
throw new RuntimeException(errMsg, use); // TODO or throw use and change the caller
}
}

private void clusterConfig(URI coordinatorUrl) {
try {
String coordinators = ConfigProvider.getConfig().getValue(COORDINATOR_URLS_KEY, String.class);

if (coordinators == null || coordinators.isEmpty()) {
// COORDINATOR_URLS_KEY property is set but empty
this.coordinatorUrl = coordinatorUrl;
} else {
try {
Stork.initialize();
var stork = Stork.getInstance()
.defineIfAbsent(COORDINATOR_PATH_NAME, ServiceDefinition.of(new StaticConfiguration()
.withAddressList(coordinators)
.withShuffle("true"))); // default is round-robin, stork supports other algorithms
this.coordinatorService = stork.getService(COORDINATOR_PATH_NAME); // add a hook to shut it down
this.coordinatorUrl = toURI(coordinators.split(",")[0]);
} catch (IllegalArgumentException e) {
this.coordinatorUrl = coordinatorUrl;
LRALogger.logger.infof("Stork service discovery unavailable (%s), using coordinator %s",
e.getMessage(), coordinatorUrl);
}
}
} catch (Exception e) {
// COORDINATOR_URLS_KEY property not set
this.coordinatorUrl = coordinatorUrl;
}
}

Expand Down Expand Up @@ -295,14 +333,29 @@ public URI startLRA(URI parentLRA, String clientID, Long timeout, ChronoUnit uni
if (unit == null) {
unit = ChronoUnit.SECONDS;
}

try {
URI coordinatorInstance; // URI of one of a cluster of coordinators

if (coordinatorService != null) {
var instance = coordinatorService.selectInstance().await().indefinitely();
if (LRALogger.logger.isDebugEnabled()) {
LRALogger.logger.debugf("Selected coordinator %s:%d%n",
instance.getHost(), instance.getPort());
}
coordinatorInstance = UriBuilder.fromPath(LRAConstants.COORDINATOR_PATH_NAME)
.scheme(instance.isSecure() ? "https" : "http") // remark do we want to support the "storks" scheme
.host(instance.getHost())
.port(instance.getPort()).build();
} else {
coordinatorInstance = coordinatorUrl;
}

String encodedParentLRA = parentLRA == null ? ""
: URLEncoder.encode(parentLRA.toString(), StandardCharsets.UTF_8);

client = getClient();

response = client.target(coordinatorUrl)
response = client.target(coordinatorInstance)
.path(START_PATH)
.queryParam(CLIENT_ID_PARAM_NAME, clientID)
.queryParam(TIMELIMIT_PARAM_NAME, Duration.of(timeout, unit).toMillis())
Expand Down Expand Up @@ -416,8 +469,7 @@ public void leaveLRA(URI lraId, String body) throws WebApplicationException {
try {
client = getClient();

response = client.target(coordinatorUrl)
.path(String.format(LEAVE_PATH, LRAConstants.getLRAUid(lraId)))
response = client.target(String.format(LEAVE_PATH, lraId))
.request()
.header(NARAYANA_LRA_API_VERSION_HEADER_NAME, LRAConstants.CURRENT_API_VERSION_STRING)
.async()
Expand Down Expand Up @@ -576,7 +628,8 @@ public LRAStatus getStatus(URI uri) throws WebApplicationException {
URL lraId;

try {
lraId = uri.toURL();
// remove the query parameter since it's not in the spec
lraId = UriBuilder.fromUri(uri).replaceQuery(null).build().toURL();
} catch (MalformedURLException mue) {
throwGenericLRAException(null,
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
Expand All @@ -586,8 +639,7 @@ public LRAStatus getStatus(URI uri) throws WebApplicationException {

try {
client = getClient();
response = client.target(coordinatorUrl)
.path(String.format(STATUS_PATH, LRAConstants.getLRAUid(uri)))
response = client.target(String.format(STATUS_PATH, lraId))
.request()
.header(NARAYANA_LRA_API_VERSION_HEADER_NAME, LRAConstants.CURRENT_API_VERSION_STRING)
.async()
Expand Down Expand Up @@ -769,17 +821,14 @@ public URI enlistCompensator(URI uri, Long timelimit, String linkHeader, StringB
}

private void endLRA(URI lra, boolean confirm, String compensator, String userData) throws WebApplicationException {
Client client = null;
Response response = null;
Response response;

lraTracef(lra, "%s LRA", confirm ? "close" : "compensate");

try {
client = getClient();
String lraUid = LRAConstants.getLRAUid(lra);
try (Client client = getClient()) {
try {
response = client.target(coordinatorUrl)
.path(confirm ? String.format(CLOSE_PATH, lraUid) : String.format(CANCEL_PATH, lraUid))
URI uri = UriBuilder.fromUri(lra).replaceQuery(null).build();
response = client.target(confirm ? String.format(CLOSE_PATH, uri) : String.format(CANCEL_PATH, uri))
.request()
.header(NARAYANA_LRA_API_VERSION_HEADER_NAME, LRAConstants.CURRENT_API_VERSION_STRING)
.header(NARAYANA_LRA_PARTICIPANT_LINK_HEADER_NAME, compensator)
Expand All @@ -806,9 +855,6 @@ private void endLRA(URI lra, boolean confirm, String compensator, String userDat
Current.pop(lra);
Current.removeActiveLRACache(lra);

if (client != null) {
client.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ public LongRunningAction(LRAService lraService, String baseUrl, LongRunningActio
this.lraService = lraService;

if (parent != null) {
this.id = Current.buildFullLRAUrl(String.format("%s/%s", baseUrl, get_uid().fileStringForm()), parent.getId());
this.parentId = parent.getId();
// encode the parent in the child URI (by rights we'd use LRA_HTTP_PARENT_CONTEXT_HEADER)
// the parent is used by children to contact parents in certain scenarios
// BTW this technique is historical and needs to be changed to use the header
this.id = Current.buildFullLRAUrl(String.format("%s/%s", baseUrl, get_uid().fileStringForm()), parent.getId());
} else {
this.id = new URI(String.format("%s/%s", baseUrl, get_uid().fileStringForm()));
}
Expand Down
Loading
Loading