Skip to content

Commit 59809a5

Browse files
authored
Merge pull request #23 from supermemoryai/12-21-use_effect_concurrency
use effect concurrency
2 parents a092f57 + 8f8519d commit 59809a5

File tree

4 files changed

+243
-1080
lines changed

4 files changed

+243
-1080
lines changed

packages/code-chunk/src/batch.ts

Lines changed: 157 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
import { Effect, Queue, Stream } from 'effect'
1+
import {
2+
Effect,
3+
Chunk as EffectChunk,
4+
Exit,
5+
Option,
6+
Ref,
7+
Scope,
8+
Stream,
9+
} from 'effect'
210
import { ChunkingError, UnsupportedLanguageError } from './chunk'
311
import { chunk as chunkInternal } from './chunking'
412
import { extractEntities } from './extract'
@@ -18,32 +26,156 @@ import type {
1826

1927
const DEFAULT_CONCURRENCY = 10
2028

21-
const chunkFileEffect = (
22-
file: FileInput,
23-
batchOptions: ChunkOptions = {},
24-
): Effect.Effect<BatchResult, never> => {
25-
const mergedOptions = { ...batchOptions, ...file.options }
29+
/**
30+
* Type for a function that chunks a single file and returns an Effect
31+
*/
32+
export type ChunkFileFunction = (
33+
filepath: string,
34+
code: string,
35+
options: ChunkOptions,
36+
) => Effect.Effect<Chunk[], unknown>
37+
38+
/**
39+
* Core batch stream processor - takes a chunk function and returns a stream of results
40+
* Used by both native and WASM implementations
41+
*/
42+
export const batchStreamEffect = (
43+
chunkFile: ChunkFileFunction,
44+
files: FileInput[],
45+
options: BatchOptions = {},
46+
): Stream.Stream<BatchResult, never> => {
47+
const {
48+
concurrency = DEFAULT_CONCURRENCY,
49+
onProgress,
50+
...chunkOptions
51+
} = options
52+
const total = files.length
53+
54+
if (total === 0) {
55+
return Stream.empty
56+
}
57+
58+
const processFile = (file: FileInput): Effect.Effect<BatchResult, never> => {
59+
const mergedOptions = { ...chunkOptions, ...file.options }
60+
return chunkFile(file.filepath, file.code, mergedOptions).pipe(
61+
Effect.map(
62+
(chunks) =>
63+
({
64+
filepath: file.filepath,
65+
chunks,
66+
error: null,
67+
}) satisfies BatchFileResult,
68+
),
69+
Effect.catchAll((error) =>
70+
Effect.succeed({
71+
filepath: file.filepath,
72+
chunks: null,
73+
error: error instanceof Error ? error : new Error(String(error)),
74+
} satisfies BatchFileError),
75+
),
76+
)
77+
}
78+
79+
return Stream.unwrap(
80+
Effect.gen(function* () {
81+
const completedRef = yield* Ref.make(0)
82+
83+
return Stream.fromIterable(files).pipe(
84+
Stream.mapEffect(
85+
(file) =>
86+
processFile(file).pipe(
87+
Effect.tap((result) =>
88+
Ref.updateAndGet(completedRef, (n) => n + 1).pipe(
89+
Effect.andThen((completed) =>
90+
Effect.sync(() =>
91+
onProgress?.(
92+
completed,
93+
total,
94+
file.filepath,
95+
result.error === null,
96+
),
97+
),
98+
),
99+
),
100+
),
101+
),
102+
{ concurrency },
103+
),
104+
)
105+
}),
106+
)
107+
}
108+
109+
/**
110+
* Core batch processor - collects stream results into array
111+
*/
112+
export const batchEffect = (
113+
chunkFile: ChunkFileFunction,
114+
files: FileInput[],
115+
options: BatchOptions = {},
116+
): Effect.Effect<BatchResult[], never> => {
117+
return Stream.runCollect(batchStreamEffect(chunkFile, files, options)).pipe(
118+
Effect.map((chunk) => Array.from(chunk)),
119+
)
120+
}
121+
122+
/**
123+
* Core batch processor - Promise API
124+
*/
125+
export async function batch(
126+
chunkFile: ChunkFileFunction,
127+
files: FileInput[],
128+
options?: BatchOptions,
129+
): Promise<BatchResult[]> {
130+
return Effect.runPromise(batchEffect(chunkFile, files, options))
131+
}
26132

133+
/**
134+
* Core batch stream processor - AsyncGenerator API
135+
*/
136+
export async function* batchStream(
137+
chunkFile: ChunkFileFunction,
138+
files: FileInput[],
139+
options?: BatchOptions,
140+
): AsyncGenerator<BatchResult> {
141+
const scope = Effect.runSync(Scope.make())
142+
143+
try {
144+
const pull = await Effect.runPromise(
145+
Stream.toPull(batchStreamEffect(chunkFile, files, options)).pipe(
146+
Scope.extend(scope),
147+
),
148+
)
149+
150+
while (true) {
151+
const result = await Effect.runPromise(Effect.option(pull))
152+
if (Option.isNone(result)) break
153+
for (const item of EffectChunk.toReadonlyArray(result.value)) {
154+
yield item
155+
}
156+
}
157+
} finally {
158+
await Effect.runPromise(Scope.close(scope, Exit.void))
159+
}
160+
}
161+
162+
const nativeChunkFile: ChunkFileFunction = (filepath, code, options) => {
27163
return Effect.gen(function* () {
28164
const language: Language | null =
29-
mergedOptions.language ?? detectLanguage(file.filepath)
165+
options.language ?? detectLanguage(filepath)
30166

31167
if (!language) {
32-
return {
33-
filepath: file.filepath,
34-
chunks: null,
35-
error: new UnsupportedLanguageError(file.filepath),
36-
} satisfies BatchFileError
168+
return yield* Effect.fail(new UnsupportedLanguageError(filepath))
37169
}
38170

39171
const parseResult = yield* Effect.tryPromise({
40-
try: () => parseCode(file.code, language),
172+
try: () => parseCode(code, language),
41173
catch: (error: unknown) =>
42174
new ChunkingError('Failed to parse code', error),
43175
})
44176

45177
const entities = yield* Effect.mapError(
46-
extractEntities(parseResult.tree.rootNode, language, file.code),
178+
extractEntities(parseResult.tree.rootNode, language, code),
47179
(error: unknown) =>
48180
new ChunkingError('Failed to extract entities', error),
49181
)
@@ -57,163 +189,46 @@ const chunkFileEffect = (
57189
const chunks = yield* Effect.mapError(
58190
chunkInternal(
59191
parseResult.tree.rootNode,
60-
file.code,
192+
code,
61193
scopeTree,
62194
language,
63-
mergedOptions,
64-
file.filepath,
195+
options,
196+
filepath,
65197
),
66198
(error: unknown) => new ChunkingError('Failed to chunk code', error),
67199
)
68200

69-
const finalChunks: Chunk[] = parseResult.error
201+
return parseResult.error
70202
? chunks.map((c: Chunk) => ({
71203
...c,
72204
context: { ...c.context, parseError: parseResult.error ?? undefined },
73205
}))
74206
: chunks
75-
76-
return {
77-
filepath: file.filepath,
78-
chunks: finalChunks,
79-
error: null,
80-
} satisfies BatchFileResult
81-
}).pipe(
82-
Effect.catchAll((error) =>
83-
Effect.succeed({
84-
filepath: file.filepath,
85-
chunks: null,
86-
error: error instanceof Error ? error : new Error(String(error)),
87-
} satisfies BatchFileError),
88-
),
89-
)
207+
})
90208
}
91209

92210
export const chunkBatchStreamEffect = (
93211
files: FileInput[],
94212
options: BatchOptions = {},
95-
): Stream.Stream<BatchResult, never> => {
96-
const {
97-
concurrency = DEFAULT_CONCURRENCY,
98-
onProgress,
99-
...chunkOptions
100-
} = options
101-
const total = files.length
102-
103-
if (total === 0) {
104-
return Stream.empty
105-
}
106-
107-
return Stream.unwrap(
108-
Effect.gen(function* () {
109-
const queue = yield* Queue.unbounded<FileInput>()
110-
const resultsQueue = yield* Queue.unbounded<BatchResult | null>()
111-
112-
yield* Effect.forEach(files, (file) => Queue.offer(queue, file), {
113-
discard: true,
114-
})
115-
116-
let completed = 0
117-
118-
const worker = Effect.gen(function* () {
119-
while (true) {
120-
const maybeFile = yield* Queue.poll(queue)
121-
if (maybeFile._tag === 'None') {
122-
break
123-
}
124-
const file = maybeFile.value
125-
const result = yield* chunkFileEffect(file, chunkOptions)
126-
completed++
127-
if (onProgress) {
128-
onProgress(completed, total, file.filepath, result.error === null)
129-
}
130-
yield* Queue.offer(resultsQueue, result)
131-
}
132-
})
133-
134-
yield* Effect.fork(
135-
Effect.gen(function* () {
136-
yield* Effect.all(
137-
Array.from({ length: Math.min(concurrency, total) }, () => worker),
138-
{ concurrency: 'unbounded' },
139-
)
140-
yield* Queue.offer(resultsQueue, null)
141-
}),
142-
)
143-
144-
return Stream.fromQueue(resultsQueue).pipe(
145-
Stream.takeWhile((result): result is BatchResult => result !== null),
146-
)
147-
}),
148-
)
149-
}
213+
): Stream.Stream<BatchResult, never> =>
214+
batchStreamEffect(nativeChunkFile, files, options)
150215

