- 
                Notifications
    
You must be signed in to change notification settings  - Fork 1k
 
Add support for PowerJob #12086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for PowerJob #12086
Changes from 23 commits
352214c
              95acfcf
              4f903b1
              ddacbdc
              32782bc
              7fd29e5
              29344b1
              e1681c3
              54efc0c
              7f3236e
              6f4faeb
              ec6cd9a
              029e00e
              353b669
              2bac40b
              878e7d2
              6cbfb63
              28a9ff1
              d933e0b
              fe81649
              2222dd8
              ef3e686
              fd29bb0
              2b43e92
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| # Settings for the PowerJob instrumentation | ||
| 
     | 
||
| | System property | Type | Default | Description | | ||
| |--------------------------------------------------------------|---------|---------|-----------------------------------------------------| | ||
| | `otel.instrumentation.powerjob.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| plugins { | ||
| id("otel.javaagent-instrumentation") | ||
| } | ||
| 
     | 
||
| muzzle { | ||
| pass { | ||
| group.set("tech.powerjob") | ||
| module.set("powerjob-worker") | ||
| versions.set("[4.0.0,)") | ||
| assertInverse.set(true) | ||
| extraDependency("tech.powerjob:powerjob-official-processors:1.1.0") | ||
| } | ||
| } | ||
| 
     | 
||
| dependencies { | ||
| library("tech.powerjob:powerjob-worker:4.0.0") | ||
| library("tech.powerjob:powerjob-official-processors:1.1.0") | ||
| } | ||
| 
     | 
||
| tasks.withType<Test>().configureEach { | ||
| // required on jdk17 | ||
| jvmArgs("--add-opens=java.base/java.lang=ALL-UNNAMED") | ||
| jvmArgs("-XX:+IgnoreUnrecognizedVMOptions") | ||
| jvmArgs("-Dotel.instrumentation.powerjob.experimental-span-attributes=true") | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; | ||
| import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; | ||
| import static io.opentelemetry.javaagent.instrumentation.powerjob.v4_0.PowerJobSingletons.instrumenter; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isPublic; | ||
| import static net.bytebuddy.matcher.ElementMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
| 
     | 
||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.context.Scope; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; | ||
| import net.bytebuddy.asm.Advice; | ||
| import net.bytebuddy.description.type.TypeDescription; | ||
| import net.bytebuddy.matcher.ElementMatcher; | ||
| import tech.powerjob.worker.core.processor.ProcessResult; | ||
| import tech.powerjob.worker.core.processor.TaskContext; | ||
| import tech.powerjob.worker.core.processor.sdk.BasicProcessor; | ||
| 
     | 
||
| public class BasicProcessorInstrumentation implements TypeInstrumentation { | ||
| @Override | ||
| public ElementMatcher<TypeDescription> typeMatcher() { | ||
| return implementsInterface(named("tech.powerjob.worker.core.processor.sdk.BasicProcessor")); | ||
| } | ||
| 
     | 
||
| @Override | ||
| public void transform(TypeTransformer transformer) { | ||
| transformer.applyAdviceToMethod( | ||
| named("process") | ||
| .and(isPublic()) | ||
| .and( | ||
| takesArguments(1) | ||
| .and( | ||
| takesArgument( | ||
| 0, named("tech.powerjob.worker.core.processor.TaskContext")))), | ||
| BasicProcessorInstrumentation.class.getName() + "$ProcessAdvice"); | ||
| } | ||
| 
     | 
||
| public static class ProcessAdvice { | ||
| 
     | 
||
| @SuppressWarnings("unused") | ||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static void onSchedule( | ||
| @Advice.This BasicProcessor handler, | ||
| @Advice.Argument(0) TaskContext taskContext, | ||
| @Advice.Local("otelRequest") PowerJobProcessRequest request, | ||
| @Advice.Local("otelContext") Context context, | ||
| @Advice.Local("otelScope") Scope scope) { | ||
| Context parentContext = currentContext(); | ||
| request = | ||
| PowerJobProcessRequest.createRequest( | ||
| taskContext.getJobId(), | ||
| handler, | ||
| "process", | ||
| taskContext.getJobParams(), | ||
| taskContext.getInstanceParams()); | ||
| 
     | 
||
| if (!instrumenter().shouldStart(parentContext, request)) { | ||
| return; | ||
| } | ||
| context = instrumenter().start(parentContext, request); | ||
| scope = context.makeCurrent(); | ||
| } | ||
| 
     | 
||
| @SuppressWarnings("unused") | ||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void stopSpan( | ||
| @Advice.Return ProcessResult result, | ||
| @Advice.Thrown Throwable throwable, | ||
| @Advice.Local("otelRequest") PowerJobProcessRequest request, | ||
| @Advice.Local("otelContext") Context context, | ||
| @Advice.Local("otelScope") Scope scope) { | ||
| if (scope == null) { | ||
| return; | ||
| } | ||
| scope.close(); | ||
| instrumenter().end(context, request, result, throwable); | ||
| } | ||
| } | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesGetter; | ||
| import javax.annotation.Nullable; | ||
| 
     | 
||
| class PowerJobCodeAttributesGetter implements CodeAttributesGetter<PowerJobProcessRequest> { | ||
| 
     | 
||
| @Nullable | ||
| @Override | ||
| public Class<?> getCodeClass(PowerJobProcessRequest powerJobProcessRequest) { | ||
| return powerJobProcessRequest.getDeclaringClass(); | ||
| } | ||
| 
     | 
||
| @Nullable | ||
| @Override | ||
| public String getMethodName(PowerJobProcessRequest powerJobProcessRequest) { | ||
| return powerJobProcessRequest.getMethodName(); | ||
| } | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import io.opentelemetry.api.common.AttributeKey; | ||
| import io.opentelemetry.api.common.AttributesBuilder; | ||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; | ||
| import javax.annotation.Nullable; | ||
| import tech.powerjob.worker.core.processor.ProcessResult; | ||
| 
     | 
||
| class PowerJobExperimentalAttributeExtractor | ||
| implements AttributesExtractor<PowerJobProcessRequest, ProcessResult> { | ||
| 
     | 
||
| private static final AttributeKey<Long> POWERJOB_JOB_ID = | ||
| AttributeKey.longKey("scheduling.powerjob.job.id"); | ||
| private static final AttributeKey<String> POWERJOB_JOB_PARAM = | ||
| AttributeKey.stringKey("scheduling.powerjob.job.param"); | ||
| private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_PARAM = | ||
| AttributeKey.stringKey("scheduling.powerjob.job.instance.param"); | ||
| private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_TYPE = | ||
| AttributeKey.stringKey("scheduling.powerjob.job.type"); | ||
| 
     | 
||
| @Override | ||
| public void onStart( | ||
| AttributesBuilder attributes, | ||
| Context parentContext, | ||
| PowerJobProcessRequest powerJobProcessRequest) { | ||
| attributes.put(POWERJOB_JOB_ID, powerJobProcessRequest.getJobId()); | ||
| attributes.put(POWERJOB_JOB_PARAM, powerJobProcessRequest.getJobParams()); | ||
| attributes.put(POWERJOB_JOB_INSTANCE_PARAM, powerJobProcessRequest.getInstanceParams()); | ||
| attributes.put(POWERJOB_JOB_INSTANCE_TYPE, powerJobProcessRequest.getJobType()); | ||
| } | ||
| 
     | 
||
| @Override | ||
| public void onEnd( | ||
| AttributesBuilder attributes, | ||
| Context context, | ||
| PowerJobProcessRequest powerJobProcessRequest, | ||
| @Nullable ProcessResult unused, | ||
| @Nullable Throwable error) {} | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import com.google.auto.service.AutoService; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| 
     | 
||
| @AutoService(InstrumentationModule.class) | ||
| public class PowerJobInstrumentationModule extends InstrumentationModule { | ||
| public PowerJobInstrumentationModule() { | ||
| super("powerjob", "powerjob-4.0"); | ||
| } | ||
| 
     | 
||
| @Override | ||
| public List<TypeInstrumentation> typeInstrumentations() { | ||
| return Collections.singletonList(new BasicProcessorInstrumentation()); | ||
| } | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import tech.powerjob.official.processors.impl.FileCleanupProcessor; | ||
| import tech.powerjob.official.processors.impl.HttpProcessor; | ||
| import tech.powerjob.official.processors.impl.script.PythonProcessor; | ||
| import tech.powerjob.official.processors.impl.script.ShellProcessor; | ||
| import tech.powerjob.official.processors.impl.sql.DynamicDatasourceSqlProcessor; | ||
| import tech.powerjob.official.processors.impl.sql.SpringDatasourceSqlProcessor; | ||
| import tech.powerjob.worker.core.processor.sdk.BasicProcessor; | ||
| import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor; | ||
| import tech.powerjob.worker.core.processor.sdk.MapProcessor; | ||
| import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; | ||
| 
     | 
||
| public final class PowerJobProcessRequest { | ||
| private final String methodName; | ||
| private final Long jobId; | ||
| private final String jobType; | ||
| private final Class<?> declaringClass; | ||
| private final String jobParams; | ||
| private final String instanceParams; | ||
| private static final List<Class<?>> KNOWN_PROCESSORS = | ||
| Arrays.asList( | ||
| FileCleanupProcessor.class, | ||
| BroadcastProcessor.class, | ||
| MapReduceProcessor.class, | ||
| MapProcessor.class, | ||
| ShellProcessor.class, | ||
| PythonProcessor.class, | ||
| HttpProcessor.class, | ||
| SpringDatasourceSqlProcessor.class, | ||
| DynamicDatasourceSqlProcessor.class); | ||
| 
     | 
||
| private PowerJobProcessRequest( | ||
| Long jobId, | ||
| String methodName, | ||
| Class<?> declaringClass, | ||
| String jobParams, | ||
| String instanceParams, | ||
| String jobType) { | ||
| this.jobId = jobId; | ||
| this.methodName = methodName; | ||
| this.jobType = jobType; | ||
| this.declaringClass = declaringClass; | ||
| this.jobParams = jobParams; | ||
| this.instanceParams = instanceParams; | ||
| } | ||
| 
     | 
||
| public static PowerJobProcessRequest createRequest( | ||
| Long jobId, | ||
| BasicProcessor handler, | ||
| String methodName, | ||
| String jobParams, | ||
| String instanceParams) { | ||
| String jobType = "BasicProcessor"; | ||
| for (Class<?> processorClass : KNOWN_PROCESSORS) { | ||
| if (processorClass.isInstance(handler)) { | ||
| jobType = processorClass.getSimpleName(); | ||
| break; | ||
| } | ||
| } | ||
| return new PowerJobProcessRequest( | ||
| jobId, methodName, handler.getClass(), jobParams, instanceParams, jobType); | ||
| } | ||
| 
     | 
||
| public String getMethodName() { | ||
| return methodName; | ||
| } | ||
| 
     | 
||
| public Long getJobId() { | ||
| return jobId; | ||
| } | ||
| 
     | 
||
| public Class<?> getDeclaringClass() { | ||
| return declaringClass; | ||
| } | ||
| 
     | 
||
| public String getJobParams() { | ||
| return jobParams; | ||
| } | ||
| 
     | 
||
| public String getInstanceParams() { | ||
| return instanceParams; | ||
| } | ||
| 
     | 
||
| public String getJobType() { | ||
| return jobType; | ||
| } | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import io.opentelemetry.api.GlobalOpenTelemetry; | ||
| import io.opentelemetry.api.common.AttributeKey; | ||
| import io.opentelemetry.api.trace.StatusCode; | ||
| import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesExtractor; | ||
| import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeSpanNameExtractor; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; | ||
| import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig; | ||
| import tech.powerjob.worker.core.processor.ProcessResult; | ||
| 
     | 
||
| public final class PowerJobSingletons { | ||
| private static final String INSTRUMENTATION_NAME = "io.opentelemetry.powerjob-4.0"; | ||
| 
     | 
||
| private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = | ||
| AgentInstrumentationConfig.get() | ||
| .getBoolean("otel.instrumentation.powerjob.experimental-span-attributes", false); | ||
| private static final Instrumenter<PowerJobProcessRequest, ProcessResult> INSTRUMENTER = create(); | ||
| 
     | 
||
| public static Instrumenter<PowerJobProcessRequest, ProcessResult> instrumenter() { | ||
| return INSTRUMENTER; | ||
| } | ||
| 
     | 
||
| private static Instrumenter<PowerJobProcessRequest, ProcessResult> create() { | ||
| PowerJobCodeAttributesGetter codeAttributesGetter = new PowerJobCodeAttributesGetter(); | ||
| SpanNameExtractor<PowerJobProcessRequest> spanNameExtractor = | ||
| CodeSpanNameExtractor.create(codeAttributesGetter); | ||
| 
     | 
||
| InstrumenterBuilder<PowerJobProcessRequest, ProcessResult> builder = | ||
| Instrumenter.<PowerJobProcessRequest, ProcessResult>builder( | ||
| GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) | ||
| .addAttributesExtractor(CodeAttributesExtractor.create(codeAttributesGetter)) | ||
| .setSpanStatusExtractor( | ||
| (spanStatusBuilder, powerJobProcessRequest, response, error) -> { | ||
| if (error != null || response == null || !response.isSuccess()) { | ||
                
       | 
||
| spanStatusBuilder.setStatus(StatusCode.ERROR); | ||
| } | ||
| }); | ||
| 
     | 
||
| if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { | ||
| builder.addAttributesExtractor( | ||
| AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "powerjob")); | ||
| builder.addAttributesExtractor(new PowerJobExperimentalAttributeExtractor()); | ||
| } | ||
| 
     | 
||
| return builder.buildInstrumenter(); | ||
| } | ||
| 
     | 
||
| private PowerJobSingletons() {} | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.