-
Notifications
You must be signed in to change notification settings - Fork 1k
Add support for PowerJob #12086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for PowerJob #12086
Changes from 22 commits
352214c
95acfcf
4f903b1
ddacbdc
32782bc
7fd29e5
29344b1
e1681c3
54efc0c
7f3236e
6f4faeb
ec6cd9a
029e00e
353b669
2bac40b
878e7d2
6cbfb63
28a9ff1
d933e0b
fe81649
2222dd8
ef3e686
fd29bb0
2b43e92
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| # Settings for the PowerJob instrumentation | ||
|
|
||
| | System property | Type | Default | Description | | ||
| |--------------------------------------------------------------|---------|---------|-----------------------------------------------------| | ||
| | `otel.instrumentation.powerjob.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| plugins { | ||
| id("otel.javaagent-instrumentation") | ||
| } | ||
|
|
||
| muzzle { | ||
| pass { | ||
| group.set("tech.powerjob") | ||
| module.set("powerjob-worker") | ||
| versions.set("[4.0.0,)") | ||
| assertInverse.set(true) | ||
| extraDependency("tech.powerjob:powerjob-official-processors:1.1.0") | ||
| } | ||
| } | ||
|
|
||
| dependencies { | ||
| library("tech.powerjob:powerjob-worker:4.0.0") | ||
| library("tech.powerjob:powerjob-official-processors:1.1.0") | ||
| } | ||
|
|
||
| tasks.withType<Test>().configureEach { | ||
| // required on jdk17 | ||
| jvmArgs("--add-opens=java.base/java.lang=ALL-UNNAMED") | ||
| jvmArgs("-XX:+IgnoreUnrecognizedVMOptions") | ||
| jvmArgs("-Dotel.instrumentation.powerjob.experimental-span-attributes=true") | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
|
|
||
| import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; | ||
| import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; | ||
| import static io.opentelemetry.javaagent.instrumentation.powerjob.v4_0.PowerJobSingletons.helper; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isPublic; | ||
| import static net.bytebuddy.matcher.ElementMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
|
||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.context.Scope; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; | ||
| import net.bytebuddy.asm.Advice; | ||
| import net.bytebuddy.description.type.TypeDescription; | ||
| import net.bytebuddy.matcher.ElementMatcher; | ||
| import tech.powerjob.worker.core.processor.ProcessResult; | ||
| import tech.powerjob.worker.core.processor.TaskContext; | ||
| import tech.powerjob.worker.core.processor.sdk.BasicProcessor; | ||
|
|
||
| public class BasicProcessorInstrumentation implements TypeInstrumentation { | ||
| @Override | ||
| public ElementMatcher<TypeDescription> typeMatcher() { | ||
| return implementsInterface(named("tech.powerjob.worker.core.processor.sdk.BasicProcessor")); | ||
| } | ||
|
|
||
| @Override | ||
| public void transform(TypeTransformer transformer) { | ||
| transformer.applyAdviceToMethod( | ||
| named("process") | ||
| .and(isPublic()) | ||
| .and( | ||
| takesArguments(1) | ||
| .and( | ||
| takesArgument( | ||
| 0, named("tech.powerjob.worker.core.processor.TaskContext")))), | ||
| BasicProcessorInstrumentation.class.getName() + "$ProcessAdvice"); | ||
| } | ||
|
|
||
| public static class ProcessAdvice { | ||
|
|
||
| @SuppressWarnings("unused") | ||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static void onSchedule( | ||
| @Advice.This BasicProcessor handler, | ||
| @Advice.Argument(0) TaskContext taskContext, | ||
| @Advice.Local("otelRequest") PowerJobProcessRequest request, | ||
| @Advice.Local("otelContext") Context context, | ||
| @Advice.Local("otelScope") Scope scope) { | ||
| Context parentContext = currentContext(); | ||
| request = | ||
| PowerJobProcessRequest.createRequest( | ||
| taskContext.getJobId(), | ||
| handler, | ||
| "process", | ||
| taskContext.getJobParams(), | ||
| taskContext.getInstanceParams()); | ||
| context = helper().startSpan(parentContext, request); | ||
| if (context == null) { | ||
| return; | ||
| } | ||
| scope = context.makeCurrent(); | ||
| } | ||
|
|
||
| @SuppressWarnings("unused") | ||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void stopSpan( | ||
| @Advice.Return ProcessResult result, | ||
| @Advice.Thrown Throwable throwable, | ||
| @Advice.Local("otelRequest") PowerJobProcessRequest request, | ||
| @Advice.Local("otelContext") Context context, | ||
| @Advice.Local("otelScope") Scope scope) { | ||
| helper().stopSpan(result, request, throwable, scope, context); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
|
|
||
| import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesGetter; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| class PowerJobCodeAttributesGetter implements CodeAttributesGetter<PowerJobProcessRequest> { | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public Class<?> getCodeClass(PowerJobProcessRequest powerJobProcessRequest) { | ||
| return powerJobProcessRequest.getDeclaringClass(); | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public String getMethodName(PowerJobProcessRequest powerJobProcessRequest) { | ||
| return powerJobProcessRequest.getMethodName(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
|
|
||
| public final class PowerJobConstants { | ||
|
|
||
| private PowerJobConstants() {} | ||
|
|
||
| public static final String BASIC_PROCESSOR = "BasicProcessor"; | ||
|
||
| public static final String BROADCAST_PROCESSOR = "BroadcastProcessor"; | ||
| public static final String MAP_PROCESSOR = "MapProcessor"; | ||
| public static final String MAP_REDUCE_PROCESSOR = "MapReduceProcessor"; | ||
|
|
||
| // Official processors | ||
| public static final String SHELL_PROCESSOR = "ShellProcessor"; | ||
| public static final String PYTHON_PROCESSOR = "PythonProcessor"; | ||
| public static final String HTTP_PROCESSOR = "HttpProcessor"; | ||
| public static final String FILE_CLEANUP_PROCESSOR = "FileCleanupProcessor"; | ||
| public static final String SPRING_DATASOURCE_SQL_PROCESSOR = "SpringDatasourceSqlProcessor"; | ||
| public static final String DYNAMIC_DATASOURCE_SQL_PROCESSOR = "DynamicDatasourceSqlProcessor"; | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
|
|
||
| import io.opentelemetry.api.common.AttributeKey; | ||
| import io.opentelemetry.api.common.AttributesBuilder; | ||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| class PowerJobExperimentalAttributeExtractor | ||
| implements AttributesExtractor<PowerJobProcessRequest, Void> { | ||
|
|
||
| private static final AttributeKey<Long> POWERJOB_JOB_ID = | ||
| AttributeKey.longKey("scheduling.powerjob.job.id"); | ||
| private static final AttributeKey<String> POWERJOB_JOB_PARAM = | ||
| AttributeKey.stringKey("scheduling.powerjob.job.param"); | ||
| private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_PARAM = | ||
| AttributeKey.stringKey("scheduling.powerjob.job.instance.param"); | ||
| private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_TYPE = | ||
| AttributeKey.stringKey("scheduling.powerjob.job.type"); | ||
|
|
||
| @Override | ||
| public void onStart( | ||
| AttributesBuilder attributes, | ||
| Context parentContext, | ||
| PowerJobProcessRequest powerJobProcessRequest) { | ||
| attributes.put(POWERJOB_JOB_ID, powerJobProcessRequest.getJobId()); | ||
| attributes.put(POWERJOB_JOB_PARAM, powerJobProcessRequest.getJobParams()); | ||
| attributes.put(POWERJOB_JOB_INSTANCE_PARAM, powerJobProcessRequest.getInstanceParams()); | ||
| attributes.put(POWERJOB_JOB_INSTANCE_TYPE, powerJobProcessRequest.getJobType()); | ||
| } | ||
|
|
||
| @Override | ||
| public void onEnd( | ||
| AttributesBuilder attributes, | ||
| Context context, | ||
| PowerJobProcessRequest powerJobProcessRequest, | ||
| @Nullable Void unused, | ||
| @Nullable Throwable error) {} | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
|
|
||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.context.Scope; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; | ||
| import java.util.function.Predicate; | ||
| import tech.powerjob.worker.core.processor.ProcessResult; | ||
|
|
||
| public final class PowerJobHelper { | ||
|
||
|
|
||
| private final Instrumenter<PowerJobProcessRequest, Void> instrumenter; | ||
|
|
||
| private final Predicate<ProcessResult> failedStatusPredicate; | ||
|
|
||
| private PowerJobHelper( | ||
| Instrumenter<PowerJobProcessRequest, Void> instrumenter, | ||
| Predicate<ProcessResult> failedStatusPredicate) { | ||
| this.instrumenter = instrumenter; | ||
| this.failedStatusPredicate = failedStatusPredicate; | ||
| } | ||
|
|
||
| public static PowerJobHelper create( | ||
| Instrumenter<PowerJobProcessRequest, Void> instrumenter, | ||
| Predicate<ProcessResult> failedStatusPredicate) { | ||
| return new PowerJobHelper(instrumenter, failedStatusPredicate); | ||
| } | ||
|
|
||
| public Context startSpan(Context parentContext, PowerJobProcessRequest request) { | ||
| if (!instrumenter.shouldStart(parentContext, request)) { | ||
| return null; | ||
| } | ||
| return instrumenter.start(parentContext, request); | ||
| } | ||
|
|
||
| public void stopSpan( | ||
| ProcessResult result, | ||
| PowerJobProcessRequest request, | ||
| Throwable throwable, | ||
| Scope scope, | ||
| Context context) { | ||
| if (scope == null) { | ||
| return; | ||
| } | ||
| if (failedStatusPredicate.test(result)) { | ||
| request.setFailed(); | ||
| } | ||
|
||
| scope.close(); | ||
| instrumenter.end(context, request, null, throwable); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| @AutoService(InstrumentationModule.class) | ||
| public class PowerJobInstrumentationModule extends InstrumentationModule { | ||
| public PowerJobInstrumentationModule() { | ||
| super("powerjob", "powerjob-4.0"); | ||
| } | ||
|
|
||
| @Override | ||
| public List<TypeInstrumentation> typeInstrumentations() { | ||
| return Collections.singletonList(new BasicProcessorInstrumentation()); | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.