Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,22 @@
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.java.channels.StreamChannel;
import org.apache.wayang.java.execution.JavaExecutor;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.stream.Stream;
import java.nio.file.Files;
import java.nio.file.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,7 +53,47 @@ public class JavaTextFileSource extends TextFileSource implements JavaExecutionO

private static final Logger logger = LoggerFactory.getLogger(JavaTextFileSource.class);

public JavaTextFileSource(String inputUrl) {
/**
* @return Stream<String> from the provided URL
*/
public static Stream<String> streamFromURL(final URL sourceUrl) {
try {
final HttpURLConnection connection = (HttpURLConnection) sourceUrl.openConnection();
connection.setRequestMethod("GET");

// Check if the response code indicates success (HTTP status code 200)
if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
logger.info(">>> Ready to stream the data from URL: " + sourceUrl.toString());
// Read the data line by line and process it in the StreamChannel
final BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
return reader.lines().onClose(() -> {
try {
connection.disconnect();
reader.close();
} catch (final IOException e) {
e.printStackTrace();
}
});
} else {
throw new WayangException("Connection with Http failed");
}
} catch (final Exception e) {
throw new WayangException(e);
}
}

/**
* @return Stream<String> from the file system
*/
public static Stream<String> streamFromFs(final String path) {
try {
return Files.lines(Path.of(URI.create(path)));
} catch (final Exception e) {
throw new WayangException(e);
}
}

public JavaTextFileSource(final String inputUrl) {
super(inputUrl);
}

Expand All @@ -66,64 +102,44 @@ public JavaTextFileSource(String inputUrl) {
*
* @param that that should be copied
*/
public JavaTextFileSource(TextFileSource that) {
public JavaTextFileSource(final TextFileSource that) {
super(that);
}

@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
JavaExecutor javaExecutor,
OptimizationContext.OperatorContext operatorContext) {
final ChannelInstance[] inputs,
final ChannelInstance[] outputs,
final JavaExecutor javaExecutor,
final OptimizationContext.OperatorContext operatorContext) {

assert inputs.length == this.getNumInputs();
assert outputs.length == this.getNumOutputs();


String urlStr = this.getInputUrl().trim();
URL sourceUrl = null;
final String urlStr = this.getInputUrl().trim();
final URL sourceUrl;

try {
sourceUrl = new URL(urlStr);
String protocol = sourceUrl.getProtocol();
if ( protocol.startsWith("https") || protocol.startsWith("http") ) {
HttpURLConnection connection2 = (HttpURLConnection) sourceUrl.openConnection();
connection2.setRequestMethod("GET");

// Check if the response code indicates success (HTTP status code 200)
if (connection2.getResponseCode() == HttpURLConnection.HTTP_OK) {
logger.info(">>> Ready to stream the data from URL: " + urlStr);
// Read the data line by line and process it in the StreamChannel
Stream<String> lines2 = new BufferedReader(new InputStreamReader(connection2.getInputStream())).lines();
((StreamChannel.Instance) outputs[0]).accept(lines2);
}
}
else {
FileSystem fs = FileSystems.getFileSystem(urlStr).orElseThrow(
() -> new WayangException(String.format("Cannot access file system of %s.", urlStr))
);

final InputStream inputStream = fs.open(urlStr);
Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines();
((StreamChannel.Instance) outputs[0]).accept(lines);
}
} catch (MalformedURLException e) {
throw new RuntimeException(e);
} catch (ProtocolException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new WayangException(String.format("Reading %s failed.", urlStr), e);
} catch (final Exception e) {
throw new WayangException("Could not create URL from string: " + urlStr, e);
}

ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
final String protocol = sourceUrl.getProtocol();

final Stream<String> lines = (protocol.startsWith("https") || protocol.startsWith("http"))
? JavaTextFileSource.streamFromURL(sourceUrl)
: JavaTextFileSource.streamFromFs(urlStr);

((StreamChannel.Instance) outputs[0]).accept(lines);

final ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
prepareLineageNode.add(LoadProfileEstimators.createFromSpecification(
"wayang.java.textfilesource.load.prepare", javaExecutor.getConfiguration()
));
ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext);
"wayang.java.textfilesource.load.prepare", javaExecutor.getConfiguration()));

final ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext);
mainLineageNode.add(LoadProfileEstimators.createFromSpecification(
"wayang.java.textfilesource.load.main", javaExecutor.getConfiguration()
));
"wayang.java.textfilesource.load.main", javaExecutor.getConfiguration()));

outputs[0].getLineage().addPredecessor(mainLineageNode);

