Skip to content

Commit 067e4f4

Browse files
committed
Move to the official Bazel worker API
1 parent b2bbb8b commit 067e4f4

File tree

9 files changed

+33
-449
lines changed

9 files changed

+33
-449
lines changed

MODULE.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ bazel_dep(name = "rules_java", version = "8.9.0")
1515
bazel_dep(name = "rules_python", version = "0.23.1")
1616
bazel_dep(name = "rules_android", version = "0.6.1")
1717
bazel_dep(name = "bazel_features", version = "1.25.0")
18+
bazel_dep(name = "bazel_worker_api", version = "0.0.4")
19+
bazel_dep(name = "bazel_worker_java", version = "0.0.4")
1820

1921
rules_java_toolchains = use_extension("@rules_java//java:extensions.bzl", "toolchains")
2022
use_repo(rules_java_toolchains, "remote_java_tools")

src/main/kotlin/io/bazel/worker/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ kt_bootstrap_library(
2121
],
2222
deps = [
2323
":worker_proto",
24+
"@bazel_worker_java//src/main/java/com/google/devtools/build/lib/worker:work_request_handlers",
2425
],
2526
)

src/main/kotlin/io/bazel/worker/CpuTimeBasedGcScheduler.kt

Lines changed: 0 additions & 42 deletions
This file was deleted.

src/main/kotlin/io/bazel/worker/GcScheduler.kt

Lines changed: 0 additions & 6 deletions
This file was deleted.

src/main/kotlin/io/bazel/worker/IO.kt

Lines changed: 0 additions & 91 deletions
This file was deleted.

src/main/kotlin/io/bazel/worker/PersistentWorker.kt

Lines changed: 30 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -17,111 +17,51 @@
1717

1818
package io.bazel.worker
1919

20+
import com.google.devtools.build.lib.worker.ProtoWorkerMessageProcessor
21+
import com.google.devtools.build.lib.worker.WorkRequestHandler.WorkRequestCallback
22+
import com.google.devtools.build.lib.worker.WorkRequestHandler.WorkRequestHandlerBuilder
2023
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest
21-
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse
22-
import src.main.kotlin.io.bazel.worker.GcScheduler
23-
import java.io.InputStream
24+
import java.io.IOException
25+
import java.io.PrintWriter
2426
import java.time.Duration
25-
import java.util.concurrent.ExecutorCompletionService
26-
import java.util.concurrent.ExecutorService
27-
import java.util.concurrent.Executors
28-
import java.util.concurrent.TimeUnit
29-
import java.util.concurrent.atomic.AtomicLong
3027

