@@ -890,15 +890,26 @@ private boolean hasNextOverlappedPage() throws IOException {
890890 return true ;
891891 }
892892
893- tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader ();
893+ // init the merge reader for current call
894+ // The original process is changed to lazy loading because different mem page readers
895+ // belonging to the same mem chunk need to be read in a streaming manner. Therefore, it is
896+ // necessary to ensure that these mem page readers cannot coexist in the mergeReader at the
897+ // same time.
898+ // The initial endPointTime is calculated as follows:
899+ // 1. If mergeReader is empty, use the endpoint of firstPageReader to find all overlapped
900+ // unseq pages and take the end point.
901+ // 2. If mergeReader is not empty, use the readStopTime of mergeReader to find all overlapping
902+ // unseq pages and take the end point.
903+ long initialEndPointTime = tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader ();
894904
895905 while (true ) {
896906
897907 // may has overlapped data
898908 if (mergeReader .hasNextTimeValuePair ()) {
899909
900910 TsBlockBuilder builder = new TsBlockBuilder (getTsDataTypeList ());
901- long currentPageEndPointTime = mergeReader .getCurrentReadStopTime ();
911+ long currentPageEndPointTime =
912+ Math .max (mergeReader .getCurrentReadStopTime (), initialEndPointTime );
902913 while (mergeReader .hasNextTimeValuePair ()) {
903914
904915 /*
@@ -928,7 +939,7 @@ private boolean hasNextOverlappedPage() throws IOException {
928939 unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata (
929940 timeValuePair .getTimestamp (), false );
930941 unpackAllOverlappedChunkMetadataToPageReaders (timeValuePair .getTimestamp (), false );
931- unpackAllOverlappedUnseqPageReadersToMergeReader (timeValuePair . getTimestamp () );
942+ unpackAllOverlappedUnseqPageReadersToMergeReader ();
932943
933944 // update if there are unpacked unSeqPageReaders
934945 timeValuePair = mergeReader .currentTimeValuePair ();
@@ -1017,33 +1028,59 @@ private long updateEndPointTime(long currentPageEndPointTime, IVersionPageReader
10171028 }
10181029 }
10191030
1020- private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader () throws IOException {
1031+ private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader () throws IOException {
1032+ do {
1033+ /*
1034+ * no cached page readers
1035+ */
1036+ if (firstPageReader == null && unSeqPageReaders .isEmpty () && seqPageReaders .isEmpty ()) {
1037+ return mergeReader .getCurrentReadStopTime ();
1038+ }
10211039
1022- /*
1023- * no cached page readers
1024- */
1025- if (firstPageReader == null && unSeqPageReaders .isEmpty () && seqPageReaders .isEmpty ()) {
1026- return ;
1027- }
1040+ /*
1041+ * init firstPageReader
1042+ */
1043+ if (firstPageReader == null ) {
1044+ initFirstPageReader ();
1045+ }
1046+ putPageReaderToMergeReader (firstPageReader );
1047+ firstPageReader = null ;
1048+ } while (!mergeReader .hasNextTimeValuePair ());
10281049
10291050 /*
1030- * init firstPageReader
1051+ * put all currently directly overlapped unseq page reader to merge reader
10311052 */
1032- if (firstPageReader == null ) {
1033- initFirstPageReader ();
1034- }
1053+ long mergeReaderStopTime = mergeReader .getCurrentReadStopTime ();
1054+ unpackAllOverlappedUnseqPageReadersToMergeReader ();
10351055
1036- long currentPageEndpointTime ;
1037- if (mergeReader .hasNextTimeValuePair ()) {
1038- currentPageEndpointTime = mergeReader .getCurrentReadStopTime ();
1039- } else {
1040- currentPageEndpointTime = orderUtils .getOverlapCheckTime (firstPageReader .getStatistics ());
1041- }
1056+ return calculateInitialEndPointTime (mergeReaderStopTime );
1057+ }
10421058
1043- /*
1044- * put all currently directly overlapped unseq page reader to merge reader
1045- */
1046- unpackAllOverlappedUnseqPageReadersToMergeReader (currentPageEndpointTime );
1059+ private long calculateInitialEndPointTime (long currentReadStopTime ) {
1060+ if (firstPageReader != null
1061+ && !firstPageReader .isSeq ()
1062+ && orderUtils .isOverlapped (currentReadStopTime , firstPageReader .getStatistics ())) {
1063+ if (orderUtils .getAscending ()) {
1064+ currentReadStopTime =
1065+ Math .max (currentReadStopTime , firstPageReader .getStatistics ().getEndTime ());
1066+ } else {
1067+ currentReadStopTime =
1068+ Math .min (currentReadStopTime , firstPageReader .getStatistics ().getEndTime ());
1069+ }
1070+ }
1071+ for (IVersionPageReader unSeqPageReader : unSeqPageReaders ) {
1072+ if (orderUtils .isOverlapped (currentReadStopTime , unSeqPageReader .getStatistics ())) {
1073+ if (orderUtils .getAscending ()) {
1074+ currentReadStopTime =
1075+ Math .max (currentReadStopTime , firstPageReader .getStatistics ().getEndTime ());
1076+ } else {
1077+ currentReadStopTime =
1078+ Math .min (currentReadStopTime , firstPageReader .getStatistics ().getEndTime ());
1079+ }
1080+ }
1081+ break ;
1082+ }
1083+ return currentReadStopTime ;
10471084 }
10481085
10491086 private void addTimeValuePairToResult (TimeValuePair timeValuePair , TsBlockBuilder builder ) {
@@ -1135,17 +1172,26 @@ private IVersionPageReader getFirstPageReaderFromCachedReaders() {
11351172 return firstPageReader ;
11361173 }
11371174
1138- private void unpackAllOverlappedUnseqPageReadersToMergeReader (long endpointTime )
1139- throws IOException {
1140- while (!unSeqPageReaders .isEmpty ()
1141- && orderUtils .isOverlapped (endpointTime , unSeqPageReaders .peek ().getStatistics ())) {
1142- putPageReaderToMergeReader (unSeqPageReaders .poll ());
1143- }
1175+ // This process loads overlapped unseq pages based on the current time value pair of the
1176+ // mergeReader. The current time value pair of the mergeReader is recalculated each time an unseq
1177+ // page is added.
1178+ // The current time obtained from mergeReader each time is not necessarily the minimum among all
1179+ // the actual unseq data, so it is necessary to repeatedly calculate and include potentially
1180+ // overlapping unseq pages.
1181+ private void unpackAllOverlappedUnseqPageReadersToMergeReader () throws IOException {
1182+ long actualFirstTimeOfMergeReader = mergeReader .currentTimeValuePair ().getTimestamp ();
11441183 if (firstPageReader != null
11451184 && !firstPageReader .isSeq ()
1146- && orderUtils .isOverlapped (endpointTime , firstPageReader .getStatistics ())) {
1185+ && orderUtils .isOverlapped (actualFirstTimeOfMergeReader , firstPageReader .getStatistics ())) {
11471186 putPageReaderToMergeReader (firstPageReader );
11481187 firstPageReader = null ;
1188+ actualFirstTimeOfMergeReader = mergeReader .currentTimeValuePair ().getTimestamp ();
1189+ }
1190+ while (!unSeqPageReaders .isEmpty ()
1191+ && orderUtils .isOverlapped (
1192+ actualFirstTimeOfMergeReader , unSeqPageReaders .peek ().getStatistics ())) {
1193+ putPageReaderToMergeReader (unSeqPageReaders .poll ());
1194+ actualFirstTimeOfMergeReader = mergeReader .currentTimeValuePair ().getTimestamp ();
11491195 }
11501196 }
11511197
0 commit comments