Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions hdfs/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM openjdk:8-jre-alpine

WORKDIR /data

ADD target/hdfs-agent.jar .
ADD entry.sh .

ENTRYPOINT ["./entry.sh"]
16 changes: 16 additions & 0 deletions hdfs/ci/publish_docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

# build multiple times
# see https://github.com/GoogleContainerTools/jib/issues/802

echo "Publishing hdfs-agent to docker"

cd hdfs

export TAG=`if [ "$TRAVIS_BRANCH" == "master" ]; then echo "latest"; else echo ${TRAVIS_BRANCH} ; fi`
mvn -q -Ddocker.tag=${TAG} -Djib.to.auth.username=${DOCKER_USER} -Djib.to.auth.password=${DOCKER_PASS} jib:build

mvn -q -Ddocker.tag=${COMMIT} -Djib.to.auth.username=${DOCKER_USER} -Djib.to.auth.password=${DOCKER_PASS} jib:build
mvn -q -Ddocker.tag=travis-${TRAVIS_BUILD_NUMBER} -Djib.to.auth.username=${DOCKER_USER} -Djib.to.auth.password=${DOCKER_PASS} jib:build

cd ..
6 changes: 6 additions & 0 deletions hdfs/entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/sh

set -x

# Run the service
java -jar hdfs-agent.jar
104 changes: 104 additions & 0 deletions hdfs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017 University of Ulm
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>project</artifactId>
<groupId>io.github.cloudiator.deployment</groupId>
<version>0.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hdfs</artifactId>

<properties>
<docker.tag>manual</docker.tag>
</properties>

<dependencies>
<dependency>
<groupId>io.github.cloudiator.common</groupId>
<artifactId>common-messaging</artifactId>
</dependency>
<dependency>
<groupId>io.github.cloudiator.iaas</groupId>
<artifactId>iaas-common</artifactId>
<version>0.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.github.cloudiator.deployment</groupId>
<artifactId>deployment-common</artifactId>
<version>0.3.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>


</dependencies>

<build>
<plugins>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<configuration>
<from>
<image>gcr.io/distroless/java:debug</image>
</from>
<to>
registry.hub.docker.com/cloudiator/hdfs-agent:${docker.tag}
</to>
<container>
<useCurrentTimestamp>true</useCurrentTimestamp>
</container>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>io.github.cloudiator.deployment.hdfs.HdfsAgent</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>hdfs-agent</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
</plugin>
</plugins>
</build>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.github.cloudiator.deployment.hdfs;

import com.google.inject.Inject;
import io.github.cloudiator.domain.Node;
import io.github.cloudiator.messaging.NodeToNodeMessageConverter;
import java.util.Set;
import java.util.stream.Collectors;
import org.cloudiator.messages.Process.SparkClusterCreatedResponse;
import org.cloudiator.messaging.MessageInterface;
import org.cloudiator.messaging.services.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.cloudiator.messages.General.Error;


public class CreateHdfsClusterSubscriber implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(CreateHdfsClusterSubscriber.class);
private final ProcessService processService;
private final MessageInterface messageInterface;
private final CreateHdfsProcessStrategy createHdfsProcessStrategy;
private static final NodeToNodeMessageConverter NODE_MESSAGE_CONVERTER = NodeToNodeMessageConverter.INSTANCE;

@Inject
public CreateHdfsClusterSubscriber(
ProcessService processService,
CreateHdfsProcessStrategy createHdfsProcessStrategy,
MessageInterface messageInterface) {
this.processService = processService;
this.createHdfsProcessStrategy = createHdfsProcessStrategy;
this.messageInterface = messageInterface;
}

