|
21 | 21 | import static com.google.common.base.Preconditions.checkArgument; |
22 | 22 | import static com.google.common.base.Preconditions.checkState; |
23 | 23 | import static java.util.concurrent.Executors.newSingleThreadExecutor; |
24 | | -import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; |
25 | 24 | import static java.util.concurrent.TimeUnit.SECONDS; |
26 | 25 | import static java.util.logging.Level.INFO; |
27 | 26 | import static java.util.logging.Level.SEVERE; |
|
63 | 62 | import build.buildfarm.worker.resources.LocalResourceSetUtils; |
64 | 63 | import com.google.common.cache.LoadingCache; |
65 | 64 | import com.google.common.collect.Lists; |
66 | | -import com.google.common.util.concurrent.SettableFuture; |
67 | | -import com.google.devtools.common.options.OptionsParsingException; |
68 | 65 | import com.google.longrunning.Operation; |
69 | 66 | import com.google.protobuf.ByteString; |
70 | 67 | import com.google.protobuf.Duration; |
|
90 | 87 | import java.util.UUID; |
91 | 88 | import java.util.concurrent.Executor; |
92 | 89 | import java.util.concurrent.ExecutorService; |
93 | | -import java.util.concurrent.ScheduledExecutorService; |
94 | | -import java.util.concurrent.ScheduledFuture; |
95 | 90 | import java.util.concurrent.atomic.AtomicBoolean; |
96 | 91 | import java.util.logging.Level; |
97 | 92 | import javax.annotation.Nullable; |
98 | 93 | import javax.naming.ConfigurationException; |
99 | 94 | import lombok.extern.java.Log; |
100 | | -import org.springframework.beans.factory.annotation.Autowired; |
101 | | -import org.springframework.boot.SpringApplication; |
102 | | -import org.springframework.boot.autoconfigure.SpringBootApplication; |
103 | | -import org.springframework.context.ApplicationContext; |
104 | | -import org.springframework.context.annotation.ComponentScan; |
105 | 95 |
|
106 | 96 | @Log |
107 | 97 | public final class Worker extends LoggingMain { |
@@ -146,7 +136,6 @@ public final class Worker extends LoggingMain { |
146 | 136 | private LoadingCache<String, Instance> workerStubs; |
147 | 137 | private AtomicBoolean released = new AtomicBoolean(true); |
148 | 138 |
|
149 | | - @Autowired private ApplicationContext springContext; |
150 | 139 | /** |
151 | 140 | * The method will prepare the worker for graceful shutdown when the worker is ready. Note on |
152 | 141 | * using stderr here instead of log. By the time this is called in PreDestroy, the log is no |
@@ -196,43 +185,6 @@ private Worker() { |
196 | 185 | super("BuildFarmShardWorker"); |
197 | 186 | } |
198 | 187 |
|
199 | | - private void exitPostPipelineFailure() { |
200 | | - // Shutdown the worker if a pipeline fails. By means of the spring lifecycle |
201 | | - // hooks - e.g. the `PreDestroy` hook here - it will attempt to gracefully |
202 | | - // spin down the pipeline |
203 | | - |
204 | | - // By calling these spring shutdown facilities; we're open to the risk that |
205 | | - // a subsystem may be hanging a criticial thread indeffinitly. Deadline the |
206 | | - // shutdown workflow to ensure we don't leave a zombie worker in this |
207 | | - // situation |
208 | | - ScheduledExecutorService shutdownDeadlineExecutor = newSingleThreadScheduledExecutor(); |
209 | | - |
210 | | - // This may be shorter than the action timeout; assume we have interrupted |
211 | | - // actions in a fatal uncaught exception. |
212 | | - int forceShutdownDeadline = 60; |
213 | | - ScheduledFuture<?> termFuture = |
214 | | - shutdownDeadlineExecutor.schedule( |
215 | | - new Runnable() { |
216 | | - public void run() { |
217 | | - log.log( |
218 | | - Level.SEVERE, |
219 | | - String.format( |
220 | | - "Force terminating due to shutdown deadline exceeded (%d seconds)", |
221 | | - forceShutdownDeadline)); |
222 | | - System.exit(1); |
223 | | - } |
224 | | - }, |
225 | | - forceShutdownDeadline, |
226 | | - SECONDS); |
227 | | - |
228 | | - // Consider defining exit codes to better afford out of band instance |
229 | | - // recovery |
230 | | - int code = SpringApplication.exit(springContext, () -> 1); |
231 | | - termFuture.cancel(false); |
232 | | - shutdownDeadlineExecutor.shutdown(); |
233 | | - System.exit(code); |
234 | | - } |
235 | | - |
236 | 188 | private Operation stripOperation(Operation operation) { |
237 | 189 | return instance.stripOperation(operation); |
238 | 190 | } |
@@ -674,22 +626,9 @@ public void start() throws ConfigurationException, InterruptedException, IOExcep |
674 | 626 | healthStatusManager.setStatus( |
675 | 627 | HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.SERVING); |
676 | 628 | PrometheusPublisher.startHttpServer(configs.getPrometheusPort()); |
| 629 | + startFailsafeRegistration(); |
677 | 630 |
|
678 | | - // An executor can also be used as storage worker as scheduler treats all new workers as storage workers |
679 | | - // TODO (Congt) Fix it upstream and revert it. |
680 | | - if (configs.getWorker().getCapabilities().isCas()) { |
681 | | - startFailsafeRegistration(); |
682 | | - } else { |
683 | | - log.log(INFO, "Skipping worker registration"); |
684 | | - } |
685 | | - |
686 | | - // Listen for pipeline unhandled exceptions |
687 | | - ExecutorService pipelineExceptionExecutor = newSingleThreadExecutor(); |
688 | | - SettableFuture<Void> pipelineExceptionFuture = SettableFuture.create(); |
689 | | - pipelineExceptionFuture.addListener(this::exitPostPipelineFailure, pipelineExceptionExecutor); |
690 | | - |
691 | | - pipeline.start(pipelineExceptionFuture); |
692 | | - |
| 631 | + pipeline.start(null); |
693 | 632 | healthCheckMetric.labels("start").inc(); |
694 | 633 | executionSlotsTotal.set(configs.getWorker().getExecuteStageWidth()); |
695 | 634 | inputFetchSlotsTotal.set(configs.getWorker().getInputFetchStageWidth()); |
|
0 commit comments