Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions docs/interpreter/hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,7 @@ limitations under the License.
To get start with HBase, please see [HBase Quickstart](https://hbase.apache.org/book.html#quickstart).

## HBase release supported
By default, Zeppelin is built against HBase 1.0.x releases. To work with HBase 1.1.x releases, use the following build command:

```bash
# HBase 1.1.4
./mvnw clean package -DskipTests -Phadoop-2.6 -Dhadoop.version=2.6.0 -P build-distr -Dhbase.hbase.version=1.1.4 -Dhbase.hadoop.version=2.6.0
```

To work with HBase 1.2.0+, use the following build command:

```bash
# HBase 1.2.0
./mvnw clean package -DskipTests -Phadoop-2.6 -Dhadoop.version=2.6.0 -P build-distr -Dhbase.hbase.version=1.2.0 -Dhbase.hadoop.version=2.6.0
```
Zeppelin is built against HBase 1.x and 2.x releases.

## Configuration

Expand All @@ -55,16 +43,6 @@ To work with HBase 1.2.0+, use the following build command:
<td>/usr/lib/hbase</td>
<td>Installation directory of HBase, defaults to HBASE_HOME in environment</td>
</tr>
<tr>
<td>hbase.ruby.sources</td>
<td>lib/ruby</td>
<td>Path to Ruby scripts relative to 'hbase.home'</td>
</tr>
<tr>
<td>zeppelin.hbase.test.mode</td>
<td>false</td>
<td>Disable checks for unit and manual tests</td>
</tr>
</table>

If you want to connect to HBase running on a cluster, you'll need to follow the next step.
Expand Down
10 changes: 6 additions & 4 deletions hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@
<properties>
<!--library versions-->
<interpreter.name>hbase</interpreter.name>
<jruby.version>1.6.8</jruby.version>
</properties>

<dependencies>
<dependency>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
<version>${jruby.version}</version>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>

Expand Down
206 changes: 110 additions & 96 deletions hbase/src/main/java/org/apache/zeppelin/hbase/HbaseInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,118 +14,135 @@

package org.apache.zeppelin.hbase;

import org.jruby.embed.LocalContextScope;
import org.jruby.embed.ScriptingContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.Executor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Support for HBase Shell. All the commands documented here
* http://hbase.apache.org/book.html#shell is supported.
*
* Requirements:
* HBase Shell should be installed on the same machine. To be more specific, the following dir.
* should be available: https://github.com/apache/hbase/tree/master/hbase-shell/src/main/ruby
* HBase Shell should be able to connect to the HBase cluster from terminal. This makes sure
* that the client is configured properly.
*
* The interpreter takes 3 config parameters:
* hbase.home: Root directory where HBase is installed. Default is /usr/lib/hbase/
* hbase.ruby.sources: Dir where shell ruby code is installed.
* Path is relative to hbase.home. Default: lib/ruby
* zeppelin.hbase.test.mode: (Testing only) Disable checks for unit and manual tests. Default: false
* HBase interpreter. It uses the hbase shell to interpret the commands.
*/
public class HbaseInterpreter extends Interpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(HbaseInterpreter.class);

public static final String HBASE_HOME = "hbase.home";
public static final String HBASE_RUBY_SRC = "hbase.ruby.sources";
public static final String HBASE_TEST_MODE = "zeppelin.hbase.test.mode";

private static final Logger LOGGER = LoggerFactory.getLogger(HbaseInterpreter.class);
private ScriptingContainer scriptingContainer;
private static final Path TEMP_FOLDER = Paths.get(System.getProperty("java.io.tmpdir"),
"zeppelin-hbase-scripts");

private StringWriter writer;
private Map<String, Executor> runningProcesses = new HashMap<>();

public HbaseInterpreter(Properties property) {
super(property);
private Map<String, File> tempFiles = new HashMap<>();

private static final int SIGTERM_CODE = 143;

private long commandTimeout = 60000;

public HbaseInterpreter(Properties properties) {
super(properties);
}

@Override
public void open() throws InterpreterException {
this.scriptingContainer = new ScriptingContainer(LocalContextScope.SINGLETON);
this.writer = new StringWriter();
scriptingContainer.setOutput(this.writer);

if (!Boolean.parseBoolean(getProperty(HBASE_TEST_MODE))) {
String hbaseHome = getProperty(HBASE_HOME);
String rubySrc = getProperty(HBASE_RUBY_SRC);
Path absRubySrc = Paths.get(hbaseHome, rubySrc).toAbsolutePath();

LOGGER.info("Home:" + hbaseHome);
LOGGER.info("Ruby Src:" + rubySrc);

File f = absRubySrc.toFile();
if (!f.exists() || !f.isDirectory()) {
throw new InterpreterException("HBase ruby sources is not available at '" + absRubySrc
+ "'");
}

LOGGER.info("Absolute Ruby Source:" + absRubySrc.toString());
// hirb.rb:41 requires the following system properties to be set.
Properties sysProps = System.getProperties();
sysProps.setProperty(HBASE_RUBY_SRC, absRubySrc.toString());

Path absHirbPath = Paths.get(hbaseHome, "bin/hirb.rb");
try {
FileInputStream fis = new FileInputStream(absHirbPath.toFile());
this.scriptingContainer.runScriptlet(fis, "hirb.rb");
fis.close();
} catch (IOException e) {
throw new InterpreterException(e.getCause());
}
}
// Do nothing
}

@Override
public void close() {
if (this.scriptingContainer != null) {
this.scriptingContainer.terminate();
}
runningProcesses.clear();
runningProcesses = null;
tempFiles.clear();
tempFiles = null;
}

@Override
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
public InterpreterResult interpret(String st, InterpreterContext context) {
LOGGER.debug("Run HBase shell script: {}", st);

if (StringUtils.isEmpty(st)) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
}

String paragraphId = context.getParagraphId();
final File scriptFile;
try {
// Write script in a temporary file
// The script is enriched with extensions
scriptFile = createTempFile(paragraphId);
FileUtils.write(scriptFile, st + "\nexit");
} catch (IOException e) {
LOGGER.error("Can not write script in temp file", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}

InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS);

final DefaultExecutor executor = new DefaultExecutor();
final ByteArrayOutputStream errorStream = new ByteArrayOutputStream();

executor.setStreamHandler(new PumpStreamHandler(context.out, errorStream));
executor.setWatchdog(new ExecuteWatchdog(commandTimeout));

String hbaseCmdPath = Paths.get(getProperty(HBASE_HOME), "bin", "hbase").toString();
final CommandLine cmdLine = CommandLine.parse(hbaseCmdPath);
cmdLine.addArgument("shell", false);
cmdLine.addArgument(scriptFile.getAbsolutePath(), false);

try {
LOGGER.info(cmd);
this.writer.getBuffer().setLength(0);
this.scriptingContainer.runScriptlet(cmd);
this.writer.flush();
LOGGER.debug(writer.toString());
return new InterpreterResult(InterpreterResult.Code.SUCCESS, writer.getBuffer().toString());
} catch (Throwable t) {
LOGGER.error("Can not run '" + cmd + "'", t);
return new InterpreterResult(InterpreterResult.Code.ERROR, t.getMessage());
executor.execute(cmdLine);
runningProcesses.put(paragraphId, executor);
} catch (ExecuteException e) {
LOGGER.error("Can not run script in paragraph {}", paragraphId, e);

final int exitValue = e.getExitValue();
InterpreterResult.Code code = InterpreterResult.Code.ERROR;
String msg = errorStream.toString();

if (exitValue == SIGTERM_CODE) {
code = InterpreterResult.Code.INCOMPLETE;
msg = msg + "Paragraph received a SIGTERM.\n";
LOGGER.info("The paragraph {} stopped executing: {}", paragraphId, msg);
}

msg += "ExitValue: " + exitValue;
result = new InterpreterResult(code, msg);
} catch (IOException e) {
LOGGER.error("Can not run script in paragraph {}", paragraphId, e);
result = new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
} finally {
deleteTempFile(paragraphId);
stopProcess(paragraphId);
}
return result;
}

@Override
public void cancel(InterpreterContext context) {}
public void cancel(InterpreterContext context) {
stopProcess(context.getParagraphId());
deleteTempFile(context.getParagraphId());
}

@Override
public FormType getFormType() {
Expand All @@ -143,30 +160,27 @@ public Scheduler getScheduler() {
HbaseInterpreter.class.getName() + this.hashCode());
}

@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
return null;
private void stopProcess(String paragraphId) {
Executor executor = runningProcesses.remove(paragraphId);
if (null != executor) {
final ExecuteWatchdog watchdog = executor.getWatchdog();
watchdog.destroyProcess();
}
}

private static String getSystemDefault(
String envName,
String propertyName,
String defaultValue) {

if (envName != null && !envName.isEmpty()) {
String envValue = System.getenv().get(envName);
if (envValue != null) {
return envValue;
}
private File createTempFile(String paragraphId) throws IOException {
if (!Files.exists(TEMP_FOLDER)) {
Files.createDirectory(TEMP_FOLDER);
}
File temp = Files.createTempFile(TEMP_FOLDER, paragraphId, ".txt").toFile();
tempFiles.put(paragraphId, temp);
return temp;
}

if (propertyName != null && !propertyName.isEmpty()) {
String propValue = System.getProperty(propertyName);
if (propValue != null) {
return propValue;
}
private void deleteTempFile(String paragraphId) {
File tmpFile = tempFiles.remove(paragraphId);
if (null != tmpFile) {
FileUtils.deleteQuietly(tmpFile);
}
return defaultValue;
}
}
12 changes: 0 additions & 12 deletions hbase/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,6 @@
"defaultValue": "/usr/lib/hbase/",
"description": "Installation directory of HBase",
"type": "string"
},
"hbase.ruby.sources": {
"propertyName": "hbase.ruby.sources",
"defaultValue": "lib/ruby",
"description": "Path to Ruby scripts relative to 'hbase.home'",
"type": "string"
},
"zeppelin.hbase.test.mode": {
"propertyName": "zeppelin.hbase.test.mode",
"defaultValue": false,
"description": "Disable checks for unit and manual tests",
"type": "checkbox"
}
},
"editor": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@

package org.apache.zeppelin.hbase;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.util.Properties;

import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertNotNull;

/**
* Tests for HBase Interpreter.
*/
Expand All @@ -34,8 +32,6 @@ public class HbaseInterpreterTest {
public static void setUp() throws NullPointerException, InterpreterException {
Properties properties = new Properties();
properties.put("hbase.home", "");
properties.put("hbase.ruby.sources", "");
properties.put("zeppelin.hbase.test.mode", "true");

hbaseInterpreter = new HbaseInterpreter(properties);
hbaseInterpreter.open();
Expand All @@ -45,28 +41,4 @@ public static void setUp() throws NullPointerException, InterpreterException {
void newObject() {
assertNotNull(hbaseInterpreter);
}

@Test
void putsTest() {
InterpreterResult result = hbaseInterpreter.interpret("puts \"Hello World\"", null);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals("Hello World\n", result.message().get(0).getData());
}

public void putsLoadPath() {
InterpreterResult result = hbaseInterpreter.interpret(
"require 'two_power'; puts twoToThePowerOf(4)", null);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals("16\n", result.message().get(0).getData());
}

@Test
void testException() {
InterpreterResult result = hbaseInterpreter.interpret("plot practical joke", null);
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertEquals("(NameError) undefined local variable or method `joke' for main:Object",
result.message().get(0).getData());
}
}
Loading
Loading