151216
export const chunkBatchEffect = (
152217
files: FileInput[],
153218
options: BatchOptions = {},
154-
): Effect.Effect<BatchResult[], never> => {
155-
return Stream.runCollect(chunkBatchStreamEffect(files, options)).pipe(
156-
Effect.map((chunk) => Array.from(chunk)),
157-
)
158-
}
219+
): Effect.Effect<BatchResult[], never> =>
220+
batchEffect(nativeChunkFile, files, options)
159221

160222
export async function chunkBatch(
161223
files: FileInput[],
162224
options?: BatchOptions,
163225
): Promise<BatchResult[]> {
164-
return Effect.runPromise(chunkBatchEffect(files, options))
226+
return batch(nativeChunkFile, files, options)
165227
}
166228

167229
export async function* chunkBatchStream(
168230
files: FileInput[],
169231
options?: BatchOptions,
170232
): AsyncGenerator<BatchResult> {
171-
const results: BatchResult[] = []
172-
let resolveNext: ((value: IteratorResult<BatchResult>) => void) | null = null
173-
let done = false
174-
175-
const streamEffect = chunkBatchStreamEffect(files, options).pipe(
176-
Stream.runForEach((result) =>
177-
Effect.sync(() => {
178-
if (resolveNext) {
179-
const resolve = resolveNext
180-
resolveNext = null
181-
resolve({ value: result, done: false })
182-
} else {
183-
results.push(result)
184-
}
185-
}),
186-
),
187-
Effect.tap(() =>
188-
Effect.sync(() => {
189-
done = true
190-
if (resolveNext) {
191-
resolveNext({ value: undefined as never, done: true })
192-
}
193-
}),
194-
),
195-
)
196-
197-
const runPromise = Effect.runPromise(streamEffect)
198-
199-
try {
200-
while (true) {
201-
const buffered = results.shift()
202-
if (buffered !== undefined) {
203-
yield buffered
204-
} else if (done) {
205-
break
206-
} else {
207-
const result = await new Promise<IteratorResult<BatchResult>>(
208-
(resolve) => {
209-
resolveNext = resolve
210-
},
211-
)
212-
if (result.done) break
213-
yield result.value
214-
}
215-
}
216-
} finally {
217-
await runPromise.catch(() => {})
218-
}
233+
yield* batchStream(nativeChunkFile, files, options)
219234
}

0 commit comments

Comments
 (0)