66 "errors"
77 "fmt"
88 "io"
9+ "log"
910 "math"
1011 "os"
1112 "path/filepath"
@@ -15,6 +16,11 @@ import (
1516 "github.com/google/codesearch/sparse"
1617)
1718
19+ // defaultFlushThreshold is the maximum number of trigram entries that we
20+ // accumulate, afterwards we flush an intermediate index to disk and start
21+ // writing to a new index. Each trigram [entry] takes 8 bytes.
22+ const defaultFlushThreshold = 100_000_000 // ≈800 MB
23+
1824type countingWriter struct {
1925 offset uint64
2026 f * os.File
@@ -50,11 +56,12 @@ type entry struct {
5056}
5157
5258type Writer struct {
53- dir string
54- index map [Trigram ][]entry
55- docs []string
56- set * sparse.Set // efficiently reset across AddFile calls
57- inbuf []byte
59+ dir string
60+ index map [Trigram ][]entry
61+ indexEntries int // total number of entries in [index]
62+ docs []string
63+ set * sparse.Set // efficiently reset across AddFile calls
64+ inbuf []byte
5865}
5966
6067func Create (dir string ) (* Writer , error ) {
@@ -216,11 +223,84 @@ func (w *Writer) AddFile(fn, name string) error {
216223 t := Trigram (e >> 32 )
217224 w .index [t ] = append (w .index [t ], entry {docid : docid , position : uint32 (e )})
218225 }
226+ w .indexEntries += len (entries )
227+ if w .indexEntries >= defaultFlushThreshold {
228+ if err := w .intermediateFlush (); err != nil {
229+ return fmt .Errorf ("intermediate flush: %w" , err )
230+ }
231+ }
232+ return nil
233+ }
234+
235+ // intermediateFlush flushes the current index to disk and starts a fresh one.
236+ // The intermediate index files will be merged before indexing completes.
237+ func (w * Writer ) intermediateFlush () error {
238+ if len (w .docs ) == 0 {
239+ return nil // nothing to flush
240+ }
241+
242+ // Create a temp directory for this intermediate index
243+ intermediate , err := filepath .Glob (filepath .Join (w .dir , "intermediate.*.tmp" ))
244+ if err != nil {
245+ return err
246+ }
247+ tmpDir := fmt .Sprintf ("%s/intermediate.%d.tmp" , w .dir , len (intermediate ))
248+ log .Printf ("flushing to intermediate index %s" , tmpDir )
249+ if err := os .MkdirAll (tmpDir , 0755 ); err != nil {
250+ return err
251+ }
252+
253+ // Write the current index to the temp directory
254+ if err := w .flushTo (tmpDir ); err != nil {
255+ os .RemoveAll (tmpDir )
256+ return err
257+ }
258+
259+ // Clear memory
260+ w .index = make (map [Trigram ][]entry )
261+ w .docs = nil
262+ w .indexEntries = 0
263+
219264 return nil
220265}
221266
222267func (w * Writer ) Flush () error {
223- if err := w .writeDocidMap (w .docs ); err != nil {
268+ intermediate , err := filepath .Glob (filepath .Join (w .dir , "intermediate.*.tmp" ))
269+ if err != nil {
270+ return err
271+ }
272+
273+ log .Printf ("found %d intermediate index files in %s" , len (intermediate ), w .dir )
274+
275+ if len (intermediate ) == 0 {
276+ // Easy case: write from memory to disk directly.
277+ return w .flushTo (w .dir )
278+ }
279+
280+ if len (w .docs ) > 0 {
281+ // Flush remaining data before merging.
282+ if err := w .intermediateFlush (); err != nil {
283+ return err
284+ }
285+ }
286+
287+ // Merge all intermediate indexes into the final directory.
288+ if err := ConcatN (w .dir , intermediate ); err != nil {
289+ return fmt .Errorf ("merging intermediate indexes: %w" , err )
290+ }
291+
292+ for _ , tmpDir := range intermediate {
293+ if err := os .RemoveAll (tmpDir ); err != nil {
294+ return err
295+ }
296+ }
297+
298+ return nil
299+ }
300+
301+ // flushTo writes the current in-memory index to the specified directory.
302+ func (w * Writer ) flushTo (dir string ) error {
303+ if err := w .writeDocidMap (dir , w .docs ); err != nil {
224304 return err
225305 }
226306
@@ -231,15 +311,15 @@ func (w *Writer) Flush() error {
231311 }
232312 sort .Slice (trigrams , func (i , j int ) bool { return trigrams [i ] < trigrams [j ] })
233313
234- if err := w .writeDocid (trigrams ); err != nil {
314+ if err := w .writeDocid (dir , trigrams ); err != nil {
235315 return err
236316 }
237317
238- if err := w .writePos (trigrams ); err != nil {
318+ if err := w .writePos (dir , trigrams ); err != nil {
239319 return err
240320 }
241321
242- if err := w .writePosrel (trigrams ); err != nil {
322+ if err := w .writePosrel (dir , trigrams ); err != nil {
243323 return err
244324 }
245325
@@ -250,8 +330,8 @@ func (w *Writer) Flush() error {
250330// \n-separated strings (for easy printing by humans using less(1) or
251331// strings(1)), followed by the byte offsets of each entry and, lastly, the
252332// offset of the byte offsets (for fast lookup).
253- func (w * Writer ) writeDocidMap (filenames []string ) error {
254- f , err := os .Create (filepath .Join (w . dir , "docid.map" ))
333+ func (w * Writer ) writeDocidMap (dir string , filenames []string ) error {
334+ f , err := os .Create (filepath .Join (dir , "docid.map" ))
255335 if err != nil {
256336 return err
257337 }
@@ -272,14 +352,14 @@ func (w *Writer) writeDocidMap(filenames []string) error {
272352 return cw .Close ()
273353}
274354
275- func (w * Writer ) writeDocid (trigrams []Trigram ) error {
276- f , err := os .Create (filepath .Join (w . dir , "posting.docid.meta" ))
355+ func (w * Writer ) writeDocid (dir string , trigrams []Trigram ) error {
356+ f , err := os .Create (filepath .Join (dir , "posting.docid.meta" ))
277357 if err != nil {
278358 return err
279359 }
280360 defer f .Close ()
281361 bufw := bufio .NewWriter (f )
282- dw , err := newPForWriter (w . dir , "docid" )
362+ dw , err := newPForWriter (dir , "docid" )
283363 if err != nil {
284364 return err
285365 }
@@ -330,14 +410,14 @@ func (w *Writer) writeDocid(trigrams []Trigram) error {
330410 return nil
331411}
332412
333- func (w * Writer ) writePos (trigrams []Trigram ) error {
334- f , err := os .Create (filepath .Join (w . dir , "posting.pos.meta" ))
413+ func (w * Writer ) writePos (dir string , trigrams []Trigram ) error {
414+ f , err := os .Create (filepath .Join (dir , "posting.pos.meta" ))
335415 if err != nil {
336416 return err
337417 }
338418 defer f .Close ()
339419 bufw := bufio .NewWriter (f )
340- dw , err := newPForWriter (w . dir , "pos" )
420+ dw , err := newPForWriter (dir , "pos" )
341421 if err != nil {
342422 return err
343423 }
@@ -385,14 +465,14 @@ func (w *Writer) writePos(trigrams []Trigram) error {
385465 return nil
386466}
387467
388- func (w * Writer ) writePosrel (trigrams []Trigram ) error {
389- f , err := os .Create (filepath .Join (w . dir , "posting.posrel.meta" ))
468+ func (w * Writer ) writePosrel (dir string , trigrams []Trigram ) error {
469+ f , err := os .Create (filepath .Join (dir , "posting.posrel.meta" ))
390470 if err != nil {
391471 return err
392472 }
393473 defer f .Close ()
394474 bufw := bufio .NewWriter (f )
395- df , err := os .Create (filepath .Join (w . dir , "posting.posrel.data" ))
475+ df , err := os .Create (filepath .Join (dir , "posting.posrel.data" ))
396476 if err != nil {
397477 return err
398478 }
0 commit comments