Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/client/ClientPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.client;

import io.temporal.common.Experimental;
import io.temporal.common.SimplePlugin;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubs.ClientPluginCallback;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import javax.annotation.Nonnull;

/**
* Plugin interface for customizing Temporal client configuration and lifecycle.
*
* <p>Plugins participate in two phases:
*
* <ul>
* <li><b>Configuration phase:</b> Plugins are called in registration order to modify options
* <li><b>Connection phase:</b> Plugins are called in reverse order to wrap service client
* creation
* </ul>
*
* <p>Example implementation:
*
* <pre>{@code
* public class LoggingPlugin extends SimplePlugin {
* public LoggingPlugin() {
* super("my-org.logging");
* }
*
* @Override
* public WorkflowClientOptions.Builder configureClient(
* WorkflowClientOptions.Builder builder) {
* // Add custom interceptor
* return builder.setInterceptors(new LoggingInterceptor());
* }
*
* @Override
* public WorkflowServiceStubs connectServiceClient(
* WorkflowServiceStubsOptions options,
* ServiceStubsSupplier next) throws Exception {
* logger.info("Connecting to Temporal at {}", options.getTarget());
* WorkflowServiceStubs stubs = next.get();
* logger.info("Connected successfully");
* return stubs;
* }
* }
* }</pre>
*
* @see io.temporal.worker.WorkerPlugin
* @see SimplePlugin
*/
@Experimental
public interface ClientPlugin extends ClientPluginCallback {

/**
* Returns a unique name for this plugin. Used for logging and duplicate detection. Recommended
* format: "organization.plugin-name" (e.g., "io.temporal.tracing")
*
* @return fully qualified plugin name
*/
@Nonnull
String getName();

/**
* Allows the plugin to modify service stubs options before the service stubs are created. Called
* during configuration phase in forward (registration) order.
*
* @param builder the options builder to modify
* @return the modified builder (may return same instance or new builder)
*/
@Override
@Nonnull
default WorkflowServiceStubsOptions.Builder configureServiceStubs(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default

Even in places where we could provide default implementations (e.g. Python and Ruby), we intentionally chose not to because we want to force implementers to implement these (even if they choose no-op). Granted in the simple plugin it makes sense to have default implementations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably this shouldn't return a value since builder is mutable, though it's mostly harmless to do so

@Nonnull WorkflowServiceStubsOptions.Builder builder) {
return builder;
}

/**
* Allows the plugin to modify workflow client options before the client is created. Called during
* configuration phase in forward (registration) order.
*
* @param builder the options builder to modify
* @return the modified builder
*/
@Nonnull
default WorkflowClientOptions.Builder configureClient(
@Nonnull WorkflowClientOptions.Builder builder) {
return builder;
}

/**
* Allows the plugin to wrap service client connection. Called during connection phase in reverse
* order (first plugin wraps all others).
*
* <p>Example:
*
* <pre>{@code
* @Override
* public WorkflowServiceStubs connectServiceClient(
* WorkflowServiceStubsOptions options,
* ClientPluginCallback.ServiceStubsSupplier next) throws Exception {
* logger.info("Connecting to Temporal...");
* WorkflowServiceStubs stubs = next.get();
* logger.info("Connected successfully");
* return stubs;
* }
* }</pre>
*
* @param options the final options being used for connection
* @param next supplier that creates the service stubs (calls next plugin or actual connection)
* @return the service stubs (possibly wrapped or decorated)
* @throws Exception if connection fails
*/
@Override
@Nonnull
default WorkflowServiceStubs connectServiceClient(
@Nonnull WorkflowServiceStubsOptions options,
@Nonnull ClientPluginCallback.ServiceStubsSupplier next)
throws Exception {
return next.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public static WorkflowClient newInstance(

WorkflowClientInternalImpl(
WorkflowServiceStubs workflowServiceStubs, WorkflowClientOptions options) {
// Apply plugin configuration phase (forward order)
options = applyClientPluginConfiguration(options);
options = WorkflowClientOptions.newBuilder(options).validateAndBuildWithDefaults();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkflowClientOptions.newBuilder(options)

Should just pass this builder into applying plugin configuration (or just inline applying plugin configuration), no need to make the builder twice.

workflowServiceStubs =
new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace());
Expand Down Expand Up @@ -771,4 +773,24 @@ public NexusStartWorkflowResponse startNexus(
WorkflowInvocationHandler.closeAsyncInvocation();
}
}

/**
* Applies client plugin configuration phase. Plugins are called in forward (registration) order
* to modify the client options.
*/
private static WorkflowClientOptions applyClientPluginConfiguration(
WorkflowClientOptions options) {
List<?> plugins = options.getPlugins();
if (plugins == null || plugins.isEmpty()) {
return options;
}

WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder(options);
for (Object plugin : plugins) {
if (plugin instanceof ClientPlugin) {
builder = ((ClientPlugin) plugin).configureClient(builder);
}
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.temporal.client;

import io.temporal.api.enums.v1.QueryRejectCondition;
import io.temporal.common.Experimental;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.common.interceptors.WorkflowClientInterceptor;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -47,6 +49,7 @@ public static final class Builder {
private String binaryChecksum;
private List<ContextPropagator> contextPropagators;
private QueryRejectCondition queryRejectCondition;
private List<Object> plugins;

private Builder() {}

Expand All @@ -61,6 +64,7 @@ private Builder(WorkflowClientOptions options) {
binaryChecksum = options.binaryChecksum;
contextPropagators = options.contextPropagators;
queryRejectCondition = options.queryRejectCondition;
plugins = options.plugins != null ? new ArrayList<>(options.plugins) : null;
}

public Builder setNamespace(String namespace) {
Expand Down Expand Up @@ -132,6 +136,47 @@ public Builder setQueryRejectCondition(QueryRejectCondition queryRejectCondition
return this;
}

/**
* Sets the plugins to use with this client. Plugins can modify client and worker configuration,
* intercept connection, and wrap execution lifecycle.
*
* <p>Each plugin should implement {@link io.temporal.client.ClientPlugin} and/or {@link
* io.temporal.worker.WorkerPlugin}. Plugins that implement both interfaces are automatically
* propagated to workers created from this client.
*
* @param plugins the list of plugins to use (each should implement Plugin)
* @return this builder for chaining
* @see io.temporal.client.ClientPlugin
* @see io.temporal.worker.WorkerPlugin
*/
@Experimental
public Builder setPlugins(List<?> plugins) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider matching our approach for interceptors (just a varargs setter) instead of doing something different here. How interceptors are configured is an ideal model for plugins throughout the SDK even if just for consistency reasons IMO (e.g. as arrays, etc).

this.plugins = plugins != null ? new ArrayList<>(plugins) : null;
return this;
}

/**
* Adds a plugin to use with this client. Plugins can modify client and worker configuration,
* intercept connection, and wrap execution lifecycle.
*
* <p>The plugin should implement {@link io.temporal.client.ClientPlugin} and/or {@link
* io.temporal.worker.WorkerPlugin}. Plugins that implement both interfaces are automatically
* propagated to workers created from this client.
*
* @param plugin the plugin to add (should implement Plugin)
* @return this builder for chaining
* @see io.temporal.client.ClientPlugin
* @see io.temporal.worker.WorkerPlugin
*/
@Experimental
public Builder addPlugin(Object plugin) {
if (this.plugins == null) {
this.plugins = new ArrayList<>();
}
this.plugins.add(Objects.requireNonNull(plugin, "Plugin cannot be null"));
return this;
}

public WorkflowClientOptions build() {
return new WorkflowClientOptions(
namespace,
Expand All @@ -140,7 +185,8 @@ public WorkflowClientOptions build() {
identity,
binaryChecksum,
contextPropagators,
queryRejectCondition);
queryRejectCondition,
plugins);
}

public WorkflowClientOptions validateAndBuildWithDefaults() {
Expand All @@ -154,7 +200,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators,
queryRejectCondition == null
? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED
: queryRejectCondition);
: queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
}
}

Expand All @@ -163,6 +210,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {

private static final List<ContextPropagator> EMPTY_CONTEXT_PROPAGATORS = Collections.emptyList();

private static final List<Object> EMPTY_PLUGINS = Collections.emptyList();

private final String namespace;

private final DataConverter dataConverter;
Expand All @@ -177,21 +226,25 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {

private final QueryRejectCondition queryRejectCondition;

private final List<Object> plugins;

private WorkflowClientOptions(
String namespace,
DataConverter dataConverter,
WorkflowClientInterceptor[] interceptors,
String identity,
String binaryChecksum,
List<ContextPropagator> contextPropagators,
QueryRejectCondition queryRejectCondition) {
QueryRejectCondition queryRejectCondition,
List<Object> plugins) {
this.namespace = namespace;
this.dataConverter = dataConverter;
this.interceptors = interceptors;
this.identity = identity;
this.binaryChecksum = binaryChecksum;
this.contextPropagators = contextPropagators;
this.queryRejectCondition = queryRejectCondition;
this.plugins = plugins;
}

/**
Expand Down Expand Up @@ -236,6 +289,20 @@ public QueryRejectCondition getQueryRejectCondition() {
return queryRejectCondition;
}

/**
* Returns the list of plugins configured for this client.
*
* <p>Each plugin implements {@link io.temporal.client.ClientPlugin} and/or {@link
* io.temporal.worker.WorkerPlugin}. Plugins that implement both interfaces are automatically
* propagated to workers created from this client.
*
* @return an unmodifiable list of plugins, never null
*/
@Experimental
public List<?> getPlugins() {
return plugins != null ? Collections.unmodifiableList(plugins) : Collections.emptyList();
}

@Override
public String toString() {
return "WorkflowClientOptions{"
Expand All @@ -256,6 +323,8 @@ public String toString() {
+ contextPropagators
+ ", queryRejectCondition="
+ queryRejectCondition
+ ", plugins="
+ plugins
+ '}';
}

Expand All @@ -270,7 +339,8 @@ public boolean equals(Object o) {
&& com.google.common.base.Objects.equal(identity, that.identity)
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
&& com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators)
&& queryRejectCondition == that.queryRejectCondition;
&& queryRejectCondition == that.queryRejectCondition
&& com.google.common.base.Objects.equal(plugins, that.plugins);
}

@Override
Expand All @@ -282,6 +352,7 @@ public int hashCode() {
identity,
binaryChecksum,
contextPropagators,
queryRejectCondition);
queryRejectCondition,
plugins);
}
}
Loading
Loading