2121
2222import org .apache .iotdb .commons .path .AlignedFullPath ;
2323import org .apache .iotdb .commons .schema .table .column .TsTableColumnCategory ;
24- import org .apache .iotdb .db .queryengine .execution .MemoryEstimationHelper ;
2524import org .apache .iotdb .db .queryengine .execution .aggregation .timerangeiterator .ITableTimeRangeIterator ;
2625import org .apache .iotdb .db .queryengine .execution .operator .OperatorContext ;
2726import org .apache .iotdb .db .queryengine .execution .operator .source .AbstractDataSourceOperator ;
3130import org .apache .iotdb .db .queryengine .execution .operator .window .TimeWindow ;
3231import org .apache .iotdb .db .queryengine .plan .planner .plan .node .PlanNodeId ;
3332import org .apache .iotdb .db .queryengine .plan .planner .plan .parameter .SeriesScanOptions ;
33+ import org .apache .iotdb .db .queryengine .plan .relational .metadata .AlignedDeviceEntry ;
3434import org .apache .iotdb .db .queryengine .plan .relational .metadata .ColumnSchema ;
3535import org .apache .iotdb .db .queryengine .plan .relational .metadata .DeviceEntry ;
3636import org .apache .iotdb .db .queryengine .plan .statement .component .Ordering ;
5252import org .apache .tsfile .read .common .block .column .RunLengthEncodedColumn ;
5353import org .apache .tsfile .utils .Binary ;
5454import org .apache .tsfile .utils .Pair ;
55- import org .apache .tsfile .utils .RamUsageEstimator ;
5655import org .apache .tsfile .write .schema .IMeasurementSchema ;
5756
5857import java .io .IOException ;
6160import java .util .List ;
6261import java .util .Optional ;
6362import java .util .Set ;
64- import java .util .concurrent .TimeUnit ;
6563import java .util .stream .Collectors ;
6664
6765import static org .apache .iotdb .db .queryengine .execution .operator .AggregationUtil .satisfiedTimeRange ;
7371
7472public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOperator {
7573
76- private static final long INSTANCE_SIZE =
77- RamUsageEstimator .shallowSizeOfInstance (AbstractAggTableScanOperator .class );
78-
7974 private boolean finished = false ;
8075 private TsBlock inputTsBlock ;
8176
@@ -116,53 +111,39 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe
116111
117112 private boolean allAggregatorsHasFinalResult = false ;
118113
119- public AbstractAggTableScanOperator (
120- PlanNodeId sourceId ,
121- OperatorContext context ,
122- List <ColumnSchema > aggColumnSchemas ,
123- int [] aggColumnsIndexArray ,
124- List <DeviceEntry > deviceEntries ,
125- int deviceCount ,
126- SeriesScanOptions seriesScanOptions ,
127- List <String > measurementColumnNames ,
128- Set <String > allSensors ,
129- List <IMeasurementSchema > measurementSchemas ,
130- List <TableAggregator > tableAggregators ,
131- List <ColumnSchema > groupingKeySchemas ,
132- int [] groupingKeyIndex ,
133- ITableTimeRangeIterator tableTimeRangeIterator ,
134- boolean ascending ,
135- boolean canUseStatistics ,
136- List <Integer > aggregatorInputChannels ) {
137-
138- this .sourceId = sourceId ;
139- this .operatorContext = context ;
140- this .canUseStatistics = canUseStatistics ;
141- this .tableAggregators = tableAggregators ;
142- this .groupingKeySchemas = groupingKeySchemas ;
143- this .groupingKeyIndex = groupingKeyIndex ;
144- this .groupingKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas .size ();
145- this .aggColumnSchemas = aggColumnSchemas ;
146- this .aggColumnsIndexArray = aggColumnsIndexArray ;
147- this .deviceEntries = deviceEntries ;
148- this .deviceCount = deviceCount ;
114+ public AbstractAggTableScanOperator (AbstractAggTableScanOperatorParameter parameter ) {
115+
116+ this .sourceId = parameter .sourceId ;
117+ this .operatorContext = parameter .context ;
118+ this .canUseStatistics = parameter .canUseStatistics ;
119+ this .tableAggregators = parameter .tableAggregators ;
120+ this .groupingKeySchemas = parameter .groupingKeySchemas ;
121+ this .groupingKeyIndex = parameter .groupingKeyIndex ;
122+ this .groupingKeySize =
123+ parameter .groupingKeySchemas == null ? 0 : parameter .groupingKeySchemas .size ();
124+ this .aggColumnSchemas = parameter .aggColumnSchemas ;
125+ this .aggColumnsIndexArray = parameter .aggColumnsIndexArray ;
126+ this .deviceEntries = parameter .deviceEntries ;
127+ this .deviceCount = parameter .deviceCount ;
149128 this .operatorContext .recordSpecifiedInfo (DEVICE_NUMBER , Integer .toString (this .deviceCount ));
150- this .ascending = ascending ;
151- this .scanOrder = ascending ? Ordering .ASC : Ordering .DESC ;
152- this .seriesScanOptions = seriesScanOptions ;
153- this .measurementColumnNames = measurementColumnNames ;
129+ this .ascending = parameter . ascending ;
130+ this .scanOrder = parameter . ascending ? Ordering .ASC : Ordering .DESC ;
131+ this .seriesScanOptions = parameter . seriesScanOptions ;
132+ this .measurementColumnNames = parameter . measurementColumnNames ;
154133 this .measurementCount = measurementColumnNames .size ();
155134 this .cachedRawDataSize =
156135 (1L + this .measurementCount )
157136 * TSFileDescriptor .getInstance ().getConfig ().getPageSizeInByte ();
158- this .allSensors = allSensors ;
159- this .measurementSchemas = measurementSchemas ;
137+ this .allSensors = parameter . allSensors ;
138+ this .measurementSchemas = parameter . measurementSchemas ;
160139 this .measurementColumnTSDataTypes =
161- measurementSchemas .stream ().map (IMeasurementSchema ::getType ).collect (Collectors .toList ());
140+ parameter .measurementSchemas .stream ()
141+ .map (IMeasurementSchema ::getType )
142+ .collect (Collectors .toList ());
162143 this .currentDeviceIndex = 0 ;
163144 this .operatorContext .recordSpecifiedInfo (CURRENT_DEVICE_INDEX_STRING , Integer .toString (0 ));
164- this .aggregatorInputChannels = aggregatorInputChannels ;
165- this .timeIterator = tableTimeRangeIterator ;
145+ this .aggregatorInputChannels = parameter . aggregatorInputChannels ;
146+ this .timeIterator = parameter . tableTimeRangeIterator ;
166147 this .dateBinSize =
167148 timeIterator .getType () == ITableTimeRangeIterator .TimeIteratorType .DATE_BIN_TIME_ITERATOR
168149 ? 1
@@ -179,56 +160,6 @@ public boolean isFinished() throws Exception {
179160 return finished ;
180161 }
181162
182- @ Override
183- public boolean hasNext () throws Exception {
184- if (retainedTsBlock != null ) {
185- return true ;
186- }
187-
188- return timeIterator .hasCachedTimeRange () || timeIterator .hasNextTimeRange ();
189- }
190-
191- @ Override
192- public TsBlock next () throws Exception {
193- if (retainedTsBlock != null ) {
194- return getResultFromRetainedTsBlock ();
195- }
196-
197- // optimize for sql: select count(*) from (select count(s1), sum(s1) from table)
198- if (tableAggregators .isEmpty ()
199- && timeIterator .getType () == ITableTimeRangeIterator .TimeIteratorType .SINGLE_TIME_ITERATOR
200- && resultTsBlockBuilder .getValueColumnBuilders ().length == 0 ) {
201- resultTsBlockBuilder .reset ();
202- currentDeviceIndex = deviceCount ;
203- timeIterator .setFinished ();
204- Column [] valueColumns = new Column [0 ];
205- return new TsBlock (1 , new RunLengthEncodedColumn (TIME_COLUMN_TEMPLATE , 1 ), valueColumns );
206- }
207-
208- // start stopwatch, reset leftRuntimeOfOneNextCall
209- long start = System .nanoTime ();
210- leftRuntimeOfOneNextCall = 1000 * operatorContext .getMaxRunTime ().roundTo (TimeUnit .NANOSECONDS );
211- long maxRuntime = leftRuntimeOfOneNextCall ;
212-
213- while (System .nanoTime () - start < maxRuntime
214- && (timeIterator .hasCachedTimeRange () || timeIterator .hasNextTimeRange ())
215- && !resultTsBlockBuilder .isFull ()) {
216-
217- // calculate aggregation result on current time window
218- // return true if current time window is calc finished
219- if (calculateAggregationResultForCurrentTimeRange ()) {
220- timeIterator .resetCurTimeRange ();
221- }
222- }
223-
224- if (resultTsBlockBuilder .isEmpty ()) {
225- return null ;
226- }
227-
228- buildResultTsBlock ();
229- return checkTsBlockSizeAndGetResult ();
230- }
231-
232163 protected abstract void updateResultTsBlock ();
233164
234165 protected void buildResultTsBlock () {
@@ -244,7 +175,7 @@ protected void constructAlignedSeriesScanUtil() {
244175
245176 if (this .deviceEntries .isEmpty () || this .deviceEntries .get (this .currentDeviceIndex ) == null ) {
246177 // for device which is not exist
247- deviceEntry = new DeviceEntry (new StringArrayDeviceID ("" ), Collections .emptyList ());
178+ deviceEntry = new AlignedDeviceEntry (new StringArrayDeviceID ("" ), Collections .emptyList ());
248179 } else {
249180 deviceEntry = this .deviceEntries .get (this .currentDeviceIndex );
250181 }
@@ -422,12 +353,9 @@ private Column buildValueColumn(
422353 case TIME :
423354 return inputRegion .getTimeColumn ();
424355 case TAG :
425- // TODO avoid create deviceStatics multi times; count, sum can use time statistics
426356 String id =
427- (String )
428- deviceEntries
429- .get (currentDeviceIndex )
430- .getNthSegment (aggColumnsIndexArray [columnIdx ] + 1 );
357+ getNthIdColumnValue (
358+ deviceEntries .get (currentDeviceIndex ), aggColumnsIndexArray [columnIdx ]);
431359 return getIdOrAttrColumn (
432360 inputRegion .getTimeColumn ().getPositionCount (),
433361 id == null ? null : new Binary (id , TSFileConfig .STRING_CHARSET ));
@@ -493,12 +421,9 @@ private Statistics buildStatistics(
493421 case TIME :
494422 return timeStatistics ;
495423 case TAG :
496- // TODO avoid create deviceStatics multi times; count, sum can use time statistics
497424 String id =
498- (String )
499- deviceEntries
500- .get (currentDeviceIndex )
501- .getNthSegment (aggColumnsIndexArray [columnIdx ] + 1 );
425+ getNthIdColumnValue (
426+ deviceEntries .get (currentDeviceIndex ), aggColumnsIndexArray [columnIdx ]);
502427 return getStatistics (
503428 timeStatistics , id == null ? null : new Binary (id , TSFileConfig .STRING_CHARSET ));
504429 case ATTRIBUTE :
@@ -723,7 +648,7 @@ protected void appendGroupKeysToResult(List<DeviceEntry> deviceEntries, int devi
723648 ColumnBuilder [] columnBuilders = resultTsBlockBuilder .getValueColumnBuilders ();
724649 for (int i = 0 ; i < groupingKeySize ; i ++) {
725650 if (TsTableColumnCategory .TAG == groupingKeySchemas .get (i ).getColumnCategory ()) {
726- String id = ( String ) deviceEntries .get (deviceIndex ). getNthSegment ( groupingKeyIndex [i ] + 1 );
651+ String id = getNthIdColumnValue ( deviceEntries .get (deviceIndex ), groupingKeyIndex [i ]);
727652 if (id == null ) {
728653 columnBuilders [i ].appendNull ();
729654 } else {
@@ -839,19 +764,101 @@ public long calculateRetainedSizeAfterCallingNext() {
839764 : 0 ;
840765 }
841766
842- @ Override
843- public long ramBytesUsed () {
844- return INSTANCE_SIZE
845- + MemoryEstimationHelper .getEstimatedSizeOfAccountableObject (seriesScanUtil )
846- + MemoryEstimationHelper .getEstimatedSizeOfAccountableObject (operatorContext )
847- + MemoryEstimationHelper .getEstimatedSizeOfAccountableObject (sourceId )
848- + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder .getRetainedSizeInBytes ())
849- + RamUsageEstimator .sizeOfCollection (deviceEntries );
850- }
851-
852767 @ Override
853768 public void close () throws Exception {
854769 super .close ();
855770 tableAggregators .forEach (TableAggregator ::close );
856771 }
772+
773+ abstract String getNthIdColumnValue (DeviceEntry deviceEntry , int idColumnIndex );
774+
775+ public static class AbstractAggTableScanOperatorParameter {
776+ private final String timeColumnName ;
777+ protected final PlanNodeId sourceId ;
778+ protected final OperatorContext context ;
779+ protected final List <ColumnSchema > aggColumnSchemas ;
780+ protected final int [] aggColumnsIndexArray ;
781+ protected final SeriesScanOptions seriesScanOptions ;
782+ protected final List <String > measurementColumnNames ;
783+ protected final Set <String > allSensors ;
784+ protected final List <IMeasurementSchema > measurementSchemas ;
785+ protected final List <TableAggregator > tableAggregators ;
786+ protected final List <ColumnSchema > groupingKeySchemas ;
787+ protected final int [] groupingKeyIndex ;
788+ protected final ITableTimeRangeIterator tableTimeRangeIterator ;
789+ protected final boolean ascending ;
790+ protected final boolean canUseStatistics ;
791+ protected final List <Integer > aggregatorInputChannels ;
792+
793+ protected List <DeviceEntry > deviceEntries ;
794+ protected int deviceCount ;
795+
796+ public AbstractAggTableScanOperatorParameter (
797+ PlanNodeId sourceId ,
798+ OperatorContext context ,
799+ List <ColumnSchema > aggColumnSchemas ,
800+ int [] aggColumnsIndexArray ,
801+ List <DeviceEntry > deviceEntries ,
802+ int deviceCount ,
803+ SeriesScanOptions seriesScanOptions ,
804+ List <String > measurementColumnNames ,
805+ Set <String > allSensors ,
806+ List <IMeasurementSchema > measurementSchemas ,
807+ List <TableAggregator > tableAggregators ,
808+ List <ColumnSchema > groupingKeySchemas ,
809+ int [] groupingKeyIndex ,
810+ ITableTimeRangeIterator tableTimeRangeIterator ,
811+ boolean ascending ,
812+ boolean canUseStatistics ,
813+ List <Integer > aggregatorInputChannels ,
814+ String timeColumnName ) {
815+ this .sourceId = sourceId ;
816+ this .context = context ;
817+ this .aggColumnSchemas = aggColumnSchemas ;
818+ this .aggColumnsIndexArray = aggColumnsIndexArray ;
819+ this .deviceEntries = deviceEntries ;
820+ this .deviceCount = deviceCount ;
821+ this .seriesScanOptions = seriesScanOptions ;
822+ this .measurementColumnNames = measurementColumnNames ;
823+ this .allSensors = allSensors ;
824+ this .measurementSchemas = measurementSchemas ;
825+ this .tableAggregators = tableAggregators ;
826+ this .groupingKeySchemas = groupingKeySchemas ;
827+ this .groupingKeyIndex = groupingKeyIndex ;
828+ this .tableTimeRangeIterator = tableTimeRangeIterator ;
829+ this .ascending = ascending ;
830+ this .canUseStatistics = canUseStatistics ;
831+ this .aggregatorInputChannels = aggregatorInputChannels ;
832+ this .timeColumnName = timeColumnName ;
833+ }
834+
835+ public List <TableAggregator > getTableAggregators () {
836+ return tableAggregators ;
837+ }
838+
839+ public SeriesScanOptions getSeriesScanOptions () {
840+ return seriesScanOptions ;
841+ }
842+
843+ public Set <String > getAllSensors () {
844+ return allSensors ;
845+ }
846+
847+ public List <String > getMeasurementColumnNames () {
848+ return measurementColumnNames ;
849+ }
850+
851+ public List <IMeasurementSchema > getMeasurementSchemas () {
852+ return measurementSchemas ;
853+ }
854+
855+ public String getTimeColumnName () {
856+ return timeColumnName ;
857+ }
858+
859+ public void setDeviceEntries (List <DeviceEntry > deviceEntries ) {
860+ this .deviceEntries = deviceEntries ;
861+ this .deviceCount = deviceEntries .size ();
862+ }
863+ }
857864}
0 commit comments