1- import { Effect , Queue , Stream } from 'effect'
1+ import {
2+ Effect ,
3+ Chunk as EffectChunk ,
4+ Exit ,
5+ Option ,
6+ Ref ,
7+ Scope ,
8+ Stream ,
9+ } from 'effect'
210import { ChunkingError , UnsupportedLanguageError } from './chunk'
311import { chunk as chunkInternal } from './chunking'
412import { extractEntities } from './extract'
@@ -18,32 +26,156 @@ import type {
1826
1927const DEFAULT_CONCURRENCY = 10
2028
21- const chunkFileEffect = (
22- file : FileInput ,
23- batchOptions : ChunkOptions = { } ,
24- ) : Effect . Effect < BatchResult , never > => {
25- const mergedOptions = { ...batchOptions , ...file . options }
29+ /**
30+ * Type for a function that chunks a single file and returns an Effect
31+ */
32+ export type ChunkFileFunction = (
33+ filepath : string ,
34+ code : string ,
35+ options : ChunkOptions ,
36+ ) => Effect . Effect < Chunk [ ] , unknown >
37+
38+ /**
39+ * Core batch stream processor - takes a chunk function and returns a stream of results
40+ * Used by both native and WASM implementations
41+ */
42+ export const batchStreamEffect = (
43+ chunkFile : ChunkFileFunction ,
44+ files : FileInput [ ] ,
45+ options : BatchOptions = { } ,
46+ ) : Stream . Stream < BatchResult , never > => {
47+ const {
48+ concurrency = DEFAULT_CONCURRENCY ,
49+ onProgress,
50+ ...chunkOptions
51+ } = options
52+ const total = files . length
53+
54+ if ( total === 0 ) {
55+ return Stream . empty
56+ }
57+
58+ const processFile = ( file : FileInput ) : Effect . Effect < BatchResult , never > => {
59+ const mergedOptions = { ...chunkOptions , ...file . options }
60+ return chunkFile ( file . filepath , file . code , mergedOptions ) . pipe (
61+ Effect . map (
62+ ( chunks ) =>
63+ ( {
64+ filepath : file . filepath ,
65+ chunks,
66+ error : null ,
67+ } ) satisfies BatchFileResult ,
68+ ) ,
69+ Effect . catchAll ( ( error ) =>
70+ Effect . succeed ( {
71+ filepath : file . filepath ,
72+ chunks : null ,
73+ error : error instanceof Error ? error : new Error ( String ( error ) ) ,
74+ } satisfies BatchFileError ) ,
75+ ) ,
76+ )
77+ }
78+
79+ return Stream . unwrap (
80+ Effect . gen ( function * ( ) {
81+ const completedRef = yield * Ref . make ( 0 )
82+
83+ return Stream . fromIterable ( files ) . pipe (
84+ Stream . mapEffect (
85+ ( file ) =>
86+ processFile ( file ) . pipe (
87+ Effect . tap ( ( result ) =>
88+ Ref . updateAndGet ( completedRef , ( n ) => n + 1 ) . pipe (
89+ Effect . andThen ( ( completed ) =>
90+ Effect . sync ( ( ) =>
91+ onProgress ?.(
92+ completed ,
93+ total ,
94+ file . filepath ,
95+ result . error === null ,
96+ ) ,
97+ ) ,
98+ ) ,
99+ ) ,
100+ ) ,
101+ ) ,
102+ { concurrency } ,
103+ ) ,
104+ )
105+ } ) ,
106+ )
107+ }
108+
109+ /**
110+ * Core batch processor - collects stream results into array
111+ */
112+ export const batchEffect = (
113+ chunkFile : ChunkFileFunction ,
114+ files : FileInput [ ] ,
115+ options : BatchOptions = { } ,
116+ ) : Effect . Effect < BatchResult [ ] , never > => {
117+ return Stream . runCollect ( batchStreamEffect ( chunkFile , files , options ) ) . pipe (
118+ Effect . map ( ( chunk ) => Array . from ( chunk ) ) ,
119+ )
120+ }
121+
122+ /**
123+ * Core batch processor - Promise API
124+ */
125+ export async function batch (
126+ chunkFile : ChunkFileFunction ,
127+ files : FileInput [ ] ,
128+ options ?: BatchOptions ,
129+ ) : Promise < BatchResult [ ] > {
130+ return Effect . runPromise ( batchEffect ( chunkFile , files , options ) )
131+ }
26132
133+ /**
134+ * Core batch stream processor - AsyncGenerator API
135+ */
136+ export async function * batchStream (
137+ chunkFile : ChunkFileFunction ,
138+ files : FileInput [ ] ,
139+ options ?: BatchOptions ,
140+ ) : AsyncGenerator < BatchResult > {
141+ const scope = Effect . runSync ( Scope . make ( ) )
142+
143+ try {
144+ const pull = await Effect . runPromise (
145+ Stream . toPull ( batchStreamEffect ( chunkFile , files , options ) ) . pipe (
146+ Scope . extend ( scope ) ,
147+ ) ,
148+ )
149+
150+ while ( true ) {
151+ const result = await Effect . runPromise ( Effect . option ( pull ) )
152+ if ( Option . isNone ( result ) ) break
153+ for ( const item of EffectChunk . toReadonlyArray ( result . value ) ) {
154+ yield item
155+ }
156+ }
157+ } finally {
158+ await Effect . runPromise ( Scope . close ( scope , Exit . void ) )
159+ }
160+ }
161+
162+ const nativeChunkFile : ChunkFileFunction = ( filepath , code , options ) => {
27163 return Effect . gen ( function * ( ) {
28164 const language : Language | null =
29- mergedOptions . language ?? detectLanguage ( file . filepath )
165+ options . language ?? detectLanguage ( filepath )
30166
31167 if ( ! language ) {
32- return {
33- filepath : file . filepath ,
34- chunks : null ,
35- error : new UnsupportedLanguageError ( file . filepath ) ,
36- } satisfies BatchFileError
168+ return yield * Effect . fail ( new UnsupportedLanguageError ( filepath ) )
37169 }
38170
39171 const parseResult = yield * Effect . tryPromise ( {
40- try : ( ) => parseCode ( file . code , language ) ,
172+ try : ( ) => parseCode ( code , language ) ,
41173 catch : ( error : unknown ) =>
42174 new ChunkingError ( 'Failed to parse code' , error ) ,
43175 } )
44176
45177 const entities = yield * Effect . mapError (
46- extractEntities ( parseResult . tree . rootNode , language , file . code ) ,
178+ extractEntities ( parseResult . tree . rootNode , language , code ) ,
47179 ( error : unknown ) =>
48180 new ChunkingError ( 'Failed to extract entities' , error ) ,
49181 )
@@ -57,163 +189,46 @@ const chunkFileEffect = (
57189 const chunks = yield * Effect . mapError (
58190 chunkInternal (
59191 parseResult . tree . rootNode ,
60- file . code ,
192+ code ,
61193 scopeTree ,
62194 language ,
63- mergedOptions ,
64- file . filepath ,
195+ options ,
196+ filepath ,
65197 ) ,
66198 ( error : unknown ) => new ChunkingError ( 'Failed to chunk code' , error ) ,
67199 )
68200
69- const finalChunks : Chunk [ ] = parseResult . error
201+ return parseResult . error
70202 ? chunks . map ( ( c : Chunk ) => ( {
71203 ...c ,
72204 context : { ...c . context , parseError : parseResult . error ?? undefined } ,
73205 } ) )
74206 : chunks
75-
76- return {
77- filepath : file . filepath ,
78- chunks : finalChunks ,
79- error : null ,
80- } satisfies BatchFileResult
81- } ) . pipe (
82- Effect . catchAll ( ( error ) =>
83- Effect . succeed ( {
84- filepath : file . filepath ,
85- chunks : null ,
86- error : error instanceof Error ? error : new Error ( String ( error ) ) ,
87- } satisfies BatchFileError ) ,
88- ) ,
89- )
207+ } )
90208}
91209
92210export const chunkBatchStreamEffect = (
93211 files : FileInput [ ] ,
94212 options : BatchOptions = { } ,
95- ) : Stream . Stream < BatchResult , never > => {
96- const {
97- concurrency = DEFAULT_CONCURRENCY ,
98- onProgress,
99- ...chunkOptions
100- } = options
101- const total = files . length
102-
103- if ( total === 0 ) {
104- return Stream . empty
105- }
106-
107- return Stream . unwrap (
108- Effect . gen ( function * ( ) {
109- const queue = yield * Queue . unbounded < FileInput > ( )
110- const resultsQueue = yield * Queue . unbounded < BatchResult | null > ( )
111-
112- yield * Effect . forEach ( files , ( file ) => Queue . offer ( queue , file ) , {
113- discard : true ,
114- } )
115-
116- let completed = 0
117-
118- const worker = Effect . gen ( function * ( ) {
119- while ( true ) {
120- const maybeFile = yield * Queue . poll ( queue )
121- if ( maybeFile . _tag === 'None' ) {
122- break
123- }
124- const file = maybeFile . value
125- const result = yield * chunkFileEffect ( file , chunkOptions )
126- completed ++
127- if ( onProgress ) {
128- onProgress ( completed , total , file . filepath , result . error === null )
129- }
130- yield * Queue . offer ( resultsQueue , result )
131- }
132- } )
133-
134- yield * Effect . fork (
135- Effect . gen ( function * ( ) {
136- yield * Effect . all (
137- Array . from ( { length : Math . min ( concurrency , total ) } , ( ) => worker ) ,
138- { concurrency : 'unbounded' } ,
139- )
140- yield * Queue . offer ( resultsQueue , null )
141- } ) ,
142- )
143-
144- return Stream . fromQueue ( resultsQueue ) . pipe (
145- Stream . takeWhile ( ( result ) : result is BatchResult => result !== null ) ,
146- )
147- } ) ,
148- )
149- }
213+ ) : Stream . Stream < BatchResult , never > =>
214+ batchStreamEffect ( nativeChunkFile , files , options )
150215
151216export const chunkBatchEffect = (
152217 files : FileInput [ ] ,
153218 options : BatchOptions = { } ,
154- ) : Effect . Effect < BatchResult [ ] , never > => {
155- return Stream . runCollect ( chunkBatchStreamEffect ( files , options ) ) . pipe (
156- Effect . map ( ( chunk ) => Array . from ( chunk ) ) ,
157- )
158- }
219+ ) : Effect . Effect < BatchResult [ ] , never > =>
220+ batchEffect ( nativeChunkFile , files , options )
159221
160222export async function chunkBatch (
161223 files : FileInput [ ] ,
162224 options ?: BatchOptions ,
163225) : Promise < BatchResult [ ] > {
164- return Effect . runPromise ( chunkBatchEffect ( files , options ) )
226+ return batch ( nativeChunkFile , files , options )
165227}
166228
167229export async function * chunkBatchStream (
168230 files : FileInput [ ] ,
169231 options ?: BatchOptions ,
170232) : AsyncGenerator < BatchResult > {
171- const results : BatchResult [ ] = [ ]
172- let resolveNext : ( ( value : IteratorResult < BatchResult > ) => void ) | null = null
173- let done = false
174-
175- const streamEffect = chunkBatchStreamEffect ( files , options ) . pipe (
176- Stream . runForEach ( ( result ) =>
177- Effect . sync ( ( ) => {
178- if ( resolveNext ) {
179- const resolve = resolveNext
180- resolveNext = null
181- resolve ( { value : result , done : false } )
182- } else {
183- results . push ( result )
184- }
185- } ) ,
186- ) ,
187- Effect . tap ( ( ) =>
188- Effect . sync ( ( ) => {
189- done = true
190- if ( resolveNext ) {
191- resolveNext ( { value : undefined as never , done : true } )
192- }
193- } ) ,
194- ) ,
195- )
196-
197- const runPromise = Effect . runPromise ( streamEffect )
198-
199- try {
200- while ( true ) {
201- const buffered = results . shift ( )
202- if ( buffered !== undefined ) {
203- yield buffered
204- } else if ( done ) {
205- break
206- } else {
207- const result = await new Promise < IteratorResult < BatchResult > > (
208- ( resolve ) => {
209- resolveNext = resolve
210- } ,
211- )
212- if ( result . done ) break
213- yield result . value
214- }
215- }
216- } finally {
217- await runPromise . catch ( ( ) => { } )
218- }
233+ yield * batchStream ( nativeChunkFile , files , options )
219234}
0 commit comments