@@ -813,6 +813,38 @@ public void testKafkaWithDelayedStopReadingFunction() {
813813 runWithStopReadingFn (checkStopReadingFn , "delayed-stop-reading" , sourceOptions .numRecords );
814814 }
815815
816+ @ Test
817+ public void testKafkaWithStopReadTime () throws IOException {
818+ writePipeline
819+ .apply ("Generate records" , Read .from (new SyntheticBoundedSource (sourceOptions )))
820+ .apply ("Measure write time" , ParDo .of (new TimeMonitor <>(NAMESPACE , WRITE_TIME_METRIC_NAME )))
821+ .apply (
822+ "Write to Kafka" ,
823+ writeToKafka ().withTopic (options .getKafkaTopic () + "-stop-read-time" ));
824+
825+ PipelineResult writeResult = writePipeline .run ();
826+ PipelineResult .State writeState = writeResult .waitUntilFinish ();
827+ cancelIfTimeouted (writeResult , writeState );
828+ assertNotEquals (PipelineResult .State .FAILED , writeState );
829+
830+ sdfReadPipeline .getOptions ().as (Options .class ).setStreaming (false );
831+ PCollection <KafkaRecord <byte [], byte []>> rows =
832+ sdfReadPipeline .apply (
833+ "Read from bounded Kafka" ,
834+ readFromKafka ()
835+ .withTopic (options .getKafkaTopic () + "-stop-read-time" )
836+ .withStopReadTime (
837+ org .joda .time .Instant .ofEpochMilli (
838+ new MetricsReader (writeResult , NAMESPACE )
839+ .getEndTimeMetric (WRITE_TIME_METRIC_NAME ))));
840+
841+ PipelineResult readResult = sdfReadPipeline .run ();
842+ PipelineResult .State readState =
843+ readResult .waitUntilFinish (Duration .standardSeconds (options .getReadTimeout ()));
844+ cancelIfTimeouted (readResult , readState );
845+ assertNotEquals (PipelineResult .State .FAILED , readState );
846+ }
847+
816848 public static final Schema KAFKA_TOPIC_SCHEMA =
817849 Schema .builder ()
818850 .addStringField ("name" )
0 commit comments