Skip to content

Commit c88ffe9

Browse files
committed
refactor: wip
1 parent bf0ecb9 commit c88ffe9

File tree

2 files changed

+58
-60
lines changed

2 files changed

+58
-60
lines changed

packages/utils/src/lib/file-sink.ts

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import fs from 'node:fs';
22
import path from 'node:path';
3-
import type { Decoder, Encoder } from './sink-source.type';
3+
import type { Decoder, Encoder, RecoverResult } from './sink-source.type';
44

55
export type AppendOptions = {
66
filePath: string;
@@ -162,3 +162,60 @@ export const StringCodec = {
162162
encode: (v: string) => v,
163163
decode: (v: string) => v,
164164
};
165+
166+
export abstract class RecoverableEventSink<
167+
Raw extends Record<string, unknown>,
168+
Domain,
169+
> {
170+
protected readonly sink: JsonlFile<Raw>;
171+
private finalized = false;
172+
173+
constructor(sink: JsonlFile<Raw>) {
174+
this.sink = sink;
175+
}
176+
177+
open() {
178+
this.sink.open();
179+
}
180+
181+
write(event: Domain) {
182+
this.sink.write(this.encode(event));
183+
}
184+
185+
close() {
186+
this.finalize();
187+
}
188+
189+
recover(): RecoverResult<Domain> {
190+
const { records, errors, partialTail } = this.sink.recover();
191+
const out: Domain[] = [];
192+
const errs = [...errors];
193+
194+
records.forEach((r, i) => {
195+
try {
196+
out.push(this.decode(r));
197+
} catch (error) {
198+
errs.push({
199+
lineNo: i + 1,
200+
line: JSON.stringify(r),
201+
error: error as Error,
202+
});
203+
}
204+
});
205+
206+
return { records: out, errors: errs, partialTail };
207+
}
208+
209+
finalize() {
210+
if (this.finalized) {
211+
return;
212+
}
213+
this.finalized = true;
214+
this.sink.close();
215+
this.onFinalize();
216+
}
217+
218+
protected abstract encode(domain: Domain): Raw;
219+
protected abstract decode(raw: Raw): Domain;
220+
protected abstract onFinalize(): void;
221+
}

packages/utils/src/lib/sink-source.type.ts

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import type { JsonlFile } from './file-sink.js';
2-
31
export type Encoder<T> = (value: T) => string;
42
export type Decoder<T> = (line: string) => T;
53

@@ -44,63 +42,6 @@ export type RecoverResult<T = unknown> = {
4442
partialTail: string | null;
4543
};
4644

47-
export abstract class RecoverableEventSink<
48-
Raw extends Record<string, unknown>,
49-
Domain,
50-
> {
51-
protected readonly sink: JsonlFile<Raw>;
52-
private finalized = false;
53-
54-
constructor(sink: JsonlFile<Raw>) {
55-
this.sink = sink;
56-
}
57-
58-
open() {
59-
this.sink.open();
60-
}
61-
62-
write(event: Domain) {
63-
this.sink.write(this.encode(event));
64-
}
65-
66-
close() {
67-
this.finalize();
68-
}
69-
70-
recover(): RecoverResult<Domain> {
71-
const { records, errors, partialTail } = this.sink.recover();
72-
const out: Domain[] = [];
73-
const errs = [...errors];
74-
75-
records.forEach((r, i) => {
76-
try {
77-
out.push(this.decode(r));
78-
} catch (error) {
79-
errs.push({
80-
lineNo: i + 1,
81-
line: JSON.stringify(r),
82-
error: error as Error,
83-
});
84-
}
85-
});
86-
87-
return { records: out, errors: errs, partialTail };
88-
}
89-
90-
finalize() {
91-
if (this.finalized) {
92-
return;
93-
}
94-
this.finalized = true;
95-
this.sink.close();
96-
this.onFinalize();
97-
}
98-
99-
protected abstract encode(domain: Domain): Raw;
100-
protected abstract decode(raw: Raw): Domain;
101-
protected abstract onFinalize(): void;
102-
}
103-
10445
export type RecoverOptions = {
10546
keepInvalid?: boolean;
10647
};

0 commit comments

Comments
 (0)