- 
                Notifications
    
You must be signed in to change notification settings  - Fork 1k
 
Add support for PowerJob #12086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for PowerJob #12086
Changes from 18 commits
352214c
              95acfcf
              4f903b1
              ddacbdc
              32782bc
              7fd29e5
              29344b1
              e1681c3
              54efc0c
              7f3236e
              6f4faeb
              ec6cd9a
              029e00e
              353b669
              2bac40b
              878e7d2
              6cbfb63
              28a9ff1
              d933e0b
              fe81649
              2222dd8
              ef3e686
              fd29bb0
              2b43e92
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| # Settings for the PowerJob instrumentation | ||
| 
     | 
||
| | System property | Type | Default | Description | | ||
| |--------------------------------------------------------------|---------|---------|-----------------------------------------------------| | ||
| | `otel.instrumentation.powerjob.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| plugins { | ||
| id("otel.javaagent-instrumentation") | ||
| } | ||
| 
     | 
||
| muzzle { | ||
| pass { | ||
| group.set("tech.powerjob") | ||
| module.set("powerjob-worker") | ||
| versions.set("[4.0.0,)") | ||
| assertInverse.set(true) | ||
| extraDependency("tech.powerjob:powerjob-official-processors:1.1.0") | ||
| } | ||
| } | ||
| 
     | 
||
| dependencies { | ||
| library("tech.powerjob:powerjob-worker:4.0.0") | ||
                
      
                  trask marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| library("tech.powerjob:powerjob-official-processors:1.1.0") | ||
| } | ||
| 
     | 
||
| tasks.withType<Test>().configureEach { | ||
| // required on jdk17 | ||
| jvmArgs("--add-opens=java.base/java.lang=ALL-UNNAMED") | ||
| jvmArgs("-XX:+IgnoreUnrecognizedVMOptions") | ||
| jvmArgs("-Dotel.instrumentation.powerjob.experimental-span-attributes=true") | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; | ||
| import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; | ||
| import static io.opentelemetry.javaagent.instrumentation.powerjob.v4_0.PowerJobSingletons.helper; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isPublic; | ||
| import static net.bytebuddy.matcher.ElementMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
| 
     | 
||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.context.Scope; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; | ||
| import net.bytebuddy.asm.Advice; | ||
| import net.bytebuddy.description.type.TypeDescription; | ||
| import net.bytebuddy.matcher.ElementMatcher; | ||
| import tech.powerjob.worker.core.processor.ProcessResult; | ||
| import tech.powerjob.worker.core.processor.TaskContext; | ||
| import tech.powerjob.worker.core.processor.sdk.BasicProcessor; | ||
| 
     | 
||
| public class BasicProcessorInstrumentation implements TypeInstrumentation { | ||
| @Override | ||
| public ElementMatcher<TypeDescription> typeMatcher() { | ||
| return implementsInterface(named("tech.powerjob.worker.core.processor.sdk.BasicProcessor")); | ||
| } | ||
| 
     | 
||
| @Override | ||
| public void transform(TypeTransformer transformer) { | ||
| transformer.applyAdviceToMethod( | ||
| named("process").and(isPublic()).and(takesArguments(1)), | ||
                
       | 
||
| BasicProcessorInstrumentation.class.getName() + "$ProcessAdvice"); | ||
| } | ||
| 
     | 
||
| public static class ProcessAdvice { | ||
| 
     | 
||
| @SuppressWarnings("unused") | ||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static void onSchedule( | ||
| @Advice.This BasicProcessor handler, | ||
| @Advice.Argument(0) TaskContext taskContext, | ||
| @Advice.Local("otelRequest") PowerJobProcessRequest request, | ||
| @Advice.Local("otelContext") Context context, | ||
| @Advice.Local("otelScope") Scope scope) { | ||
| Context parentContext = currentContext(); | ||
| request = PowerJobProcessRequest.createRequest(taskContext.getJobId(), handler, "process"); | ||
| request.setInstanceParams(taskContext.getInstanceParams()); | ||
| request.setJobParams(taskContext.getJobParams()); | ||
                
      
                  crossoverJie marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| context = helper().startSpan(parentContext, request); | ||
| if (context == null) { | ||
| return; | ||
| } | ||
| scope = context.makeCurrent(); | ||
| } | ||
| 
     | 
||
| @SuppressWarnings("unused") | ||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void stopSpan( | ||
| @Advice.Return ProcessResult result, | ||
| @Advice.Thrown Throwable throwable, | ||
| @Advice.Local("otelRequest") PowerJobProcessRequest request, | ||
| @Advice.Local("otelContext") Context context, | ||
| @Advice.Local("otelScope") Scope scope) { | ||
| helper().stopSpan(result, request, throwable, scope, context); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesGetter; | ||
| import javax.annotation.Nullable; | ||
| 
     | 
||
| class PowerJobCodeAttributesGetter implements CodeAttributesGetter<PowerJobProcessRequest> { | ||
| 
     | 
||
| @Nullable | ||
| @Override | ||
| public Class<?> getCodeClass(PowerJobProcessRequest powerJobProcessRequest) { | ||
| return powerJobProcessRequest.getDeclaringClass(); | ||
| } | ||
| 
     | 
||
| @Nullable | ||
| @Override | ||
| public String getMethodName(PowerJobProcessRequest powerJobProcessRequest) { | ||
| return powerJobProcessRequest.getMethodName(); | ||
| } | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| public final class PowerJobConstants { | ||
| 
     | 
||
| private PowerJobConstants() {} | ||
| 
     | 
||
| public static final String BASIC_PROCESSOR = "BasicProcessor"; | ||
                
       | 
||
| public static final String BROADCAST_PROCESSOR = "BroadcastProcessor"; | ||
| public static final String MAP_PROCESSOR = "MapProcessor"; | ||
| public static final String MAP_REDUCE_PROCESSOR = "MapReduceProcessor"; | ||
| 
     | 
||
| // Official processors | ||
| public static final String SHELL_PROCESSOR = "ShellProcessor"; | ||
| public static final String PYTHON_PROCESSOR = "PythonProcessor"; | ||
| public static final String HTTP_PROCESSOR = "HttpProcessor"; | ||
| public static final String FILE_CLEANUP_PROCESSOR = "FileCleanupProcessor"; | ||
| public static final String SPRING_DATASOURCE_SQL_PROCESSOR = "SpringDatasourceSqlProcessor"; | ||
| public static final String DYNAMIC_DATASOURCE_SQL_PROCESSOR = "DynamicDatasourceSqlProcessor"; | ||
                
       | 
||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import io.opentelemetry.api.common.AttributeKey; | ||
| import io.opentelemetry.api.common.AttributesBuilder; | ||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; | ||
| import javax.annotation.Nullable; | ||
| 
     | 
||
| class PowerJobExperimentalAttributeExtractor | ||
| implements AttributesExtractor<PowerJobProcessRequest, Void> { | ||
| 
     | 
||
| private static final AttributeKey<Long> POWERJOB_JOB_ID = | ||
| AttributeKey.longKey("scheduling.powerjob.job.id"); | ||
| private static final AttributeKey<String> POWERJOB_JOB_PARAM = | ||
| AttributeKey.stringKey("scheduling.powerjob.job.param"); | ||
| private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_PARAM = | ||
| AttributeKey.stringKey("scheduling.powerjob.job.instance.param"); | ||
| private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_TRPE = | ||
                
      
                  crossoverJie marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| AttributeKey.stringKey("scheduling.powerjob.job.type"); | ||
| 
     | 
||
| @Override | ||
| public void onStart( | ||
| AttributesBuilder attributes, | ||
| Context parentContext, | ||
| PowerJobProcessRequest powerJobProcessRequest) { | ||
| attributes.put(POWERJOB_JOB_ID, powerJobProcessRequest.getJobId()); | ||
| attributes.put(POWERJOB_JOB_PARAM, powerJobProcessRequest.getJobParams()); | ||
| attributes.put(POWERJOB_JOB_INSTANCE_PARAM, powerJobProcessRequest.getInstanceParams()); | ||
| attributes.put(POWERJOB_JOB_INSTANCE_TRPE, powerJobProcessRequest.getJobType()); | ||
| } | ||
| 
     | 
||
| @Override | ||
| public void onEnd( | ||
| AttributesBuilder attributes, | ||
| Context context, | ||
| PowerJobProcessRequest powerJobProcessRequest, | ||
| @Nullable Void unused, | ||
| @Nullable Throwable error) {} | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.context.Scope; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; | ||
| import java.util.function.Predicate; | ||
| import tech.powerjob.worker.core.processor.ProcessResult; | ||
| 
     | 
||
| public final class PowerJobHelper { | ||
                
       | 
||
| 
     | 
||
| private final Instrumenter<PowerJobProcessRequest, Void> instrumenter; | ||
| 
     | 
||
| private final Predicate<ProcessResult> failedStatusPredicate; | ||
| 
     | 
||
| private PowerJobHelper( | ||
| Instrumenter<PowerJobProcessRequest, Void> instrumenter, | ||
| Predicate<ProcessResult> failedStatusPredicate) { | ||
| this.instrumenter = instrumenter; | ||
| this.failedStatusPredicate = failedStatusPredicate; | ||
| } | ||
| 
     | 
||
| public static PowerJobHelper create( | ||
| Instrumenter<PowerJobProcessRequest, Void> instrumenter, | ||
| Predicate<ProcessResult> failedStatusPredicate) { | ||
| return new PowerJobHelper(instrumenter, failedStatusPredicate); | ||
| } | ||
| 
     | 
||
| public Context startSpan(Context parentContext, PowerJobProcessRequest request) { | ||
| if (!instrumenter.shouldStart(parentContext, request)) { | ||
| return null; | ||
| } | ||
| return instrumenter.start(parentContext, request); | ||
| } | ||
| 
     | 
||
| public void stopSpan( | ||
| ProcessResult result, | ||
| PowerJobProcessRequest request, | ||
| Throwable throwable, | ||
| Scope scope, | ||
| Context context) { | ||
| if (scope == null) { | ||
| return; | ||
| } | ||
| if (failedStatusPredicate.test(result)) { | ||
| request.setFailed(); | ||
| } | ||
                
       | 
||
| scope.close(); | ||
| instrumenter.end(context, request, null, throwable); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import static java.util.Arrays.asList; | ||
| 
     | 
||
| import com.google.auto.service.AutoService; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; | ||
| import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
| import java.util.List; | ||
| 
     | 
||
| @AutoService(InstrumentationModule.class) | ||
| public class PowerJobInstrumentationModule extends InstrumentationModule { | ||
| public PowerJobInstrumentationModule() { | ||
| super("powerjob", "powerjob-4.0"); | ||
| } | ||
| 
     | 
||
| @Override | ||
| public List<TypeInstrumentation> typeInstrumentations() { | ||
| return asList(new BasicProcessorInstrumentation()); | ||
                
      
                  crossoverJie marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
| 
     | 
||
| package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; | ||
| 
     | 
||
| import io.opentelemetry.api.GlobalOpenTelemetry; | ||
| import io.opentelemetry.api.common.AttributeKey; | ||
| import io.opentelemetry.api.trace.StatusCode; | ||
| import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesExtractor; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; | ||
| import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig; | ||
| 
     | 
||
| public final class PowerJobInstrumenterFactory { | ||
                
       | 
||
| 
     | 
||
| static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = | ||
| AgentInstrumentationConfig.get() | ||
| .getBoolean("otel.instrumentation.powerjob.experimental-span-attributes", false); | ||
| 
     | 
||
| public static Instrumenter<PowerJobProcessRequest, Void> create(String instrumentationName) { | ||
| PowerJobCodeAttributesGetter codeAttributesGetter = new PowerJobCodeAttributesGetter(); | ||
| PowerJobSpanNameExtractor spanNameExtractor = | ||
| new PowerJobSpanNameExtractor(codeAttributesGetter); | ||
| 
     | 
||
| InstrumenterBuilder<PowerJobProcessRequest, Void> builder = | ||
| Instrumenter.<PowerJobProcessRequest, Void>builder( | ||
| GlobalOpenTelemetry.get(), instrumentationName, spanNameExtractor) | ||
| .addAttributesExtractor(CodeAttributesExtractor.create(codeAttributesGetter)) | ||
| .setSpanStatusExtractor( | ||
| (spanStatusBuilder, powerJobProcessRequest, response, error) -> { | ||
| if (error != null || powerJobProcessRequest.isFailed()) { | ||
| spanStatusBuilder.setStatus(StatusCode.ERROR); | ||
| } | ||
| }); | ||
| 
     | 
||
| if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { | ||
| builder.addAttributesExtractor( | ||
| AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "powerjob")); | ||
| builder.addAttributesExtractor(new PowerJobExperimentalAttributeExtractor()); | ||
| } | ||
| 
     | 
||
| return builder.buildInstrumenter(); | ||
| } | ||
| 
     | 
||
| private PowerJobInstrumenterFactory() {} | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.