@@ -52,25 +52,27 @@ type AssemblyOperation struct {
5252
5353// LocalAssemblyConfig is the config for the local assembly job
5454type LocalAssemblyConfig struct {
55- Name string
56- Namespace string
57- StreamID string
58- JobName string
59- NodeName string
60- TimeoutSeconds int64
61- VineyardSockPath string
55+ Name string
56+ Namespace string
57+ StreamID string
58+ JobName string
59+ NodeName string
60+ TimeoutSeconds int64
61+ VineyardSockPath string
62+ LocalAssemblyImage string
6263}
6364
6465// DistributedAssemblyConfig is the config for the distributed assembly job
6566type DistributedAssemblyConfig struct {
66- Name string
67- Namespace string
68- StreamID string
69- JobName string
70- GlobalObjectID string
71- OldObjectToNewObject string
72- TimeoutSeconds int64
73- VineyardSockPath string
67+ Name string
68+ Namespace string
69+ StreamID string
70+ JobName string
71+ GlobalObjectID string
72+ OldObjectToNewObject string
73+ TimeoutSeconds int64
74+ VineyardSockPath string
75+ DistributedAssemblyImage string
7476}
7577
7678// LocalAssemblyConfigTemplate is the template config for the assembly job
@@ -146,6 +148,18 @@ func (ao *AssemblyOperation) findNeedAssemblyPodByLocalObject(ctx context.Contex
146148// buildLocalAssemblyJob build all configuration for the local assembly job
147149func (ao * AssemblyOperation ) buildLocalAssemblyJob (ctx context.Context , localObject * v1alpha1.LocalObject , pod * corev1.Pod , timeout int64 ) error {
148150 podLabels := pod .Labels
151+
152+ vineyarddName := podLabels [labels .VineyarddName ]
153+ vineyarddNamespace := podLabels [labels .VineyarddNamespace ]
154+ // get vineyardd cluster info
155+ vineyardd := & v1alpha1.Vineyardd {}
156+ if err := ao .Client .Get (ctx , client.ObjectKey {
157+ Name : vineyarddName ,
158+ Namespace : vineyarddNamespace ,
159+ }, vineyardd ); err != nil {
160+ return fmt .Errorf ("failed to get the vineyardd: %v" , err )
161+ }
162+
149163 // When the pod which generated the stream is annotated, the assembly job will be created in the same pod
150164 if _ , ok := podLabels [NeedInjectedAssemblyKey ]; ok {
151165 if strings .Contains (strings .ToLower (localObject .Spec .Typename ), "stream" ) {
@@ -155,6 +169,7 @@ func (ao *AssemblyOperation) buildLocalAssemblyJob(ctx context.Context, localObj
155169 LocalAssemblyConfigTemplate .NodeName = localObject .Spec .Hostname
156170 LocalAssemblyConfigTemplate .JobName = podLabels [labels .VineyardJobName ]
157171 LocalAssemblyConfigTemplate .TimeoutSeconds = timeout
172+ LocalAssemblyConfigTemplate .LocalAssemblyImage = vineyardd .Spec .PluginConfig .LocalAssemblyImage
158173 if socket , err := ao .ResolveRequiredVineyarddSocket (
159174 ctx ,
160175 podLabels [labels .VineyarddName ],
@@ -286,6 +301,17 @@ func (ao *AssemblyOperation) buildDistributedAssemblyJob(
286301 }
287302 }
288303
304+ vineyarddName := podLabels [labels .VineyarddName ]
305+ vineyarddNamespace := podLabels [labels .VineyarddNamespace ]
306+ // get vineyardd cluster info
307+ vineyardd := & v1alpha1.Vineyardd {}
308+ if err := ao .Client .Get (ctx , client.ObjectKey {
309+ Name : vineyarddName ,
310+ Namespace : vineyarddNamespace ,
311+ }, vineyardd ); err != nil {
312+ return false , fmt .Errorf ("failed to get the vineyardd: %v" , err )
313+ }
314+
289315 // build the distributed assembly job
290316 if len (sigToID ) == len (oldObjectToNewObject ) {
291317 str := `'{`
@@ -299,6 +325,7 @@ func (ao *AssemblyOperation) buildDistributedAssemblyJob(
299325 DistributedAssemblyConfigTemplate .OldObjectToNewObject = str
300326 DistributedAssemblyConfigTemplate .JobName = podLabels [labels .VineyardJobName ]
301327 DistributedAssemblyConfigTemplate .TimeoutSeconds = timeout
328+ DistributedAssemblyConfigTemplate .DistributedAssemblyImage = vineyardd .Spec .PluginConfig .DistributedAssemblyImage
302329 if socket , err := ao .ResolveRequiredVineyarddSocket (
303330 ctx ,
304331 podLabels [labels .VineyarddName ],
0 commit comments