Skip to content

Commit 00f2c7c

Browse files
Adding a ShellCommandLocalSocketClient for the ShellExecutor to talk to the ShellMain.
PiperOrigin-RevId: 689248060
1 parent 02ce11c commit 00f2c7c

File tree

7 files changed

+539
-0
lines changed

7 files changed

+539
-0
lines changed

services/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111

1212
**New Features**
1313

14+
* Adding a LocalSocket-based protocol for the ShellExecutor to talk to the
15+
ShellMain. This obsoletes SpeakEasy; if androidx.test.services is killed
16+
(e.g. by the low memory killer) between the start of the app_process that
17+
invokes LocalSocketShellMain and the start of the test, the test is still able
18+
to talk to LocalSocketShellMain.
19+
1420
**Breaking Changes**
1521

1622
**API Changes**

services/shellexecutor/java/androidx/test/services/shellexecutor/BUILD

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,32 @@ kt_android_library(
2929
],
3030
)
3131

32+
proto_library(
33+
name = "local_socket_protocol_pb",
34+
srcs = ["local_socket_protocol.proto"],
35+
)
36+
37+
java_lite_proto_library(
38+
name = "local_socket_protocol_pb_java_proto_lite",
39+
visibility = [
40+
"//services/shellexecutor/javatests/androidx/test/services/shellexecutor:__subpackages__",
41+
],
42+
deps = [":local_socket_protocol_pb"],
43+
)
44+
45+
kt_android_library(
46+
name = "local_socket_protocol",
47+
srcs = ["LocalSocketProtocol.kt"],
48+
visibility = [
49+
"//services/shellexecutor/javatests/androidx/test/services/shellexecutor:__subpackages__",
50+
],
51+
deps = [
52+
":local_socket_protocol_pb_java_proto_lite",
53+
"@com_google_protobuf//:protobuf_javalite",
54+
"@maven//:org_jetbrains_kotlinx_kotlinx_coroutines_core",
55+
],
56+
)
57+
3258
kt_android_library(
3359
name = "exec_server",
3460
srcs = [
@@ -62,6 +88,7 @@ kt_android_library(
6288
"ShellCommand.java",
6389
"ShellCommandClient.java",
6490
"ShellCommandFileObserverClient.kt",
91+
"ShellCommandLocalSocketClient.kt",
6592
"ShellExecSharedConstants.java",
6693
"ShellExecutor.java",
6794
"ShellExecutorFactory.java",
@@ -73,6 +100,8 @@ kt_android_library(
73100
deps = [
74101
":coroutine_file_observer",
75102
":file_observer_protocol",
103+
":local_socket_protocol",
104+
":local_socket_protocol_pb_java_proto_lite",
76105
"//services/speakeasy/java/androidx/test/services/speakeasy:protocol",
77106
"//services/speakeasy/java/androidx/test/services/speakeasy/client",
78107
"//services/speakeasy/java/androidx/test/services/speakeasy/client:tool_connection",
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright (C) 2024 The Android Open Source Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package androidx.test.services.shellexecutor
18+
19+
import android.net.LocalSocket
20+
import android.net.LocalSocketAddress
21+
import android.util.Log
22+
import androidx.test.services.shellexecutor.LocalSocketProtocolProto.RunCommandRequest
23+
import androidx.test.services.shellexecutor.LocalSocketProtocolProto.RunCommandResponse
24+
import com.google.protobuf.ByteString
25+
import java.io.IOException
26+
import java.net.URLDecoder
27+
import java.net.URLEncoder
28+
import kotlin.time.Duration
29+
30+
/**
31+
* Protocol for ShellCommandLocalSocketClient to talk to ShellCommandLocalSocketExecutorServer.
32+
*
33+
* Since androidx.test.services already includes the protobuf runtime, we aren't paying much extra
34+
* for adding some more protos to ship back and forth, which is vastly easier to deal with than
35+
* PersistableBundles (which don't even support ByteArray types).
36+
*
37+
* A conversation consists of a single RunCommandRequest from the client followed by a stream of
38+
* RunCommandResponses from the server; the final response has an exit code.
39+
*/
40+
object LocalSocketProtocol {
41+
/**
42+
* Composes a RunCommandRequest and sends it over the LocalSocket.
43+
*
44+
* @param secret The secret to authenticate the request.
45+
* @param argv The argv of the command line to run.
46+
* @param env The environment variables to provide to the process.
47+
* @param timeout The timeout for the command; infinite or nonpositive values mean no timeout.
48+
*/
49+
fun LocalSocket.sendRequest(
50+
secret: String,
51+
argv: List<String>,
52+
env: Map<String, String>? = null,
53+
timeout: Duration,
54+
) {
55+
val builder = RunCommandRequest.newBuilder().setSecret(secret).addAllArgv(argv)
56+
env?.forEach { (k, v) -> builder.putEnvironment(k, v) }
57+
if (timeout.isInfinite() || timeout.isNegative() || timeout == Duration.ZERO) {
58+
builder.setTimeoutMs(0) // <= 0 means no timeout
59+
} else {
60+
builder.setTimeoutMs(timeout.inWholeMilliseconds)
61+
}
62+
builder.build().writeDelimitedTo(outputStream)
63+
}
64+
65+
/** Reads a RunCommandRequest from the LocalSocket. */
66+
fun LocalSocket.readRequest(): RunCommandRequest {
67+
return RunCommandRequest.parseDelimitedFrom(inputStream)!!
68+
}
69+
70+
/** Composes a RunCommandResponse and sends it over the LocalSocket. */
71+
fun LocalSocket.sendResponse(
72+
buffer: ByteArray? = null,
73+
size: Int = 0,
74+
exitCode: Int? = null,
75+
): Boolean {
76+
val builder = RunCommandResponse.newBuilder()
77+
buffer?.let {
78+
val bufferSize = if (size > 0) size else it.size
79+
builder.buffer = ByteString.copyFrom(it, 0, bufferSize)
80+
}
81+
// Since we're currently stuck on a version of protobuf where we don't have hasExitCode(), we
82+
// use a magic value to indicate that exitCode is not set. When we upgrade to a newer version
83+
// of protobuf, we can obsolete this.
84+
if (exitCode != null) {
85+
builder.exitCode = exitCode
86+
} else {
87+
builder.exitCode = HAS_NOT_EXITED
88+
}
89+
90+
try {
91+
builder.build().writeDelimitedTo(outputStream)
92+
} catch (x: IOException) {
93+
// Sadly, the only way to discover that the client cut the connection is an exception that
94+
// can only be distinguished by its text.
95+
if (x.message.equals("Broken pipe")) {
96+
Log.i(TAG, "LocalSocket stream closed early")
97+
} else {
98+
Log.w(TAG, "LocalSocket write failed", x)
99+
}
100+
return false
101+
}
102+
return true
103+
}
104+
105+
/** Reads a RunCommandResponse from the LocalSocket. */
106+
fun LocalSocket.readResponse(): RunCommandResponse? {
107+
return RunCommandResponse.parseDelimitedFrom(inputStream)
108+
}
109+
110+
/**
111+
* Is this the end of the stream?
112+
*
113+
* Once we upgrade to a newer version of protobuf, we can switch to hasExitCode().
114+
*/
115+
fun RunCommandResponse.hasExited() = exitCode != HAS_NOT_EXITED
116+
117+
/**
118+
* Builds a "binder key", given the server address and secret. (We are not actually using a Binder
119+
* here, but the ShellExecutor interface calls the secret for connecting client to server a
120+
* "binder key", so we stick with that nomenclature.) Binder keys should be opaque outside
121+
* this directory.
122+
*
123+
* The address can contain spaces, and since it gets passed through a command line, we need to
124+
* encode it so it doesn't get split by argv. java.net.URLEncoder is conveniently available on all
125+
* SDK versions.
126+
*/
127+
@JvmStatic
128+
fun LocalSocketAddress.asBinderKey(secret: String) = buildString {
129+
append(":")
130+
append(URLEncoder.encode(name, "UTF-8")) // Will convert any : to %3A
131+
append(":")
132+
append(URLEncoder.encode(secret, "UTF-8"))
133+
append(":")
134+
}
135+
136+
/** Extracts the address from a binder key. */
137+
@JvmStatic
138+
fun addressFromBinderKey(binderKey: String) =
139+
LocalSocketAddress(URLDecoder.decode(binderKey.split(":")[1], "UTF-8"))
140+
141+
/** Extracts the secret from a binder key. */
142+
@JvmStatic
143+
fun secretFromBinderKey(binderKey: String) = URLDecoder.decode(binderKey.split(":")[2], "UTF-8")
144+
145+
/** Is this a valid binder key? */
146+
@JvmStatic
147+
fun isBinderKey(maybeKey: String) =
148+
maybeKey.startsWith(':') && maybeKey.endsWith(':') && maybeKey.split(":").size == 4
149+
150+
const val TAG = "LocalSocketProtocol"
151+
private const val HAS_NOT_EXITED = 0xCA7F00D
152+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright (C) 2024 The Android Open Source Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package androidx.test.services.shellexecutor
18+
19+
import android.net.LocalSocket
20+
import android.net.LocalSocketAddress
21+
import android.os.Build
22+
import android.util.Log
23+
import androidx.test.services.shellexecutor.LocalSocketProtocol.addressFromBinderKey
24+
import androidx.test.services.shellexecutor.LocalSocketProtocol.hasExited
25+
import androidx.test.services.shellexecutor.LocalSocketProtocol.readResponse
26+
import androidx.test.services.shellexecutor.LocalSocketProtocol.secretFromBinderKey
27+
import androidx.test.services.shellexecutor.LocalSocketProtocol.sendRequest
28+
import java.io.IOException
29+
import java.io.InputStream
30+
import java.io.PipedInputStream
31+
import java.io.PipedOutputStream
32+
import java.util.concurrent.Executors
33+
import kotlin.time.Duration
34+
import kotlin.time.Duration.Companion.milliseconds
35+
import kotlin.time.measureTime
36+
import kotlin.time.toKotlinDuration
37+
import kotlinx.coroutines.CoroutineScope
38+
import kotlinx.coroutines.asCoroutineDispatcher
39+
import kotlinx.coroutines.delay
40+
import kotlinx.coroutines.launch
41+
import kotlinx.coroutines.runBlocking
42+
import kotlinx.coroutines.runInterruptible
43+
import kotlinx.coroutines.withTimeout
44+
45+
/**
46+
* Client that sends requests to the ShellCommandLocalSocketExecutorServer.
47+
*
48+
* This client is designed to be callable from Java.
49+
*/
50+
class ShellCommandLocalSocketClient(binderKey: String) {
51+
private val address: LocalSocketAddress = addressFromBinderKey(binderKey)
52+
private val secret: String = secretFromBinderKey(binderKey)
53+
private lateinit var socket: LocalSocket
54+
55+
/**
56+
* Composes a request and sends it to the server, and streams the resulting output.
57+
* @param command The command to run.
58+
* @param parameters The parameters to the command. command + parameters = argv
59+
* @param shellEnv The environment variables to provide to the process.
60+
* @param executeThroughShell Whether to execute the command through a shell, making the argv
61+
* "sh" "-c" "command parameters".
62+
* @param timeout The timeout for the command; infinite or nonpositive values mean no timeout.
63+
* @return An InputStream that can be used to read the output of the command.
64+
*/
65+
@kotlin.time.ExperimentalTime
66+
fun request(
67+
command: String?,
68+
parameters: List<String>?,
69+
shellEnv: Map<String, String>?,
70+
executeThroughShell: Boolean,
71+
timeout: Duration,
72+
): InputStream {
73+
if (command == null || command.isEmpty()) {
74+
throw IllegalArgumentException("Null or empty command")
75+
}
76+
77+
lateinit var result: InputStream
78+
79+
// The call to runBlocking causes Android to emit "art: Note: end time exceeds epoch:". This is
80+
// in InitTimeSpec in runtime/utils.cc. I don't see a way to invoke it in such a way that it
81+
// doesn't clutter the logcat.
82+
runBlocking(scope.coroutineContext) {
83+
withTimeout(timeout) {
84+
runInterruptible {
85+
socket = LocalSocket(LocalSocket.SOCKET_STREAM)
86+
// While there *is* a timeout option on connect(), in the Android source, it throws
87+
// UnsupportedOperationException! So we leave the timeout up to withTimeout +
88+
// runInterruptible. Capture the time taken to connect so we can subtract it from the
89+
// overall timeout. (Calling socket.setSoTimeout() before connect() throws IOException
90+
// "socket not created".)
91+
val connectTime = measureTime { socket.connect(address) }
92+
93+
val argv = mutableListOf<String>()
94+
if (executeThroughShell) {
95+
argv.addAll(listOf("sh", "-c"))
96+
argv.add((listOf(command) + (parameters ?: emptyList())).joinToString(" "))
97+
} else {
98+
argv.add(command)
99+
parameters?.let { argv.addAll(it) }
100+
}
101+
102+
socket.sendRequest(secret, argv, shellEnv, timeout - connectTime)
103+
socket.shutdownOutput()
104+
105+
// We read responses off the socket, write buffers to the pipe, and close the pipe when we
106+
// get an exit code. The existing ShellExecutor API doesn't provide for *returning* that
107+
// exit code, but it's useful as a way to know when to close the stream. By using the pipe
108+
// as an intermediary, we can respond to exceptions sensibly.
109+
val upstream = PipedOutputStream()
110+
val downstream = PipedInputStream(upstream)
111+
112+
scope.launch {
113+
try {
114+
socket.inputStream.use {
115+
while (true) {
116+
val response = socket.readResponse()
117+
if (response == null) break // EOF
118+
if (response.buffer.size() > 0) response.buffer.writeTo(upstream)
119+
if (response.hasExited()) {
120+
Log.i(TAG, "Process ${argv[0]} exited with code ${response.exitCode}")
121+
break
122+
}
123+
}
124+
}
125+
} catch (x: IOException) {
126+
if (x.isPipeClosed()) {
127+
Log.i(TAG, "LocalSocket relay for ${argv[0]} closed early")
128+
} else {
129+
Log.w(TAG, "LocalSocket relay for ${argv[0]} failed", x)
130+
}
131+
} finally {
132+
upstream.flush()
133+
upstream.close()
134+
}
135+
}
136+
137+
result = downstream
138+
}
139+
}
140+
}
141+
return result
142+
}
143+
144+
/** Java-friendly wrapper for the above. */
145+
@kotlin.time.ExperimentalTime
146+
fun request(
147+
command: String?,
148+
parameters: List<String>?,
149+
shellEnv: Map<String, String>?,
150+
executeThroughShell: Boolean,
151+
timeout: java.time.Duration,
152+
): InputStream =
153+
request(command, parameters, shellEnv, executeThroughShell, timeout.toKotlinDuration())
154+
155+
private companion object {
156+
private const val TAG = "SCLSClient" // up to 23 characters
157+
158+
// Keep this around for all clients; if you create a new one with every object, you can wind up
159+
// running out of threads.
160+
private val scope = CoroutineScope(Executors.newCachedThreadPool().asCoroutineDispatcher())
161+
}
162+
}
163+
164+
// Sadly, the only way to distinguish the downstream pipe being closed is the text
165+
// of the exception thrown when you try to write to it. Which varies by API level.
166+
private fun IOException.isPipeClosed() =
167+
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
168+
message.equals("Pipe closed")
169+
} else {
170+
message.equals("Pipe is closed")
171+
}

0 commit comments

Comments
 (0)