Skip to content

Commit 689fa69

Browse files
committed
fix(worker): properly handle WorkerJobfetcher bean lifecycle
1 parent 455f093 commit 689fa69

File tree

2 files changed

+15
-18
lines changed

2 files changed

+15
-18
lines changed

worker/src/main/java/io/kestra/worker/GrpcStubFactory.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,43 +22,40 @@
2222
*/
2323
@Factory
2424
public class GrpcStubFactory {
25-
26-
@Inject
27-
GrpcChannelManager grpcChannelManager;
28-
25+
2926
@Bean
3027
@Singleton
31-
public WorkerControllerServiceBlockingStub blockingWorkerServiceStub() {
32-
return WorkerControllerServiceGrpc.newBlockingStub(grpcChannelManager.getDefaultChannel());
28+
public WorkerControllerServiceBlockingStub blockingWorkerServiceStub(GrpcChannelManager manager) {
29+
return WorkerControllerServiceGrpc.newBlockingStub(manager.getDefaultChannel());
3330
}
3431

3532
@Bean
3633
@Singleton
37-
public WorkerControllerServiceStub asyncWorkerServiceStub() {
38-
return WorkerControllerServiceGrpc.newStub(grpcChannelManager.getDefaultChannel());
34+
public WorkerControllerServiceStub asyncWorkerServiceStub(GrpcChannelManager manager) {
35+
return WorkerControllerServiceGrpc.newStub(manager.getDefaultChannel());
3936
}
4037

4138
@Bean
4239
@Singleton
43-
public LivenessControllerServiceBlockingStub workerServiceStub() {
44-
return LivenessControllerServiceGrpc.newBlockingStub(grpcChannelManager.getDefaultChannel());
40+
public LivenessControllerServiceBlockingStub workerServiceStub(GrpcChannelManager manager) {
41+
return LivenessControllerServiceGrpc.newBlockingStub(manager.getDefaultChannel());
4542
}
4643

4744
@Bean
4845
@Singleton
49-
public ConnectControllerServiceBlockingStub connectControllerServiceBlockingStub() {
50-
return ConnectControllerServiceGrpc.newBlockingStub(grpcChannelManager.getDefaultChannel());
46+
public ConnectControllerServiceBlockingStub connectControllerServiceBlockingStub(GrpcChannelManager manager) {
47+
return ConnectControllerServiceGrpc.newBlockingStub(manager.getDefaultChannel());
5148
}
5249

5350
@Bean
5451
@Singleton
55-
public WorkerFlowMetaStoreServiceBlockingStub workerFlowMetaStoreServiceBlockingStub() {
56-
return WorkerFlowMetaStoreServiceGrpc.newBlockingStub(grpcChannelManager.getDefaultChannel());
52+
public WorkerFlowMetaStoreServiceBlockingStub workerFlowMetaStoreServiceBlockingStub(GrpcChannelManager manager) {
53+
return WorkerFlowMetaStoreServiceGrpc.newBlockingStub(manager.getDefaultChannel());
5754
}
5855

5956
@Bean
6057
@Singleton
61-
public KVMetadataServiceBlockingStub kvMetadataServiceBlockingStub() {
62-
return KVMetadataServiceGrpc.newBlockingStub(grpcChannelManager.getDefaultChannel());
58+
public KVMetadataServiceBlockingStub kvMetadataServiceBlockingStub(GrpcChannelManager manager) {
59+
return KVMetadataServiceGrpc.newBlockingStub(manager.getDefaultChannel());
6360
}
6461
}

worker/src/main/java/io/kestra/worker/fetchers/WorkerJobFetcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import io.kestra.worker.WorkerLoop;
1919
import io.kestra.worker.queues.WorkerQueue;
2020
import io.kestra.worker.queues.WorkerQueueRegistry;
21-
import io.micronaut.context.annotation.Prototype;
2221
import jakarta.inject.Inject;
22+
import jakarta.inject.Singleton;
2323
import lombok.extern.slf4j.Slf4j;
2424

2525
import java.time.Duration;
@@ -47,7 +47,7 @@
4747
* ACKs are "receipt ACKs" (not completion ACKs) - they signal that the job was received and
4848
* is queued locally, allowing the controller to clean up its in-memory tracking.
4949
*/
50-
@Prototype
50+
@Singleton
5151
@Slf4j
5252
public class WorkerJobFetcher extends WorkerLoop {
5353

0 commit comments

Comments
 (0)