diff --git a/.fossa.yml b/.fossa.yml index 5b1f524cc4f9..a9916e5447ed 100644 --- a/.fossa.yml +++ b/.fossa.yml @@ -73,6 +73,9 @@ targets: - type: gradle path: ./ target: ':instrumentation:apache-dubbo-2.7:library-autoconfigure' + - type: gradle + path: ./ + target: ':instrumentation:apache-elasticjob-3.0:javaagent' - type: gradle path: ./ target: ':instrumentation:apache-httpasyncclient-4.1:javaagent' diff --git a/docs/supported-libraries.md b/docs/supported-libraries.md index 4b9d839b0414..2860490745db 100644 --- a/docs/supported-libraries.md +++ b/docs/supported-libraries.md @@ -29,6 +29,7 @@ These are the supported libraries and frameworks: | [Apache CXF JAX-WS](https://cxf.apache.org/) | 3.0+ (not including 4.0+ yet) | N/A | Provides `http.route` [2], Controller Spans [3] | | [Apache DBCP](https://commons.apache.org/proper/commons-dbcp/) | 2.0+ | [opentelemetry-apache-dbcp-2.0](../instrumentation/apache-dbcp-2.0/library) | [Database Pool Metrics] | | [Apache Dubbo](https://github.com/apache/dubbo/) | 2.7+ | [opentelemetry-apache-dubbo-2.7](../instrumentation/apache-dubbo-2.7/library-autoconfigure) | [RPC Client Spans], [RPC Server Spans] | +| [Apache ElasticJob](https://shardingsphere.apache.org/elasticjob/) | 3.0+ | N/A | none | | [Apache HttpAsyncClient](https://hc.apache.org/index.html) | 4.1+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] | | [Apache HttpClient](https://hc.apache.org/index.html) | 2.0+ | [opentelemetry-apache-httpclient-4.3](../instrumentation/apache-httpclient/apache-httpclient-4.3/library),
[opentelemetry-apache-httpclient-5.2](../instrumentation/apache-httpclient/apache-httpclient-5.2/library) | [HTTP Client Spans], [HTTP Client Metrics] | | [Apache ShenYu](https://shenyu.apache.org/) | 2.4+ | N/A | Provides `http.route` [2] | diff --git a/instrumentation/apache-elasticjob-3.0/README.md b/instrumentation/apache-elasticjob-3.0/README.md new file mode 100644 index 000000000000..3a30d62aa418 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/README.md @@ -0,0 +1,5 @@ +# Settings for the Apache ElasticJob instrumentation + +| System property | Type | Default | Description | +|-----------------------------------------------------------------------|---------|---------|-----------------------------------------------------| +| `otel.instrumentation.apache-elasticjob.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/build.gradle.kts b/instrumentation/apache-elasticjob-3.0/javaagent/build.gradle.kts new file mode 100644 index 000000000000..8c71224ec6e3 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/build.gradle.kts @@ -0,0 +1,25 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("org.apache.shardingsphere.elasticjob") + module.set("elasticjob-lite-core") + versions.set("[3.0.0,)") + assertInverse.set(true) + } +} + +dependencies { + library("org.apache.shardingsphere.elasticjob:elasticjob-lite-core:3.0.0") + + testImplementation("org.apache.curator:curator-test:5.1.0") +} + +tasks.withType().configureEach { + // required on jdk17 + jvmArgs("--add-opens=java.base/java.lang=ALL-UNNAMED") + jvmArgs("-XX:+IgnoreUnrecognizedVMOptions") + jvmArgs("-Dotel.instrumentation.apache-elasticjob.experimental-span-attributes=true") +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/DataflowJobExecutorInstrumentation.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/DataflowJobExecutorInstrumentation.java new file mode 100644 index 000000000000..322dd3580905 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/DataflowJobExecutorInstrumentation.java @@ -0,0 +1,69 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import static io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0.ElasticJobSingletons.helper; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.shardingsphere.elasticjob.api.ShardingContext; +import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob; + +public class DataflowJobExecutorInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.shardingsphere.elasticjob.dataflow.executor.DataflowJobExecutor"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("process")).and(takesArguments(4)), + DataflowJobExecutorInstrumentation.class.getName() + "$ProcessAdvice"); + } + + @SuppressWarnings("unused") + public static class ProcessAdvice { + + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ElasticJobHelper.ElasticJobScope onEnter( + @Advice.Argument(0) DataflowJob elasticJob, + @Advice.Argument(3) ShardingContext shardingContext) { + + ElasticJobProcessRequest request = + ElasticJobProcessRequest.createWithUserJobInfo( + shardingContext.getJobName(), + shardingContext.getTaskId(), + shardingContext.getShardingItem(), + shardingContext.getShardingTotalCount(), + shardingContext.getShardingParameter() != null + ? shardingContext.getShardingParameter() + : "", + "DATAFLOW", + elasticJob.getClass(), + "processData"); + + return helper().startSpan(request); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Enter ElasticJobHelper.ElasticJobScope scope, @Advice.Thrown Throwable throwable) { + if (scope != null) { + helper().endSpan(scope, throwable); + } + } + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobCodeAttributesGetter.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobCodeAttributesGetter.java new file mode 100644 index 000000000000..a1585686b483 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobCodeAttributesGetter.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesGetter; +import javax.annotation.Nullable; + +class ElasticJobCodeAttributesGetter implements CodeAttributesGetter { + @Nullable + @Override + public Class getCodeClass(ElasticJobProcessRequest request) { + if (request.isScriptJob() || request.isHttpJob()) { + return null; + } + return request.getUserJobClass(); + } + + @Nullable + @Override + public String getMethodName(ElasticJobProcessRequest request) { + if (request.isScriptJob() || request.isHttpJob()) { + return null; + } + return request.getUserMethodName() != null ? request.getUserMethodName() : "process"; + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobExecutorInstrumentation.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobExecutorInstrumentation.java new file mode 100644 index 000000000000..ac8d86164f93 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobExecutorInstrumentation.java @@ -0,0 +1,98 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import static io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0.ElasticJobSingletons.helper; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts; + +public class ElasticJobExecutorInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.shardingsphere.elasticjob.executor.ElasticJobExecutor"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("process") + .and( + takesArgument( + 0, named("org.apache.shardingsphere.elasticjob.api.JobConfiguration"))) + .and( + takesArgument( + 1, + named("org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts"))) + .and(takesArgument(2, int.class)) + .and( + takesArgument( + 3, + named("org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent"))), + ElasticJobExecutorInstrumentation.class.getName() + "$ElasticJobExecutorAdvice"); + } + + @SuppressWarnings("unused") + public static class ElasticJobExecutorAdvice { + + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ElasticJobHelper.ElasticJobScope onEnter( + @Advice.FieldValue("jobItemExecutor") Object jobItemExecutor, + @Advice.Argument(1) ShardingContexts shardingContexts, + @Advice.Argument(2) int item) { + + String jobType = determineJobTypeFromExecutor(jobItemExecutor); + if (!"SCRIPT".equals(jobType) && !"HTTP".equals(jobType)) { + return null; + } + ElasticJobProcessRequest request = + ElasticJobProcessRequest.create( + shardingContexts.getJobName(), + shardingContexts.getTaskId(), + item, + shardingContexts.getShardingTotalCount(), + shardingContexts.getShardingItemParameters() == null + ? "" + : shardingContexts.getShardingItemParameters().toString(), + jobType); + return helper().startSpan(request); + } + + public static String determineJobTypeFromExecutor(Object jobItemExecutor) { + if (jobItemExecutor == null) { + return "UNKNOWN"; + } else { + switch (jobItemExecutor.getClass().getSimpleName()) { + case "HttpJobExecutor": + return "HTTP"; + case "ScriptJobExecutor": + return "SCRIPT"; + case "SimpleJobExecutor": + return "SIMPLE"; + case "DataflowJobExecutor": + return "DATAFLOW"; + default: + return "UNKNOWN"; + } + } + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Enter @Nullable ElasticJobHelper.ElasticJobScope scope, + @Advice.Thrown @Nullable Throwable throwable) { + helper().endSpan(scope, throwable); + } + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobExperimentalAttributeExtractor.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobExperimentalAttributeExtractor.java new file mode 100644 index 000000000000..2e8b7ec35a9b --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobExperimentalAttributeExtractor.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import javax.annotation.Nullable; + +class ElasticJobExperimentalAttributeExtractor + implements AttributesExtractor { + + private static final AttributeKey ELASTICJOB_JOB_NAME = + AttributeKey.stringKey("scheduling.apache-elasticjob.job.name"); + private static final AttributeKey ELASTICJOB_TASK_ID = + AttributeKey.stringKey("scheduling.apache-elasticjob.task.id"); + private static final AttributeKey ELASTICJOB_ITEM = + AttributeKey.longKey("scheduling.apache-elasticjob.item"); + private static final AttributeKey ELASTICJOB_SHARDING_TOTAL_COUNT = + AttributeKey.longKey("scheduling.apache-elasticjob.sharding.total.count"); + private static final AttributeKey ELASTICJOB_SHARDING_ITEM_PARAMETERS = + AttributeKey.stringKey("scheduling.apache-elasticjob.sharding.item.parameters"); + + @Override + public void onStart( + AttributesBuilder attributes, + Context parentContext, + ElasticJobProcessRequest elasticJobProcessRequest) { + attributes.put(ELASTICJOB_JOB_NAME, elasticJobProcessRequest.getJobName()); + attributes.put(ELASTICJOB_TASK_ID, elasticJobProcessRequest.getTaskId()); + attributes.put(ELASTICJOB_ITEM, elasticJobProcessRequest.getItem()); + attributes.put( + ELASTICJOB_SHARDING_TOTAL_COUNT, elasticJobProcessRequest.getShardingTotalCount()); + if (elasticJobProcessRequest.getShardingItemParameters() != null) { + attributes.put( + ELASTICJOB_SHARDING_ITEM_PARAMETERS, + elasticJobProcessRequest.getShardingItemParameters()); + } + } + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + ElasticJobProcessRequest elasticJobProcessRequest, + @Nullable Void unused, + @Nullable Throwable error) {} +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobHelper.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobHelper.java new file mode 100644 index 000000000000..2b06befa99fe --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobHelper.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import javax.annotation.Nullable; + +public final class ElasticJobHelper { + private final Instrumenter instrumenter; + + private ElasticJobHelper(Instrumenter instrumenter) { + this.instrumenter = instrumenter; + } + + public static ElasticJobHelper create(Instrumenter instrumenter) { + return new ElasticJobHelper(instrumenter); + } + + @Nullable + public ElasticJobScope startSpan(ElasticJobProcessRequest request) { + Context parentContext = Context.current(); + if (!this.instrumenter.shouldStart(parentContext, request)) { + return null; + } else { + Context context = this.instrumenter.start(parentContext, request); + return new ElasticJobScope(request, context, context.makeCurrent()); + } + } + + public void endSpan(@Nullable ElasticJobScope scope, @Nullable Throwable throwable) { + if (scope != null) { + if (throwable != null) { + scope.request.setFailed(); + } + + scope.scope.close(); + this.instrumenter.end(scope.context, scope.request, null, throwable); + } + } + + public static class ElasticJobScope { + private final ElasticJobProcessRequest request; + private final Context context; + private final Scope scope; + + private ElasticJobScope(ElasticJobProcessRequest request, Context context, Scope scope) { + this.request = request; + this.context = context; + this.scope = scope; + } + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobInstrumentationModule.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobInstrumentationModule.java new file mode 100644 index 000000000000..f81690c3c71d --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobInstrumentationModule.java @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; +import java.util.Arrays; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class ElasticJobInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { + + public ElasticJobInstrumentationModule() { + super("apache-elasticjob", "apache-elasticjob-3.0"); + } + + @Override + public List typeInstrumentations() { + return Arrays.asList( + new ElasticJobExecutorInstrumentation(), + new SimpleJobExecutorInstrumentation(), + new DataflowJobExecutorInstrumentation()); + } + + @Override + public boolean isIndyReady() { + return true; + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobInstrumenterFactory.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobInstrumenterFactory.java new file mode 100644 index 000000000000..66479e03d034 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobInstrumenterFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig; + +public final class ElasticJobInstrumenterFactory { + private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = + AgentInstrumentationConfig.get() + .getBoolean("otel.instrumentation.apache-elasticjob.experimental-span-attributes", false); + + public static Instrumenter create(String instrumentationName) { + ElasticJobCodeAttributesGetter codeAttributesGetter = new ElasticJobCodeAttributesGetter(); + InstrumenterBuilder builder = + Instrumenter.builder( + GlobalOpenTelemetry.get(), + instrumentationName, + new ElasticJobSpanNameExtractor(codeAttributesGetter)) + .addAttributesExtractor(CodeAttributesExtractor.create(codeAttributesGetter)) + .setSpanStatusExtractor( + (spanStatusBuilder, elasticJobProcessRequest, unused, error) -> { + if (error != null || elasticJobProcessRequest.isFailed()) { + spanStatusBuilder.setStatus(StatusCode.ERROR); + } + }); + if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + builder.addAttributesExtractor( + AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "elasticjob")); + builder.addAttributesExtractor(new ElasticJobExperimentalAttributeExtractor()); + } + + return builder.buildInstrumenter(); + } + + private ElasticJobInstrumenterFactory() {} +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobProcessRequest.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobProcessRequest.java new file mode 100644 index 000000000000..7c37baaa367d --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobProcessRequest.java @@ -0,0 +1,104 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +public final class ElasticJobProcessRequest { + private String jobName; + private String taskId; + private int item; + private int shardingTotalCount; + private String shardingItemParameters; + private boolean failed; + private String jobType; + private Class userJobClass; + private String userMethodName; + + public static ElasticJobProcessRequest create( + String jobName, + String taskId, + int item, + int shardingTotalCount, + String shardingItemParameters, + String jobType) { + ElasticJobProcessRequest request = new ElasticJobProcessRequest(); + request.jobName = jobName; + request.taskId = taskId; + request.item = item; + request.shardingTotalCount = shardingTotalCount; + request.shardingItemParameters = shardingItemParameters; + request.jobType = jobType; + return request; + } + + public static ElasticJobProcessRequest createWithUserJobInfo( + String jobName, + String taskId, + int item, + int shardingTotalCount, + String shardingItemParameters, + String jobType, + Class userJobClass, + String userMethodName) { + ElasticJobProcessRequest request = new ElasticJobProcessRequest(); + request.jobName = jobName; + request.taskId = taskId; + request.item = item; + request.shardingTotalCount = shardingTotalCount; + request.shardingItemParameters = shardingItemParameters; + request.jobType = jobType; + request.userJobClass = userJobClass; + request.userMethodName = userMethodName; + return request; + } + + public void setFailed() { + this.failed = true; + } + + public boolean isFailed() { + return this.failed; + } + + public String getJobName() { + return this.jobName; + } + + public String getTaskId() { + return this.taskId; + } + + public int getItem() { + return this.item; + } + + public int getShardingTotalCount() { + return this.shardingTotalCount; + } + + public String getShardingItemParameters() { + return this.shardingItemParameters; + } + + public String getJobType() { + return this.jobType; + } + + public boolean isScriptJob() { + return "SCRIPT".equals(this.jobType); + } + + public boolean isHttpJob() { + return "HTTP".equals(this.jobType); + } + + public Class getUserJobClass() { + return this.userJobClass; + } + + public String getUserMethodName() { + return this.userMethodName; + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobSingletons.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobSingletons.java new file mode 100644 index 000000000000..a5ff6b7a3e33 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobSingletons.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; + +public final class ElasticJobSingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.apache-elasticjob-3.0"; + private static final Instrumenter INSTRUMENTER = + ElasticJobInstrumenterFactory.create(INSTRUMENTATION_NAME); + private static final ElasticJobHelper HELPER = ElasticJobHelper.create(INSTRUMENTER); + + public static ElasticJobHelper helper() { + return HELPER; + } + + private ElasticJobSingletons() {} +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobSpanNameExtractor.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobSpanNameExtractor.java new file mode 100644 index 000000000000..0692ad9c4ce4 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobSpanNameExtractor.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; + +class ElasticJobSpanNameExtractor implements SpanNameExtractor { + private final SpanNameExtractor codeSpanNameExtractor; + + ElasticJobSpanNameExtractor(CodeAttributesGetter getter) { + this.codeSpanNameExtractor = CodeSpanNameExtractor.create(getter); + } + + @Override + public String extract(ElasticJobProcessRequest request) { + if (request.isScriptJob()) { + return "SCRIPT"; + } + if (request.isHttpJob()) { + return "HTTP"; + } + return this.codeSpanNameExtractor.extract(request); + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/SimpleJobExecutorInstrumentation.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/SimpleJobExecutorInstrumentation.java new file mode 100644 index 000000000000..8d7cc7900b6d --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/SimpleJobExecutorInstrumentation.java @@ -0,0 +1,69 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import static io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0.ElasticJobSingletons.helper; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.shardingsphere.elasticjob.api.ShardingContext; +import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; + +public class SimpleJobExecutorInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.shardingsphere.elasticjob.simple.executor.SimpleJobExecutor"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("process")).and(takesArguments(4)), + SimpleJobExecutorInstrumentation.class.getName() + "$ProcessAdvice"); + } + + @SuppressWarnings("unused") + public static class ProcessAdvice { + + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ElasticJobHelper.ElasticJobScope onEnter( + @Advice.Argument(0) SimpleJob elasticJob, + @Advice.Argument(3) ShardingContext shardingContext) { + + ElasticJobProcessRequest request = + ElasticJobProcessRequest.createWithUserJobInfo( + shardingContext.getJobName(), + shardingContext.getTaskId(), + shardingContext.getShardingItem(), + shardingContext.getShardingTotalCount(), + shardingContext.getShardingParameter() != null + ? shardingContext.getShardingParameter() + : "", + "SIMPLE", + elasticJob.getClass(), + "execute"); + + return helper().startSpan(request); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Enter ElasticJobHelper.ElasticJobScope scope, @Advice.Thrown Throwable throwable) { + if (scope != null) { + helper().endSpan(scope, throwable); + } + } + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobTest.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobTest.java new file mode 100644 index 000000000000..ec0238ccdf59 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobTest.java @@ -0,0 +1,475 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermissions; +import java.time.Duration; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.shardingsphere.elasticjob.api.JobConfiguration; +import org.apache.shardingsphere.elasticjob.api.ShardingContext; +import org.apache.shardingsphere.elasticjob.http.props.HttpJobProperties; +import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; +import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; +import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; +import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; +import org.apache.shardingsphere.elasticjob.script.props.ScriptJobProperties; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class ElasticJobTest { + + private static final int EMBED_ZOOKEEPER_PORT = 4181; + private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT; + + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static CoordinatorRegistryCenter regCenter; + private static int port; + private static HttpServer httpServer; + + @BeforeAll + public static void init() throws IOException { + EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT); + regCenter = setUpRegistryCenter(); + port = PortUtils.findOpenPort(); + httpServer = HttpServer.create(new InetSocketAddress(port), 0); + httpServer.createContext( + "/hello", + new HttpHandler() { + @Override + public void handle(HttpExchange exchange) throws IOException { + byte[] response = "{\"success\": true}".getBytes(UTF_8); + exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + } + }); + new Thread(() -> httpServer.start()).start(); + } + + @AfterAll + public static void stop() { + httpServer.stop(0); + } + + @AfterEach + public void clearSpans() { + testing.clearData(); + + try { + Thread.sleep(100L); + } catch (InterruptedException var2) { + Thread.currentThread().interrupt(); + } + } + + @Test + public void testHttpJob() throws InterruptedException { + ScheduleJobBootstrap bootstrap = setUpHttpJob(regCenter); + try { + bootstrap.schedule(); + await().atMost(Duration.ofSeconds(12)).until(() -> testing.spans().size() >= 3); + } finally { + bootstrap.shutdown(); + } + + List spans = testing.spans(); + assertThat(spans).hasSize(3); + + for (SpanData span : spans) { + assertThat(span.getName()).isEqualTo("HTTP"); + assertThat(span.getKind()).isEqualTo(SpanKind.INTERNAL); + assertThat(span.getAttributes().get(AttributeKey.stringKey("job.system"))) + .isEqualTo("elasticjob"); + assertThat( + span.getAttributes() + .get(AttributeKey.stringKey("scheduling.apache-elasticjob.job.name"))) + .isEqualTo("javaHttpJob"); + assertThat( + span.getAttributes() + .get(AttributeKey.longKey("scheduling.apache-elasticjob.sharding.total.count"))) + .isEqualTo(3L); + assertThat( + span.getAttributes() + .get( + AttributeKey.stringKey( + "scheduling.apache-elasticjob.sharding.item.parameters"))) + .isEqualTo("{0=Beijing, 1=Shanghai, 2=Guangzhou}"); + + assertThat( + span.getAttributes().get(AttributeKey.longKey("scheduling.apache-elasticjob.item"))) + .isIn(0L, 1L, 2L); + assertThat( + span.getAttributes() + .get(AttributeKey.stringKey("scheduling.apache-elasticjob.task.id"))) + .contains("javaHttpJob"); + } + + Set shardItems = + spans.stream() + .map( + span -> + span.getAttributes() + .get(AttributeKey.longKey("scheduling.apache-elasticjob.item"))) + .collect(Collectors.toSet()); + assertThat(shardItems).containsExactlyInAnyOrder(0L, 1L, 2L); + } + + @Test + public void testSimpleJob() throws InterruptedException { + SynchronizedTestSimpleJob job = new SynchronizedTestSimpleJob(2); + ScheduleJobBootstrap bootstrap = + new ScheduleJobBootstrap( + regCenter, + job, + JobConfiguration.newBuilder("simpleElasticJob", 2) + .cron("0/5 * * * * ?") + .shardingItemParameters("0=A,1=B") + .build()); + + try { + bootstrap.schedule(); + assertThat(job.awaitExecution(10, TimeUnit.SECONDS)) + .as("Job should execute within timeout") + .isTrue(); + await().atMost(Duration.ofSeconds(2)).until(() -> testing.spans().size() >= 2); + } finally { + bootstrap.shutdown(); + } + + List spans = testing.spans(); + assertThat(spans).hasSize(2); + + for (SpanData span : spans) { + assertThat(span.getName()).isEqualTo("SynchronizedTestSimpleJob.execute"); + assertThat(span.getKind()).isEqualTo(SpanKind.INTERNAL); + assertThat(span.getAttributes().get(AttributeKey.stringKey("job.system"))) + .isEqualTo("elasticjob"); + assertThat( + span.getAttributes() + .get(AttributeKey.stringKey("scheduling.apache-elasticjob.job.name"))) + .isEqualTo("simpleElasticJob"); + assertThat( + span.getAttributes() + .get(AttributeKey.longKey("scheduling.apache-elasticjob.sharding.total.count"))) + .isEqualTo(2L); + assertThat(span.getAttributes().get(AttributeKey.stringKey("code.function"))) + .isEqualTo("execute"); + assertThat(span.getAttributes().get(AttributeKey.stringKey("code.namespace"))) + .isEqualTo( + "io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0.ElasticJobTest$SynchronizedTestSimpleJob"); + assertThat( + span.getAttributes().get(AttributeKey.longKey("scheduling.apache-elasticjob.item"))) + .isIn(0L, 1L); + assertThat( + span.getAttributes() + .get(AttributeKey.stringKey("scheduling.apache-elasticjob.task.id"))) + .contains("simpleElasticJob"); + } + + Set shardItems = + spans.stream() + .map( + span -> + span.getAttributes() + .get(AttributeKey.longKey("scheduling.apache-elasticjob.item"))) + .collect(Collectors.toSet()); + assertThat(shardItems).containsExactlyInAnyOrder(0L, 1L); + } + + @Test + public void testDataflowJob() throws InterruptedException { + SynchronizedTestDataflowJob job = new SynchronizedTestDataflowJob(2); + ScheduleJobBootstrap bootstrap = + new ScheduleJobBootstrap( + regCenter, + job, + JobConfiguration.newBuilder("dataflowElasticJob", 2) + .cron("0/5 * * * * ?") + .shardingItemParameters("0=X,1=Y") + .build()); + + try { + bootstrap.schedule(); + assertThat(job.awaitExecution(10, TimeUnit.SECONDS)) + .as("Job should execute within timeout") + .isTrue(); + await().atMost(Duration.ofSeconds(2)).until(() -> testing.spans().size() >= 2); + } finally { + bootstrap.shutdown(); + } + + List spans = testing.spans(); + assertThat(spans).hasSize(2); + + for (SpanData span : spans) { + assertThat(span.getName()).isEqualTo("SynchronizedTestDataflowJob.processData"); + assertThat(span.getKind()).isEqualTo(SpanKind.INTERNAL); + assertThat(span.getAttributes().get(AttributeKey.stringKey("job.system"))) + .isEqualTo("elasticjob"); + assertThat( + span.getAttributes() + .get(AttributeKey.stringKey("scheduling.apache-elasticjob.job.name"))) + .isEqualTo("dataflowElasticJob"); + assertThat( + span.getAttributes() + .get(AttributeKey.longKey("scheduling.apache-elasticjob.sharding.total.count"))) + .isEqualTo(2L); + assertThat(span.getAttributes().get(AttributeKey.stringKey("code.function"))) + .isEqualTo("processData"); + assertThat(span.getAttributes().get(AttributeKey.stringKey("code.namespace"))) + .isEqualTo( + "io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0.ElasticJobTest$SynchronizedTestDataflowJob"); + assertThat( + span.getAttributes().get(AttributeKey.longKey("scheduling.apache-elasticjob.item"))) + .isIn(0L, 1L); + assertThat( + span.getAttributes() + .get(AttributeKey.stringKey("scheduling.apache-elasticjob.task.id"))) + .contains("dataflowElasticJob"); + } + + Set shardItems = + spans.stream() + .map( + span -> + span.getAttributes() + .get(AttributeKey.longKey("scheduling.apache-elasticjob.item"))) + .collect(Collectors.toSet()); + assertThat(shardItems).containsExactlyInAnyOrder(0L, 1L); + } + + @Test + public void testScriptJob() throws IOException { + ScheduleJobBootstrap bootstrap = setUpScriptJob(regCenter); + try { + testing.waitAndAssertTracesWithoutScopeVersionVerification( + trace -> + trace + .hasSize(1) + .hasSpansSatisfyingExactly( + span -> + span.hasKind(SpanKind.INTERNAL) + .hasName("SCRIPT") + .hasAttributesSatisfyingExactly( + equalTo(AttributeKey.stringKey("job.system"), "elasticjob"), + equalTo( + AttributeKey.stringKey( + "scheduling.apache-elasticjob.job.name"), + "scriptElasticJob"), + equalTo( + AttributeKey.longKey("scheduling.apache-elasticjob.item"), + 0L), + equalTo( + AttributeKey.longKey( + "scheduling.apache-elasticjob.sharding.total.count"), + 1L), + equalTo( + AttributeKey.stringKey( + "scheduling.apache-elasticjob.sharding.item.parameters"), + "{0=null}"), + satisfies( + AttributeKey.stringKey( + "scheduling.apache-elasticjob.task.id"), + taskId -> taskId.contains("scriptElasticJob"))))); + } finally { + bootstrap.shutdown(); + } + } + + @Test + public void testFailedJob() throws InterruptedException { + SynchronizedTestFailedJob job = new SynchronizedTestFailedJob(1); + ScheduleJobBootstrap bootstrap = + new ScheduleJobBootstrap( + regCenter, + job, + JobConfiguration.newBuilder("failedElasticJob", 1) + .cron("0/5 * * * * ?") + .shardingItemParameters("0=failed") + .build()); + + try { + bootstrap.schedule(); + assertThat(job.awaitExecution(10, TimeUnit.SECONDS)) + .as("Job should execute within timeout") + .isTrue(); + await().atMost(Duration.ofSeconds(2)).until(() -> testing.spans().size() >= 1); + } finally { + bootstrap.shutdown(); + } + + List spans = testing.spans(); + assertThat(spans).hasSize(1); + + SpanData span = spans.get(0); + assertThat(span.getName()).isEqualTo("SynchronizedTestFailedJob.execute"); + assertThat(span.getKind()).isEqualTo(SpanKind.INTERNAL); + assertThat(span.getStatus().getStatusCode()).isEqualTo(StatusCode.ERROR); + assertThat(span.getAttributes().get(AttributeKey.stringKey("job.system"))) + .isEqualTo("elasticjob"); + assertThat( + span.getAttributes() + .get(AttributeKey.stringKey("scheduling.apache-elasticjob.job.name"))) + .isEqualTo("failedElasticJob"); + assertThat(span.getAttributes().get(AttributeKey.longKey("scheduling.apache-elasticjob.item"))) + .isEqualTo(0L); + assertThat( + span.getAttributes() + .get(AttributeKey.longKey("scheduling.apache-elasticjob.sharding.total.count"))) + .isEqualTo(1L); + assertThat( + span.getAttributes() + .get( + AttributeKey.stringKey( + "scheduling.apache-elasticjob.sharding.item.parameters"))) + .isEqualTo("failed"); + assertThat(span.getAttributes().get(AttributeKey.stringKey("code.function"))) + .isEqualTo("execute"); + assertThat(span.getAttributes().get(AttributeKey.stringKey("code.namespace"))) + .isEqualTo( + "io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0.ElasticJobTest$SynchronizedTestFailedJob"); + + assertThat(span.getEvents()).hasSize(1); + assertThat(span.getEvents().get(0).getName()).isEqualTo("exception"); + assertThat( + span.getEvents().get(0).getAttributes().get(AttributeKey.stringKey("exception.type"))) + .isEqualTo("java.lang.RuntimeException"); + assertThat( + span.getEvents() + .get(0) + .getAttributes() + .get(AttributeKey.stringKey("exception.message"))) + .isEqualTo("Simulated job failure for testing"); + } + + private static CoordinatorRegistryCenter setUpRegistryCenter() { + ZookeeperConfiguration zkConfig = + new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, "elasticjob-example-lite-java"); + CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig); + result.init(); + return result; + } + + private static ScheduleJobBootstrap setUpHttpJob(CoordinatorRegistryCenter regCenter) { + return new ScheduleJobBootstrap( + regCenter, + "HTTP", + JobConfiguration.newBuilder("javaHttpJob", 3) + .setProperty(HttpJobProperties.URI_KEY, "http://localhost:" + port + "/hello") + .setProperty(HttpJobProperties.METHOD_KEY, "GET") + .cron("0/5 * * * * ?") + .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou") + .build()); + } + + private static ScheduleJobBootstrap setUpScriptJob(CoordinatorRegistryCenter regCenter) + throws IOException { + ScheduleJobBootstrap bootstrap = + new ScheduleJobBootstrap( + regCenter, + "SCRIPT", + JobConfiguration.newBuilder("scriptElasticJob", 1) + .cron("0/5 * * * * ?") + .setProperty(ScriptJobProperties.SCRIPT_KEY, buildScriptCommandLine()) + .build()); + bootstrap.schedule(); + return bootstrap; + } + + private static String buildScriptCommandLine() throws IOException { + Path result = Paths.get(ElasticJobTest.class.getResource("/script/demo.sh").getPath()); + Files.setPosixFilePermissions(result, PosixFilePermissions.fromString("rwxr-xr-x")); + return result.toString(); + } + + static class SynchronizedTestSimpleJob extends TestSimpleJob { + private final CountDownLatch executionLatch; + + public SynchronizedTestSimpleJob(int expectedExecutions) { + this.executionLatch = new CountDownLatch(expectedExecutions); + } + + @Override + public void execute(ShardingContext shardingContext) { + super.execute(shardingContext); + executionLatch.countDown(); + } + + public boolean awaitExecution(long timeout, TimeUnit unit) throws InterruptedException { + return executionLatch.await(timeout, unit); + } + } + + static class SynchronizedTestDataflowJob extends TestDataflowJob { + private final CountDownLatch executionLatch; + + public SynchronizedTestDataflowJob(int expectedExecutions) { + this.executionLatch = new CountDownLatch(expectedExecutions); + } + + @Override + public void processData(ShardingContext shardingContext, List data) { + super.processData(shardingContext, data); + executionLatch.countDown(); + } + + public boolean awaitExecution(long timeout, TimeUnit unit) throws InterruptedException { + return executionLatch.await(timeout, unit); + } + } + + static class SynchronizedTestFailedJob extends TestFailedJob { + private final CountDownLatch executionLatch; + + public SynchronizedTestFailedJob(int expectedExecutions) { + this.executionLatch = new CountDownLatch(expectedExecutions); + } + + @Override + public void execute(ShardingContext shardingContext) { + try { + super.execute(shardingContext); + } finally { + executionLatch.countDown(); + } + } + + public boolean awaitExecution(long timeout, TimeUnit unit) throws InterruptedException { + return executionLatch.await(timeout, unit); + } + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/EmbedZookeeperServer.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/EmbedZookeeperServer.java new file mode 100644 index 000000000000..998abbc0f802 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/EmbedZookeeperServer.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import java.io.File; +import java.io.IOException; +import org.apache.curator.test.TestingServer; + +public final class EmbedZookeeperServer { + + private static TestingServer testingServer; + + public static void start(int port) { + try { + testingServer = + new TestingServer( + port, new File(String.format("target/test_zk_data/%s/", System.nanoTime()))); + } catch (Exception ex) { + // ignore + } finally { + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + try { + Thread.sleep(1000L); + testingServer.close(); + } catch (InterruptedException | IOException ignore) { + // ignore + } + })); + } + } + + private EmbedZookeeperServer() {} +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/TestDataflowJob.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/TestDataflowJob.java new file mode 100644 index 000000000000..688873771cdb --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/TestDataflowJob.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import java.util.Arrays; +import java.util.List; +import org.apache.shardingsphere.elasticjob.api.ShardingContext; +import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob; + +public class TestDataflowJob implements DataflowJob { + + @Override + public List fetchData(ShardingContext context) { + // Simulate fetching data based on sharding item + switch (context.getShardingItem()) { + case 0: + return Arrays.asList("data-0-1", "data-0-2"); + case 1: + return Arrays.asList("data-1-1", "data-1-2"); + default: + return Arrays.asList("data-default"); + } + } + + @Override + public void processData(ShardingContext shardingContext, List data) { + // Simulate processing + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/TestFailedJob.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/TestFailedJob.java new file mode 100644 index 000000000000..267d4cb43b96 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/TestFailedJob.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import org.apache.shardingsphere.elasticjob.api.ShardingContext; +import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; + +public class TestFailedJob implements SimpleJob { + + @Override + public void execute(ShardingContext context) { + // Simulate a failed job by throwing an exception + throw new RuntimeException("Simulated job failure for testing"); + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/TestSimpleJob.java b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/TestSimpleJob.java new file mode 100644 index 000000000000..e74e0bb11dda --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/TestSimpleJob.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; + +import org.apache.shardingsphere.elasticjob.api.ShardingContext; +import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; + +public class TestSimpleJob implements SimpleJob { + + @Override + public void execute(ShardingContext context) { + // Simulate some work + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/instrumentation/apache-elasticjob-3.0/javaagent/src/test/resources/script/demo.sh b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/resources/script/demo.sh new file mode 100644 index 000000000000..8aad6f5817b9 --- /dev/null +++ b/instrumentation/apache-elasticjob-3.0/javaagent/src/test/resources/script/demo.sh @@ -0,0 +1,2 @@ +#!/bin/bash +echo "Sharding Context $*" diff --git a/settings.gradle.kts b/settings.gradle.kts index 3b6bf858fb56..4c97122a379f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -142,6 +142,7 @@ include(":instrumentation:apache-dbcp-2.0:testing") include(":instrumentation:apache-dubbo-2.7:javaagent") include(":instrumentation:apache-dubbo-2.7:library-autoconfigure") include(":instrumentation:apache-dubbo-2.7:testing") +include(":instrumentation:apache-elasticjob-3.0:javaagent") include(":instrumentation:apache-httpasyncclient-4.1:javaagent") include(":instrumentation:apache-httpclient:apache-httpclient-2.0:javaagent") include(":instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent")