diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index e304f25d5d373..6a3c7123b6a02 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -88,6 +88,7 @@ public enum Runtime { private String outputSerdeClassName; private String logTopic; + private String logLevel; private ProcessingGuarantees processingGuarantees; // Do we want function instances to process data in the same order as in the input topics // This essentially means that every partition of input topic is consumed by only one instance diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java index 09b98249a4df1..6f1d4f7ddbd13 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java @@ -62,6 +62,8 @@ public class SinkConfig { private String deadLetterTopic; + private String logLevel; + private Map configs; // This is a map of secretName(aka how the secret is going to be // accessed in the function via context) to an object that diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index 17b37008127ba..16ee03597afcd 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -48,6 +48,8 @@ public class SourceConfig { private String schemaType; + private String logLevel; + private Map configs; // This is a map of secretName(aka how the secret is going to be // accessed in the function via context) to an object that diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index c94041df0ffaf..0207cf7a67129 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -249,7 +249,9 @@ abstract class FunctionDetailsCommand extends BaseCommand { @Parameter(names = "--log-topic", description = "The topic to which the logs of a Pulsar Function are produced" + " #Java, Python, Go") protected String logTopic; - + @Parameter(names = "--logLevel", description = "Log level at which the logs of a Pulsar Function are produced" + + " #Java") + protected String logLevel; @Parameter(names = {"-st", "--schema-type"}, description = "The builtin schema type or " + "custom schema class name to be used for messages output by the function #Java") protected String schemaType = ""; @@ -518,6 +520,9 @@ void processArguments() throws Exception { if (null != logTopic) { functionConfig.setLogTopic(logTopic); } + if (null != logLevel) { + functionConfig.setLogLevel(logLevel); + } if (null != className) { functionConfig.setClassName(className); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 6d9619244a32e..4290f85915625 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -410,7 +410,8 @@ abstract class SinkDetailsCommand extends BaseCommand { @Parameter(names = "--transform-function-config", description = "Configuration of the transform function " + "applied before the Sink") protected String transformFunctionConfig; - + @Parameter(names = "--logLevel", description = "Log level at which the logs of a Pulsar Sink are produced") + protected String logLevel; protected SinkConfig sinkConfig; private void mergeArgs() { @@ -606,6 +607,10 @@ void processArguments() throws Exception { sinkConfig.setTransformFunctionConfig(transformFunctionConfig); } + if (null != logLevel) { + sinkConfig.setLogLevel(logLevel); + } + // check if configs are valid validateSinkConfigs(sinkConfig); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index be2e03206021b..6586e6fee95bf 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -365,6 +365,8 @@ abstract class SourceDetailsCommand extends BaseCommand { @Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates " + "how the secret is fetched by the underlying secrets provider") protected String secretsString; + @Parameter(names = "--logLevel", description = "Log level at which the logs of a Pulsar Source are produced") + protected String logLevel; protected SourceConfig sourceConfig; @@ -499,6 +501,10 @@ void processArguments() throws Exception { sourceConfig.setSecrets(secretsMap); } + if (null != logLevel) { + sourceConfig.setLogLevel(logLevel); + } + // check if source configs are valid validateSourceConfigs(sourceConfig); } diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index de3f03a39008c..7985bfbbb6f29 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -98,6 +98,7 @@ message FunctionDetails { bool retainOrdering = 21; bool retainKeyOrdering = 22; SubscriptionPosition subscriptionPosition = 23; + string logLevel = 24; } message ConsumerSpec { diff --git a/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml b/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml index 190d9be92940b..bb78c80ea9374 100644 --- a/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml +++ b/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml @@ -120,7 +120,7 @@ - info + ${sys:pulsar.log.level} ${sys:pulsar.log.appender} ${sys:pulsar.log.level} diff --git a/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml b/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml index f86d03e41793f..c5c596b6840c0 100644 --- a/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml +++ b/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml @@ -50,7 +50,7 @@ - info + ${sys:pulsar.log.level} Console ${sys:pulsar.log.level} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 78347948688dd..1fa21304eefa4 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -78,6 +78,7 @@ public static List composeCmd(InstanceConfig instanceConfig, Integer grpcPort, Long expectedHealthCheckInterval, String logConfigFile, + String logLevel, String secretsProviderClassName, String secretsProviderConfig, Boolean installUserCodeDependencies, @@ -92,7 +93,7 @@ public static List composeCmd(InstanceConfig instanceConfig, cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory, originalCodeFileName, originalTransformFunctionFileName, pulsarServiceUrl, stateStorageServiceUrl, authConfig, shardId, grpcPort, expectedHealthCheckInterval, - logConfigFile, secretsProviderClassName, secretsProviderConfig, + logConfigFile, logLevel, secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository, narExtractionDirectory, functionInstanceClassPath, false, pulsarWebServiceUrl)); @@ -309,6 +310,7 @@ public static List getCmd(InstanceConfig instanceConfig, Integer grpcPort, Long expectedHealthCheckInterval, String logConfigFile, + String logLevel, String secretsProviderClassName, String secretsProviderConfig, Boolean installUserCodeDependencies, @@ -354,6 +356,7 @@ public static List getCmd(InstanceConfig instanceConfig, } args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath)); } + args.add("-Dpulsar.log.level=" + logLevel); args.add("-Dlog4j.configurationFile=" + logConfigFile); args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig)); args.add("-Dpulsar.function.log.file=" + String.format( diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index 7a69b822cbd89..9738833fba785 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -74,6 +74,7 @@ import lombok.extern.slf4j.Slf4j; import okhttp3.Response; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -236,6 +237,10 @@ public class KubernetesRuntime implements Runtime { case GO: break; } + String logLevel = instanceConfig.getFunctionDetails().getLogLevel(); + if (StringUtils.isBlank(logLevel)) { + logLevel = "info"; + } this.authConfig = authConfig; @@ -275,6 +280,7 @@ public class KubernetesRuntime implements Runtime { grpcPort, -1L, logConfigFile, + logLevel, secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java index e59fcadc729ed..049b8fe4f09e9 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java @@ -122,6 +122,11 @@ class ProcessRuntime implements Runtime { case GO: break; } + String logLevel = instanceConfig.getFunctionDetails().getLogLevel(); + if (StringUtils.isBlank(logLevel)) { + logLevel = "info"; + } + this.extraDependenciesDir = extraDependenciesDir; this.narExtractionDirectory = narExtractionDirectory; this.processArgs = RuntimeUtils.composeCmd( @@ -142,6 +147,7 @@ class ProcessRuntime implements Runtime { instanceConfig.getPort(), expectedHealthCheckInterval, logConfigFile, + logLevel, secretsProviderClassName, secretsProviderConfig, false, diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java index b19be92e6ba81..4650b3496c9f0 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java @@ -213,6 +213,7 @@ public void getAdditionalJavaRuntimeArguments(boolean k8sRuntime) throws Excepti 23, 1234L, "logConfigFile", + "info", "secretsProviderClassName", "secretsProviderConfig", false, diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 6ed9849412910..fb0895354e801 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -475,6 +475,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s String expectedArgs = "exec java -cp " + classpath + extraDepsEnv + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*" + + " -Dpulsar.log.level=info" + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml " + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails()) + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID" diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index 365704ea0b4ed..91bfb245a5719 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -324,6 +324,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS String expectedArgs = "java -cp " + classpath + extraDepsEnv + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*" + + " -Dpulsar.log.level=info" + " -Dlog4j.configurationFile=java_instance_log4j2.xml " + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails()) + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId() diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 3c96837e4374e..acfec25aae035 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -32,6 +32,7 @@ import com.google.gson.reflect.TypeToken; import java.io.File; import java.lang.reflect.Type; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -70,6 +71,7 @@ public static class ExtractedFunctionDetails { static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000; static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE; + private static final List VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR"); private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create(); @@ -272,6 +274,9 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu if (functionConfig.getLogTopic() != null) { functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic()); } + if (functionConfig.getLogLevel() != null) { + functionDetailsBuilder.setLogLevel(functionConfig.getLogLevel()); + } if (functionConfig.getRuntime() != null) { functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime())); } @@ -447,6 +452,9 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) if (!isEmpty(functionDetails.getLogTopic())) { functionConfig.setLogTopic(functionDetails.getLogTopic()); } + if (!isEmpty(functionDetails.getLogLevel())) { + functionConfig.setLogLevel(functionDetails.getLogLevel()); + } if (functionDetails.getSink().getForwardSourceMessageProperty()) { functionConfig.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty()); } @@ -806,6 +814,13 @@ public static void doCommonChecks(FunctionConfig functionConfig) { } } + if (!isEmpty(functionConfig.getLogLevel())) { + if (!VALID_LOG_LEVELS.contains(functionConfig.getLogLevel().toUpperCase())) { + throw new IllegalArgumentException( + String.format("LogLevel %s is invalid", functionConfig.getLogLevel())); + } + } + if (functionConfig.getParallelism() != null && functionConfig.getParallelism() <= 0) { throw new IllegalArgumentException("Function parallelism must be a positive number"); } @@ -1017,6 +1032,9 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct if (!StringUtils.isEmpty(newConfig.getLogTopic())) { mergedConfig.setLogTopic(newConfig.getLogTopic()); } + if (!StringUtils.isEmpty(newConfig.getLogLevel())) { + mergedConfig.setLogLevel(newConfig.getLogLevel()); + } if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees() .equals(existingConfig.getProcessingGuarantees())) { throw new IllegalArgumentException("Processing Guarantees cannot be altered"); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 36ff0ce2c7d26..37e0ca615ab2b 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -62,6 +63,8 @@ @Slf4j public class SinkConfigUtils { + private static final List VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR"); + @Getter @Setter @AllArgsConstructor @@ -88,6 +91,9 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail functionDetailsBuilder.setName(sinkConfig.getName()); } functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); + if (sinkConfig.getLogLevel() != null) { + functionDetailsBuilder.setLogLevel(sinkConfig.getLogLevel()); + } if (sinkConfig.getParallelism() != null) { functionDetailsBuilder.setParallelism(sinkConfig.getParallelism()); } else { @@ -396,6 +402,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) { if (!isEmpty(functionDetails.getUserConfig())) { sinkConfig.setTransformFunctionConfig(functionDetails.getUserConfig()); } + if (!isEmpty(functionDetails.getLogLevel())) { + sinkConfig.setLogLevel(functionDetails.getLogLevel()); + } return sinkConfig; @@ -548,6 +557,13 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set"); } + if (!isEmpty(sinkConfig.getLogLevel())) { + if (!VALID_LOG_LEVELS.contains(sinkConfig.getLogLevel().toUpperCase())) { + throw new IllegalArgumentException( + String.format("LogLevel %s is invalid", sinkConfig.getLogLevel())); + } + } + // validate user defined config if enabled and classloading is enabled if (validateConnectorConfig) { if (sinkFunction.isEnableClassloading()) { @@ -712,6 +728,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne if (newConfig.getSourceSubscriptionPosition() != null) { mergedConfig.setSourceSubscriptionPosition(newConfig.getSourceSubscriptionPosition()); } + if (!StringUtils.isEmpty(newConfig.getLogLevel())) { + mergedConfig.setLogLevel(newConfig.getLogLevel()); + } return mergedConfig; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 2239f0fcbc2b7..eb3497782137a 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -28,7 +28,9 @@ import com.google.gson.reflect.TypeToken; import java.io.IOException; import java.lang.reflect.Type; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import lombok.AllArgsConstructor; import lombok.Getter; @@ -56,6 +58,8 @@ @Slf4j public class SourceConfigUtils { + private static final List VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR"); + @Getter @Setter @AllArgsConstructor @@ -180,6 +184,9 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource if (!StringUtils.isEmpty(sourceConfig.getCustomRuntimeOptions())) { functionDetailsBuilder.setCustomRuntimeOptions(sourceConfig.getCustomRuntimeOptions()); } + if (sourceConfig.getLogLevel() != null) { + functionDetailsBuilder.setLogLevel(sourceConfig.getLogLevel()); + } return FunctionConfigUtils.validateFunctionDetails(functionDetailsBuilder.build()); } @@ -250,6 +257,9 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) { if (!isEmpty(functionDetails.getCustomRuntimeOptions())) { sourceConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions()); } + if (!isEmpty(functionDetails.getLogLevel())) { + sourceConfig.setLogLevel(functionDetails.getLogLevel()); + } return sourceConfig; } @@ -356,6 +366,12 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour log.warn("Skipping annotation based validation of sink config as classloading is disabled"); } } + if (!isEmpty(sourceConfig.getLogLevel())) { + if (!VALID_LOG_LEVELS.contains(sourceConfig.getLogLevel().toUpperCase())) { + throw new IllegalArgumentException( + String.format("LogLevel %s is invalid", sourceConfig.getLogLevel())); + } + } return new ExtractedSourceDetails(sourceClassName, typeArg.asErasure().getTypeName()); } @@ -425,6 +441,9 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon if (newConfig.getProducerConfig() != null) { mergedConfig.setProducerConfig(newConfig.getProducerConfig()); } + if (!StringUtils.isEmpty(newConfig.getLogLevel())) { + mergedConfig.setLogLevel(newConfig.getLogLevel()); + } return mergedConfig; } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index cf4e7dd92a8f7..b78174b303841 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -596,6 +596,7 @@ public void testFunctionConfigConvertFromDetails() { .build(); boolean autoAck = true; String logTopic = "log-topic1"; + String logLevel = "debug"; Function.Resources resources = Function.Resources.newBuilder().setCpu(1.5).setDisk(1024 * 20).setRam(1024 * 10).build(); String packageUrl = "http://package.url"; Map secretsMap = new HashMap<>(); @@ -616,6 +617,7 @@ public void testFunctionConfigConvertFromDetails() { .setSource(sourceSpec) .setAutoAck(autoAck) .setLogTopic(logTopic) + .setLogLevel(logLevel) .setResources(resources) .setPackageUrl(packageUrl) .setSecretsMap(new Gson().toJson(secretsMap)) @@ -629,6 +631,7 @@ public void testFunctionConfigConvertFromDetails() { assertEquals(functionConfig.getName(), name); assertEquals(functionConfig.getClassName(), classname); assertEquals(functionConfig.getLogTopic(), logTopic); + assertEquals(functionConfig.getLogLevel(), logLevel); assertEquals((Object) functionConfig.getResources().getCpu(), resources.getCpu()); assertEquals(functionConfig.getResources().getDisk().longValue(), resources.getDisk()); assertEquals(functionConfig.getResources().getRam().longValue(), resources.getRam());