4444import org .elasticsearch .index .mapper .DataStreamTimestampFieldMapper ;
4545import org .elasticsearch .index .mapper .DateFieldMapper ;
4646import org .elasticsearch .index .mapper .KeywordFieldMapper ;
47+ import org .elasticsearch .index .mapper .MappedFieldType ;
4748import org .elasticsearch .index .mapper .NumberFieldMapper ;
4849import org .elasticsearch .index .mapper .RoutingPathFields ;
4950import org .elasticsearch .index .mapper .TimeSeriesIdFieldMapper ;
@@ -81,7 +82,7 @@ public void testSimple() {
8182 int numSamplesPerTS = 10 ;
8283 long timestampStart = DateFieldMapper .DEFAULT_DATE_TIME_FORMATTER .parseMillis ("2024-01-01T00:00:00Z" );
8384 int maxPageSize = between (1 , 1024 );
84- List <Page > results = runDriver (1024 , maxPageSize , randomBoolean () , numTimeSeries , numSamplesPerTS , timestampStart );
85+ List <Page > results = runDriver (1024 , maxPageSize , true , numTimeSeries , numSamplesPerTS , timestampStart );
8586 // for now we emit at most one time series each page
8687 int offset = 0 ;
8788 for (Page page : results ) {
@@ -155,9 +156,12 @@ record Doc(int host, long timestamp, long metric) {}
155156 }
156157 int maxPageSize = between (1 , 1024 );
157158 int limit = randomBoolean () ? between (1 , 100000 ) : Integer .MAX_VALUE ;
159+ var metricField = new NumberFieldMapper .NumberFieldType ("metric" , NumberFieldMapper .NumberType .LONG );
158160 var timeSeriesFactory = createTimeSeriesSourceOperator (
159161 directory ,
160162 r -> this .reader = r ,
163+ true ,
164+ List .of (new ExtractField (metricField , ElementType .LONG )),
161165 limit ,
162166 maxPageSize ,
163167 randomBoolean (),
@@ -171,12 +175,11 @@ record Doc(int host, long timestamp, long metric) {}
171175 );
172176 DriverContext driverContext = driverContext ();
173177 List <Page > results = new ArrayList <>();
174- var metricField = new NumberFieldMapper .NumberFieldType ("metric" , NumberFieldMapper .NumberType .LONG );
175178 OperatorTestCase .runDriver (
176179 TestDriverFactory .create (
177180 driverContext ,
178181 timeSeriesFactory .get (driverContext ),
179- List .of (ValuesSourceReaderOperatorTests . factory ( reader , metricField , ElementType . LONG ). get ( driverContext ) ),
182+ List .of (),
180183 new TestResultPageSinkOperator (results ::add )
181184 )
182185 );
@@ -240,7 +243,9 @@ public void testMatchNone() throws Exception {
240243 Integer .MAX_VALUE ,
241244 randomIntBetween (1 , 1024 ),
242245 1 ,
246+ randomBoolean (),
243247 List .of (ctx ),
248+ new ValuesSourceReaderOperator .FieldInfo [] {},
244249 unused -> query
245250 );
246251 var driverContext = driverContext ();
@@ -260,7 +265,7 @@ public void testMatchNone() throws Exception {
260265
261266 @ Override
262267 protected Operator .OperatorFactory simple () {
263- return createTimeSeriesSourceOperator (directory , r -> this .reader = r , 1 , 1 , false , writer -> {
268+ return createTimeSeriesSourceOperator (directory , r -> this .reader = r , randomBoolean (), List . of (), 1 , 1 , false , writer -> {
264269 long timestamp = DateFieldMapper .DEFAULT_DATE_TIME_FORMATTER .parseMillis ("2024-01-01T00:00:00Z" );
265270 writeTS (writer , timestamp , new Object [] { "hostname" , "host-01" }, new Object [] { "voltage" , 2 });
266271 return 1 ;
@@ -279,9 +284,13 @@ protected Matcher<String> expectedToStringOfSimple() {
279284
280285 List <Page > runDriver (int limit , int maxPageSize , boolean forceMerge , int numTimeSeries , int numSamplesPerTS , long timestampStart ) {
281286 var ctx = driverContext ();
287+ var voltageField = new NumberFieldMapper .NumberFieldType ("voltage" , NumberFieldMapper .NumberType .LONG );
288+ var hostnameField = new KeywordFieldMapper .KeywordFieldType ("hostname" );
282289 var timeSeriesFactory = createTimeSeriesSourceOperator (
283290 directory ,
284291 indexReader -> this .reader = indexReader ,
292+ true ,
293+ List .of (new ExtractField (voltageField , ElementType .LONG ), new ExtractField (hostnameField , ElementType .BYTES_REF )),
285294 limit ,
286295 maxPageSize ,
287296 forceMerge ,
@@ -300,18 +309,8 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
300309 );
301310
302311 List <Page > results = new ArrayList <>();
303- var voltageField = new NumberFieldMapper .NumberFieldType ("voltage" , NumberFieldMapper .NumberType .LONG );
304- var hostnameField = new KeywordFieldMapper .KeywordFieldType ("hostname" );
305312 OperatorTestCase .runDriver (
306- TestDriverFactory .create (
307- ctx ,
308- timeSeriesFactory .get (ctx ),
309- List .of (
310- ValuesSourceReaderOperatorTests .factory (reader , voltageField , ElementType .LONG ).get (ctx ),
311- ValuesSourceReaderOperatorTests .factory (reader , hostnameField , ElementType .BYTES_REF ).get (ctx )
312- ),
313- new TestResultPageSinkOperator (results ::add )
314- )
313+ TestDriverFactory .create (ctx , timeSeriesFactory .get (ctx ), List .of (), new TestResultPageSinkOperator (results ::add ))
315314 );
316315 OperatorTestCase .assertDriverContext (ctx );
317316 for (Page result : results ) {
@@ -321,9 +320,15 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
321320 return results ;
322321 }
323322
323+ public record ExtractField (MappedFieldType ft , ElementType elementType ) {
324+
325+ }
326+
324327 public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator (
325328 Directory directory ,
326329 Consumer <IndexReader > readerConsumer ,
330+ boolean emitDocIds ,
331+ List <ExtractField > extractFields ,
327332 int limit ,
328333 int maxPageSize ,
329334 boolean forceMerge ,
@@ -354,7 +359,18 @@ public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperat
354359 }
355360 var ctx = new LuceneSourceOperatorTests .MockShardContext (reader , 0 );
356361 Function <ShardContext , Query > queryFunction = c -> new MatchAllDocsQuery ();
357- return TimeSeriesSortedSourceOperatorFactory .create (limit , maxPageSize , 1 , List .of (ctx ), queryFunction );
362+
363+ var fieldInfos = extractFields .stream ()
364+ .map (
365+ f -> new ValuesSourceReaderOperator .FieldInfo (
366+ f .ft .name (),
367+ f .elementType ,
368+ n -> f .ft .blockLoader (ValuesSourceReaderOperatorTests .blContext ())
369+ )
370+ )
371+ .toArray (ValuesSourceReaderOperator .FieldInfo []::new );
372+
373+ return TimeSeriesSortedSourceOperatorFactory .create (limit , maxPageSize , 1 , emitDocIds , List .of (ctx ), fieldInfos , queryFunction );
358374 }
359375
360376 public static void writeTS (RandomIndexWriter iw , long timestamp , Object [] dimensions , Object [] metrics ) throws IOException {
0 commit comments