2020package org .apache .iotdb .confignode .procedure ;
2121
2222import org .apache .iotdb .confignode .procedure .env .ConfigNodeProcedureEnv ;
23- import org .apache .iotdb .confignode .procedure .exception .ProcedureAbortedException ;
2423import org .apache .iotdb .confignode .procedure .exception .ProcedureException ;
25- import org .apache .iotdb .confignode .procedure .exception .ProcedureSuspendedException ;
26- import org .apache .iotdb .confignode .procedure .exception .ProcedureTimeoutException ;
27- import org .apache .iotdb .confignode .procedure .exception .ProcedureYieldException ;
2824import org .apache .iotdb .confignode .procedure .state .ProcedureLockState ;
2925import org .apache .iotdb .confignode .procedure .state .ProcedureState ;
3026import org .apache .iotdb .confignode .procedure .store .IProcedureStore ;
3430
3531import java .io .DataOutputStream ;
3632import java .io .IOException ;
37- import java .lang .reflect .InvocationTargetException ;
3833import java .nio .ByteBuffer ;
3934import java .nio .charset .StandardCharsets ;
4035import java .util .ArrayList ;
@@ -83,14 +78,9 @@ public final boolean hasLock() {
8378 * @param env the environment passed to the ProcedureExecutor
8479 * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
8580 * procedure is done.
86- * @throws ProcedureYieldException the procedure will be added back to the queue and retried
87- * later.
8881 * @throws InterruptedException the procedure will be added back to the queue and retried later.
89- * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
90- * and has set itself up waiting for an external event to wake it back up again.
9182 */
92- protected abstract Procedure <Env >[] execute (Env env )
93- throws ProcedureYieldException , ProcedureSuspendedException , InterruptedException ;
83+ protected abstract Procedure <Env >[] execute (Env env ) throws InterruptedException ;
9484
9585 /**
9686 * The code to undo what was done by the execute() code. It is called when the procedure or one of
@@ -130,11 +120,14 @@ public void serialize(DataOutputStream stream) throws IOException {
130120
131121 // exceptions
132122 if (hasException ()) {
123+ // symbol of exception
133124 stream .write ((byte ) 1 );
125+ // exception's name
134126 String exceptionClassName = exception .getClass ().getName ();
135127 byte [] exceptionClassNameBytes = exceptionClassName .getBytes (StandardCharsets .UTF_8 );
136128 stream .writeInt (exceptionClassNameBytes .length );
137129 stream .write (exceptionClassNameBytes );
130+ // exception's message
138131 String message = this .exception .getMessage ();
139132 if (message != null ) {
140133 byte [] messageBytes = message .getBytes (StandardCharsets .UTF_8 );
@@ -144,6 +137,7 @@ public void serialize(DataOutputStream stream) throws IOException {
144137 stream .writeInt (-1 );
145138 }
146139 } else {
140+ // symbol of no exception
147141 stream .write ((byte ) 0 );
148142 }
149143
@@ -181,29 +175,18 @@ public void deserialize(ByteBuffer byteBuffer) {
181175 }
182176 this .setStackIndexes (indexList );
183177 }
184- // exceptions
178+
179+ // exception
185180 if (byteBuffer .get () == 1 ) {
186- Class <?> exceptionClass = deserializeTypeInfo (byteBuffer );
181+ deserializeTypeInfoForCompatibility (byteBuffer );
187182 int messageBytesLength = byteBuffer .getInt ();
188183 String errMsg = null ;
189184 if (messageBytesLength > 0 ) {
190185 byte [] messageBytes = new byte [messageBytesLength ];
191186 byteBuffer .get (messageBytes );
192187 errMsg = new String (messageBytes , StandardCharsets .UTF_8 );
193188 }
194- ProcedureException exception ;
195- try {
196- exception =
197- (ProcedureException ) exceptionClass .getConstructor (String .class ).newInstance (errMsg );
198- } catch (InstantiationException
199- | IllegalAccessException
200- | InvocationTargetException
201- | NoSuchMethodException e ) {
202- LOG .warn ("Instantiation exception class failed" , e );
203- exception = new ProcedureException (errMsg );
204- }
205-
206- setFailure (exception );
189+ setFailure (new ProcedureException (errMsg ));
207190 }
208191
209192 // result
@@ -224,18 +207,11 @@ public void deserialize(ByteBuffer byteBuffer) {
224207 * @param byteBuffer bytebuffer
225208 * @return Procedure
226209 */
227- public static Class <?> deserializeTypeInfo (ByteBuffer byteBuffer ) {
210+ @ Deprecated
211+ public static void deserializeTypeInfoForCompatibility (ByteBuffer byteBuffer ) {
228212 int classNameBytesLen = byteBuffer .getInt ();
229213 byte [] classNameBytes = new byte [classNameBytesLen ];
230214 byteBuffer .get (classNameBytes );
231- String className = new String (classNameBytes , StandardCharsets .UTF_8 );
232- Class <?> clazz ;
233- try {
234- clazz = Class .forName (className );
235- } catch (ClassNotFoundException e ) {
236- throw new RuntimeException ("Invalid procedure class" , e );
237- }
238- return clazz ;
239215 }
240216
241217 /**
@@ -284,8 +260,7 @@ protected boolean isYieldAfterExecution(Env env) {
284260 * @param env execute environment
285261 * @return sub procedures
286262 */
287- protected Procedure <Env >[] doExecute (Env env )
288- throws ProcedureYieldException , ProcedureSuspendedException , InterruptedException {
263+ protected Procedure <Env >[] doExecute (Env env ) throws InterruptedException {
289264 try {
290265 updateTimestamp ();
291266 return execute (env );
@@ -676,20 +651,9 @@ protected synchronized void setFailure(final ProcedureException exception) {
676651 }
677652 }
678653
679- protected void setAbortFailure (final String source , final String msg ) {
680- setFailure (source , new ProcedureAbortedException (msg ));
681- }
682-
683654 /**
684655 * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
685656 *
686- * <p>Another usage for this method is to implement retrying. A procedure can set the state to
687- * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a {@link
688- * ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a call
689- * {@link #setTimeout(long)} method to set the timeout. And you should also override this method
690- * to wake up the procedure, and also return false to tell the ProcedureExecutor that the timeout
691- * event has been handled.
692- *
693657 * @return true to let the framework handle the timeout as abort, false in case the procedure
694658 * handled the timeout itself.
695659 */
@@ -698,7 +662,7 @@ protected synchronized boolean setTimeoutFailure(Env env) {
698662 long timeDiff = System .currentTimeMillis () - lastUpdate ;
699663 setFailure (
700664 "ProcedureExecutor" ,
701- new ProcedureTimeoutException ("Operation timed out after " + timeDiff + " ms." ));
665+ new ProcedureException ("Operation timed out after " + timeDiff + " ms." ));
702666 return true ;
703667 }
704668 return false ;
0 commit comments