Skip to content

Commit 9a4d2a3

Browse files
KyleAMathewsclaude
andcommitted
Fix test-ui registry and stream handling
- Fix registry hook to use DurableStream writer client for JSON mode - Changed from manual Buffer append to using writer's append() method - Registry events now properly stored as individual JSON objects - Updated createRegistryHooks to accept serverUrl parameter - Added @durable-streams/writer as dependency to server package - Fix test-ui to handle JSON array responses from registry - Updated __root.tsx to parse both array and individual event formats - Remove empty placeholder on home route - Changed index route to return null instead of rendering empty div - Add redirect for missing streams - Stream route loader now redirects to home if stream doesn't exist 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 774c9b6 commit 9a4d2a3

File tree

6 files changed

+48
-39
lines changed

6 files changed

+48
-39
lines changed

packages/cli/example-server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ const server = new DurableStreamTestServer({
1515
host: `0.0.0.0`,
1616
})
1717

18+
const url = await server.start()
19+
1820
// Add hooks to maintain a __registry__ stream for observability
19-
const hooks = createRegistryHooks(server.store)
21+
const hooks = createRegistryHooks(server.store, url)
2022
;(server as any).options.onStreamCreated = hooks.onStreamCreated
2123
;(server as any).options.onStreamDeleted = hooks.onStreamDeleted
22-
23-
const url = await server.start()
2424
console.log(`✓ Durable Streams server running at ${url}`)
2525
console.log(`\nYou can now use the CLI to interact with streams:`)
2626
console.log(` export STREAM_URL=${url}`)

packages/server/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"typecheck": "tsc --noEmit"
2020
},
2121
"dependencies": {
22+
"@durable-streams/writer": "workspace:*",
2223
"@neophi/sieve-cache": "^1.0.0",
2324
"lmdb": "^3.3.0"
2425
},

packages/server/src/registry-hook.ts

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* This stream records all create/delete events for observability.
44
*/
55

6+
import { DurableStream } from "@durable-streams/writer"
67
import type { StreamLifecycleHook } from "./types"
78
import type { StreamStore } from "./store"
89
import type { FileBackedStreamStore } from "./file-store"
@@ -14,14 +15,20 @@ const REGISTRY_PATH = `/v1/stream/__registry__`
1415
* Any client can read this stream to discover all streams and their lifecycle events.
1516
*/
1617
export function createRegistryHooks(
17-
store: StreamStore | FileBackedStreamStore
18+
store: StreamStore | FileBackedStreamStore,
19+
serverUrl: string
1820
): {
1921
onStreamCreated: StreamLifecycleHook
2022
onStreamDeleted: StreamLifecycleHook
2123
} {
22-
const ensureRegistryExists = () => {
24+
const registryStream = new DurableStream({
25+
url: `${serverUrl}${REGISTRY_PATH}`,
26+
})
27+
28+
const ensureRegistryExists = async () => {
2329
if (!store.has(REGISTRY_PATH)) {
24-
store.create(REGISTRY_PATH, {
30+
await DurableStream.create({
31+
url: `${serverUrl}${REGISTRY_PATH}`,
2532
contentType: `application/json`,
2633
})
2734
}
@@ -35,36 +42,28 @@ export function createRegistryHooks(
3542

3643
return {
3744
onStreamCreated: async (event) => {
38-
ensureRegistryExists()
45+
await ensureRegistryExists()
3946

4047
const streamName = extractStreamName(event.path)
4148

42-
const record = JSON.stringify({
49+
await registryStream.append({
4350
type: event.type,
4451
path: streamName,
4552
contentType: event.contentType,
4653
timestamp: event.timestamp,
4754
})
48-
49-
await Promise.resolve(
50-
store.append(REGISTRY_PATH, Buffer.from(record + `\n`))
51-
)
5255
},
5356

5457
onStreamDeleted: async (event) => {
55-
ensureRegistryExists()
58+
await ensureRegistryExists()
5659

5760
const streamName = extractStreamName(event.path)
5861

59-
const record = JSON.stringify({
62+
await registryStream.append({
6063
type: event.type,
6164
path: streamName,
6265
timestamp: event.timestamp,
6366
})
64-
65-
await Promise.resolve(
66-
store.append(REGISTRY_PATH, Buffer.from(record + `\n`))
67-
)
6867
},
6968
}
7069
}

packages/test-ui/src/routes/__root.tsx

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,24 @@ function RootLayout() {
4949

5050
for (const line of lines) {
5151
try {
52-
const event = JSON.parse(line)
53-
console.log(`Registry event:`, event)
54-
if (event.type === `created`) {
55-
loadedStreams.push({
56-
path: event.path,
57-
contentType: event.contentType,
58-
})
59-
} else if (event.type === `deleted`) {
60-
const index = loadedStreams.findIndex(
61-
(s) => s.path === event.path
62-
)
63-
if (index !== -1) {
64-
loadedStreams.splice(index, 1)
52+
const parsed = JSON.parse(line)
53+
console.log(`Registry event:`, parsed)
54+
55+
const events = Array.isArray(parsed) ? parsed : [parsed]
56+
57+
for (const event of events) {
58+
if (event.type === `created`) {
59+
loadedStreams.push({
60+
path: event.path,
61+
contentType: event.contentType,
62+
})
63+
} else if (event.type === `deleted`) {
64+
const index = loadedStreams.findIndex(
65+
(s) => s.path === event.path
66+
)
67+
if (index !== -1) {
68+
loadedStreams.splice(index, 1)
69+
}
6570
}
6671
}
6772
} catch (e) {

packages/test-ui/src/routes/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ export const Route = createFileRoute(`/`)({
55
})
66

77
function Index() {
8-
return <div className="placeholder" />
8+
return null
99
}

packages/test-ui/src/routes/stream.$streamPath.tsx

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
1-
import { createFileRoute } from "@tanstack/react-router"
1+
import { createFileRoute, redirect } from "@tanstack/react-router"
22
import { useEffect, useRef, useState } from "react"
33
import { DurableStream } from "@durable-streams/writer"
44

55
const SERVER_URL = `http://${typeof window !== `undefined` ? window.location.hostname : `localhost`}:8787`
66

77
export const Route = createFileRoute(`/stream/$streamPath`)({
88
loader: async ({ params }) => {
9-
const stream = new DurableStream({
10-
url: `${SERVER_URL}/v1/stream/${params.streamPath}`,
11-
})
12-
const metadata = await stream.head()
13-
return {
14-
contentType: metadata.contentType || null,
9+
try {
10+
const stream = new DurableStream({
11+
url: `${SERVER_URL}/v1/stream/${params.streamPath}`,
12+
})
13+
const metadata = await stream.head()
14+
return {
15+
contentType: metadata.contentType || null,
16+
}
17+
} catch {
18+
throw redirect({ to: `/` })
1519
}
1620
},
1721
component: StreamViewer,

0 commit comments

Comments
 (0)