@@ -39,6 +39,7 @@ import (
3939 v1 "k8s.io/api/batch/v1"
4040 corev1 "k8s.io/api/core/v1"
4141 k8serrors "k8s.io/apimachinery/pkg/api/errors"
42+ "k8s.io/apimachinery/pkg/api/resource"
4243 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4344 "k8s.io/apimachinery/pkg/types"
4445 "k8s.io/apimachinery/pkg/util/intstr"
@@ -436,13 +437,13 @@ func makePodTemplate(container *corev1.Container, filebeatContainer *corev1.Cont
436437}
437438
438439func MakeJavaFunctionCommand (downloadPath , packageFile , name , clusterName , generateLogConfigCommand , logLevel , details , extraDependenciesDir , uid string ,
439- javaOpts []string , hasPulsarctl , hasWget , authProvided , tlsProvided bool , secretMaps map [string ]v1alpha1.SecretRef ,
440+ memory * resource. Quantity , javaOpts []string , hasPulsarctl , hasWget , authProvided , tlsProvided bool , secretMaps map [string ]v1alpha1.SecretRef ,
440441 state * v1alpha1.Stateful ,
441442 tlsConfig TLSConfig , authConfig * v1alpha1.AuthConfig ,
442443 maxPendingAsyncRequests * int32 , logConfigFileName string ) []string {
443444 processCommand := setShardIDEnvironmentVariableCommand () + " && " + generateLogConfigCommand +
444445 strings .Join (getProcessJavaRuntimeArgs (name , packageFile , clusterName , logLevel , details ,
445- extraDependenciesDir , uid , javaOpts , authProvided , tlsProvided , secretMaps , state , tlsConfig ,
446+ extraDependenciesDir , uid , memory , javaOpts , authProvided , tlsProvided , secretMaps , state , tlsConfig ,
446447 authConfig , maxPendingAsyncRequests , logConfigFileName ), " " )
447448 if downloadPath != "" && ! utils .EnableInitContainers {
448449 // prepend download command if the downPath is provided
@@ -1184,7 +1185,7 @@ func setShardIDEnvironmentVariableCommand() string {
11841185}
11851186
11861187func getProcessJavaRuntimeArgs (name , packageName , clusterName , logLevel , details , extraDependenciesDir , uid string ,
1187- javaOpts []string , authProvided , tlsProvided bool , secretMaps map [string ]v1alpha1.SecretRef ,
1188+ memory * resource. Quantity , javaOpts []string , authProvided , tlsProvided bool , secretMaps map [string ]v1alpha1.SecretRef ,
11881189 state * v1alpha1.Stateful ,
11891190 tlsConfig TLSConfig , authConfig * v1alpha1.AuthConfig ,
11901191 maxPendingAsyncRequests * int32 , logConfigFileName string ) []string {
@@ -1206,6 +1207,8 @@ func getProcessJavaRuntimeArgs(name, packageName, clusterName, logLevel, details
12061207 },
12071208 " " )
12081209 }
1210+ // maxDirectMemory takes 20% of the total memory, while MaxRamPercentage is 70%, the rest 10% is for misc usage
1211+ maxDirectMemory := resource .NewScaledQuantity (memory .Value ()/ 5 , 0 )
12091212 args := []string {
12101213 "exec" ,
12111214 "java" ,
@@ -1218,6 +1221,7 @@ func getProcessJavaRuntimeArgs(name, packageName, clusterName, logLevel, details
12181221 "-Dpulsar.allocator.exit_on_oom=true" ,
12191222 setLogLevel ,
12201223 "-XX:MaxRAMPercentage=70" ,
1224+ "-XX:MaxDirectMemorySize=" + getDecimalSIMemory (maxDirectMemory ),
12211225 "-XX:+UseG1GC" ,
12221226 "-XX:+HeapDumpOnOutOfMemoryError" ,
12231227 "-XX:HeapDumpPath=/pulsar/tmp/heapdump-%p.hprof" ,
@@ -2027,6 +2031,14 @@ func getPythonSecretProviderArgs(secretMaps map[string]v1alpha1.SecretRef) []str
20272031 return ret
20282032}
20292033
2034+ // Java command requires memory values in resource.DecimalSI format
2035+ func getDecimalSIMemory (quantity * resource.Quantity ) string {
2036+ if quantity .Format == resource .DecimalSI {
2037+ return quantity .String ()
2038+ }
2039+ return resource .NewQuantity (quantity .Value (), resource .DecimalSI ).String ()
2040+ }
2041+
20302042func getGenericSecretProviderArgs (secretMaps map [string ]v1alpha1.SecretRef , language string ) []string {
20312043 var ret []string
20322044 if len (secretMaps ) > 0 {
0 commit comments