|
1 | 1 | package com.rules.android.lint.worker |
2 | 2 |
|
3 | | -import io.reactivex.rxjava3.core.BackpressureStrategy |
4 | | -import io.reactivex.rxjava3.core.Flowable |
5 | | -import io.reactivex.rxjava3.core.Scheduler |
6 | | -import io.reactivex.rxjava3.schedulers.Schedulers |
7 | | -import java.io.BufferedOutputStream |
8 | | -import java.io.ByteArrayOutputStream |
| 3 | +import com.google.devtools.build.lib.worker.ProtoWorkerMessageProcessor |
| 4 | +import com.google.devtools.build.lib.worker.WorkRequestHandler |
9 | 5 | import java.io.IOException |
10 | 6 | import java.io.PrintStream |
| 7 | +import java.time.Duration |
11 | 8 |
|
12 | 9 | internal class PersistentWorker( |
13 | | - /** |
14 | | - * WorkerIO instance wrapping the standard output streams |
15 | | - */ |
16 | | - private val workerIO: WorkerIO, |
17 | | - /** |
18 | | - * Rxjava Scheduler to execute work requests on. |
19 | | - */ |
20 | | - private val scheduler: Scheduler, |
21 | | - /** |
22 | | - * Instance of CpuTimeBasedGcScheduler that will run periodically |
23 | | - */ |
24 | | - private val persistentWorkerCpuTimeBasedGcScheduler: PersistentWorkerCpuTimeBasedGcScheduler, |
25 | | - /** |
26 | | - * Instance of CpuTimeBasedGcScheduler that will run periodically |
27 | | - */ |
28 | | - private val workRequestProcessor: Worker.WorkerMessageProcessor, |
29 | | - /** |
30 | | - * Instance of CpuTimeBasedGcScheduler that will run periodically |
31 | | - */ |
32 | 10 | private val workerWorkRequestCallback: Worker.WorkRequestCallback, |
33 | 11 | ) : Worker { |
34 | | - constructor( |
35 | | - workerMessageProcessor: Worker.WorkRequestCallback, |
36 | | - ) : this( |
37 | | - workerIO = WorkerIO(), |
38 | | - scheduler = Schedulers.io(), |
39 | | - persistentWorkerCpuTimeBasedGcScheduler = PersistentWorkerCpuTimeBasedGcScheduler(), |
40 | | - workRequestProcessor = |
41 | | - WorkerJsonMessageProcessor( |
42 | | - System.`in`, |
43 | | - System.out, |
44 | | - ), |
45 | | - workerWorkRequestCallback = workerMessageProcessor, |
46 | | - ) |
47 | | - |
48 | | - /** |
49 | | - * Initiate the worker and begin processing work requests |
50 | | - */ |
51 | 12 | override fun processRequests(): Int { |
52 | | - return workerIO.use { io -> |
53 | | - // Start by redirecting the system streams so that nothing |
54 | | - // corrupts the streams that the worker uses |
55 | | - io.redirectSystemStreams() |
| 13 | + val realStdErr: PrintStream = System.err |
56 | 14 |
|
57 | | - // Process requests as they come in using RxJava |
58 | | - Flowable |
59 | | - .create( |
60 | | - { emitter -> |
61 | | - while (!emitter.isCancelled) { |
62 | | - try { |
63 | | - val request: WorkRequest = workRequestProcessor.readWorkRequest() |
64 | | - emitter.onNext(request) |
65 | | - } catch (e: IOException) { |
66 | | - emitter.onError(e) |
67 | | - } |
68 | | - } |
69 | | - }, |
70 | | - BackpressureStrategy.BUFFER, |
71 | | - ).subscribeOn(scheduler) |
72 | | - .parallel() |
73 | | - .runOn(scheduler) |
74 | | - // Execute the work and map the result to a work response |
75 | | - .map { request -> return@map this.respondToRequest(request) } |
76 | | - // Run the garbage collector periodically so that we are a good responsible worker |
77 | | - .doOnNext { persistentWorkerCpuTimeBasedGcScheduler.maybePerformGc() } |
78 | | - .doOnError { it.printStackTrace() } |
79 | | - .sequential() |
80 | | - .observeOn(scheduler) |
81 | | - .blockingSubscribe { response -> |
82 | | - workRequestProcessor.writeWorkResponse(response) |
83 | | - } |
84 | | - return@use 0 |
| 15 | + try { |
| 16 | + val workerHandler: WorkRequestHandler = |
| 17 | + WorkRequestHandler |
| 18 | + .WorkRequestHandlerBuilder( |
| 19 | + WorkRequestHandler.WorkRequestCallback { request, pw -> |
| 20 | + return@WorkRequestCallback workerWorkRequestCallback.processWorkRequest( |
| 21 | + request.argumentsList.toList(), |
| 22 | + System.err, |
| 23 | + ) |
| 24 | + }, |
| 25 | + realStdErr, |
| 26 | + ProtoWorkerMessageProcessor(System.`in`, System.out), |
| 27 | + ).setCpuUsageBeforeGc(Duration.ofSeconds(10)) |
| 28 | + .build() |
| 29 | + workerHandler.processRequests() |
| 30 | + } catch (e: IOException) { |
| 31 | + e.printStackTrace(realStdErr) |
| 32 | + return 1 |
85 | 33 | } |
86 | | - } |
87 | 34 |
|
88 | | - private fun respondToRequest(request: WorkRequest): WorkResponse { |
89 | | - ByteArrayOutputStream().use { baos -> |
90 | | - // Create a print stream that the execution can write logs to |
91 | | - val printStream = PrintStream(BufferedOutputStream(ByteArrayOutputStream())) |
92 | | - var exitCode: Int |
93 | | - try { |
94 | | - // Sanity check the work request arguments |
95 | | - val arguments = |
96 | | - requireNotNull(request.arguments) { |
97 | | - "Request with id ${request.requestId} " + |
98 | | - "does not have arguments!" |
99 | | - } |
100 | | - require(arguments.isNotEmpty()) { |
101 | | - "Request with id ${request.requestId} " + |
102 | | - "does not have arguments!" |
103 | | - } |
104 | | - exitCode = workerWorkRequestCallback.processWorkRequest(arguments, printStream) |
105 | | - } catch (e: Exception) { |
106 | | - e.printStackTrace(printStream) |
107 | | - exitCode = 1 |
108 | | - } finally { |
109 | | - printStream.flush() |
110 | | - } |
111 | | - |
112 | | - val output = |
113 | | - arrayOf(baos.toString()) |
114 | | - .asSequence() |
115 | | - .map { it.trim() } |
116 | | - .filter { it.isNotEmpty() } |
117 | | - .joinToString("\n") |
118 | | - return WorkResponse( |
119 | | - exitCode = exitCode, |
120 | | - output = output, |
121 | | - requestId = request.requestId, |
122 | | - ) |
123 | | - } |
| 35 | + return 0 |
124 | 36 | } |
125 | 37 | } |
0 commit comments