1414
1515package org .apache .zeppelin .hbase ;
1616
17- import org .jruby .embed .LocalContextScope ;
18- import org .jruby .embed .ScriptingContainer ;
19- import org .slf4j .Logger ;
20- import org .slf4j .LoggerFactory ;
21-
17+ import java .io .ByteArrayOutputStream ;
2218import java .io .File ;
23- import java .io .FileInputStream ;
2419import java .io .IOException ;
25- import java .io . StringWriter ;
20+ import java .nio . file . Files ;
2621import java .nio .file .Path ;
2722import java .nio .file .Paths ;
28- import java .util .List ;
23+ import java .util .HashMap ;
24+ import java .util .Map ;
2925import java .util .Properties ;
3026
27+ import org .apache .commons .exec .CommandLine ;
28+ import org .apache .commons .exec .DefaultExecutor ;
29+ import org .apache .commons .exec .ExecuteException ;
30+ import org .apache .commons .exec .ExecuteWatchdog ;
31+ import org .apache .commons .exec .Executor ;
32+ import org .apache .commons .exec .PumpStreamHandler ;
33+ import org .apache .commons .io .FileUtils ;
34+ import org .apache .commons .lang3 .StringUtils ;
3135import org .apache .zeppelin .interpreter .Interpreter ;
3236import org .apache .zeppelin .interpreter .InterpreterContext ;
3337import org .apache .zeppelin .interpreter .InterpreterException ;
3438import org .apache .zeppelin .interpreter .InterpreterResult ;
35- import org .apache .zeppelin .interpreter .thrift .InterpreterCompletion ;
3639import org .apache .zeppelin .scheduler .Scheduler ;
3740import org .apache .zeppelin .scheduler .SchedulerFactory ;
41+ import org .slf4j .Logger ;
42+ import org .slf4j .LoggerFactory ;
3843
3944/**
40- * Support for HBase Shell. All the commands documented here
41- * http://hbase.apache.org/book.html#shell is supported.
42- *
43- * Requirements:
44- * HBase Shell should be installed on the same machine. To be more specific, the following dir.
45- * should be available: https://github.com/apache/hbase/tree/master/hbase-shell/src/main/ruby
46- * HBase Shell should be able to connect to the HBase cluster from terminal. This makes sure
47- * that the client is configured properly.
48- *
49- * The interpreter takes 3 config parameters:
50- * hbase.home: Root directory where HBase is installed. Default is /usr/lib/hbase/
51- * hbase.ruby.sources: Dir where shell ruby code is installed.
52- * Path is relative to hbase.home. Default: lib/ruby
53- * zeppelin.hbase.test.mode: (Testing only) Disable checks for unit and manual tests. Default: false
45+ * HBase interpreter. It uses the hbase shell to interpret the commands.
5446 */
5547public class HbaseInterpreter extends Interpreter {
48+ private static final Logger LOGGER = LoggerFactory .getLogger (HbaseInterpreter .class );
49+
5650 public static final String HBASE_HOME = "hbase.home" ;
57- public static final String HBASE_RUBY_SRC = "hbase.ruby.sources" ;
58- public static final String HBASE_TEST_MODE = "zeppelin.hbase.test.mode" ;
5951
60- private static final Logger LOGGER = LoggerFactory . getLogger ( HbaseInterpreter . class );
61- private ScriptingContainer scriptingContainer ;
52+ private static final Path TEMP_FOLDER = Paths . get ( System . getProperty ( "java.io.tmpdir" ),
53+ "zeppelin-hbase-scripts" ) ;
6254
63- private StringWriter writer ;
55+ private Map < String , Executor > runningProcesses = new HashMap <>() ;
6456
65- public HbaseInterpreter (Properties property ) {
66- super (property );
57+ private Map <String , File > tempFiles = new HashMap <>();
58+
59+ private static final int SIGTERM_CODE = 143 ;
60+
61+ private long commandTimeout = 60000 ;
62+
63+ public HbaseInterpreter (Properties properties ) {
64+ super (properties );
6765 }
6866
6967 @ Override
7068 public void open () throws InterpreterException {
71- this .scriptingContainer = new ScriptingContainer (LocalContextScope .SINGLETON );
72- this .writer = new StringWriter ();
73- scriptingContainer .setOutput (this .writer );
74-
75- if (!Boolean .parseBoolean (getProperty (HBASE_TEST_MODE ))) {
76- String hbaseHome = getProperty (HBASE_HOME );
77- String rubySrc = getProperty (HBASE_RUBY_SRC );
78- Path absRubySrc = Paths .get (hbaseHome , rubySrc ).toAbsolutePath ();
79-
80- LOGGER .info ("Home:" + hbaseHome );
81- LOGGER .info ("Ruby Src:" + rubySrc );
82-
83- File f = absRubySrc .toFile ();
84- if (!f .exists () || !f .isDirectory ()) {
85- throw new InterpreterException ("HBase ruby sources is not available at '" + absRubySrc
86- + "'" );
87- }
88-
89- LOGGER .info ("Absolute Ruby Source:" + absRubySrc .toString ());
90- // hirb.rb:41 requires the following system properties to be set.
91- Properties sysProps = System .getProperties ();
92- sysProps .setProperty (HBASE_RUBY_SRC , absRubySrc .toString ());
93-
94- Path absHirbPath = Paths .get (hbaseHome , "bin/hirb.rb" );
95- try {
96- FileInputStream fis = new FileInputStream (absHirbPath .toFile ());
97- this .scriptingContainer .runScriptlet (fis , "hirb.rb" );
98- fis .close ();
99- } catch (IOException e ) {
100- throw new InterpreterException (e .getCause ());
101- }
102- }
69+ // Do nothing
10370 }
10471
10572 @ Override
10673 public void close () {
107- if (this .scriptingContainer != null ) {
108- this .scriptingContainer .terminate ();
109- }
74+ runningProcesses .clear ();
75+ runningProcesses = null ;
76+ tempFiles .clear ();
77+ tempFiles = null ;
11078 }
11179
11280 @ Override
113- public InterpreterResult interpret (String cmd , InterpreterContext interpreterContext ) {
81+ public InterpreterResult interpret (String st , InterpreterContext context ) {
82+ LOGGER .debug ("Run HBase shell script: {}" , st );
83+
84+ if (StringUtils .isEmpty (st )) {
85+ return new InterpreterResult (InterpreterResult .Code .SUCCESS );
86+ }
87+
88+ String paragraphId = context .getParagraphId ();
89+ final File scriptFile ;
90+ try {
91+ // Write script in a temporary file
92+ // The script is enriched with extensions
93+ scriptFile = createTempFile (paragraphId );
94+ FileUtils .write (scriptFile , st + "\n exit" );
95+ } catch (IOException e ) {
96+ LOGGER .error ("Can not write script in temp file" , e );
97+ return new InterpreterResult (InterpreterResult .Code .ERROR , e .getMessage ());
98+ }
99+
100+ InterpreterResult result = new InterpreterResult (InterpreterResult .Code .SUCCESS );
101+
102+ final DefaultExecutor executor = new DefaultExecutor ();
103+ final ByteArrayOutputStream errorStream = new ByteArrayOutputStream ();
104+
105+ executor .setStreamHandler (new PumpStreamHandler (context .out , errorStream ));
106+ executor .setWatchdog (new ExecuteWatchdog (commandTimeout ));
107+
108+ String hbaseCmdPath = Paths .get (getProperty (HBASE_HOME ), "bin" , "hbase" ).toString ();
109+ final CommandLine cmdLine = CommandLine .parse (hbaseCmdPath );
110+ cmdLine .addArgument ("shell" , false );
111+ cmdLine .addArgument (scriptFile .getAbsolutePath (), false );
112+
114113 try {
115- LOGGER .info (cmd );
116- this .writer .getBuffer ().setLength (0 );
117- this .scriptingContainer .runScriptlet (cmd );
118- this .writer .flush ();
119- LOGGER .debug (writer .toString ());
120- return new InterpreterResult (InterpreterResult .Code .SUCCESS , writer .getBuffer ().toString ());
121- } catch (Throwable t ) {
122- LOGGER .error ("Can not run '" + cmd + "'" , t );
123- return new InterpreterResult (InterpreterResult .Code .ERROR , t .getMessage ());
114+ executor .execute (cmdLine );
115+ runningProcesses .put (paragraphId , executor );
116+ } catch (ExecuteException e ) {
117+ LOGGER .error ("Can not run script in paragraph {}" , paragraphId , e );
118+
119+ final int exitValue = e .getExitValue ();
120+ InterpreterResult .Code code = InterpreterResult .Code .ERROR ;
121+ String msg = errorStream .toString ();
122+
123+ if (exitValue == SIGTERM_CODE ) {
124+ code = InterpreterResult .Code .INCOMPLETE ;
125+ msg = msg + "Paragraph received a SIGTERM.\n " ;
126+ LOGGER .info ("The paragraph {} stopped executing: {}" , paragraphId , msg );
127+ }
128+
129+ msg += "ExitValue: " + exitValue ;
130+ result = new InterpreterResult (code , msg );
131+ } catch (IOException e ) {
132+ LOGGER .error ("Can not run script in paragraph {}" , paragraphId , e );
133+ result = new InterpreterResult (InterpreterResult .Code .ERROR , e .getMessage ());
134+ } finally {
135+ deleteTempFile (paragraphId );
136+ stopProcess (paragraphId );
124137 }
138+ return result ;
125139 }
126140
127141 @ Override
128- public void cancel (InterpreterContext context ) {}
142+ public void cancel (InterpreterContext context ) {
143+ stopProcess (context .getParagraphId ());
144+ deleteTempFile (context .getParagraphId ());
145+ }
129146
130147 @ Override
131148 public FormType getFormType () {
@@ -143,30 +160,27 @@ public Scheduler getScheduler() {
143160 HbaseInterpreter .class .getName () + this .hashCode ());
144161 }
145162
146- @ Override
147- public List <InterpreterCompletion > completion (String buf , int cursor ,
148- InterpreterContext interpreterContext ) {
149- return null ;
163+ private void stopProcess (String paragraphId ) {
164+ Executor executor = runningProcesses .remove (paragraphId );
165+ if (null != executor ) {
166+ final ExecuteWatchdog watchdog = executor .getWatchdog ();
167+ watchdog .destroyProcess ();
168+ }
150169 }
151170
152- private static String getSystemDefault (
153- String envName ,
154- String propertyName ,
155- String defaultValue ) {
156-
157- if (envName != null && !envName .isEmpty ()) {
158- String envValue = System .getenv ().get (envName );
159- if (envValue != null ) {
160- return envValue ;
161- }
171+ private File createTempFile (String paragraphId ) throws IOException {
172+ if (!Files .exists (TEMP_FOLDER )) {
173+ Files .createDirectory (TEMP_FOLDER );
162174 }
175+ File temp = Files .createTempFile (TEMP_FOLDER , paragraphId , ".txt" ).toFile ();
176+ tempFiles .put (paragraphId , temp );
177+ return temp ;
178+ }
163179
164- if (propertyName != null && !propertyName .isEmpty ()) {
165- String propValue = System .getProperty (propertyName );
166- if (propValue != null ) {
167- return propValue ;
168- }
180+ private void deleteTempFile (String paragraphId ) {
181+ File tmpFile = tempFiles .remove (paragraphId );
182+ if (null != tmpFile ) {
183+ FileUtils .deleteQuietly (tmpFile );
169184 }
170- return defaultValue ;
171185 }
172186}
0 commit comments