Skip to content

Commit f7fc500

Browse files
committed
Fixes
1 parent 9a4d2a3 commit f7fc500

File tree

6 files changed

+69
-69
lines changed

6 files changed

+69
-69
lines changed

.claude/settings.local.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
"Bash(pnpm --dir /Users/kylemathews/programs/ds-worktrees/conformance-tests/packages/test-ui add @durable-streams/writer @durable-streams/client)",
2828
"Bash(pnpm --filter @durable-streams/test-ui build:*)",
2929
"Bash(git add:*)",
30-
"Bash(git commit:*)"
30+
"Bash(git commit:*)",
31+
"Bash(pnpm --filter @durable-streams/writer build:*)"
3132
],
3233
"deny": [],
3334
"ask": []

packages/server/src/registry-hook.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export function createRegistryHooks(
2323
} {
2424
const registryStream = new DurableStream({
2525
url: `${serverUrl}${REGISTRY_PATH}`,
26+
contentType: `application/json`,
2627
})
2728

2829
const ensureRegistryExists = async () => {

packages/test-ui/src/routes/__root.tsx

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ interface Stream {
88
contentType?: string
99
}
1010

11+
interface RegistryEvent {
12+
type: `created` | `deleted`
13+
path: string
14+
contentType?: string
15+
timestamp: number
16+
}
17+
1118
function RootLayout() {
1219
const [streams, setStreams] = useState<Array<Stream>>([])
1320
const [newStreamPath, setNewStreamPath] = useState(``)
@@ -36,45 +43,30 @@ function RootLayout() {
3643
})
3744
}
3845

39-
// Read all chunks from the registry
46+
// Read all events from the registry
4047
const loadedStreams: Array<Stream> = []
4148

4249
try {
43-
for await (const chunk of registryStream.read({ offset: `-1` })) {
44-
console.log(`Registry chunk:`, chunk)
45-
if (chunk.data.length > 0) {
46-
const text = new TextDecoder().decode(chunk.data)
47-
console.log(`Registry text:`, text)
48-
const lines = text.trim().split(`\n`).filter(Boolean)
49-
50-
for (const line of lines) {
51-
try {
52-
const parsed = JSON.parse(line)
53-
console.log(`Registry event:`, parsed)
54-
55-
const events = Array.isArray(parsed) ? parsed : [parsed]
56-
57-
for (const event of events) {
58-
if (event.type === `created`) {
59-
loadedStreams.push({
60-
path: event.path,
61-
contentType: event.contentType,
62-
})
63-
} else if (event.type === `deleted`) {
64-
const index = loadedStreams.findIndex(
65-
(s) => s.path === event.path
66-
)
67-
if (index !== -1) {
68-
loadedStreams.splice(index, 1)
69-
}
70-
}
71-
}
72-
} catch (e) {
73-
console.error(`Error parsing registry line:`, line, e)
50+
for await (const chunk of registryStream.json<
51+
RegistryEvent | Array<RegistryEvent>
52+
>()) {
53+
const events = Array.isArray(chunk) ? chunk : [chunk]
54+
55+
for (const event of events) {
56+
if (event.type === `created`) {
57+
loadedStreams.push({
58+
path: event.path,
59+
contentType: event.contentType,
60+
})
61+
} else {
62+
const index = loadedStreams.findIndex(
63+
(s) => s.path === event.path
64+
)
65+
if (index !== -1) {
66+
loadedStreams.splice(index, 1)
7467
}
7568
}
7669
}
77-
console.log(`Loaded streams:`, loadedStreams)
7870
setStreams(loadedStreams)
7971
}
8072
} catch (readErr) {
@@ -111,7 +103,7 @@ function RootLayout() {
111103
const deleteStream = async (path: string) => {
112104
if (
113105
!window.confirm(
114-
`Delete stream "${path}"?\n\nThis action cannot be undone.`
106+
`Delete stream "${decodeURIComponent(path)}"?\n\nThis action cannot be undone.`
115107
)
116108
) {
117109
return
@@ -145,7 +137,7 @@ function RootLayout() {
145137
placeholder="New stream path"
146138
value={newStreamPath}
147139
onChange={(e) => setNewStreamPath(e.target.value)}
148-
onKeyPress={(e) => e.key === `Enter` && void createStream()}
140+
onInput={(e) => e.key === `Enter` && void createStream()}
149141
/>
150142
<select
151143
value={newStreamContentType}
@@ -168,14 +160,16 @@ function RootLayout() {
168160
onClick={() => setSidebarOpen(false)}
169161
>
170162
<div>
171-
<div className="stream-path">{stream.path}</div>
163+
<div className="stream-path">
164+
{decodeURIComponent(stream.path)}
165+
</div>
172166
<div className="stream-type">
173-
{stream.contentType || `unknown`}
167+
{stream.contentType?.toLowerCase() || `unknown`}
174168
</div>
175169
</div>
176170
<button
177171
className="delete-btn"
178-
title={`Delete stream: ${stream.path}`}
172+
title={`Delete stream: ${decodeURIComponent(stream.path)}`}
179173
onClick={(e) => {
180174
e.preventDefault()
181175
e.stopPropagation()

packages/test-ui/src/routes/stream.$streamPath.tsx

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,17 @@ const SERVER_URL = `http://${typeof window !== `undefined` ? window.location.hos
77
export const Route = createFileRoute(`/stream/$streamPath`)({
88
loader: async ({ params }) => {
99
try {
10+
const streamMetadata = new DurableStream({
11+
url: `${SERVER_URL}/v1/stream/${params.streamPath}`,
12+
})
13+
const metadata = await streamMetadata.head()
1014
const stream = new DurableStream({
1115
url: `${SERVER_URL}/v1/stream/${params.streamPath}`,
16+
contentType: metadata.contentType || undefined,
1217
})
13-
const metadata = await stream.head()
1418
return {
15-
contentType: metadata.contentType || null,
19+
contentType: metadata.contentType || undefined,
20+
stream,
1621
}
1722
} catch {
1823
throw redirect({ to: `/` })
@@ -23,7 +28,7 @@ export const Route = createFileRoute(`/stream/$streamPath`)({
2328

2429
function StreamViewer() {
2530
const { streamPath } = Route.useParams()
26-
const { contentType } = Route.useLoaderData()
31+
const { contentType, stream } = Route.useLoaderData()
2732
const [messages, setMessages] = useState<
2833
Array<{ offset: string; data: string }>
2934
>([])
@@ -47,10 +52,6 @@ function StreamViewer() {
4752

4853
const followStream = async () => {
4954
try {
50-
const stream = new DurableStream({
51-
url: `${SERVER_URL}/v1/stream/${streamPath}`,
52-
})
53-
5455
for await (const chunk of stream.read({
5556
offset: `-1`,
5657
live: `long-poll`,
@@ -84,9 +85,6 @@ function StreamViewer() {
8485

8586
try {
8687
setError(null)
87-
const stream = new DurableStream({
88-
url: `${SERVER_URL}/v1/stream/${streamPath}`,
89-
})
9088
await stream.append(writeInput + `\n`)
9189
setWriteInput(``)
9290
} catch (err: any) {
@@ -116,17 +114,19 @@ function StreamViewer() {
116114
Listening for new messages...
117115
</div>
118116
)}
119-
{isJsonStream ? (
120-
messages.map((msg, i) => (
121-
<div key={i} className="message">
122-
<pre>{msg.data}</pre>
117+
{messages.length !== 0 ? (
118+
isJsonStream ? (
119+
messages.map((msg, i) => (
120+
<div key={i} className="message">
121+
<pre>{msg.data}</pre>
122+
</div>
123+
))
124+
) : (
125+
<div className="message">
126+
<pre>{messages.map((msg) => msg.data).join(``)}</pre>
123127
</div>
124-
))
125-
) : (
126-
<div className="message">
127-
<pre>{messages.map((msg) => msg.data).join(``)}</pre>
128-
</div>
129-
)}
128+
)
129+
) : null}
130130
<div ref={messagesEndRef} />
131131
</div>
132132
{!isRegistryStream && (

packages/writer/src/writer.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ export class DurableStream extends BaseStream {
119119

120120
if (!response.ok) {
121121
const text = await response.text()
122-
const headers: Record<string, string> = {}
122+
const responseHeaders: Record<string, string> = {}
123123
response.headers.forEach((v, k) => {
124-
headers[k] = v
124+
responseHeaders[k] = v
125125
})
126126
throw new FetchError(
127127
response.status,
128128
text,
129129
undefined,
130-
headers,
130+
responseHeaders,
131131
opts.url,
132132
response.status === 409
133133
? `Stream already exists`
@@ -252,7 +252,8 @@ export class DurableStream extends BaseStream {
252252
}
253253
}
254254

255-
const isJson = normalizeContentType(this.contentType) === `application/json`
255+
const isJson =
256+
normalizeContentType(this.options.contentType) === `application/json`
256257

257258
// Batch data
258259
let batchedBody: BodyInit
@@ -286,8 +287,8 @@ export class DurableStream extends BaseStream {
286287

287288
// Send
288289
const headers: Record<string, string> = {}
289-
if (this.contentType) {
290-
headers[`content-type`] = this.contentType
290+
if (this.options.contentType) {
291+
headers[`content-type`] = this.options.contentType
291292
}
292293
if (highestSeq) {
293294
headers[STREAM_SEQ_HEADER] = highestSeq
@@ -301,9 +302,9 @@ export class DurableStream extends BaseStream {
301302

302303
if (!response.ok) {
303304
const text = await response.text()
304-
const headers: Record<string, string> = {}
305+
const responseHeaders: Record<string, string> = {}
305306
response.headers.forEach((v, k) => {
306-
headers[k] = v
307+
responseHeaders[k] = v
307308
})
308309

309310
let message: string
@@ -312,7 +313,7 @@ export class DurableStream extends BaseStream {
312313
} else if (response.status === 409) {
313314
message = `Sequence conflict`
314315
} else if (response.status === 400) {
315-
message = `Bad request (possibly content-type mismatch)`
316+
message = `Bad request (possibly content-type mismatch), ${text}`
316317
} else {
317318
message = `Failed to append`
318319
}
@@ -321,7 +322,7 @@ export class DurableStream extends BaseStream {
321322
response.status,
322323
text,
323324
undefined,
324-
headers,
325+
responseHeaders,
325326
this.url,
326327
message
327328
)

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)