3128
/**
3229
* PersistentWorker satisfies Bazel persistent worker protocol for executing work.
3330
*
3431
* Supports multiplex (https://docs.bazel.build/versions/master/multiplex-worker.html) provided
3532
* the work is thread/coroutine safe.
36-
*
37-
* @param executor thread pool for executing tasks.
38-
* @param captureIO to avoid writing stdout and stderr while executing.
39-
* @param cpuTimeBasedGcScheduler to trigger gc cleanup.
4033
*/
41-
class PersistentWorker(
42-
private val captureIO: () -> IO,
43-
private val executor: ExecutorService,
44-
private val cpuTimeBasedGcScheduler: GcScheduler,
45-
) : Worker {
46-
constructor(
47-
executor: ExecutorService,
48-
captureIO: () -> IO,
49-
) : this(
50-
captureIO,
51-
executor,
52-
GcScheduler {},
53-
)
54-
55-
constructor() : this(
56-
IO.Companion::capture,
57-
Executors.newCachedThreadPool(),
58-
CpuTimeBasedGcScheduler(Duration.ofSeconds(10)),
59-
)
60-
61-
override fun start(execute: Work) =
62-
WorkerContext.run {
63-
captureIO().use { io ->
64-
val running = AtomicLong(0)
65-
val completion = ExecutorCompletionService<WorkResponse>(executor)
66-
val producer =
67-
executor.submit {
68-
io.input.readRequestAnd { request ->
69-
running.incrementAndGet()
70-
completion.submit {
71-
doTask(
72-
name = "request ${request.requestId}",
73-
task = request.workTo(execute),
74-
).asResponseTo(request.requestId, io)
75-
}
76-
}
77-
}
78-
val consumer =
79-
executor.submit {
80-
while (!producer.isDone || running.get() > 0) {
81-
// poll time is how long before checking producer liveliness. Too long, worker hangs
82-
// when being shutdown -- too short, and it starves the process.
83-
completion.poll(1, TimeUnit.SECONDS)?.run {
84-
running.decrementAndGet()
85-
get().writeDelimitedTo(io.output)
86-
io.output.flush()
87-
}
88-
cpuTimeBasedGcScheduler.maybePerformGc()
89-
}
90-
}
91-
producer.get()
92-
consumer.get()
93-
io.output.close()
34+
class PersistentWorker : Worker {
35+
override fun start(execute: Work): Int {
36+
return WorkerContext.run {
37+
val realStdErr = System.err
38+
try {
39+
val workerHandler =
40+
WorkRequestHandlerBuilder(
41+
WorkRequestCallback { request: WorkRequest, pw: PrintWriter ->
42+
return@WorkRequestCallback doTask(
43+
name = "request ${request.requestId}",
44+
task = request.workTo(execute),
45+
).asResponse(pw)
46+
},
47+
realStdErr,
48+
ProtoWorkerMessageProcessor(System.`in`, System.out),
49+
).setCpuUsageBeforeGc(Duration.ofSeconds(10)).build()
50+
workerHandler.processRequests()
51+
} catch (e: IOException) {
52+
this.error(e, { "Unknown IO exception" })
53+
e.printStackTrace(realStdErr)
54+
return@run 1
9455
}
9556
return@run 0
9657
}
58+
}
9759

9860
private fun WorkRequest.workTo(execute: Work): (sub: WorkerContext.TaskContext) -> Status =
9961
{ ctx -> execute(ctx, argumentsList.toList()) }
10062

101-
private fun InputStream.readRequestAnd(action: (WorkRequest) -> Unit) {
102-
while (true) {
103-
WorkRequest
104-
.parseDelimitedFrom(this)
105-
?.run(action)
106-
?: return
107-
}
63+
private fun TaskResult.asResponse(pw: PrintWriter): Int {
64+
pw.print(log.out.toString())
65+
return status.exit
10866
}
109-
110-
private fun TaskResult.asResponseTo(
111-
id: Int,
112-
io: IO,
113-
): WorkResponse =
114-
WorkResponse
115-
.newBuilder()
116-
.apply {
117-
val cap = io.readCapturedAsUtf8String()
118-
// append whatever falls through standard out.
119-
output =
120-
listOf(
121-
log.out.toString(),
122-
cap,
123-
).joinToString("\n").trim()
124-
exitCode = status.exit
125-
requestId = id
126-
}.build()
12767
}

src/test/kotlin/io/bazel/worker/BUILD.bazel

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
load("//kotlin:jvm.bzl", "kt_jvm_library", "kt_jvm_test")
22

3-
kt_jvm_test(
4-
name = "IOTest",
5-
srcs = [
6-
"IOTest.kt",
7-
],
8-
test_class = "io.bazel.worker.IOTest",
9-
deps = [
10-
"//src/main/kotlin/io/bazel/worker",
11-
"@kotlin_rules_maven//:com_google_truth_truth",
12-
],
13-
)
14-
153
kt_jvm_test(
164
name = "WorkerContextTest",
175
srcs = [
@@ -58,29 +46,11 @@ kt_jvm_test(
5846
],
5947
)
6048

61-
kt_jvm_test(
62-
name = "JavaPersistentWorkerTest",
63-
srcs = [
64-
"JavaPersistentWorkerTest.kt",
65-
],
66-
test_class = "io.bazel.worker.JavaPersistentWorkerTest",
67-
deps = [
68-
":WorkerEnvironment",
69-
"//src/main/kotlin/io/bazel/worker",
70-
"//src/main/protobuf:worker_protocol_java_proto",
71-
"@kotlin_rules_maven//:com_google_truth_truth",
72-
],
73-
)
74-
7549
test_suite(
7650
name = "worker_tests",
7751
tests = [
78-
":IOTest",
7952
":InvocationWorkerTest",
80-
":JavaPersistentWorkerTest",
8153
":WorkerContextTest",
82-
# TODO(restingbull): Re-enable when not flaky.
83-
#":WorkerEnvironmentTest",
8454
],
8555
)
8656

0 commit comments

Comments
 (0)