Skip to content

Commit c5ec6ac

Browse files
authored
Merge pull request durable-streams#22 from durable-streams/test1
Fix test-ui registry and stream handling
2 parents cc3b6d8 + b0f87d5 commit c5ec6ac

File tree

9 files changed

+97
-88
lines changed

9 files changed

+97
-88
lines changed

.claude/settings.local.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
"Bash(pnpm --dir /Users/kylemathews/programs/ds-worktrees/conformance-tests/packages/test-ui add @durable-streams/writer @durable-streams/client)",
2828
"Bash(pnpm --filter @durable-streams/test-ui build:*)",
2929
"Bash(git add:*)",
30-
"Bash(git commit:*)"
30+
"Bash(git commit:*)",
31+
"Bash(pnpm --filter @durable-streams/writer build:*)"
3132
],
3233
"deny": [],
3334
"ask": []

packages/cli/example-server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ const server = new DurableStreamTestServer({
1515
host: `0.0.0.0`,
1616
})
1717

18+
const url = await server.start()
19+
1820
// Add hooks to maintain a __registry__ stream for observability
19-
const hooks = createRegistryHooks(server.store)
21+
const hooks = createRegistryHooks(server.store, url)
2022
;(server as any).options.onStreamCreated = hooks.onStreamCreated
2123
;(server as any).options.onStreamDeleted = hooks.onStreamDeleted
22-
23-
const url = await server.start()
2424
console.log(`✓ Durable Streams server running at ${url}`)
2525
console.log(`\nYou can now use the CLI to interact with streams:`)
2626
console.log(` export STREAM_URL=${url}`)

packages/server/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"typecheck": "tsc --noEmit"
2020
},
2121
"dependencies": {
22+
"@durable-streams/writer": "workspace:*",
2223
"@neophi/sieve-cache": "^1.0.0",
2324
"lmdb": "^3.3.0"
2425
},

packages/server/src/registry-hook.ts

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* This stream records all create/delete events for observability.
44
*/
55

