2424import io .functionmesh .compute .sinks .models .V1alpha1SinkSpecPod ;
2525import io .functionmesh .compute .sinks .models .V1alpha1SinkSpecPodVolumeMounts ;
2626import io .functionmesh .compute .sinks .models .V1alpha1SinkSpecPodVolumes ;
27+ import io .functionmesh .compute .util .CommonUtil ;
2728import io .functionmesh .compute .util .KubernetesUtils ;
2829import io .functionmesh .compute .util .SinksUtil ;
2930import io .functionmesh .compute .MeshWorkerService ;
3031import io .functionmesh .compute .sinks .models .V1alpha1SinkStatus ;
32+ import io .kubernetes .client .openapi .apis .CustomObjectsApi ;
3133import lombok .extern .slf4j .Slf4j ;
3234import okhttp3 .Call ;
3335import org .apache .commons .lang3 .StringUtils ;
@@ -123,6 +125,7 @@ public void registerSink(
123125 clientAuthenticationDataHttps ,
124126 ComponentTypeUtils .toString (componentType ));
125127 this .validateTenantIsExist (tenant , namespace , sinkName , clientRole );
128+ String cluster = worker ().getWorkerConfig ().getPulsarFunctionsCluster ();
126129 V1alpha1Sink v1alpha1Sink =
127130 SinksUtil .createV1alpha1SkinFromSinkConfig (
128131 kind ,
@@ -133,7 +136,7 @@ public void registerSink(
133136 uploadedInputStream ,
134137 sinkConfig ,
135138 this .meshWorkerServiceSupplier .get ().getConnectorsManager (),
136- worker ().getWorkerConfig ().getFunctionsWorkerServiceCustomConfigs ());
139+ worker ().getWorkerConfig ().getFunctionsWorkerServiceCustomConfigs (), cluster );
137140 // override namesapce by configuration
138141 v1alpha1Sink .getMetadata ().setNamespace (KubernetesUtils .getNamespace (worker ().getFactoryConfig ()));
139142 try {
@@ -147,6 +150,7 @@ public void registerSink(
147150 customLabels .putAll (worker ().getFactoryConfig ().getCustomLabels ());
148151 }
149152 pod .setLabels (customLabels );
153+ v1alpha1Sink .getMetadata ().setLabels (customLabels );
150154 v1alpha1Sink .getSpec ().setPod (pod );
151155 this .upsertSink (tenant , namespace , sinkName , sinkConfig , v1alpha1Sink , clientAuthenticationDataHttps );
152156 Call call =
@@ -193,13 +197,8 @@ public void updateSink(
193197 clientRole ,
194198 clientAuthenticationDataHttps ,
195199 ComponentTypeUtils .toString (componentType ));
200+ String cluster = worker ().getWorkerConfig ().getPulsarFunctionsCluster ();
196201 try {
197- Call getCall =
198- worker ().getCustomObjectsApi ()
199- .getNamespacedCustomObjectCall (
200- group , version , namespace , plural , sinkName , null );
201- V1alpha1Sink oldRes = executeCall (getCall , V1alpha1Sink .class );
202-
203202 V1alpha1Sink v1alpha1Sink =
204203 SinksUtil .createV1alpha1SkinFromSinkConfig (
205204 kind ,
@@ -209,20 +208,33 @@ public void updateSink(
209208 sinkPkgUrl ,
210209 uploadedInputStream ,
211210 sinkConfig , this .meshWorkerServiceSupplier .get ().getConnectorsManager (),
212- worker ().getWorkerConfig ().getFunctionsWorkerServiceCustomConfigs ());
211+ worker ().getWorkerConfig ().getFunctionsWorkerServiceCustomConfigs (), cluster );
212+ CustomObjectsApi customObjectsApi = worker ().getCustomObjectsApi ();
213+ Call getCall =
214+ customObjectsApi .getNamespacedCustomObjectCall (
215+ group , version ,
216+ KubernetesUtils .getNamespace (worker ().getFactoryConfig ()), plural ,
217+ v1alpha1Sink .getMetadata ().getName (), null );
218+ V1alpha1Sink oldRes = executeCall (getCall , V1alpha1Sink .class );
219+ if (oldRes .getMetadata () == null || oldRes .getMetadata ().getLabels () == null ) {
220+ throw new RestException (Response .Status .NOT_FOUND , "This sink resource was not found" );
221+ }
222+ Map <String , String > labels = oldRes .getMetadata ().getLabels ();
223+ V1alpha1SinkSpecPod pod = new V1alpha1SinkSpecPod ();
224+ pod .setLabels (labels );
225+ v1alpha1Sink .getMetadata ().setLabels (labels );
226+ v1alpha1Sink .getSpec ().setPod (pod );
213227 this .upsertSink (tenant , namespace , sinkName , sinkConfig , v1alpha1Sink , clientAuthenticationDataHttps );
214228 v1alpha1Sink .getMetadata ().setNamespace (KubernetesUtils .getNamespace (worker ().getFactoryConfig ()));
215229 v1alpha1Sink
216230 .getMetadata ()
217231 .setResourceVersion (oldRes .getMetadata ().getResourceVersion ());
218- Call replaceCall =
219- worker ().getCustomObjectsApi ()
220- .replaceNamespacedCustomObjectCall (
232+ Call replaceCall = customObjectsApi .replaceNamespacedCustomObjectCall (
221233 group ,
222234 version ,
223235 KubernetesUtils .getNamespace (worker ().getFactoryConfig ()),
224236 plural ,
225- sinkName ,
237+ v1alpha1Sink . getMetadata (). getName () ,
226238 v1alpha1Sink ,
227239 null ,
228240 null ,
@@ -270,11 +282,12 @@ public SinkStatus getSinkStatus(
270282 clientAuthenticationDataHttps ,
271283 ComponentTypeUtils .toString (componentType ));
272284 try {
285+ String hashName = CommonUtil .generateObjectName (worker (), tenant , namespace , componentName );
273286 Call call =
274287 worker ().getCustomObjectsApi ()
275288 .getNamespacedCustomObjectCall (
276289 group , version , KubernetesUtils .getNamespace (worker ().getFactoryConfig ()),
277- plural , componentName , null );
290+ plural , hashName , null );
278291 V1alpha1Sink v1alpha1Sink = executeCall (call , V1alpha1Sink .class );
279292 SinkStatus .SinkInstanceStatus sinkInstanceStatus = new SinkStatus .SinkInstanceStatus ();
280293 SinkStatus .SinkInstanceStatus .SinkInstanceStatusData sinkInstanceStatusData =
@@ -314,11 +327,12 @@ public SinkConfig getSinkInfo(
314327 validateSinkEnabled ();
315328 this .validateGetInfoRequestParams (tenant , namespace , componentName , kind );
316329 try {
330+ String hashName = CommonUtil .generateObjectName (worker (), tenant , namespace , componentName );
317331 Call call =
318332 worker ().getCustomObjectsApi ()
319333 .getNamespacedCustomObjectCall (
320334 group , version , KubernetesUtils .getNamespace (worker ().getFactoryConfig ()),
321- plural , componentName , null );
335+ plural , hashName , null );
322336
323337 V1alpha1Sink v1alpha1Sink = executeCall (call , V1alpha1Sink .class );
324338 return SinksUtil .createSinkConfigFromV1alpha1Sink (
0 commit comments