Skip to content

Commit 7d97c5e

Browse files
committed
recreate pint client
1 parent b5667df commit 7d97c5e

File tree

5 files changed

+405
-8
lines changed

5 files changed

+405
-8
lines changed

src/PintClient/index.ts

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import { Port } from "../pitcher-protocol/messages/port";
2+
import { Emitter, EmitterSubscription, Event } from "../utils/event";
3+
import { SandboxSession } from "../types";
4+
import { Disposable } from "../utils/disposable";
5+
import { Client, createClient, createConfig } from "../api-clients/pint/client";
6+
import {
7+
IAgentClient,
8+
IAgentClientPorts,
9+
IAgentClientShells,
10+
IAgentClientState,
11+
IAgentClientFS,
12+
IAgentClientSetup,
13+
IAgentClientTasks,
14+
IAgentClientSystem,
15+
PickRawFsResult,
16+
} from "../agent-client-interface";
17+
import {
18+
listPorts,
19+
PortInfo,
20+
streamPortsList,
21+
} from "../api-clients/pint";
22+
23+
24+
25+
function parseStreamEvent<T>(evt: unknown): T {
26+
if (typeof evt !== "string") {
27+
return evt as T;
28+
}
29+
30+
const evtWithoutDataPrefix = evt.substring(5);
31+
32+
return JSON.parse(evtWithoutDataPrefix);
33+
}
34+
35+
class PintPortsClient implements IAgentClientPorts {
36+
private onPortsUpdatedEmitter = new EmitterSubscription<Port[]>((fire) => {
37+
const abortController = new AbortController();
38+
39+
streamPortsList({
40+
signal: abortController.signal,
41+
headers: {
42+
headers: { Accept: "text/event-stream" },
43+
},
44+
}).then(async ({ stream }) => {
45+
for await (const evt of stream) {
46+
const data = parseStreamEvent<PortInfo[]>(evt);
47+
48+
fire(
49+
data.map((pintPort) => ({
50+
port: pintPort.port,
51+
url: pintPort.address,
52+
}))
53+
);
54+
}
55+
});
56+
57+
return Disposable.create(() => {
58+
abortController.abort();
59+
});
60+
});
61+
onPortsUpdated = this.onPortsUpdatedEmitter.event;
62+
63+
constructor(private apiClient: Client, private sandboxId: string) {}
64+
65+
async getPorts(): Promise<Port[]> {
66+
const ports = await listPorts({
67+
client: this.apiClient,
68+
});
69+
70+
return (
71+
ports.data?.ports.map((port) => ({
72+
port: port.port,
73+
url: `https://${this.sandboxId}-${port.port}.csb.app`,
74+
})) ?? []
75+
);
76+
}
77+
}
78+
79+
80+
export class PintClient implements IAgentClient {
81+
static async create(session: SandboxSession) {
82+
return new PintClient(session);
83+
}
84+
85+
readonly type = "pint" as const;
86+
87+
// Since there is no websocket connection or internal hibernation, the state
88+
// will always be CONNECTED. No state change events will be triggered
89+
readonly state = "CONNECTED";
90+
private onStateChangeEmitter = new Emitter<IAgentClientState>();
91+
onStateChange = this.onStateChangeEmitter.event;
92+
93+
sandboxId: string;
94+
workspacePath: string;
95+
isUpToDate: boolean;
96+
97+
ports: IAgentClientPorts;
98+
shells: IAgentClientShells;
99+
fs: IAgentClientFS;
100+
setup: IAgentClientSetup;
101+
tasks: IAgentClientTasks;
102+
system: IAgentClientSystem;
103+
104+
constructor(session: SandboxSession) {
105+
this.sandboxId = session.sandboxId;
106+
this.workspacePath = session.workspacePath;
107+
this.isUpToDate = true;
108+
109+
const apiClient = createClient(
110+
createConfig({
111+
baseUrl: session.pitcherURL,
112+
headers: {
113+
Authorization: `Bearer ${session.pitcherToken}`,
114+
},
115+
})
116+
);
117+
118+
this.ports = new PintPortsClient(apiClient, this.sandboxId);
119+
this.shells = {} as IAgentClientShells; // Not implemented for Pint
120+
this.fs = {} as IAgentClientFS; // Not implemented for Pint
121+
this.tasks = {} as IAgentClientTasks; // Not implemented for Pint
122+
this.setup = {} as IAgentClientSetup; // Not implemented for Pint
123+
this.system = {} as IAgentClientSystem; // Not implemented for Pint
124+
}
125+
126+
ping(): void {}
127+
async reconnect(): Promise<void> {}
128+
async disconnect(): Promise<void> {}
129+
dispose(): void {}
130+
}

