-
Notifications
You must be signed in to change notification settings - Fork 1k
Add support for Apache Elasticjob #14933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+1,066
−0
Merged
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
456a8b4
feat: add support for elasticJob
steverao 011d725
feat: add support for elasticjob
steverao c501345
Improve code formatting
steverao 36ded9c
fix: remove redundant newline in settings.gradle.kts
steverao a3efe69
Fix CI failings
steverao 0c4aabe
Merge branch 'main' into elasticjob
laurit b751d37
Address review comments
steverao 29f5b47
refactor: improve ElasticJobTest and EmbedZookeeperServer for better …
steverao 901516b
./gradlew spotlessApply
otelbot[bot] 14dc3ce
refactor: streamline ElasticJobTest structure and enhance attribute a…
steverao 9b0ead0
./gradlew spotlessApply
otelbot[bot] d712d3c
refactor: rename ElasticJobExecutorAdvice to ProcessAdvice
steverao b4604cf
Optimize code
steverao de49063
Address review comment
steverao 7c1dd52
Address review comment
steverao 08804f7
Merge branch 'main' into elasticjob
steverao 149ef02
Merge branch 'main' into elasticjob
laurit 150a7cc
review
laurit 6163e73
review
laurit 4303a78
Address review comments
steverao 32d463b
Address review comments
steverao 4e306cf
review
laurit File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| # Settings for the Apache ElasticJob instrumentation | ||
|
|
||
| | System property | Type | Default | Description | | ||
| |-----------------------------------------------------------------------|---------|---------|-----------------------------------------------------| | ||
| | `otel.instrumentation.apache-elasticjob.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | |
25 changes: 25 additions & 0 deletions
25
instrumentation/apache-elasticjob-3.0/javaagent/build.gradle.kts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| plugins { | ||
| id("otel.javaagent-instrumentation") | ||
| } | ||
|
|
||
| muzzle { | ||
| pass { | ||
| group.set("org.apache.shardingsphere.elasticjob") | ||
| module.set("elasticjob-lite-core") | ||
| versions.set("[3.0.0,)") | ||
| assertInverse.set(true) | ||
| } | ||
| } | ||
|
|
||
| dependencies { | ||
| library("org.apache.shardingsphere.elasticjob:elasticjob-lite-core:3.0.0") | ||
|
|
||
| testImplementation("org.apache.curator:curator-test:5.1.0") | ||
| } | ||
|
|
||
| tasks.withType<Test>().configureEach { | ||
| // required on jdk17 | ||
| jvmArgs("--add-opens=java.base/java.lang=ALL-UNNAMED") | ||
| jvmArgs("-XX:+IgnoreUnrecognizedVMOptions") | ||
steverao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| jvmArgs("-Dotel.instrumentation.apache-elasticjob.experimental-span-attributes=true") | ||
| } | ||
76 changes: 76 additions & 0 deletions
76
...y/javaagent/instrumentation/apacheelasticjob/v3_0/DataflowJobExecutorInstrumentation.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; | ||
|
|
||
| import static io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0.ElasticJobSingletons.helper; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
| import static net.bytebuddy.matcher.ElementMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
|
|
||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; | ||
| import javax.annotation.Nullable; | ||
| import net.bytebuddy.asm.Advice; | ||
| import net.bytebuddy.description.type.TypeDescription; | ||
| import net.bytebuddy.matcher.ElementMatcher; | ||
| import org.apache.shardingsphere.elasticjob.api.ShardingContext; | ||
| import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob; | ||
|
|
||
| public class DataflowJobExecutorInstrumentation implements TypeInstrumentation { | ||
|
|
||
| @Override | ||
| public ElementMatcher<TypeDescription> typeMatcher() { | ||
| return named("org.apache.shardingsphere.elasticjob.dataflow.executor.DataflowJobExecutor"); | ||
| } | ||
|
|
||
| @Override | ||
| public void transform(TypeTransformer transformer) { | ||
| transformer.applyAdviceToMethod( | ||
| isMethod() | ||
| .and(named("process")) | ||
| .and( | ||
| takesArgument( | ||
| 0, named("org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob"))) | ||
| .and( | ||
| takesArgument( | ||
| 3, named("org.apache.shardingsphere.elasticjob.api.ShardingContext"))), | ||
| DataflowJobExecutorInstrumentation.class.getName() + "$ProcessAdvice"); | ||
| } | ||
|
|
||
| @SuppressWarnings("unused") | ||
| public static class ProcessAdvice { | ||
|
|
||
| @Nullable | ||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static ElasticJobHelper.ElasticJobScope onEnter( | ||
| @Advice.Argument(0) DataflowJob<?> elasticJob, | ||
| @Advice.Argument(3) ShardingContext shardingContext) { | ||
|
|
||
| ElasticJobProcessRequest request = | ||
| ElasticJobProcessRequest.createWithUserJobInfo( | ||
| shardingContext.getJobName(), | ||
| shardingContext.getTaskId(), | ||
| shardingContext.getShardingItem(), | ||
| shardingContext.getShardingTotalCount(), | ||
| shardingContext.getShardingParameter() != null | ||
| ? shardingContext.getShardingParameter() | ||
| : "", | ||
steverao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "DATAFLOW", | ||
steverao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| elasticJob.getClass(), | ||
| "processData"); | ||
|
|
||
| return helper().startSpan(request); | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void onExit( | ||
| @Advice.Enter ElasticJobHelper.ElasticJobScope scope, @Advice.Thrown Throwable throwable) { | ||
| if (scope != null) { | ||
| helper().endSpan(scope, throwable); | ||
| } | ||
| } | ||
| } | ||
| } | ||
29 changes: 29 additions & 0 deletions
29
...metry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobCodeAttributesGetter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; | ||
|
|
||
| import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesGetter; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| class ElasticJobCodeAttributesGetter implements CodeAttributesGetter<ElasticJobProcessRequest> { | ||
| @Nullable | ||
| @Override | ||
| public Class<?> getCodeClass(ElasticJobProcessRequest request) { | ||
| if (request.isScriptJob() || request.isHttpJob()) { | ||
| return null; | ||
| } | ||
| return request.getUserJobClass(); | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public String getMethodName(ElasticJobProcessRequest request) { | ||
| if (request.isScriptJob() || request.isHttpJob()) { | ||
| return null; | ||
| } | ||
| return request.getUserMethodName() != null ? request.getUserMethodName() : "process"; | ||
steverao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
80 changes: 80 additions & 0 deletions
80
...ry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobExecutorInstrumentation.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; | ||
|
|
||
| import static io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0.ElasticJobSingletons.helper; | ||
| import static io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0.JobTypeHelper.determineJobTypeFromExecutor; | ||
| import static net.bytebuddy.matcher.ElementMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
|
|
||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; | ||
| import javax.annotation.Nullable; | ||
| import net.bytebuddy.asm.Advice; | ||
| import net.bytebuddy.description.type.TypeDescription; | ||
| import net.bytebuddy.matcher.ElementMatcher; | ||
| import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts; | ||
|
|
||
| public class ElasticJobExecutorInstrumentation implements TypeInstrumentation { | ||
| @Override | ||
| public ElementMatcher<TypeDescription> typeMatcher() { | ||
| return named("org.apache.shardingsphere.elasticjob.executor.ElasticJobExecutor"); | ||
| } | ||
|
|
||
| @Override | ||
| public void transform(TypeTransformer transformer) { | ||
| transformer.applyAdviceToMethod( | ||
| named("process") | ||
| .and( | ||
| takesArgument( | ||
| 0, named("org.apache.shardingsphere.elasticjob.api.JobConfiguration"))) | ||
| .and( | ||
| takesArgument( | ||
| 1, | ||
| named("org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts"))) | ||
| .and(takesArgument(2, int.class)) | ||
| .and( | ||
| takesArgument( | ||
| 3, | ||
| named("org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent"))), | ||
| ElasticJobExecutorInstrumentation.class.getName() + "$ProcessAdvice"); | ||
| } | ||
|
|
||
| @SuppressWarnings("unused") | ||
| public static class ProcessAdvice { | ||
|
|
||
| @Nullable | ||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static ElasticJobHelper.ElasticJobScope onEnter( | ||
| @Advice.FieldValue("jobItemExecutor") Object jobItemExecutor, | ||
| @Advice.Argument(1) ShardingContexts shardingContexts, | ||
| @Advice.Argument(2) int item) { | ||
|
|
||
| String jobType = determineJobTypeFromExecutor(jobItemExecutor); | ||
| if (!"SCRIPT".equals(jobType) && !"HTTP".equals(jobType)) { | ||
| return null; | ||
| } | ||
| ElasticJobProcessRequest request = | ||
| ElasticJobProcessRequest.create( | ||
| shardingContexts.getJobName(), | ||
| shardingContexts.getTaskId(), | ||
| item, | ||
| shardingContexts.getShardingTotalCount(), | ||
| shardingContexts.getShardingItemParameters() == null | ||
| ? "" | ||
| : shardingContexts.getShardingItemParameters().toString(), | ||
| jobType); | ||
| return helper().startSpan(request); | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void onExit( | ||
| @Advice.Enter @Nullable ElasticJobHelper.ElasticJobScope scope, | ||
| @Advice.Thrown @Nullable Throwable throwable) { | ||
| helper().endSpan(scope, throwable); | ||
| } | ||
| } | ||
| } |
52 changes: 52 additions & 0 deletions
52
...agent/instrumentation/apacheelasticjob/v3_0/ElasticJobExperimentalAttributeExtractor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; | ||
|
|
||
| import io.opentelemetry.api.common.AttributeKey; | ||
| import io.opentelemetry.api.common.AttributesBuilder; | ||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| class ElasticJobExperimentalAttributeExtractor | ||
| implements AttributesExtractor<ElasticJobProcessRequest, Void> { | ||
|
|
||
| private static final AttributeKey<String> ELASTICJOB_JOB_NAME = | ||
| AttributeKey.stringKey("scheduling.apache-elasticjob.job.name"); | ||
steverao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private static final AttributeKey<String> ELASTICJOB_TASK_ID = | ||
| AttributeKey.stringKey("scheduling.apache-elasticjob.task.id"); | ||
| private static final AttributeKey<Long> ELASTICJOB_ITEM = | ||
| AttributeKey.longKey("scheduling.apache-elasticjob.item"); | ||
steverao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private static final AttributeKey<Long> ELASTICJOB_SHARDING_TOTAL_COUNT = | ||
| AttributeKey.longKey("scheduling.apache-elasticjob.sharding.total.count"); | ||
| private static final AttributeKey<String> ELASTICJOB_SHARDING_ITEM_PARAMETERS = | ||
| AttributeKey.stringKey("scheduling.apache-elasticjob.sharding.item.parameters"); | ||
|
|
||
| @Override | ||
| public void onStart( | ||
| AttributesBuilder attributes, | ||
| Context parentContext, | ||
| ElasticJobProcessRequest elasticJobProcessRequest) { | ||
| attributes.put(ELASTICJOB_JOB_NAME, elasticJobProcessRequest.getJobName()); | ||
| attributes.put(ELASTICJOB_TASK_ID, elasticJobProcessRequest.getTaskId()); | ||
| attributes.put(ELASTICJOB_ITEM, elasticJobProcessRequest.getItem()); | ||
| attributes.put( | ||
| ELASTICJOB_SHARDING_TOTAL_COUNT, elasticJobProcessRequest.getShardingTotalCount()); | ||
| if (elasticJobProcessRequest.getShardingItemParameters() != null) { | ||
steverao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| attributes.put( | ||
| ELASTICJOB_SHARDING_ITEM_PARAMETERS, | ||
| elasticJobProcessRequest.getShardingItemParameters()); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onEnd( | ||
| AttributesBuilder attributes, | ||
| Context context, | ||
| ElasticJobProcessRequest elasticJobProcessRequest, | ||
| @Nullable Void unused, | ||
| @Nullable Throwable error) {} | ||
| } | ||
52 changes: 52 additions & 0 deletions
52
...va/io/opentelemetry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobHelper.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; | ||
|
|
||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.context.Scope; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| public final class ElasticJobHelper { | ||
| private final Instrumenter<ElasticJobProcessRequest, Void> instrumenter; | ||
|
|
||
| private ElasticJobHelper(Instrumenter<ElasticJobProcessRequest, Void> instrumenter) { | ||
| this.instrumenter = instrumenter; | ||
| } | ||
|
|
||
| public static ElasticJobHelper create(Instrumenter<ElasticJobProcessRequest, Void> instrumenter) { | ||
| return new ElasticJobHelper(instrumenter); | ||
| } | ||
|
|
||
| @Nullable | ||
| public ElasticJobScope startSpan(ElasticJobProcessRequest request) { | ||
| Context parentContext = Context.current(); | ||
| if (!this.instrumenter.shouldStart(parentContext, request)) { | ||
| return null; | ||
| } | ||
| Context context = this.instrumenter.start(parentContext, request); | ||
| return new ElasticJobScope(request, context, context.makeCurrent()); | ||
| } | ||
|
|
||
| public void endSpan(@Nullable ElasticJobScope scope, @Nullable Throwable throwable) { | ||
| if (scope != null) { | ||
| scope.scope.close(); | ||
| this.instrumenter.end(scope.context, scope.request, null, throwable); | ||
| } | ||
| } | ||
|
|
||
| public static class ElasticJobScope { | ||
| private final ElasticJobProcessRequest request; | ||
| private final Context context; | ||
| private final Scope scope; | ||
|
|
||
| private ElasticJobScope(ElasticJobProcessRequest request, Context context, Scope scope) { | ||
| this.request = request; | ||
| this.context = context; | ||
| this.scope = scope; | ||
| } | ||
| } | ||
| } |
28 changes: 28 additions & 0 deletions
28
...etry/javaagent/instrumentation/apacheelasticjob/v3_0/ElasticJobInstrumentationModule.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.apacheelasticjob.v3_0; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
|
|
||
| @AutoService(InstrumentationModule.class) | ||
| public class ElasticJobInstrumentationModule extends InstrumentationModule { | ||
steverao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| public ElasticJobInstrumentationModule() { | ||
| super("apache-elasticjob", "apache-elasticjob-3.0"); | ||
| } | ||
|
|
||
| @Override | ||
| public List<TypeInstrumentation> typeInstrumentations() { | ||
| return Arrays.asList( | ||
steverao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| new ElasticJobExecutorInstrumentation(), | ||
| new SimpleJobExecutorInstrumentation(), | ||
| new DataflowJobExecutorInstrumentation()); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.