diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/file/ZipFileWriter.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/file/ZipFileWriter.java index df5a5fd2..7a8edc7d 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/file/ZipFileWriter.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/file/ZipFileWriter.java @@ -6,6 +6,7 @@ import com.marklogic.spark.ConnectorException; import com.marklogic.spark.ContextSupport; import com.marklogic.spark.Options; +import com.marklogic.spark.Util; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -13,10 +14,7 @@ import org.apache.spark.sql.connector.write.DataWriter; import org.apache.spark.sql.connector.write.WriterCommitMessage; import org.apache.spark.util.SerializableConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; @@ -27,13 +25,11 @@ public class ZipFileWriter implements DataWriter { - private static final Logger logger = LoggerFactory.getLogger(ZipFileWriter.class); - private final ContextSupport context; private final SerializableConfiguration hadoopConfiguration; private final String path; - private final String zipFilePath; + private final Path zipPath; // These can be instantiated lazily depending on which constructor is used. private ContentWriter contentWriter; @@ -48,7 +44,7 @@ public class ZipFileWriter implements DataWriter { public ZipFileWriter(String path, Map properties, SerializableConfiguration hadoopConfiguration, int partitionId, boolean createZipFileImmediately) { this.path = path; - this.zipFilePath = makeFilePath(path, partitionId); + this.zipPath = makeFilePath(path, partitionId); this.context = new ContextSupport(properties); this.hadoopConfiguration = hadoopConfiguration; if (createZipFileImmediately) { @@ -81,7 +77,7 @@ public void close() { @Override public WriterCommitMessage commit() { - return new ZipCommitMessage(path, zipFilePath, zipEntryCounter); + return new ZipCommitMessage(path, zipPath.toString(), zipEntryCounter); } @Override @@ -90,15 +86,14 @@ public void abort() { } private void createZipFileAndContentWriter() { - Path filePath = new Path(zipFilePath); - if (logger.isDebugEnabled()) { - logger.debug("Will write to: {}", filePath); + if (Util.MAIN_LOGGER.isDebugEnabled()) { + Util.MAIN_LOGGER.debug("Will write file at: {}", zipPath); } this.contentWriter = new ContentWriter(context.getProperties()); try { - FileSystem fileSystem = filePath.getFileSystem(hadoopConfiguration.value()); + FileSystem fileSystem = zipPath.getFileSystem(hadoopConfiguration.value()); fileSystem.setWriteChecksum(false); - zipOutputStream = new ZipOutputStream(fileSystem.create(filePath, true)); + zipOutputStream = new ZipOutputStream(fileSystem.create(zipPath, true)); } catch (IOException e) { throw new ConnectorException("Unable to create stream for writing zip file: " + e.getMessage(), e); } @@ -132,12 +127,16 @@ private boolean hasMetadata(InternalRow row) { * @param partitionId * @return */ - private String makeFilePath(String path, int partitionId) { + private Path makeFilePath(String path, int partitionId) { final String timestamp = new SimpleDateFormat("yyyyMMddHHmmssZ").format(new Date()); - return String.format("%s%s%s-%d.zip", path, File.separator, timestamp, partitionId); + final String zipFilename = String.format("%s-%d.zip", timestamp, partitionId); + // Fixed a bug in 1.x (fixed in 2.0) by using a Path here instead of string concatenation with File.separator. + // Using File.separator on Windows would result in a "\" in an S3 URL, which is awkward for a user to work with, + // and appears buggy and unexpected. + return new Path(path, zipFilename); } public String getZipFilePath() { - return zipFilePath; + return zipPath.toString(); } }