2020package org .apache .iotdb .db .queryengine .execution .fragment ;
2121
2222import org .apache .iotdb .commons .concurrent .IoTDBThreadPoolFactory ;
23+ import org .apache .iotdb .commons .memory .AtomicLongMemoryBlock ;
24+ import org .apache .iotdb .db .conf .IoTDBDescriptor ;
2325import org .apache .iotdb .db .queryengine .common .FragmentInstanceId ;
2426import org .apache .iotdb .db .queryengine .common .PlanFragmentId ;
2527import org .apache .iotdb .db .queryengine .exception .CpuNotEnoughException ;
2931import org .apache .iotdb .db .queryengine .execution .exchange .sink .ISink ;
3032import org .apache .iotdb .db .queryengine .execution .schedule .IDriverScheduler ;
3133import org .apache .iotdb .db .storageengine .dataregion .DataRegion ;
34+ import org .apache .iotdb .db .utils .datastructure .AlignedTVList ;
35+ import org .apache .iotdb .db .utils .datastructure .TVList ;
3236
37+ import ch .qos .logback .classic .Level ;
38+ import ch .qos .logback .classic .Logger ;
39+ import ch .qos .logback .classic .spi .ILoggingEvent ;
40+ import ch .qos .logback .core .read .ListAppender ;
41+ import com .google .common .collect .ImmutableMap ;
42+ import org .apache .tsfile .enums .TSDataType ;
3343import org .junit .Test ;
3444import org .mockito .Mockito ;
45+ import org .slf4j .LoggerFactory ;
3546
47+ import java .util .ArrayList ;
3648import java .util .Collections ;
3749import java .util .List ;
50+ import java .util .concurrent .Executor ;
3851import java .util .concurrent .ExecutorService ;
3952
4053import static org .apache .iotdb .db .queryengine .common .QueryId .MOCK_QUERY_ID ;
@@ -50,32 +63,11 @@ public void testFragmentInstanceExecution() {
5063 ExecutorService instanceNotificationExecutor =
5164 IoTDBThreadPoolFactory .newFixedThreadPool (1 , "test-instance-notification" );
5265 try {
53- IDriverScheduler scheduler = Mockito .mock (IDriverScheduler .class );
54- FragmentInstanceId instanceId =
55- new FragmentInstanceId (new PlanFragmentId (MOCK_QUERY_ID , 0 ), "0" );
56- FragmentInstanceStateMachine stateMachine =
57- new FragmentInstanceStateMachine (instanceId , instanceNotificationExecutor );
58- DataRegion dataRegion = Mockito .mock (DataRegion .class );
59- FragmentInstanceContext fragmentInstanceContext =
60- createFragmentInstanceContext (instanceId , stateMachine );
61- fragmentInstanceContext .initializeNumOfDrivers (1 );
62- fragmentInstanceContext .setMayHaveTmpFile (true );
63- fragmentInstanceContext .setDataRegion (dataRegion );
64- List <IDriver > drivers = Collections .emptyList ();
65- ISink sinkHandle = Mockito .mock (ISink .class );
66- long timeOut = -1 ;
67- MPPDataExchangeManager exchangeManager = Mockito .mock (MPPDataExchangeManager .class );
6866 FragmentInstanceExecution execution =
69- FragmentInstanceExecution .createFragmentInstanceExecution (
70- scheduler ,
71- instanceId ,
72- fragmentInstanceContext ,
73- drivers ,
74- sinkHandle ,
75- stateMachine ,
76- timeOut ,
77- false ,
78- exchangeManager );
67+ createFragmentInstanceExecution (0 , instanceNotificationExecutor );
68+ FragmentInstanceContext fragmentInstanceContext = execution .getFragmentInstanceContext ();
69+ FragmentInstanceStateMachine stateMachine = execution .getStateMachine ();
70+
7971 assertEquals (FragmentInstanceState .RUNNING , execution .getInstanceState ());
8072 FragmentInstanceInfo instanceInfo = execution .getInstanceInfo ();
8173 assertEquals (FragmentInstanceState .RUNNING , instanceInfo .getState ());
@@ -84,7 +76,7 @@ public void testFragmentInstanceExecution() {
8476 assertEquals (fragmentInstanceContext .getFailureInfoList (), instanceInfo .getFailureInfoList ());
8577
8678 assertEquals (fragmentInstanceContext .getStartTime (), execution .getStartTime ());
87- assertEquals (timeOut , execution .getTimeoutInMs ());
79+ assertEquals (- 1 , execution .getTimeoutInMs ());
8880 assertEquals (stateMachine , execution .getStateMachine ());
8981
9082 fragmentInstanceContext .decrementNumOfUnClosedDriver ();
@@ -107,4 +99,111 @@ public void testFragmentInstanceExecution() {
10799 instanceNotificationExecutor .shutdown ();
108100 }
109101 }
102+
103+ @ Test
104+ public void testTVListOwnerTransfer () throws InterruptedException {
105+ Logger logger = (Logger ) LoggerFactory .getLogger (AtomicLongMemoryBlock .class );
106+ ListAppender <ILoggingEvent > listAppender = new ListAppender <>();
107+ listAppender .start ();
108+
109+ // set log level
110+ logger .setLevel (Level .WARN );
111+ logger .setAdditive (false );
112+ logger .addAppender (listAppender );
113+
114+ try {
115+ IoTDBDescriptor .getInstance ().getConfig ().setDataNodeId (1 );
116+
117+ ExecutorService instanceNotificationExecutor =
118+ IoTDBThreadPoolFactory .newFixedThreadPool (1 , "test-instance-notification" );
119+ try {
120+ // TVList
121+ TVList tvList = buildTVList ();
122+
123+ // FragmentInstance Context & Execution
124+ FragmentInstanceExecution execution1 =
125+ createFragmentInstanceExecution (1 , instanceNotificationExecutor );
126+ FragmentInstanceContext fragmentInstanceContext1 = execution1 .getFragmentInstanceContext ();
127+ fragmentInstanceContext1 .addTVListToSet (ImmutableMap .of (tvList , 0 ));
128+ tvList .getQueryContextSet ().add (fragmentInstanceContext1 );
129+
130+ FragmentInstanceExecution execution2 =
131+ createFragmentInstanceExecution (2 , instanceNotificationExecutor );
132+ FragmentInstanceContext fragmentInstanceContext2 = execution2 .getFragmentInstanceContext ();
133+ fragmentInstanceContext2 .addTVListToSet (ImmutableMap .of (tvList , 0 ));
134+ tvList .getQueryContextSet ().add (fragmentInstanceContext2 );
135+
136+ // mock flush's behavior
137+ fragmentInstanceContext1
138+ .getMemoryReservationContext ()
139+ .reserveMemoryCumulatively (tvList .calculateRamSize ());
140+ tvList .setOwnerQuery (fragmentInstanceContext1 );
141+
142+ fragmentInstanceContext1 .decrementNumOfUnClosedDriver ();
143+ fragmentInstanceContext2 .decrementNumOfUnClosedDriver ();
144+
145+ fragmentInstanceContext1 .getStateMachine ().finished ();
146+ Thread .sleep (100 );
147+ fragmentInstanceContext2 .getStateMachine ().finished ();
148+
149+ assertTrue (execution1 .getInstanceState ().isDone ());
150+ assertTrue (execution2 .getInstanceState ().isDone ());
151+ Thread .sleep (100 );
152+ } catch (CpuNotEnoughException | MemoryNotEnoughException | IllegalArgumentException e ) {
153+ fail (e .getMessage ());
154+ } finally {
155+ instanceNotificationExecutor .shutdown ();
156+ }
157+ } finally {
158+ logger .detachAppender (listAppender );
159+ // should not contain warn message: "The memory cost to be released is larger than the memory
160+ // cost of memory block"
161+ assertEquals (0 , listAppender .list .size ());
162+ }
163+ }
164+
165+ private FragmentInstanceExecution createFragmentInstanceExecution (int id , Executor executor )
166+ throws CpuNotEnoughException {
167+ IDriverScheduler scheduler = Mockito .mock (IDriverScheduler .class );
168+ FragmentInstanceId instanceId =
169+ new FragmentInstanceId (new PlanFragmentId (MOCK_QUERY_ID , id ), String .valueOf (id ));
170+ FragmentInstanceStateMachine stateMachine =
171+ new FragmentInstanceStateMachine (instanceId , executor );
172+ DataRegion dataRegion = Mockito .mock (DataRegion .class );
173+ FragmentInstanceContext fragmentInstanceContext =
174+ createFragmentInstanceContext (instanceId , stateMachine );
175+ fragmentInstanceContext .initializeNumOfDrivers (1 );
176+ fragmentInstanceContext .setMayHaveTmpFile (true );
177+ fragmentInstanceContext .setDataRegion (dataRegion );
178+ List <IDriver > drivers = Collections .emptyList ();
179+ ISink sinkHandle = Mockito .mock (ISink .class );
180+ long timeOut = -1 ;
181+ MPPDataExchangeManager exchangeManager = Mockito .mock (MPPDataExchangeManager .class );
182+ return FragmentInstanceExecution .createFragmentInstanceExecution (
183+ scheduler ,
184+ instanceId ,
185+ fragmentInstanceContext ,
186+ drivers ,
187+ sinkHandle ,
188+ stateMachine ,
189+ timeOut ,
190+ false ,
191+ exchangeManager );
192+ }
193+
194+ private TVList buildTVList () {
195+ int columns = 200 ;
196+ int rows = 1000 ;
197+ List <TSDataType > dataTypes = new ArrayList <>();
198+ Object [] values = new Object [columns ];
199+ for (int i = 0 ; i < columns ; i ++) {
200+ dataTypes .add (TSDataType .INT64 );
201+ values [i ] = 1L ;
202+ }
203+ AlignedTVList tvList = AlignedTVList .newAlignedList (dataTypes );
204+ for (long t = 1 ; t < rows ; t ++) {
205+ tvList .putAlignedValue (t , values );
206+ }
207+ return tvList ;
208+ }
110209}
0 commit comments