Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ protected static Property property(String local)
public static final Property has_output_content = property("has-output-content");
public static final Property has_script = property("has-script");

public static final Property has_function = property("has-function");
public static final Property has_script_path = property("has-script-path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -26,10 +27,29 @@ public ExecutionContext executePipeline(final Module module, final ExecutionCont
log.info("Executing script {} with context {}.", module.getResource(), inputContext.toSimpleString());
final long pipelineExecutionId = Instant.now().toEpochMilli()*1000+(i++);

fire((l) -> {l.pipelineExecutionStarted(pipelineExecutionId); return null;});
ExecutionContext outputContext = _executePipeline(pipelineExecutionId, module, inputContext, null);
fire((l) -> {l.pipelineExecutionFinished(pipelineExecutionId); return null;});
return outputContext;
String function = inputContext.getId();
String scriptPath;
if (inputContext.getScriptUri() != null) {
scriptPath = inputContext.getScriptUri();
} else {
scriptPath = null;
}

String script = new File(module.getResource().getURI()).getParent();

fire((l) -> {l.pipelineExecutionStarted(pipelineExecutionId, function, scriptPath, script); return null;});
try {
ExecutionContext outputContext = _executePipeline(pipelineExecutionId, module, inputContext, null);
fire((l) -> {l.pipelineExecutionFinished(pipelineExecutionId); return null;});
return outputContext;
} catch (Exception e) {
log.error("Pipeline execution failed", e);
fire((l) -> {
l.pipelineExecutionFailed(pipelineExecutionId);
return null;
});
throw e;
}
}