src/Sandbox.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,13 @@ export class Sandbox {
151151
return `export ${key}='${safe}'`;
152152
})
153153
.join("\n");
154-
commands.push(
155-
[
156-
`cat << 'EOF' > "$HOME/.private/.env"`,
157-
envStrings,
158-
`EOF`,
159-
].join("\n")
160-
);
154+
const cmd = [
155+
`mkdir -p "$HOME/.private"`,
156+
`cat << 'EOF' > "$HOME/.private/.env"`,
157+
envStrings,
158+
`EOF`,
159+
].join("\n");
160+
await client.commands.run(cmd);
161161
}
162162

163163
if (customSession.git) {
@@ -188,8 +188,12 @@ export class Sandbox {
188188
pitcherManagerResponse: PitcherManagerResponse,
189189
customSession?: SessionCreateOptions
190190
): Promise<SandboxSession> {
191+
// HACK: we currently do not get a flag for pint, but this is a check we can use for now
192+
const isPint = false;
193+
191194
if (!customSession || !customSession.id) {
192195
return {
196+
isPint: isPint,
193197
sandboxId: this.id,
194198
bootupType: this.bootupType,
195199
hostToken: customSession?.hostToken,
@@ -214,6 +218,7 @@ export class Sandbox {
214218
});
215219

216220
return {
221+
isPint: isPint,
217222
sandboxId: this.id,
218223
sessionId: customSession?.id,
219224
hostToken: customSession?.hostToken,

src/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { VMTier } from "./VMTier";
22
import { HostToken } from "./HostTokens";
3-
import { Config } from "@hey-api/client-fetch";
3+
import { Config } from "./api-clients/client/client";
44
import { Tracer } from "@opentelemetry/api";
55

66
export interface PitcherManagerResponse {
@@ -235,6 +235,7 @@ export type SandboxOpts = {
235235
};
236236

237237
export type SandboxSession = PitcherManagerResponse & {
238+
isPint: boolean;
238239
sandboxId: string;
239240
sessionId?: string;
240241
hostToken?: HostToken;

src/utils/event.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,53 @@ export class AsyncEmitter<T> implements IDisposable {
130130
this.registeredListeners = new Set();
131131
}
132132
}
133+
134+
/**
135+
* EmitterSubscription provides an abstraction that manages a subscription lifecycle
136+
* tied to the number of listeners on an emitter. The subscription is created when
137+
* the first listener is added and disposed when the last listener is removed.
138+
*/
139+
export class EmitterSubscription<T> implements IDisposable {
140+
private emitter = new Emitter<T>();
141+
private subscription: IDisposable | undefined;
142+
private listenerCount = 0;
143+
144+
constructor(
145+
private createSubscription: (fire: (value: T) => void) => IDisposable
146+
) {}
147+
148+
get event(): Event<T> {
149+
return (listener: (e: T) => void) => {
150+
// Add listener to emitter
151+
const listenerDisposable = this.emitter.event(listener);
152+
153+
// Create subscription if this is the first listener
154+
if (this.listenerCount === 0) {
155+
this.subscription = this.createSubscription((value) =>
156+
this.emitter.fire(value)
157+
);
158+
}
159+
160+
this.listenerCount++;
161+
162+
// Return disposable that removes listener and cleans up subscription if needed
163+
return Disposable.create(() => {
164+
listenerDisposable.dispose();
165+
this.listenerCount--;
166+
167+
// Dispose subscription when last listener is removed
168+
if (this.listenerCount === 0 && this.subscription) {
169+
this.subscription.dispose();
170+
this.subscription = undefined;
171+
}
172+
});
173+
};
174+
}
175+
176+
dispose(): void {
177+
this.subscription?.dispose();
178+
this.subscription = undefined;
179+
this.emitter.dispose();
180+
this.listenerCount = 0;
181+
}
182+
}

0 commit comments

Comments
 (0)