@Override
public void run() {

LOGGER.debug("Create HdfsClusterSubscriber started and waiting for requests...");

processService.subscribeCreateHdfsClusterRequest(
(id, content) -> {

try {

final String userId = content.getUserId();


Set<Node> nodes = content.getNodes().getNodesList().stream()
.map(NODE_MESSAGE_CONVERTER::applyBack).collect(
Collectors.toSet());



createHdfsProcessStrategy.executeClusterDeployment(userId,nodes);

final hdfsClusterCreatedResponse hdfsClusterCreatedResponse = hdfsClusterCreatedResponse.newBuilder().build();

messageInterface.reply(id, hdfsClusterCreatedResponse);


} catch (Exception e) {
final String errorMessage = String
.format("Exception %s while processing request %s with id %s.", e.getMessage(),
content, id);

LOGGER.error(errorMessage, e);

messageInterface.reply(HdfsClusterCreatedResponse.class, id,
Error.newBuilder().setMessage(errorMessage).setCode(500).build());

}


});

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package io.github.cloudiator.deployment.hdfs;

import com.google.gson.Gson;
import com.google.inject.Inject;
import de.uniulm.omi.cloudiator.domain.Identifiable;
import de.uniulm.omi.cloudiator.util.configuration.Configuration;
import io.github.cloudiator.deployment.config.Constants;
import io.github.cloudiator.deployment.domain.CloudiatorClusterProcessBuilder;
import io.github.cloudiator.deployment.domain.CloudiatorProcess;
import io.github.cloudiator.deployment.domain.CloudiatorProcess.ProcessState;
import io.github.cloudiator.deployment.domain.CloudiatorProcess.Type;
import io.github.cloudiator.deployment.domain.HdfsInterface;
import io.github.cloudiator.deployment.domain.Task;
import io.github.cloudiator.domain.Node;
import io.github.cloudiator.messaging.NodeToNodeMessageConverter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.inject.Named;
import org.apache.http.HttpStatus;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.cloudiator.messages.Installation.InstallationRequest;
import org.cloudiator.messages.Installation.InstallationResponse;
import org.cloudiator.messages.InstallationEntities.Installation;
import org.cloudiator.messages.InstallationEntities.Installation.Builder;
import org.cloudiator.messages.InstallationEntities.Tool;
import org.cloudiator.messaging.SettableFutureResponseCallback;
import org.cloudiator.messaging.services.InstallationRequestService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class CreateHdfsProcessStrategy {

private static final Logger LOGGER = LoggerFactory.getLogger(CreateHdfsProcessStrategy.class);

private static final NodeToNodeMessageConverter NODE_MESSAGE_CONVERTER = NodeToNodeMessageConverter.INSTANCE;

private static final String HDFS_ARGUMENT_DELIMITER = ",";


/**
* Hdfs Default Settings
* currently adding no default settings
*/


private InstallationRequestService installationRequestService;

@Named(Constants.INSTALL_MELODIC_TOOLS)
@Inject(optional = true)
boolean installMelodicTools = false;

@Inject
CreateHdfsProcessStrategy(InstallationRequestService installationRequestService) {
this.installationRequestService = installationRequestService;
}


private void installHdfsDataNodes(String userId, Set<Node> nodes) {

for (Node node : nodes) {

LOGGER.debug("Installing Docker and Hdfs data node on node: " + node.id());

final Builder builder = Installation.newBuilder()
.setNode(NODE_MESSAGE_CONVERTER.apply(node))
.addTool(Tool.DOCKER)
.addTool(Tool.HDFS_DATA);

if (installMelodicTools) {
builder
.addTool(Tool.ALLUXIO_CLIENT)
.addTool(Tool.DLMS_AGENT);
}

final InstallationRequest installationRequest = InstallationRequest.newBuilder()
.setUserId(userId).setInstallation(builder.build()).build();

final SettableFutureResponseCallback<InstallationResponse, InstallationResponse> futureResponseCallback = SettableFutureResponseCallback
.create();

installationRequestService
.createInstallationRequestAsync(installationRequest, futureResponseCallback);
try {
futureResponseCallback.get();
} catch (InterruptedException e) {
throw new IllegalStateException(
"Docker and Hdfs data node installation was interrupted during installation request.",
e);
} catch (ExecutionException e) {
throw new IllegalStateException("Error during Docker and Hdfd data node installation",
e.getCause());
}

LOGGER.debug("Finished Docker and HDFS data node installation on node: " + node.id());
}


}



public void executeClusterDeployment(String userId, Set<Node> nodes) {

LOGGER.info(String
.format("Deploying a new Hdfs cluster for user: %s on nodes %s",
userId, nodes));

try {

LOGGER.debug("Triggering Hdfs data node installations...");
this.installHdfsDataNodes(userId, nodes);


LOGGER.debug("Successfully deployed HDFS cluster!");



} catch (Exception e) {
throw new IllegalStateException("Could not deploy HDFS cluster on nodes " + nodes, e);
}

}


public CloudiatorProcess executeJobSubmission(String userId, String schedule, Task task,
HdfsInterface hdfsInterface,Set<Node> nodes) {

LOGGER.info(String
.format("Submitting new HdfsJobSubmission for user: %s, schedule %s, task %s on nodes %s",
userId, schedule, task, nodes));

try {

UUID uuid = UUID.randomUUID();
String temporaryHdfsProcessUid = uuid.toString();

return CloudiatorClusterProcessBuilder.create().id(temporaryHdfsProcessUid)
.originId(temporaryHdfsProcessUid)
.userId(userId)
.type(Type.HDFS)
.taskInterface(HdfsInterface.class.getCanonicalName())
.state(ProcessState.RUNNING)
.addAllNodes(nodes.stream().map(Identifiable::id).collect(Collectors.toList()))
.taskName(task.name()).scheduleId(schedule).startNow().build();

} catch (Exception e) {
throw new IllegalStateException("Could not deploy task " + task, e);
}

}

}
Loading