@@ -205,14 +205,15 @@ func MakeHeadlessServiceName(serviceName string) string {
205205 return fmt .Sprintf ("%s-headless" , serviceName )
206206}
207207
208- func MakeStatefulSet (objectMeta * metav1.ObjectMeta , replicas * int32 , downloaderImage string ,
209- container * corev1.Container , filebeatContainer * corev1.Container ,
210- volumes []corev1.Volume , labels map [string ]string , policy v1alpha1.PodPolicy , pulsar v1alpha1.PulsarMessaging ,
211- javaRuntime * v1alpha1.JavaRuntime , pythonRuntime * v1alpha1.PythonRuntime ,
212- goRuntime * v1alpha1.GoRuntime , definedVolumeMounts []corev1.VolumeMount ,
213- volumeClaimTemplates []corev1.PersistentVolumeClaim ,
208+ func MakeStatefulSet (objectMeta * metav1.ObjectMeta , replicas * int32 , downloaderImage string , container * corev1.Container ,
209+ volumes []corev1.Volume , labels map [string ]string , policy v1alpha1.PodPolicy , authConfig * v1alpha1.AuthConfig ,
210+ tlsConfig TLSConfig , pulsarConfig , authSecret , tlsSecret string , javaRuntime * v1alpha1.JavaRuntime ,
211+ pythonRuntime * v1alpha1.PythonRuntime , goRuntime * v1alpha1.GoRuntime , env []corev1.EnvVar , name , logTopic , filebeatImage string ,
212+ logTopicAgent v1alpha1.LogTopicAgent , definedVolumeMounts []corev1.VolumeMount , volumeClaimTemplates []corev1.PersistentVolumeClaim ,
214213 persistentVolumeClaimRetentionPolicy * appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy ) * appsv1.StatefulSet {
215214
215+ filebeatContainer := makeFilebeatContainer (definedVolumeMounts , env , name , logTopic , logTopicAgent , tlsConfig , authConfig , pulsarConfig , tlsSecret , authSecret , filebeatImage )
216+
216217 volumeMounts := generateDownloaderVolumeMountsForDownloader (javaRuntime , pythonRuntime , goRuntime )
217218 var downloaderContainer * corev1.Container
218219 var podVolumes = volumes
@@ -232,12 +233,12 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI
232233
233234 // mount auth and tls related VolumeMounts when download package from pulsar
234235 if ! hasHTTPPrefix (downloadPath ) {
235- if pulsar . AuthConfig != nil && pulsar . AuthConfig .OAuth2Config != nil {
236- volumeMounts = append (volumeMounts , generateVolumeMountFromOAuth2Config (pulsar . AuthConfig .OAuth2Config ))
236+ if authConfig != nil && authConfig .OAuth2Config != nil {
237+ volumeMounts = append (volumeMounts , generateVolumeMountFromOAuth2Config (authConfig .OAuth2Config ))
237238 }
238239
239- if ! reflect .ValueOf (pulsar . TLSConfig ).IsNil () && pulsar . TLSConfig .HasSecretVolume () {
240- volumeMounts = append (volumeMounts , generateVolumeMountFromTLSConfig (pulsar . TLSConfig ))
240+ if ! reflect .ValueOf (tlsConfig ).IsNil () && tlsConfig .HasSecretVolume () {
241+ volumeMounts = append (volumeMounts , generateVolumeMountFromTLSConfig (tlsConfig ))
241242 }
242243 }
243244 volumeMounts = append (volumeMounts , definedVolumeMounts ... )
@@ -253,15 +254,15 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI
253254 Name : DownloaderName ,
254255 Image : image ,
255256 Command : []string {"sh" , "-c" ,
256- strings .Join (getDownloadCommand (downloadPath , componentPackage , true , true ,
257- pulsar . AuthSecret != "" , pulsar . TLSSecret != "" , pulsar . TLSConfig , pulsar . AuthConfig ), " " )},
257+ strings .Join (GetDownloadCommand (downloadPath , componentPackage , true , true ,
258+ authSecret != "" , tlsSecret != "" , tlsConfig , authConfig ), " " )},
258259 VolumeMounts : volumeMounts ,
259260 ImagePullPolicy : corev1 .PullIfNotPresent ,
260261 Env : []corev1.EnvVar {{
261262 Name : "HOME" ,
262263 Value : "/tmp" ,
263264 }},
264- EnvFrom : generateContainerEnvFrom ( pulsar . PulsarConfig , pulsar . AuthSecret , pulsar . TLSSecret ),
265+ EnvFrom : GenerateContainerEnvFrom ( pulsarConfig , authSecret , tlsSecret ),
265266 }
266267 podVolumes = append (podVolumes , corev1.Volume {
267268 Name : DownloaderVolume ,
@@ -415,7 +416,7 @@ func MakeJavaFunctionCommand(downloadPath, packageFile, name, clusterName, gener
415416 authConfig , maxPendingAsyncRequests , logConfigFileName ), " " )
416417 if downloadPath != "" && ! utils .EnableInitContainers {
417418 // prepend download command if the downPath is provided
418- downloadCommand := strings .Join (getDownloadCommand (downloadPath , packageFile , hasPulsarctl , hasWget ,
419+ downloadCommand := strings .Join (GetDownloadCommand (downloadPath , packageFile , hasPulsarctl , hasWget ,
419420 authProvided , tlsProvided , tlsConfig , authConfig ), " " )
420421 processCommand = downloadCommand + " && " + processCommand
421422 }
@@ -431,7 +432,7 @@ func MakePythonFunctionCommand(downloadPath, packageFile, name, clusterName, gen
431432 details , uid , authProvided , tlsProvided , secretMaps , state , tlsConfig , authConfig ), " " )
432433 if downloadPath != "" && ! utils .EnableInitContainers {
433434 // prepend download command if the downPath is provided
434- downloadCommand := strings .Join (getDownloadCommand (downloadPath , packageFile , hasPulsarctl , hasWget ,
435+ downloadCommand := strings .Join (GetDownloadCommand (downloadPath , packageFile , hasPulsarctl , hasWget ,
435436 authProvided ,
436437 tlsProvided , tlsConfig , authConfig ), " " )
437438 processCommand = downloadCommand + " && " + processCommand
@@ -450,7 +451,7 @@ func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alph
450451 hasPulsarctl = true
451452 hasWget = true
452453 }
453- downloadCommand := strings .Join (getDownloadCommand (downloadPath , goExecFilePath ,
454+ downloadCommand := strings .Join (GetDownloadCommand (downloadPath , goExecFilePath ,
454455 hasPulsarctl , hasWget , function .Spec .Pulsar .AuthSecret != "" ,
455456 function .Spec .Pulsar .TLSSecret != "" , function .Spec .Pulsar .TLSConfig , function .Spec .Pulsar .AuthConfig ), " " )
456457 processCommand = downloadCommand + " && ls -al && pwd &&" + processCommand
@@ -466,7 +467,7 @@ func MakeGenericFunctionCommand(downloadPath, functionFile, language, clusterNam
466467 details , uid , authProvided , tlsProvided , secretMaps , state , tlsConfig , authConfig ), " " )
467468 if downloadPath != "" && ! utils .EnableInitContainers {
468469 // prepend download command if the downPath is provided
469- downloadCommand := strings .Join (getDownloadCommand (downloadPath , functionFile , true , true ,
470+ downloadCommand := strings .Join (GetDownloadCommand (downloadPath , functionFile , true , true ,
470471 authProvided ,
471472 tlsProvided , tlsConfig , authConfig ), " " )
472473 processCommand = downloadCommand + " && " + processCommand
@@ -486,7 +487,7 @@ func MakeLivenessProbe(liveness *v1alpha1.Liveness) *corev1.Probe {
486487 ProbeHandler : corev1.ProbeHandler {
487488 HTTPGet : & corev1.HTTPGetAction {
488489 Path : "/" ,
489- Port : intstr .FromInt ( int ( MetricsPort .ContainerPort ) ),
490+ Port : intstr .FromInt32 ( MetricsPort .ContainerPort ),
490491 },
491492 },
492493 InitialDelaySeconds : initialDelay ,
@@ -788,7 +789,7 @@ func getCleanUpCommand(hasPulsarctl, authProvided, tlsProvided bool, tlsConfig T
788789 " " )}
789790}
790791
791- func getDownloadCommand (downloadPath , componentPackage string , hasPulsarctl , hasWget , authProvided , tlsProvided bool ,
792+ func GetDownloadCommand (downloadPath , componentPackage string , hasPulsarctl , hasWget , authProvided , tlsProvided bool ,
792793 tlsConfig TLSConfig ,
793794 authConfig * v1alpha1.AuthConfig ) []string {
794795 var args []string
@@ -822,7 +823,7 @@ func getDownloadCommand(downloadPath, componentPackage string, hasPulsarctl, has
822823 return args
823824}
824825
825- func generateJavaLogConfigCommand (runtime * v1alpha1.JavaRuntime , agent v1alpha1.LogTopicAgent ) string {
826+ func GenerateJavaLogConfigCommand (runtime * v1alpha1.JavaRuntime , agent v1alpha1.LogTopicAgent ) string {
826827 if runtime == nil || (runtime .Log != nil && runtime .Log .LogConfig != nil ) {
827828 return ""
828829 }
@@ -857,7 +858,7 @@ func generateJavaLogConfigCommand(runtime *v1alpha1.JavaRuntime, agent v1alpha1.
857858 return ""
858859}
859860
860- func generateJavaLogConfigFileName (runtime * v1alpha1.JavaRuntime ) string {
861+ func GenerateJavaLogConfigFileName (runtime * v1alpha1.JavaRuntime ) string {
861862 if runtime == nil || (runtime .Log != nil && runtime .Log .LogConfig != nil ) {
862863 return DefaultJavaLogConfigPath
863864 }
@@ -1524,12 +1525,15 @@ func generateBasicContainerEnv(secrets map[string]v1alpha1.SecretRef, env []core
15241525 return vars
15251526}
15261527
1527- func generateContainerEnvFrom (messagingConfig string , authSecret string , tlsSecret string ) []corev1.EnvFromSource {
1528- envs := []corev1.EnvFromSource {{
1529- ConfigMapRef : & corev1.ConfigMapEnvSource {
1530- LocalObjectReference : corev1.LocalObjectReference {Name : messagingConfig },
1531- },
1532- }}
1528+ func GenerateContainerEnvFrom (messagingConfig string , authSecret string , tlsSecret string ) []corev1.EnvFromSource {
1529+ var envs []corev1.EnvFromSource
1530+ if messagingConfig != "" {
1531+ envs = append (envs , corev1.EnvFromSource {
1532+ ConfigMapRef : & corev1.ConfigMapEnvSource {
1533+ LocalObjectReference : corev1.LocalObjectReference {Name : messagingConfig },
1534+ },
1535+ })
1536+ }
15331537
15341538 if authSecret != "" {
15351539 envs = append (envs , corev1.EnvFromSource {
@@ -1814,7 +1818,7 @@ func generateContainerVolumeMountsFromProducerConf(conf *v1alpha1.ProducerConfig
18141818 return mounts
18151819}
18161820
1817- func generateContainerVolumeMounts (volumeMounts []corev1.VolumeMount , producerConf * v1alpha1.ProducerConfig ,
1821+ func GenerateContainerVolumeMounts (volumeMounts []corev1.VolumeMount , producerConf * v1alpha1.ProducerConfig ,
18181822 consumerConfs map [string ]v1alpha1.ConsumerConfig , tlsConfig TLSConfig , authConfig * v1alpha1.AuthConfig ,
18191823 logConfigs map [int32 ]* v1alpha1.RuntimeLogConfig , agent v1alpha1.LogTopicAgent ) []corev1.VolumeMount {
18201824 mounts := []corev1.VolumeMount {}
@@ -1839,7 +1843,7 @@ func generateContainerVolumeMounts(volumeMounts []corev1.VolumeMount, producerCo
18391843 return mounts
18401844}
18411845
1842- func generatePodVolumes (podVolumes []corev1.Volume , producerConf * v1alpha1.ProducerConfig ,
1846+ func GeneratePodVolumes (podVolumes []corev1.Volume , producerConf * v1alpha1.ProducerConfig ,
18431847 consumerConfs map [string ]v1alpha1.ConsumerConfig , tlsConfig TLSConfig , authConfig * v1alpha1.AuthConfig ,
18441848 logConfigs map [int32 ]* v1alpha1.RuntimeLogConfig ,
18451849 agent v1alpha1.LogTopicAgent ) []corev1.Volume {
@@ -1999,7 +2003,7 @@ const (
19992003 golangRuntimeLog
20002004)
20012005
2002- func getRuntimeLogConfigNames (java * v1alpha1.JavaRuntime , python * v1alpha1.PythonRuntime ,
2006+ func GetRuntimeLogConfigNames (java * v1alpha1.JavaRuntime , python * v1alpha1.PythonRuntime ,
20032007 golang * v1alpha1.GoRuntime ) map [int32 ]* v1alpha1.RuntimeLogConfig {
20042008
20052009 var configs = map [int32 ]* v1alpha1.RuntimeLogConfig {}
@@ -2249,7 +2253,7 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En
22492253 }
22502254 imagePullPolicy := corev1 .PullIfNotPresent
22512255 allowPrivilegeEscalation := false
2252- mounts := generateContainerVolumeMounts (volumeMounts , nil , nil , tlsConfig , authConfig , nil , agent )
2256+ mounts := GenerateContainerVolumeMounts (volumeMounts , nil , nil , tlsConfig , authConfig , nil , agent )
22532257
22542258 var uid int64 = 1000
22552259
@@ -2332,7 +2336,7 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En
23322336 Command : []string {"/bin/sh" , "-c" , "--" , "echo " + fmt .Sprintf ("\" %s\" " , tpl .String ()) + " > " + DefaultFilebeatConfig + " && /usr/share/filebeat/filebeat -e -c " + DefaultFilebeatConfig },
23332337 Env : envs ,
23342338 ImagePullPolicy : imagePullPolicy ,
2335- EnvFrom : generateContainerEnvFrom (pulsarConfig , authSecret , tlsSecret ),
2339+ EnvFrom : GenerateContainerEnvFrom (pulsarConfig , authSecret , tlsSecret ),
23362340 VolumeMounts : mounts ,
23372341 SecurityContext : & corev1.SecurityContext {
23382342 Capabilities : & corev1.Capabilities {
0 commit comments