Expand All @@ -141,12 +157,12 @@ public JavaTextFileSource copy() {
}

@Override
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
public List<ChannelDescriptor> getSupportedInputChannels(final int index) {
throw new UnsupportedOperationException(String.format("%s does not have input channels.", this));
}

@Override
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
public List<ChannelDescriptor> getSupportedOutputChannels(final int index) {
assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0);
return Collections.singletonList(StreamChannel.DESCRIPTOR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@

import org.apache.wayang.java.channels.JavaChannelInstance;
import org.apache.wayang.java.execution.JavaExecutor;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;

import java.util.List;
import java.util.Locale;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand All @@ -57,28 +56,23 @@ void teardownTest() {
}

@Test
void testReadLocalFile() throws IOException, URISyntaxException {
void testReadLocalFile() {
final String testFileName = "/banking-tx-small.csv";

JavaExecutor javaExecutor = null;
try {
// Prepare the source.
final URL inputUrl = this.getClass().getResource(testFileName);
System.out.println( "* " + inputUrl + " *");
JavaTextFileSource source = new JavaTextFileSource(
inputUrl.toString() );
// Prepare the source.
final URL inputUrl = this.getClass().getResource(testFileName);
System.out.println("* " + inputUrl + " *");
final JavaTextFileSource source = new JavaTextFileSource(
inputUrl.toString());

// Execute.
JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()};
evaluate(source, inputs, outputs);
// Execute.
final JavaChannelInstance[] inputs = new JavaChannelInstance[] {};
final JavaChannelInstance[] outputs = new JavaChannelInstance[] { createStreamChannelInstance() };
evaluate(source, inputs, outputs);

// Verify the outcome.
final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList());
assertEquals(63, result.size());
} finally {
if (javaExecutor != null) javaExecutor.dispose();
}
// Verify the outcome.
final List<String> result = outputs[0].<String>provideStream().toList();
assertEquals(63, result.size());
}

/**
Expand All @@ -89,54 +83,50 @@ void testReadLocalFile() throws IOException, URISyntaxException {
*/
@Disabled
@Test
void testReadRemoteFileHTTP() throws IOException, URISyntaxException {
void testReadRemoteFileHTTP() throws Exception {
final String testFileURL = "http://localhost:8000/LICENSE";

JavaExecutor javaExecutor = null;
final JavaExecutor javaExecutor = null;
try {
// Prepare the source.
final URL inputUrl = new URL(testFileURL);
System.out.println( "** " + inputUrl + " **");
JavaTextFileSource source = new JavaTextFileSource(
inputUrl.toString() );
System.out.println("** " + inputUrl + " **");
final JavaTextFileSource source = new JavaTextFileSource(
inputUrl.toString());

// Execute.
JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()};
final JavaChannelInstance[] inputs = new JavaChannelInstance[] {};
final JavaChannelInstance[] outputs = new JavaChannelInstance[] { createStreamChannelInstance() };
evaluate(source, inputs, outputs);

// Verify the outcome.
final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList());
final List<String> result = outputs[0].<String>provideStream().toList();
assertEquals(225, result.size());
} finally {
if (javaExecutor != null) javaExecutor.dispose();
if (javaExecutor != null)
javaExecutor.dispose();
}
}

@Disabled
@Test
void testReadRemoteFileHTTPS() throws IOException, URISyntaxException {
final String testFileURL = "https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile/profile2.ttl";

JavaExecutor javaExecutor = null;
void testReadRemoteFileHTTPS() throws Exception {
try {
final String testFileURL = "https://downloads.apache.org/incubator/wayang/1.0.0/RELEASE_NOTES";

// Prepare the source.
final URL inputUrl = new URL(testFileURL);
System.out.println( "*** " + inputUrl + " ***");
JavaTextFileSource source = new JavaTextFileSource(
inputUrl.toString() );
final JavaTextFileSource source = new JavaTextFileSource(inputUrl.toString());

// Execute.
JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()};
final JavaChannelInstance[] inputs = new JavaChannelInstance[] {};
final JavaChannelInstance[] outputs = new JavaChannelInstance[] { createStreamChannelInstance() };
evaluate(source, inputs, outputs);

// Verify the outcome.
final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList());
assertEquals(23, result.size());
} finally {
if (javaExecutor != null) javaExecutor.dispose();
final List<String> result = outputs[0].<String>provideStream().toList();
assertEquals(64, result.size());
} catch (final Exception e) {
Assumptions.assumeTrue(false, "Skipping test due to possible network error: " + e.getMessage());
}

}
}
Loading