6+
import { DurableStream } from "@durable-streams/writer"
67
import type { StreamLifecycleHook } from "./types"
78
import type { StreamStore } from "./store"
89
import type { FileBackedStreamStore } from "./file-store"
@@ -14,14 +15,21 @@ const REGISTRY_PATH = `/v1/stream/__registry__`
1415
* Any client can read this stream to discover all streams and their lifecycle events.
1516
*/
1617
export function createRegistryHooks(
17-
store: StreamStore | FileBackedStreamStore
18+
store: StreamStore | FileBackedStreamStore,
19+
serverUrl: string
1820
): {
1921
onStreamCreated: StreamLifecycleHook
2022
onStreamDeleted: StreamLifecycleHook
2123
} {
22-
const ensureRegistryExists = () => {
24+
const registryStream = new DurableStream({
25+
url: `${serverUrl}${REGISTRY_PATH}`,
26+
contentType: `application/json`,
27+
})
28+
29+
const ensureRegistryExists = async () => {
2330
if (!store.has(REGISTRY_PATH)) {
24-
store.create(REGISTRY_PATH, {
31+
await DurableStream.create({
32+
url: `${serverUrl}${REGISTRY_PATH}`,
2533
contentType: `application/json`,
2634
})
2735
}
@@ -35,36 +43,28 @@ export function createRegistryHooks(
3543

3644
return {
3745
onStreamCreated: async (event) => {
38-
ensureRegistryExists()
46+
await ensureRegistryExists()
3947

4048
const streamName = extractStreamName(event.path)
4149

42-
const record = JSON.stringify({
50+
await registryStream.append({
4351
type: event.type,
4452
path: streamName,
4553
contentType: event.contentType,
4654
timestamp: event.timestamp,
4755
})
48-
49-
await Promise.resolve(
50-
store.append(REGISTRY_PATH, Buffer.from(record + `\n`))
51-
)
5256
},
5357

5458
onStreamDeleted: async (event) => {
55-
ensureRegistryExists()
59+
await ensureRegistryExists()
5660

5761
const streamName = extractStreamName(event.path)
5862

59-
const record = JSON.stringify({
63+
await registryStream.append({
6064
type: event.type,
6165
path: streamName,
6266
timestamp: event.timestamp,
6367
})
64-
65-
await Promise.resolve(
66-
store.append(REGISTRY_PATH, Buffer.from(record + `\n`))
67-
)
6868
},
6969
}
7070
}

packages/test-ui/src/routes/__root.tsx

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ interface Stream {
88
contentType?: string
99
}
1010

11+
interface RegistryEvent {
12+
type: `created` | `deleted`
13+
path: string
14+
contentType?: string
15+
timestamp: number
16+
}
17+
1118
function RootLayout() {
1219
const [streams, setStreams] = useState<Array<Stream>>([])
1320
const [newStreamPath, setNewStreamPath] = useState(``)
@@ -36,40 +43,30 @@ function RootLayout() {
3643
})
3744
}
3845

39-
// Read all chunks from the registry
46+
// Read all events from the registry
4047
const loadedStreams: Array<Stream> = []
4148

4249
try {
43-
for await (const chunk of registryStream.read({ offset: `-1` })) {
44-
console.log(`Registry chunk:`, chunk)
45-
if (chunk.data.length > 0) {
46-
const text = new TextDecoder().decode(chunk.data)
47-
console.log(`Registry text:`, text)
48-
const lines = text.trim().split(`\n`).filter(Boolean)
49-
50-
for (const line of lines) {
51-
try {
52-
const event = JSON.parse(line)
53-
console.log(`Registry event:`, event)
54-
if (event.type === `created`) {
55-
loadedStreams.push({
56-
path: event.path,
57-
contentType: event.contentType,
58-
})
59-
} else if (event.type === `deleted`) {
60-
const index = loadedStreams.findIndex(
61-
(s) => s.path === event.path
62-
)
63-
if (index !== -1) {
64-
loadedStreams.splice(index, 1)
65-
}
66-
}
67-
} catch (e) {
68-
console.error(`Error parsing registry line:`, line, e)
50+
for await (const chunk of registryStream.json<
51+
RegistryEvent | Array<RegistryEvent>
52+
>()) {
53+
const events = Array.isArray(chunk) ? chunk : [chunk]
54+
55+
for (const event of events) {
56+
if (event.type === `created`) {
57+
loadedStreams.push({
58+
path: event.path,
59+
contentType: event.contentType,
60+
})
61+
} else {
62+
const index = loadedStreams.findIndex(
63+
(s) => s.path === event.path
64+
)
65+
if (index !== -1) {
66+
loadedStreams.splice(index, 1)
6967
}
7068
}
7169
}
72-
console.log(`Loaded streams:`, loadedStreams)
7370
setStreams(loadedStreams)
7471
}
7572
} catch (readErr) {
@@ -106,7 +103,7 @@ function RootLayout() {
106103
const deleteStream = async (path: string) => {
107104
if (
108105
!window.confirm(
109-
`Delete stream "${path}"?\n\nThis action cannot be undone.`
106+
`Delete stream "${decodeURIComponent(path)}"?\n\nThis action cannot be undone.`
110107
)
111108
) {
112109
return
@@ -140,7 +137,7 @@ function RootLayout() {
140137
placeholder="New stream path"
141138
value={newStreamPath}
142139
onChange={(e) => setNewStreamPath(e.target.value)}
143-
onKeyPress={(e) => e.key === `Enter` && void createStream()}
140+
onKeyDown={(e) => e.key === `Enter` && void createStream()}
144141
/>
145142
<select
146143
value={newStreamContentType}
@@ -163,14 +160,16 @@ function RootLayout() {
163160
onClick={() => setSidebarOpen(false)}
164161
>
165162
<div>
166-
<div className="stream-path">{stream.path}</div>
163+
<div className="stream-path">
164+
{decodeURIComponent(stream.path)}
165+
</div>
167166
<div className="stream-type">
168-
{stream.contentType || `unknown`}
167+
{stream.contentType?.toLowerCase() || `unknown`}
169168
</div>
170169
</div>
171170
<button
172171
className="delete-btn"
173-
title={`Delete stream: ${stream.path}`}
172+
title={`Delete stream: ${decodeURIComponent(stream.path)}`}
174173
onClick={(e) => {
175174
e.preventDefault()
176175
e.stopPropagation()

packages/test-ui/src/routes/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ export const Route = createFileRoute(`/`)({
55
})
66

77
function Index() {
8-
return <div className="placeholder" />
8+
return null
99
}

packages/test-ui/src/routes/stream.$streamPath.tsx

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,34 @@
1-
import { createFileRoute } from "@tanstack/react-router"
1+
import { createFileRoute, redirect } from "@tanstack/react-router"
22
import { useEffect, useRef, useState } from "react"
33
import { DurableStream } from "@durable-streams/writer"
44

55
const SERVER_URL = `http://${typeof window !== `undefined` ? window.location.hostname : `localhost`}:8787`
66

77
export const Route = createFileRoute(`/stream/$streamPath`)({
88
loader: async ({ params }) => {
9-
const stream = new DurableStream({
10-
url: `${SERVER_URL}/v1/stream/${params.streamPath}`,
11-
})
12-
const metadata = await stream.head()
13-
return {
14-
contentType: metadata.contentType || null,
9+
try {
10+
const streamMetadata = new DurableStream({
11+
url: `${SERVER_URL}/v1/stream/${params.streamPath}`,
12+
})
13+
const metadata = await streamMetadata.head()
14+
const stream = new DurableStream({
15+
url: `${SERVER_URL}/v1/stream/${params.streamPath}`,
16+
contentType: metadata.contentType || undefined,
17+
})
18+
return {
19+
contentType: metadata.contentType || undefined,
20+
stream,
21+
}
22+
} catch {
23+
throw redirect({ to: `/` })
1524
}
1625
},
1726
component: StreamViewer,
1827
})
1928

2029
function StreamViewer() {
2130
const { streamPath } = Route.useParams()
22-
const { contentType } = Route.useLoaderData()
31+
const { contentType, stream } = Route.useLoaderData()
2332
const [messages, setMessages] = useState<
2433
Array<{ offset: string; data: string }>
2534
>([])
@@ -43,10 +52,6 @@ function StreamViewer() {
4352

4453
const followStream = async () => {
4554
try {
46-
const stream = new DurableStream({
47-
url: `${SERVER_URL}/v1/stream/${streamPath}`,
48-
})
49-
5055
for await (const chunk of stream.read({
5156
offset: `-1`,
5257
live: `long-poll`,
@@ -80,9 +85,6 @@ function StreamViewer() {
8085

8186
try {
8287
setError(null)
83-
const stream = new DurableStream({
84-
url: `${SERVER_URL}/v1/stream/${streamPath}`,
85-
})
8688
await stream.append(writeInput + `\n`)
8789
setWriteInput(``)
8890
} catch (err: any) {
@@ -112,17 +114,19 @@ function StreamViewer() {
112114
Listening for new messages...
113115
</div>
114116
)}
115-
{isJsonStream ? (
116-
messages.map((msg, i) => (
117-
<div key={i} className="message">
118-
<pre>{msg.data}</pre>
117+
{messages.length !== 0 ? (
118+
isJsonStream ? (
119+
messages.map((msg, i) => (
120+
<div key={i} className="message">
121+
<pre>{msg.data}</pre>
122+
</div>
123+
))
124+
) : (
125+
<div className="message">
126+
<pre>{messages.map((msg) => msg.data).join(``)}</pre>
119127
</div>
120-
))
121-
) : (
122-
<div className="message">
123-
<pre>{messages.map((msg) => msg.data).join(``)}</pre>
124-
</div>
125-
)}
128+
)
129+
) : null}
126130
<div ref={messagesEndRef} />
127131
</div>
128132
{!isRegistryStream && (

packages/writer/src/writer.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ export class DurableStream extends BaseStream {
119119

120120
if (!response.ok) {
121121
const text = await response.text()
122-
const headers: Record<string, string> = {}
122+
const responseHeaders: Record<string, string> = {}
123123
response.headers.forEach((v, k) => {
124-
headers[k] = v
124+
responseHeaders[k] = v
125125
})
126126
throw new FetchError(
127127
response.status,
128128
text,
129129
undefined,
130-
headers,
130+
responseHeaders,
131131
opts.url,
132132
response.status === 409
133133
? `Stream already exists`
@@ -252,7 +252,8 @@ export class DurableStream extends BaseStream {
252252
}
253253
}
254254

255-
const isJson = normalizeContentType(this.contentType) === `application/json`
255+
const isJson =
256+
normalizeContentType(this.options.contentType) === `application/json`
256257

257258
// Batch data
258259
let batchedBody: BodyInit
@@ -286,8 +287,8 @@ export class DurableStream extends BaseStream {
286287

287288
// Send
288289
const headers: Record<string, string> = {}
289-
if (this.contentType) {
290-
headers[`content-type`] = this.contentType
290+
if (this.options.contentType) {
291+
headers[`content-type`] = this.options.contentType
291292
}
292293
if (highestSeq) {
293294
headers[STREAM_SEQ_HEADER] = highestSeq
@@ -301,9 +302,9 @@ export class DurableStream extends BaseStream {
301302

302303
if (!response.ok) {
303304
const text = await response.text()
304-
const headers: Record<string, string> = {}
305+
const responseHeaders: Record<string, string> = {}
305306
response.headers.forEach((v, k) => {
306-
headers[k] = v
307+
responseHeaders[k] = v
307308
})
308309

309310
let message: string
@@ -312,7 +313,7 @@ export class DurableStream extends BaseStream {
312313
} else if (response.status === 409) {
313314
message = `Sequence conflict`
314315
} else if (response.status === 400) {
315-
message = `Bad request (possibly content-type mismatch)`
316+
message = `Bad request (possibly content-type mismatch), ${text}`
316317
} else {
317318
message = `Failed to append`
318319
}
@@ -321,7 +322,7 @@ export class DurableStream extends BaseStream {
321322
response.status,
322323
text,
323324
undefined,
324-
headers,
325+
responseHeaders,
325326
this.url,
326327
message
327328
)

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)