1414
1515package org .apache .zeppelin .hbase ;
1616
17- import org .jruby .embed .LocalContextScope ;
18- import org .jruby .embed .ScriptingContainer ;
17+ import org .apache .commons .exec .*;
18+ import org .apache .commons .io .FileUtils ;
19+ import org .apache .commons .lang3 .StringUtils ;
20+ import org .apache .zeppelin .interpreter .Interpreter ;
21+ import org .apache .zeppelin .interpreter .InterpreterContext ;
22+ import org .apache .zeppelin .interpreter .InterpreterException ;
23+ import org .apache .zeppelin .interpreter .InterpreterResult ;
24+ import org .apache .zeppelin .scheduler .Scheduler ;
25+ import org .apache .zeppelin .scheduler .SchedulerFactory ;
1926import org .slf4j .Logger ;
2027import org .slf4j .LoggerFactory ;
2128
29+ import java .io .ByteArrayOutputStream ;
2230import java .io .File ;
23- import java .io .FileInputStream ;
2431import java .io .IOException ;
25- import java .io .StringWriter ;
26- import java .nio .file .Path ;
2732import java .nio .file .Paths ;
28- import java .util .List ;
33+ import java .util .HashMap ;
34+ import java .util .Map ;
2935import java .util .Properties ;
3036
31- import org .apache .zeppelin .interpreter .Interpreter ;
32- import org .apache .zeppelin .interpreter .InterpreterContext ;
33- import org .apache .zeppelin .interpreter .InterpreterException ;
34- import org .apache .zeppelin .interpreter .InterpreterResult ;
35- import org .apache .zeppelin .interpreter .thrift .InterpreterCompletion ;
36- import org .apache .zeppelin .scheduler .Scheduler ;
37- import org .apache .zeppelin .scheduler .SchedulerFactory ;
38-
3937/**
40- * Support for HBase Shell. All the commands documented here
41- * http://hbase.apache.org/book.html#shell is supported.
42- *
43- * Requirements:
44- * HBase Shell should be installed on the same machine. To be more specific, the following dir.
45- * should be available: https://github.com/apache/hbase/tree/master/hbase-shell/src/main/ruby
46- * HBase Shell should be able to connect to the HBase cluster from terminal. This makes sure
47- * that the client is configured properly.
48- *
49- * The interpreter takes 3 config parameters:
50- * hbase.home: Root directory where HBase is installed. Default is /usr/lib/hbase/
51- * hbase.ruby.sources: Dir where shell ruby code is installed.
52- * Path is relative to hbase.home. Default: lib/ruby
53- * zeppelin.hbase.test.mode: (Testing only) Disable checks for unit and manual tests. Default: false
38+ * HBase interpreter. It uses the hbase shell to interpret the commands.
5439 */
5540public class HbaseInterpreter extends Interpreter {
41+ private static final Logger LOGGER = LoggerFactory .getLogger (HbaseInterpreter .class );
42+
5643 public static final String HBASE_HOME = "hbase.home" ;
57- public static final String HBASE_RUBY_SRC = "hbase.ruby.sources" ;
58- public static final String HBASE_TEST_MODE = "zeppelin.hbase.test.mode" ;
5944
60- private static final Logger LOGGER = LoggerFactory .getLogger (HbaseInterpreter .class );
61- private ScriptingContainer scriptingContainer ;
45+ private Map <String , Executor > runningProcesses = new HashMap <>();
46+
47+ private static final int SIGTERM_CODE = 143 ;
6248
63- private StringWriter writer ;
49+ private long commandTimeout = 60000 ;
6450
65- public HbaseInterpreter (Properties property ) {
66- super (property );
51+ public HbaseInterpreter (Properties properties ) {
52+ super (properties );
6753 }
6854
6955 @ Override
7056 public void open () throws InterpreterException {
71- this .scriptingContainer = new ScriptingContainer (LocalContextScope .SINGLETON );
72- this .writer = new StringWriter ();
73- scriptingContainer .setOutput (this .writer );
74-
75- if (!Boolean .parseBoolean (getProperty (HBASE_TEST_MODE ))) {
76- String hbaseHome = getProperty (HBASE_HOME );
77- String rubySrc = getProperty (HBASE_RUBY_SRC );
78- Path absRubySrc = Paths .get (hbaseHome , rubySrc ).toAbsolutePath ();
79-
80- LOGGER .info ("Home:" + hbaseHome );
81- LOGGER .info ("Ruby Src:" + rubySrc );
82-
83- File f = absRubySrc .toFile ();
84- if (!f .exists () || !f .isDirectory ()) {
85- throw new InterpreterException ("HBase ruby sources is not available at '" + absRubySrc
86- + "'" );
87- }
88-
89- LOGGER .info ("Absolute Ruby Source:" + absRubySrc .toString ());
90- // hirb.rb:41 requires the following system properties to be set.
91- Properties sysProps = System .getProperties ();
92- sysProps .setProperty (HBASE_RUBY_SRC , absRubySrc .toString ());
93-
94- Path absHirbPath = Paths .get (hbaseHome , "bin/hirb.rb" );
95- try {
96- FileInputStream fis = new FileInputStream (absHirbPath .toFile ());
97- this .scriptingContainer .runScriptlet (fis , "hirb.rb" );
98- fis .close ();
99- } catch (IOException e ) {
100- throw new InterpreterException (e .getCause ());
101- }
102- }
57+ // Do nothing
10358 }
10459
10560 @ Override
10661 public void close () {
107- if (this .scriptingContainer != null ) {
108- this .scriptingContainer .terminate ();
109- }
62+ runningProcesses .clear ();
63+ runningProcesses = null ;
11064 }
11165
11266 @ Override
113- public InterpreterResult interpret (String cmd , InterpreterContext interpreterContext ) {
67+ public InterpreterResult interpret (String st , InterpreterContext context ) {
68+ LOGGER .debug ("Run HBase shell script: {}" , st );
69+
70+ if (StringUtils .isEmpty (st )) {
71+ return new InterpreterResult (InterpreterResult .Code .SUCCESS );
72+ }
73+
74+ String paragraphId = context .getParagraphId ();
75+ // Write script in a temporary file
76+ // The script is enriched with extensions
77+ final File scriptFile = new File (getScriptFileName (paragraphId ));
11478 try {
115- LOGGER .info (cmd );
116- this .writer .getBuffer ().setLength (0 );
117- this .scriptingContainer .runScriptlet (cmd );
118- this .writer .flush ();
119- LOGGER .debug (writer .toString ());
120- return new InterpreterResult (InterpreterResult .Code .SUCCESS , writer .getBuffer ().toString ());
121- } catch (Throwable t ) {
122- LOGGER .error ("Can not run '" + cmd + "'" , t );
123- return new InterpreterResult (InterpreterResult .Code .ERROR , t .getMessage ());
79+ FileUtils .write (scriptFile , st + "\n exit" );
80+ } catch (IOException e ) {
81+ LOGGER .error ("Can not write script in temp file" , e );
82+ return new InterpreterResult (InterpreterResult .Code .ERROR , e .getMessage ());
12483 }
84+
85+ InterpreterResult result = new InterpreterResult (InterpreterResult .Code .SUCCESS );
86+
87+ final DefaultExecutor executor = new DefaultExecutor ();
88+ final ByteArrayOutputStream errorStream = new ByteArrayOutputStream ();
89+
90+ executor .setStreamHandler (new PumpStreamHandler (context .out , errorStream ));
91+ executor .setWatchdog (new ExecuteWatchdog (commandTimeout ));
92+
93+ String hbaseCmdPath = Paths .get (getProperty (HBASE_HOME ), "bin" , "hbase" ).toString ();
94+ final CommandLine cmdLine = CommandLine .parse (hbaseCmdPath );
95+ cmdLine .addArgument ("shell" , false );
96+ cmdLine .addArgument (scriptFile .getAbsolutePath (), false );
97+
98+ try {
99+ executor .execute (cmdLine );
100+ runningProcesses .put (paragraphId , executor );
101+ } catch (ExecuteException e ) {
102+ LOGGER .error ("Can not run script in paragraph {}" , paragraphId , e );
103+
104+ final int exitValue = e .getExitValue ();
105+ InterpreterResult .Code code = InterpreterResult .Code .ERROR ;
106+ String msg = errorStream .toString ();
107+
108+ if (exitValue == SIGTERM_CODE ) {
109+ code = InterpreterResult .Code .INCOMPLETE ;
110+ msg = msg + "Paragraph received a SIGTERM.\n " ;
111+ LOGGER .info ("The paragraph {} stopped executing: {}" , paragraphId , msg );
112+ }
113+
114+ msg += "ExitValue: " + exitValue ;
115+ result = new InterpreterResult (code , msg );
116+ } catch (IOException e ) {
117+ LOGGER .error ("Can not run script in paragraph {}" , paragraphId , e );
118+ result = new InterpreterResult (InterpreterResult .Code .ERROR , e .getMessage ());
119+ } finally {
120+ FileUtils .deleteQuietly (scriptFile );
121+ stopProcess (paragraphId );
122+ }
123+ return result ;
125124 }
126125
127126 @ Override
128- public void cancel (InterpreterContext context ) {}
127+ public void cancel (InterpreterContext context ) {
128+ stopProcess (context .getParagraphId ());
129+ FileUtils .deleteQuietly (new File (getScriptFileName (context .getParagraphId ())));
130+ }
129131
130132 @ Override
131133 public FormType getFormType () {
@@ -143,30 +145,25 @@ public Scheduler getScheduler() {
143145 HbaseInterpreter .class .getName () + this .hashCode ());
144146 }
145147
146- @ Override
147- public List <InterpreterCompletion > completion (String buf , int cursor ,
148- InterpreterContext interpreterContext ) {
149- return null ;
148+ private String getScriptFileName (String paragraphId ) {
149+ return String .format ("%s%s.txt" , getScriptDir (), paragraphId );
150150 }
151151
152- private static String getSystemDefault (
153- String envName ,
154- String propertyName ,
155- String defaultValue ) {
156-
157- if (envName != null && !envName .isEmpty ()) {
158- String envValue = System .getenv ().get (envName );
159- if (envValue != null ) {
160- return envValue ;
161- }
152+ private String getScriptDir () {
153+ String tmpProperty = System .getProperty ("java.io.tmpdir" );
154+ if (!tmpProperty .endsWith (File .separator )) {
155+ tmpProperty += File .separator ;
162156 }
163157
164- if (propertyName != null && !propertyName .isEmpty ()) {
165- String propValue = System .getProperty (propertyName );
166- if (propValue != null ) {
167- return propValue ;
168- }
158+ return tmpProperty + "zeppelin-hbase-scripts" + File .separator ;
159+ }
160+
161+ private void stopProcess (String paragraphId ) {
162+ if (runningProcesses .containsKey (paragraphId )) {
163+ final Executor executor = runningProcesses .get (paragraphId );
164+ final ExecuteWatchdog watchdog = executor .getWatchdog ();
165+ watchdog .destroyProcess ();
166+ runningProcesses .remove (paragraphId );
169167 }
170- return defaultValue ;
171168 }
172169}
0 commit comments