private void fire(final Function<ProgressListener,Void> function) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@ public class LoggingProgressListener implements ProgressListener {
private static final Logger LOG = LoggerFactory.getLogger(LoggingProgressListener.class);

@Override
public void pipelineExecutionStarted(long pipelineExecutionId) {
LOG.debug("pipelineExecutionStarted - pipelineExecutionId: {}", pipelineExecutionId);
public void pipelineExecutionStarted(long pipelineExecutionId, final String function, final String scriptPath, final String script) {
LOG.debug("pipelineExecutionStarted - pipelineExecutionId: {}, function: {}, scriptPath: {}, script: {}", pipelineExecutionId, function, scriptPath, script);
}

@Override
public void pipelineExecutionFinished(long pipelineExecutionId) {
LOG.debug("pipelineExecutionFinished - pipelineExecutionId: {}", pipelineExecutionId);
}

@Override
public void pipelineExecutionFailed(long pipelineExecutionId) {
LOG.debug("pipelineExecutionFailed - pipelineExecutionId: {}", pipelineExecutionId);
}

@Override
public void moduleExecutionStarted(long pipelineExecutionId, String moduleExecutionId, Module outputModule, ExecutionContext inputContext, String predecessorModuleExecutionId) {
LOG.debug("moduleExecutionStarted - pipelineExecutionId: {}, moduleExecutionId: {}, inputContext: {}, predecessorModuleExecutionId: {}", pipelineExecutionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ public interface ProgressListener {
* Triggers when execution of a pipeline starts.
*
* @param pipelineExecutionId execution id of the pipeline
* @param function the function being executed
* @param scriptPath path to the script being executed
* @param script the script being executed
*/
void pipelineExecutionStarted(long pipelineExecutionId);
void pipelineExecutionStarted(long pipelineExecutionId, final String function, final String scriptPath, final String script);

/**
* Triggers when execution of a pipeline finishes.
Expand All @@ -25,6 +28,13 @@ public interface ProgressListener {
*/
void pipelineExecutionFinished(long pipelineExecutionId);

/**
* Triggers when execution of a pipeline fails.
*
* @param pipelineExecutionId execution id of the pipeline
*/
void pipelineExecutionFailed(long pipelineExecutionId);

/**
* Triggers when execution of a module within a pipeline starts.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private static Property getParameter(final String name) {
}

@Override
public void pipelineExecutionStarted(final long pipelineExecutionId) {
public void pipelineExecutionStarted(final long pipelineExecutionId, String function, String scriptPath, String script) {
PipelineExecution pipelineExecution = new PipelineExecution();
pipelineExecution.setId(getPipelineExecutionIri(pipelineExecutionId));
pipelineExecution.setTypes(Collections.singleton(Vocabulary.s_c_transformation));
Expand All @@ -113,6 +113,11 @@ public void pipelineExecutionStarted(final long pipelineExecutionId) {
pipelineExecutionDir.toFile().mkdir();
logDir.put(pipelineExecutionId, pipelineExecutionDir);

metadataMap.clear();
metadataMap.put(SPIPES.has_function.toString(), URI.create(function));
metadataMap.put(SPIPES.has_script_path.toString(), scriptPath);
metadataMap.put(SPIPES.has_script.toString(), URI.create(script));

final EntityManager metadataEM = getMetadataEmf().createEntityManager();
synchronized (metadataEM) {
persistPipelineExecutionStarted(metadataEM, pipelineExecutionId, pipelineExecution);
Expand All @@ -133,7 +138,8 @@ private void persistPipelineExecutionStarted(final EntityManager em, long pipeli
Date startDate = new Date();
addProperty(pipelineExecution, SPIPES.has_pipeline_execution_start_date, startDate);
addProperty(pipelineExecution, SPIPES.has_pipeline_execution_start_date_unix, startDate.getTime());

addProperty(pipelineExecution, SPIPES.has_function, getURIFromMetadataMap(SPIPES.has_function));
addProperty(pipelineExecution, SPIPES.has_script_path, metadataMap.get(SPIPES.has_script_path.toString()));
if (pipelineExecutionGroupId != null) {
addProperty(pipelineExecution, PIPELINE_EXECUTION_GROUP_ID, pipelineExecutionGroupId);
}
Expand Down Expand Up @@ -182,14 +188,29 @@ private void persistPipelineExecutionFinished(final EntityManager em, final long
final PipelineExecution pipelineExecution =
em.find(PipelineExecution.class, pipelineExecutionIri, pd);

String pipelineName = metadataMap.get(SPIPES.has_pipeline_name.toString()).toString();
addProperty(pipelineExecution, SPIPES.has_script, getURIFromMetadataMap(SPIPES.has_script));
// new
Date startDate = pipelineExecution.getHas_pipepline_execution_date();
Date startDate = pipelineExecution.getHas_pipepline_execution_start_date();
addProperty(pipelineExecution, SPIPES.has_pipeline_execution_finish_date, finishDate);
addProperty(pipelineExecution, SPIPES.has_pipeline_execution_finish_date_unix, finishDate.getTime());
addProperty(pipelineExecution, SPIPES.has_pipeline_execution_duration, computeDuration(startDate, finishDate));
addProperty(pipelineExecution, SPIPES.has_pipeline_name, pipelineName);
// addScript(pipelineExecution, scriptManager.getScriptByContextId(pipelineName));
pipelineExecution.addTypes(Collections.singleton(Vocabulary.s_c_finished_pipeline_execution));
// addScript(pipelineExecution, scriptManager.getScriptByContextId(script));
em.getTransaction().commit();
em.close();
}
}

private void persistPipelineExecutionFailed(final EntityManager em, final long pipelineExecutionId) {
if (em.isOpen()) {
em.getTransaction().begin();

String pipelineExecutionIri = getPipelineExecutionIri(pipelineExecutionId);
final EntityDescriptor pd = new EntityDescriptor(URI.create(pipelineExecutionIri));
final PipelineExecution pipelineExecution =
em.find(PipelineExecution.class, pipelineExecutionIri, pd);
addProperty(pipelineExecution, SPIPES.has_script, getURIFromMetadataMap(SPIPES.has_script));
pipelineExecution.addTypes(Collections.singleton(Vocabulary.s_c_failed_pipeline_execution));
em.getTransaction().commit();
em.close();
}
Expand Down Expand Up @@ -220,6 +241,16 @@ public void pipelineExecutionFinished(final long pipelineExecutionId) {
}
}

public void pipelineExecutionFailed(final long pipelineExecutionId) {
final EntityManager em = entityManagerMap.get(getPipelineExecutionIri(pipelineExecutionId));

synchronized (em) {
persistPipelineExecutionFailed(em, pipelineExecutionId);
entityManagerMap.remove(em);
executionMap.remove(getPipelineExecutionIri(pipelineExecutionId));
}
}

@Override
public void moduleExecutionStarted(final long pipelineExecutionId, final String moduleExecutionId,
final Module outputModule,
Expand Down Expand Up @@ -314,9 +345,10 @@ public void moduleExecutionFinished(long pipelineExecutionId, final String modul
addProperty(moduleExecution, SPIPES.has_module_execution_duration, computeDuration(startDate, finishDate));
addProperty(moduleExecution, SPIPES.has_output_model_triple_count, module.getOutputContext().getDefaultModel().size());
addContentProperty(moduleExecution, module.getOutputContext(), "output");
addProperty(moduleExecution, SPIPES.has_pipeline_name, module.getResource().toString().replaceAll("\\/[^.]*$", ""));
if(!metadataMap.containsKey(SPIPES.has_pipeline_name.toString())){
metadataMap.put(SPIPES.has_pipeline_name.toString(), module.getResource().toString().replaceAll("\\/[^.]*$", ""));
String script = module.getResource().toString().replaceAll("\\/[^.]*$", "");
addProperty(moduleExecution, SPIPES.has_script, URI.create(script));
if(!metadataMap.containsKey(SPIPES.has_script.toString())){
metadataMap.put(SPIPES.has_script.toString(), URI.create(script));
}

// input binding
Expand Down Expand Up @@ -567,5 +599,16 @@ public String toString() {
}
}


private URI getURIFromMetadataMap(Property property){
URI uri;
try {
if (!metadataMap.containsKey(property.toString())) {
throw new IllegalStateException("Metadata map does not contain property: " + property);
}
uri = URI.create((metadataMap.get(property.toString()).toString()));
return uri;
} catch (NullPointerException e) {
throw new IllegalStateException("Metadata map is null or does not contain property: " + property, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public SemanticLoggingProgressListener() {
public SemanticLoggingProgressListener(Resource configResource) {
}

@Override public void pipelineExecutionStarted(final long pipelineExecutionId) {
@Override public void pipelineExecutionStarted(final long pipelineExecutionId, final String function, final String scriptPath, final String script) {
Thing pipelineExecution = new Thing();
pipelineExecution.setId(getPipelineExecutionIri(pipelineExecutionId));
pipelineExecution.setTypes(Collections.singleton(Vocabulary.s_c_pipeline_execution));
Expand Down Expand Up @@ -115,6 +115,34 @@ public SemanticLoggingProgressListener(Resource configResource) {
}
}

@Override
public void pipelineExecutionFailed(final long pipelineExecutionId) {
final EntityManager em = entityManagerMap.get(getPipelineExecutionIri(pipelineExecutionId));
synchronized (em) {
if (em.isOpen()) {
final TurtleWriterFactory factory = new TurtleWriterFactory();
try (FileOutputStream fos = new FileOutputStream(
Files.createFile(getDir(pipelineExecutionId).resolve("log.ttl")).toFile())) {
final RDFWriter writer = factory.getWriter(fos);
writer.startRDF();
RepositoryConnection con = em.unwrap(SailRepository.class).getConnection();
final ValueFactory f = con.getValueFactory();
final RepositoryResult<Statement> res = con
.getStatements(null, null, null, true, f.createIRI(getPipelineExecutionIri(pipelineExecutionId)));
while (res.hasNext()) {
writer.handleStatement(res.next());
}
writer.endRDF();
} catch (IOException e) {
log.error("Error during failed pipeline execution logging.", e);
}
entityManagerMap.remove(em);
em.close();
logDir.remove(pipelineExecutionId);
}
}
}

@Override public void moduleExecutionStarted(final long pipelineExecutionId, final String moduleExecutionId,
final Module outputModule,
final ExecutionContext inputContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -153,10 +154,11 @@ public String getFunctionLocation(final String functionId) {
/**
* @implNote Based on jena's OntDocumentManager.
* @param ontologyIRI
* @return file path at which <code>ontologyIRI</code> is loaded
* @return an absolute URI string at which <code>ontologyIRI</code> is loaded
*/
public String getLocation(String ontologyIRI) {
return OntDocumentManager.getInstance().doAltURLMapping(ontologyIRI);
String scriptPath = OntDocumentManager.getInstance().getFileManager().mapURI(ontologyIRI);
return Paths.get(scriptPath).toUri().toString();
}
// id -> contexts
// function id-s
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,73 @@
package cz.cvut.spipes.debug.dto;

import cz.cvut.kbss.jopa.model.annotations.*;
import cz.cvut.spipes.Vocabulary;

import java.net.URI;
import java.util.Date;
import java.util.List;

import cz.cvut.kbss.jopa.model.annotations.OWLClass;
import cz.cvut.kbss.jopa.model.annotations.OWLDataProperty;
import cz.cvut.kbss.jopa.model.annotations.OWLObjectProperty;
import cz.cvut.kbss.jopa.model.annotations.ParticipationConstraint;
import cz.cvut.kbss.jopa.model.annotations.ParticipationConstraints;
import cz.cvut.spipes.Vocabulary;

@OWLClass(iri = Vocabulary.s_c_pipeline_execution)
public class PipelineExecutionDto extends ExecutionThing {

@OWLDataProperty(iri = Vocabulary.s_p_has_execution_start_date)
private Date has_pipepline_execution_date;
private Date has_pipepline_execution_start_date;

@OWLObjectProperty(iri = Vocabulary.s_p_has_module_execution)
@ParticipationConstraints({
@ParticipationConstraint(owlObjectIRI = Vocabulary.s_c_module_execution)
})
private List<ModuleExecutionDto> has_module_executions;

public Date getHas_pipepline_execution_date() {
return has_pipepline_execution_date;
@OWLDataProperty(iri = Vocabulary.s_p_has_pipeline_execution_finish_date)
private Date has_pipeline_execution_finish_date;

@OWLObjectProperty(iri = Vocabulary.s_p_has_script)
private URI has_script;

@OWLObjectProperty(iri = Vocabulary.s_p_has_function)
private URI has_function;

@OWLDataProperty(iri = Vocabulary.s_p_has_script_path)
private String has_script_path;

public Date getHas_pipepline_execution_start_date() {
return has_pipepline_execution_start_date;
}

public Date getHas_pipeline_execution_finish_date() {
return has_pipeline_execution_finish_date;
}

public void setHas_pipeline_execution_finish_date(Date has_pipeline_execution_finish_date) {
this.has_pipeline_execution_finish_date = has_pipeline_execution_finish_date;
}

public void setHas_pipepline_execution_date(Date has_pipepline_execution_date) {
this.has_pipepline_execution_date = has_pipepline_execution_date;
public void setHas_pipepline_execution_start_date(Date has_pipepline_execution_start_date) {
this.has_pipepline_execution_start_date = has_pipepline_execution_start_date;
}
public URI getHas_script() {
return has_script;
}

public void setHas_script(URI has_script) {
this.has_script = has_script;
}

public URI getHas_function() {
return has_function;
}

public void setHas_function(URI has_function) {
this.has_function = has_function;
}

public String getHas_script_path() {
return has_script_path;
}

public void setHas_script_path(String has_script_path) {
this.has_script_path = has_script_path;
}

public List<ModuleExecutionDto> getHas_module_executions() {
Expand All @@ -37,6 +77,5 @@ public List<ModuleExecutionDto> getHas_module_executions() {
public void setHas_module_executions(List<ModuleExecutionDto> has_module_executions) {
this.has_module_executions = has_module_executions;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@
public interface PipelineExecutionMapper {

@Mapping(source = "has_part", target = "has_module_executions")
@Mapping(source = "has_pipeline_execution_finish_date", target = "has_pipeline_execution_finish_date")
@Mapping(source = "has_script", target = "has_script")
@Mapping(source = "has_function", target = "has_function")
@Mapping(source = "has_script_path", target = "has_script_path")
PipelineExecutionDto toDto(PipelineExecution pipelineExecution);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public DebugService(
public List<PipelineExecutionDto> getAllPipelineExecutions() {
List<PipelineExecution> pipelineExecutionDtos = pipelineExecutionDao.findAll();
return pipelineExecutionDtos.stream()
.sorted(comparing(PipelineExecution::getHas_pipepline_execution_date, Comparator.reverseOrder()))
.sorted(comparing(PipelineExecution::getHas_pipepline_execution_start_date, Comparator.reverseOrder()))
.map(pipelineExecutionMapper::toDto)
.collect(Collectors.toList());
}
Expand Down
Loading