@@ -61,7 +61,7 @@ class KnnIndexer {
6161 static final String ID_FIELD = "id" ;
6262 static final String VECTOR_FIELD = "vector" ;
6363
64- private final Path docsPath ;
64+ private final List < Path > docsPath ;
6565 private final Path indexPath ;
6666 private final VectorEncoding vectorEncoding ;
6767 private int dim ;
@@ -71,7 +71,7 @@ class KnnIndexer {
7171 private final int numIndexThreads ;
7272
7373 KnnIndexer (
74- Path docsPath ,
74+ List < Path > docsPath ,
7575 Path indexPath ,
7676 Codec codec ,
7777 int numIndexThreads ,
@@ -127,57 +127,70 @@ public boolean isEnabled(String component) {
127127 }
128128
129129 long start = System .nanoTime ();
130- try (
131- FSDirectory dir = FSDirectory .open (indexPath );
132- IndexWriter iw = new IndexWriter (dir , iwc );
133- FileChannel in = FileChannel .open (docsPath )
134- ) {
135- long docsPathSizeInBytes = in .size ();
136- int offsetByteSize = 0 ;
137- if (dim == -1 ) {
138- offsetByteSize = 4 ;
139- ByteBuffer preamble = ByteBuffer .allocate (4 ).order (ByteOrder .LITTLE_ENDIAN );
140- int bytesRead = Channels .readFromFileChannel (in , 0 , preamble );
141- if (bytesRead < 4 ) {
142- throw new IllegalArgumentException (
143- "docsPath \" " + docsPath + "\" does not contain a valid dims? size=" + docsPathSizeInBytes
130+ AtomicInteger numDocsIndexed = new AtomicInteger ();
131+ try (FSDirectory dir = FSDirectory .open (indexPath ); IndexWriter iw = new IndexWriter (dir , iwc );) {
132+ for (Path docsPath : this .docsPath ) {
133+ int dim = this .dim ;
134+ try (FileChannel in = FileChannel .open (docsPath )) {
135+ long docsPathSizeInBytes = in .size ();
136+ int offsetByteSize = 0 ;
137+ if (dim == -1 ) {
138+ offsetByteSize = 4 ;
139+ ByteBuffer preamble = ByteBuffer .allocate (4 ).order (ByteOrder .LITTLE_ENDIAN );
140+ int bytesRead = Channels .readFromFileChannel (in , 0 , preamble );
141+ if (bytesRead < 4 ) {
142+ throw new IllegalArgumentException (
143+ "docsPath \" " + docsPath + "\" does not contain a valid dims? size=" + docsPathSizeInBytes
144+ );
145+ }
146+ dim = preamble .getInt (0 );
147+ if (dim <= 0 ) {
148+ throw new IllegalArgumentException ("docsPath \" " + docsPath + "\" has invalid dimension: " + dim );
149+ }
150+ }
151+ FieldType fieldType = switch (vectorEncoding ) {
152+ case BYTE -> KnnByteVectorField .createFieldType (dim , similarityFunction );
153+ case FLOAT32 -> KnnFloatVectorField .createFieldType (dim , similarityFunction );
154+ };
155+ if (docsPathSizeInBytes % (((long ) dim * vectorEncoding .byteSize + offsetByteSize )) != 0 ) {
156+ throw new IllegalArgumentException (
157+ "docsPath \" " + docsPath + "\" does not contain a whole number of vectors? size=" + docsPathSizeInBytes
158+ );
159+ }
160+ int numDocs = (int ) (docsPathSizeInBytes / ((long ) dim * vectorEncoding .byteSize + offsetByteSize ));
161+ numDocs = Math .min (this .numDocs - numDocsIndexed .get (), numDocs );
162+ if (numDocs <= 0 ) {
163+ break ;
164+ }
165+ logger .info (
166+ "path={}, docsPathSizeInBytes={}, numDocs={}, dim={}, vectorEncoding={}, byteSize={}" ,
167+ docsPath ,
168+ docsPathSizeInBytes ,
169+ numDocs ,
170+ dim ,
171+ vectorEncoding ,
172+ vectorEncoding .byteSize
144173 );
145- }
146- dim = preamble .getInt (0 );
147- if (dim <= 0 ) {
148- throw new IllegalArgumentException ("docsPath \" " + docsPath + "\" has invalid dimension: " + dim );
149- }
150- }
151- FieldType fieldType = switch (vectorEncoding ) {
152- case BYTE -> KnnByteVectorField .createFieldType (dim , similarityFunction );
153- case FLOAT32 -> KnnFloatVectorField .createFieldType (dim , similarityFunction );
154- };
155- if (docsPathSizeInBytes % (((long ) dim * vectorEncoding .byteSize + offsetByteSize )) != 0 ) {
156- throw new IllegalArgumentException (
157- "docsPath \" " + docsPath + "\" does not contain a whole number of vectors? size=" + docsPathSizeInBytes
158- );
159- }
160- logger .info (
161- "docsPathSizeInBytes={}, dim={}, vectorEncoding={}, byteSize={}" ,
162- docsPathSizeInBytes ,
163- dim ,
164- vectorEncoding ,
165- vectorEncoding .byteSize
166- );
174+ // adjust numDocs to account for the number of documents already indexed
175+ // numDocsIndexed tracks the total docs read in order and is used for docIds
176+ // numDocs is the total number of docs to index from this file
177+ numDocs += numDocsIndexed .get ();
167178
168- VectorReader inReader = VectorReader .create (in , dim , vectorEncoding , offsetByteSize );
169- try (ExecutorService exec = Executors .newFixedThreadPool (numIndexThreads , r -> new Thread (r , "KnnIndexer-Thread" ))) {
170- AtomicInteger numDocsIndexed = new AtomicInteger ();
171- List <Future <?>> threads = new ArrayList <>();
172- for (int i = 0 ; i < numIndexThreads ; i ++) {
173- Thread t = new IndexerThread (iw , inReader , dim , vectorEncoding , fieldType , numDocsIndexed , numDocs );
174- t .setDaemon (true );
175- threads .add (exec .submit (t ));
176- }
177- for (Future <?> t : threads ) {
178- t .get ();
179+ VectorReader inReader = VectorReader .create (in , dim , vectorEncoding , offsetByteSize );
180+ try (ExecutorService exec = Executors .newFixedThreadPool (numIndexThreads , r -> new Thread (r , "KnnIndexer-Thread" ))) {
181+ List <Future <?>> threads = new ArrayList <>();
182+ for (int i = 0 ; i < numIndexThreads ; i ++) {
183+ Thread t = new IndexerThread (iw , inReader , dim , vectorEncoding , fieldType , numDocsIndexed , numDocs );
184+ t .setDaemon (true );
185+ threads .add (exec .submit (t ));
186+ }
187+ for (Future <?> t : threads ) {
188+ t .get ();
189+ }
190+ }
179191 }
180192 }
193+ logger .info ("KnnIndexer: indexed {} documents of desired {} numDocs" , numDocsIndexed , numDocs );
181194 logger .debug ("all indexing threads finished, now IndexWriter.commit()" );
182195 iw .commit ();
183196 ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler ) iwc .getMergeScheduler ();
0 commit comments