3232import java .util .List ;
3333
3434import static org .apache .iotdb .db .queryengine .metric .QueryExecutionMetricSet .QUERY_RESOURCE_INIT ;
35+ import static org .apache .iotdb .db .storageengine .dataregion .VirtualDataRegion .UNFINISHED_QUERY_DATA_SOURCE ;
3536
3637/**
3738 * One {@link DataDriver} is responsible for one {@link FragmentInstance} which is for data query,
4041@ NotThreadSafe
4142public class DataDriver extends Driver {
4243
43- private boolean init ;
44+ private boolean init = false ;
4445
4546 // Unit : Byte
4647 private final long estimatedMemorySize ;
@@ -55,7 +56,13 @@ public DataDriver(Operator root, DriverContext driverContext, long estimatedMemo
5556 protected boolean init (SettableFuture <?> blockedFuture ) {
5657 if (!init ) {
5758 try {
58- initialize ();
59+ if (!initialize ()) { // failed to init this time, but now exception thrown, possibly failed
60+ // to acquire lock within the specific time
61+ blockedFuture .set (null );
62+ return false ;
63+ } else {
64+ return true ;
65+ }
5966 } catch (Throwable t ) {
6067 LOGGER .error (
6168 "Failed to do the initialization for driver {} " , driverContext .getDriverTaskID (), t );
@@ -78,8 +85,7 @@ protected boolean init(SettableFuture<?> blockedFuture) {
7885 * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} is null after
7986 * initialization, IllegalStateException will be thrown
8087 */
81- private void initialize () throws QueryProcessException {
82- long startTime = System .nanoTime ();
88+ private boolean initialize () throws QueryProcessException {
8389 try {
8490 List <DataSourceOperator > sourceOperators =
8591 ((DataDriverContext ) driverContext ).getSourceOperators ();
@@ -90,20 +96,30 @@ private void initialize() throws QueryProcessException {
9096 // for some reasons, we may get null QueryDataSource here.
9197 // And it's safe for us to throw this exception here in such case.
9298 throw new IllegalStateException ("QueryDataSource should never be null!" );
99+ } else if (dataSource == UNFINISHED_QUERY_DATA_SOURCE ) {
100+ // init query data source timeout. Maybe failed to acquire the read lock within the
101+ // specified time
102+ // do nothing, wait for next try
103+ } else {
104+ sourceOperators .forEach (
105+ sourceOperator -> {
106+ // Construct QueryDataSource for source operator
107+ sourceOperator .initQueryDataSource (dataSource .clone ());
108+ });
109+ this .init = true ;
93110 }
94- sourceOperators .forEach (
95- sourceOperator -> {
96- // Construct QueryDataSource for source operator
97- sourceOperator .initQueryDataSource (dataSource .clone ());
98- });
111+ } else {
112+ this .init = true ;
99113 }
100-
101- this .init = true ;
102114 } finally {
103- ((DataDriverContext ) driverContext ).clearSourceOperators ();
104- QUERY_EXECUTION_METRICS .recordExecutionCost (
105- QUERY_RESOURCE_INIT , System .nanoTime () - startTime );
115+ if (this .init ) {
116+ ((DataDriverContext ) driverContext ).clearSourceOperators ();
117+ QUERY_EXECUTION_METRICS .recordExecutionCost (
118+ QUERY_RESOURCE_INIT ,
119+ driverContext .getFragmentInstanceContext ().getInitQueryDataSourceCost ());
120+ }
106121 }
122+ return this .init ;
107123 }
108124
109125 @ Override
0 commit comments