Skip to content

Commit 539beb6

Browse files
Adding a CoroutineFileObserver on which to build an inotify client and server to replace Speakeasy.
PiperOrigin-RevId: 533179252
1 parent bd0fcc3 commit 539beb6

File tree

2 files changed

+201
-0
lines changed

2 files changed

+201
-0
lines changed

services/shellexecutor/java/androidx/test/services/shellexecutor/BUILD

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,23 @@
11
load("@build_bazel_rules_android//android:rules.bzl", "android_library")
2+
load("@io_bazel_rules_kotlin//kotlin:android.bzl", "kt_android_library")
23

34
# A shell command execution server to allow shell commands to be run at elevated permissions
45

56
package(default_applicable_licenses = ["//services:license"])
67

78
licenses(["notice"])
89

10+
kt_android_library(
11+
name = "coroutine_file_observer",
12+
srcs = [
13+
"CoroutineFileObserver.kt",
14+
],
15+
visibility = ["//visibility:private"],
16+
deps = [
17+
"@maven//:org_jetbrains_kotlinx_kotlinx_coroutines_core",
18+
],
19+
)
20+
921
android_library(
1022
name = "exec_server",
1123
srcs = [
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright (C) 2023 The Android Open Source Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package androidx.test.services.shellexecutor
18+
19+
import android.os.Build
20+
import android.os.FileObserver
21+
import android.util.Log
22+
import java.io.File
23+
import java.util.concurrent.LinkedBlockingQueue
24+
import kotlinx.coroutines.channels.Channel
25+
import kotlinx.coroutines.channels.ClosedSendChannelException
26+
import kotlinx.coroutines.flow.receiveAsFlow
27+
import kotlinx.coroutines.runBlocking
28+
29+
/**
30+
* A FileObserver that is friendly with Kotlin coroutines.
31+
*
32+
* Note that the documentation on FileObserver is wrong: it doesn't see events from subdirectories.
33+
*/
34+
@Suppress("DEPRECATION") // the non-deprecated constructor needs API 29
35+
open class CoroutineFileObserver(public val watch: File) :
36+
FileObserver(watch.toString(), FileObserver.ALL_EVENTS) {
37+
private data class Event(val event: Int, val file: File)
38+
39+
private val eventChannel: EventChannel
40+
protected var logLevel: Int = 0 // by default, don't log at all; derived classes can override
41+
protected var logTag = "CoroutineFileObserver"
42+
43+
init {
44+
// On API 21 and 22, about 1% of the time, Channel code will deadlock and, thirty seconds later,
45+
// crash the application with 'art/runtime/thread_list.cc:170] Thread suspend timeout'. In that
46+
// case, we resort to Java.
47+
if (
48+
Build.VERSION.SDK_INT < Build.VERSION_CODES.LOLLIPOP ||
49+
Build.VERSION.SDK_INT > Build.VERSION_CODES.LOLLIPOP_MR1
50+
) {
51+
eventChannel = CoroutineEventChannel()
52+
} else {
53+
eventChannel = WorkaroundEventChannel()
54+
}
55+
}
56+
57+
// This runs on a special FileObserver thread provided by Android.
58+
final override fun onEvent(event: Int, path: String?) {
59+
val file =
60+
when {
61+
path == null -> watch
62+
path.startsWith("/") -> File(path)
63+
else -> File(watch, path)
64+
}
65+
eventChannel.send(Event(event, file))
66+
}
67+
68+
final fun stop(): Unit = eventChannel.stop()
69+
70+
// Events are processed in order by run(). If you do nontrivial work in one of the handlers,
71+
// launch it in another job.
72+
final suspend fun run() {
73+
startWatching()
74+
try {
75+
onWatching()
76+
eventChannel.receive { event -> handleEvent(event) }
77+
} catch (x: Exception) {
78+
Log.w(logTag, "Error while processing events", x)
79+
} finally {
80+
log("stopWatching")
81+
stopWatching()
82+
log("stoppedWatching")
83+
}
84+
}
85+
86+
private suspend fun handleEvent(event: Event) =
87+
when (event.event) {
88+
FileObserver.ACCESS -> onAccess(event.file)
89+
FileObserver.ATTRIB -> onAttrib(event.file)
90+
FileObserver.CLOSE_NOWRITE -> onCloseNoWrite(event.file)
91+
FileObserver.CLOSE_WRITE -> onCloseWrite(event.file)
92+
FileObserver.CREATE -> onCreate(event.file)
93+
FileObserver.DELETE -> onDelete(event.file)
94+
FileObserver.DELETE_SELF -> onDeleteSelf(event.file)
95+
FileObserver.MODIFY -> onModify(event.file)
96+
FileObserver.MOVED_FROM -> onMovedFrom(event.file)
97+
FileObserver.MOVED_TO -> onMovedTo(event.file)
98+
FileObserver.MOVE_SELF -> onMoveSelf(event.file)
99+
FileObserver.OPEN -> onOpen(event.file)
100+
else -> Unit
101+
}
102+
103+
protected final fun log(message: String): Unit {
104+
if (logLevel >= Log.VERBOSE) Log.println(logLevel, logTag, message)
105+
}
106+
107+
protected open suspend fun onAccess(file: File) = log("ACCESS $file")
108+
109+
protected open suspend fun onAttrib(file: File) = log("ATTRIB $file")
110+
111+
protected open suspend fun onCloseNoWrite(file: File) = log("CLOSE_NOWRITE $file")
112+
113+
protected open suspend fun onCloseWrite(file: File) = log("CLOSE_WRITE $file")
114+
115+
protected open suspend fun onCreate(file: File) = log("CREATE $file")
116+
117+
protected open suspend fun onDelete(file: File) = log("DELETE $file")
118+
119+
protected open suspend fun onDeleteSelf(file: File) = log("DELETE_SELF $file")
120+
121+
protected open suspend fun onModify(file: File) = log("MODIFY $file")
122+
123+
protected open suspend fun onMovedFrom(file: File) = log("MOVED_FROM $file")
124+
125+
protected open suspend fun onMovedTo(file: File) = log("MOVED_TO $file")
126+
127+
protected open suspend fun onMoveSelf(file: File) = log("MOVE_SELF $file")
128+
129+
protected open suspend fun onOpen(file: File) = log("OPEN $file")
130+
131+
/** Called in run() after startWatching(). Override as needed. */
132+
protected open fun onWatching() = Unit
133+
134+
private companion object {
135+
private interface EventChannel {
136+
/** Send one event. */
137+
fun send(event: Event)
138+
139+
/** Receive events until stop() is called. */
140+
suspend fun receive(handler: suspend (Event) -> Unit)
141+
142+
/** Stops the channel. */
143+
fun stop()
144+
}
145+
146+
private class CoroutineEventChannel : EventChannel {
147+
private val channel: Channel<Event> = Channel(Channel.UNLIMITED)
148+
149+
override fun send(event: Event) {
150+
if (channel.isClosedForSend) return
151+
runBlocking {
152+
try {
153+
channel.send(event)
154+
} catch (x: ClosedSendChannelException) {
155+
// Just in case the channel was closed after the previous call
156+
}
157+
}
158+
}
159+
160+
override suspend fun receive(handler: suspend (Event) -> Unit) {
161+
channel.receiveAsFlow().collect(handler)
162+
}
163+
164+
override fun stop() {
165+
channel.close()
166+
}
167+
}
168+
169+
private class WorkaroundEventChannel : EventChannel {
170+
private val queue = LinkedBlockingQueue<Event>()
171+
172+
override fun send(event: Event) {
173+
queue.put(event)
174+
}
175+
176+
override suspend fun receive(handler: suspend (Event) -> Unit) {
177+
while (true) {
178+
val event = queue.take()
179+
if (event.event < 0) return
180+
handler(event)
181+
}
182+
}
183+
184+
override fun stop() {
185+
send(Event(-1, File(".")))
186+
}
187+
}
188+
}
189+
}

0 commit comments

Comments
 (0)