2323import com .google .common .util .concurrent .MoreExecutors ;
2424
2525import java .io .IOException ;
26+ import java .lang .reflect .Modifier ;
2627import java .util .Map ;
28+ import java .util .Set ;
2729import java .util .UUID ;
2830import java .util .concurrent .CancellationException ;
2931import java .util .concurrent .ConcurrentHashMap ;
3537import java .util .concurrent .TimeUnit ;
3638import java .util .concurrent .TimeoutException ;
3739import java .util .concurrent .atomic .AtomicBoolean ;
40+ import java .util .concurrent .atomic .AtomicLong ;
3841
42+ import org .apache .commons .lang3 .StringUtils ;
3943import org .apache .hadoop .hive .metastore .IHMSHandler ;
40- import org .apache .hadoop .hive .metastore .api .AddPartitionsRequest ;
4144import org .apache .hadoop .hive .metastore .api .AsyncOperationResp ;
42- import org .apache .hadoop .hive .metastore .api .DropDatabaseRequest ;
43- import org .apache .hadoop .hive .metastore .api .DropPartitionsRequest ;
44- import org .apache .hadoop .hive .metastore .api .DropTableRequest ;
4545import org .apache .hadoop .hive .metastore .api .MetaException ;
4646import org .apache .hadoop .hive .metastore .conf .MetastoreConf ;
4747import org .apache .hadoop .hive .metastore .metrics .Metrics ;
4848import org .apache .hadoop .hive .metastore .metrics .MetricsConstants ;
4949import org .apache .thrift .TBase ;
5050import org .apache .thrift .TException ;
51+ import org .reflections .Reflections ;
5152import org .slf4j .Logger ;
5253import org .slf4j .LoggerFactory ;
5354
5455import static org .apache .hadoop .hive .metastore .ExceptionHandler .handleException ;
5556import static org .apache .hadoop .hive .metastore .conf .MetastoreConf .ConfVars .HIVE_IN_TEST ;
57+ import static org .apache .hadoop .hive .metastore .utils .JavaUtils .getField ;
58+ import static org .apache .hadoop .hive .metastore .utils .JavaUtils .newInstance ;
5659
57- public abstract class AbstractOperationHandler <T extends TBase , A extends AbstractOperationHandler .Result > {
58-
59- private static final Logger LOG = LoggerFactory . getLogger ( AbstractOperationHandler . class );
60- private static final Map < String , AbstractOperationHandler > OPID_TO_HANDLER = new ConcurrentHashMap <>( );
60+ public abstract class AbstractRequestHandler <T extends TBase , A extends AbstractRequestHandler .Result > {
61+ private static final Logger LOG = LoggerFactory . getLogger ( AbstractRequestHandler . class );
62+ private static final Map < String , AbstractRequestHandler > OPID_TO_HANDLER = new ConcurrentHashMap <>( );
63+ private static final AtomicLong ID = new AtomicLong ( 0 );
6164 private static final ScheduledExecutorService OPID_CLEANER = Executors .newScheduledThreadPool (1 , r -> {
6265 Thread thread = new Thread (r );
6366 thread .setDaemon (true );
@@ -67,33 +70,31 @@ public abstract class AbstractOperationHandler<T extends TBase, A extends Abstra
6770
6871 private static final Map <Class <? extends TBase >, HandlerFactory > REQ_FACTORIES = new ConcurrentHashMap <>();
6972 static {
70- REQ_FACTORIES . put ( DropTableRequest . class , ( base , request ) -> {
71- DropTableRequest req = ( DropTableRequest ) request ;
72- AbstractOperationHandler opHandler = ofCache ( req . getId (), req . isCancel ());
73- if (opHandler == null ) {
74- opHandler = new DropTableHandler ( base , req ) ;
73+ Set < Class <? extends AbstractRequestHandler >> handlerClasses =
74+ new Reflections ( "org.apache.hadoop.hive.metastore.handler" ). getSubTypesOf ( AbstractRequestHandler . class ) ;
75+ for ( Class <? extends AbstractRequestHandler > clz : handlerClasses ) {
76+ if (Modifier . isAbstract ( clz . getModifiers ()) ) {
77+ continue ;
7578 }
76- return opHandler ;
77- });
78-
79- REQ_FACTORIES .put (DropDatabaseRequest .class , (base , request ) -> {
80- DropDatabaseRequest req = (DropDatabaseRequest ) request ;
81- AbstractOperationHandler opHandler = ofCache (req .getId (), req .isCancel ());
82- if (opHandler == null ) {
83- opHandler = new DropDatabaseHandler (base , req );
79+ RequestHandler handler = clz .getAnnotation (RequestHandler .class );
80+ Class <? extends TBase > requestBody ;
81+ if (handler == null || (requestBody = handler .requestBody ()) == null ) {
82+ continue ;
8483 }
85- return opHandler ;
86- });
87-
88- REQ_FACTORIES .put (DropPartitionsRequest .class , (base , request ) -> {
89- DropPartitionsRequest req = (DropPartitionsRequest ) request ;
90- return new DropPartitionsHandler (base , req );
91- });
92-
93- REQ_FACTORIES .put (AddPartitionsRequest .class , (base , request ) -> {
94- AddPartitionsRequest req = (AddPartitionsRequest ) request ;
95- return new AddPartitionsHandler (base , req );
96- });
84+ validateHandler (clz , handler );
85+ REQ_FACTORIES .put (requestBody , (base , request ) -> {
86+ AbstractRequestHandler opHandler = null ;
87+ if (handler .supportAsync ()) {
88+ opHandler =
89+ ofCache (getField (request , handler .id ()), getField (request , handler .cancel ()));
90+ }
91+ if (opHandler == null ) {
92+ opHandler =
93+ newInstance (clz , new Class []{IHMSHandler .class , requestBody }, new Object []{base , request });
94+ }
95+ return opHandler ;
96+ });
97+ }
9798 }
9899
99100 private Result result ;
@@ -107,19 +108,19 @@ public abstract class AbstractOperationHandler<T extends TBase, A extends Abstra
107108 protected final String id ;
108109 private long timeout ;
109110
110- private AbstractOperationHandler (String id ) {
111+ private AbstractRequestHandler (String id ) {
111112 this .id = id ;
112113 }
113114
114- AbstractOperationHandler (IHMSHandler handler , boolean async , T request ) {
115- this .id = UUID .randomUUID (). toString ();
115+ AbstractRequestHandler (IHMSHandler handler , boolean async , T request ) {
116+ this .id = UUID .randomUUID () + "-" + ID . incrementAndGet ();
116117 this .handler = handler ;
117118 this .request = request ;
118119 this .async = async ;
119120 this .timeout = MetastoreConf .getBoolVar (handler .getConf (), HIVE_IN_TEST ) ? 10 : 5000 ;
120121 final Timer .Context timerContext ;
121- if (getHandlerAlias () != null ) {
122- Timer timer = Metrics .getOrCreateTimer (MetricsConstants .API_PREFIX + getHandlerAlias ());
122+ if (StringUtils . isNotEmpty ( getMetricAlias ()) ) {
123+ Timer timer = Metrics .getOrCreateTimer (MetricsConstants .API_PREFIX + getMetricAlias ());
123124 timerContext = timer != null ? timer .time () : null ;
124125 } else {
125126 timerContext = null ;
@@ -159,9 +160,9 @@ private AbstractOperationHandler(String id) {
159160 this .executor .shutdown ();
160161 }
161162
162- private static <T extends TBase , A extends Result > AbstractOperationHandler <T , A >
163+ private static <T extends TBase , A extends Result > AbstractRequestHandler <T , A >
163164 ofCache (String opId , boolean shouldCancel ) throws TException {
164- AbstractOperationHandler <T , A > opHandler = null ;
165+ AbstractRequestHandler <T , A > opHandler = null ;
165166 if (opId != null ) {
166167 opHandler = OPID_TO_HANDLER .get (opId );
167168 if (opHandler == null && !shouldCancel ) {
@@ -171,7 +172,7 @@ private AbstractOperationHandler(String id) {
171172 if (opHandler != null ) {
172173 opHandler .cancelOperation ();
173174 } else {
174- opHandler = new AbstractOperationHandler <>(opId ) {
175+ opHandler = new AbstractRequestHandler <>(opId ) {
175176 @ Override
176177 public OperationStatus getOperationStatus () throws TException {
177178 OperationStatus resp = new OperationStatus (opId );
@@ -198,7 +199,7 @@ public String getProgress() {
198199 return opHandler ;
199200 }
200201
201- public static <T extends AbstractOperationHandler > T offer (IHMSHandler handler , TBase req )
202+ public static <T extends AbstractRequestHandler > T offer (IHMSHandler handler , TBase req )
202203 throws TException , IOException {
203204 HandlerFactory factory = REQ_FACTORIES .get (req .getClass ());
204205 if (factory != null ) {
@@ -242,37 +243,22 @@ public OperationStatus getOperationStatus() throws TException {
242243 }
243244
244245 public static class OperationStatus {
245- private final String id ;
246- private String message ;
247- private boolean finished ;
246+ final String id ;
247+ String message ;
248+ boolean finished ;
248249 OperationStatus (String id ) {
249250 this .id = id ;
250251 }
251-
252- public String getMessage () {
253- return message ;
254- }
255-
256252 public void setMessage (String message ) {
257253 this .message = message ;
258254 }
259-
260- public String getId () {
261- return id ;
262- }
263-
264- public boolean isFinished () {
265- return finished ;
266- }
267-
268255 public void setFinished (boolean finished ) {
269256 this .finished = finished ;
270257 }
271-
272258 public AsyncOperationResp toAsyncOperationResp () {
273- AsyncOperationResp resp = new AsyncOperationResp (getId () );
274- resp .setFinished (isFinished () );
275- resp .setMessage (getMessage () );
259+ AsyncOperationResp resp = new AsyncOperationResp (id );
260+ resp .setFinished (finished );
261+ resp .setMessage (message );
276262 return resp ;
277263 }
278264 }
@@ -294,7 +280,7 @@ public void cancelOperation() {
294280 */
295281 public final A getResult () throws TException {
296282 OperationStatus resp = getOperationStatus ();
297- if (!resp .isFinished () ) {
283+ if (!resp .finished ) {
298284 throw new IllegalStateException ("Result is un-available as " +
299285 getMessagePrefix () + " is still running" );
300286 }
@@ -335,15 +321,16 @@ protected void beforeExecute() throws TException, IOException {
335321
336322 public boolean success () throws TException {
337323 OperationStatus status = getOperationStatus ();
338- return status .isFinished () && result != null && result .success ();
324+ return status .finished && result != null && result .success ();
339325 }
340326
341327 /**
342328 * Get the alias of this handler for metrics.
343- * @return the alias, null if no need to measure the operation.
329+ * @return the alias, null or empty if no need to measure the operation.
344330 */
345- protected String getHandlerAlias () {
346- return null ;
331+ private String getMetricAlias () {
332+ RequestHandler handler = getClass ().getAnnotation (RequestHandler .class );
333+ return handler != null ? handler .metricAlias () : null ;
347334 }
348335
349336 public void checkInterrupted () throws MetaException {
@@ -366,8 +353,8 @@ public static boolean containsOp(String opId) {
366353 return OPID_TO_HANDLER .containsKey (opId );
367354 }
368355
369- public interface HandlerFactory {
370- AbstractOperationHandler create (IHMSHandler base , TBase req ) throws TException , IOException ;
356+ interface HandlerFactory {
357+ AbstractRequestHandler create (IHMSHandler base , TBase req ) throws TException , IOException ;
371358 }
372359
373360 public interface Result {
@@ -386,4 +373,19 @@ default Result shrinkIfNecessary() {
386373 return this ;
387374 }
388375 }
376+
377+ private static void validateHandler (Class <? extends AbstractRequestHandler > clz ,
378+ RequestHandler handler ) {
379+ try {
380+ Class <? extends TBase > requestBody = handler .requestBody ();
381+ // Check the constructor
382+ clz .getDeclaredConstructor (IHMSHandler .class , requestBody );
383+ if (handler .supportAsync ()) {
384+ requestBody .getMethod (handler .id ());
385+ requestBody .getMethod (handler .cancel ());
386+ }
387+ } catch (Exception e ) {
388+ throw new RuntimeException (clz + " is not a satisfied handler as it's declared to be" , e );
389+ }
390+ }
389391}
0 commit comments