Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
import com.marklogic.spark.ConnectorException;
import com.marklogic.spark.ContextSupport;
import com.marklogic.spark.Options;
import com.marklogic.spark.Util;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
Expand All @@ -27,13 +25,11 @@

public class ZipFileWriter implements DataWriter<InternalRow> {

private static final Logger logger = LoggerFactory.getLogger(ZipFileWriter.class);

private final ContextSupport context;
private final SerializableConfiguration hadoopConfiguration;

private final String path;
private final String zipFilePath;
private final Path zipPath;

// These can be instantiated lazily depending on which constructor is used.
private ContentWriter contentWriter;
Expand All @@ -48,7 +44,7 @@ public class ZipFileWriter implements DataWriter<InternalRow> {
public ZipFileWriter(String path, Map<String, String> properties, SerializableConfiguration hadoopConfiguration,
int partitionId, boolean createZipFileImmediately) {
this.path = path;
this.zipFilePath = makeFilePath(path, partitionId);
this.zipPath = makeFilePath(path, partitionId);
this.context = new ContextSupport(properties);
this.hadoopConfiguration = hadoopConfiguration;
if (createZipFileImmediately) {
Expand Down Expand Up @@ -81,7 +77,7 @@ public void close() {

@Override
public WriterCommitMessage commit() {
return new ZipCommitMessage(path, zipFilePath, zipEntryCounter);
return new ZipCommitMessage(path, zipPath.toString(), zipEntryCounter);
}

@Override
Expand All @@ -90,15 +86,14 @@ public void abort() {
}

private void createZipFileAndContentWriter() {
Path filePath = new Path(zipFilePath);
if (logger.isDebugEnabled()) {
logger.debug("Will write to: {}", filePath);
if (Util.MAIN_LOGGER.isDebugEnabled()) {
Util.MAIN_LOGGER.debug("Will write file at: {}", zipPath);
}
this.contentWriter = new ContentWriter(context.getProperties());
try {
FileSystem fileSystem = filePath.getFileSystem(hadoopConfiguration.value());
FileSystem fileSystem = zipPath.getFileSystem(hadoopConfiguration.value());
fileSystem.setWriteChecksum(false);
zipOutputStream = new ZipOutputStream(fileSystem.create(filePath, true));
zipOutputStream = new ZipOutputStream(fileSystem.create(zipPath, true));
} catch (IOException e) {
throw new ConnectorException("Unable to create stream for writing zip file: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -132,12 +127,16 @@ private boolean hasMetadata(InternalRow row) {
* @param partitionId
* @return
*/
private String makeFilePath(String path, int partitionId) {
private Path makeFilePath(String path, int partitionId) {
final String timestamp = new SimpleDateFormat("yyyyMMddHHmmssZ").format(new Date());
return String.format("%s%s%s-%d.zip", path, File.separator, timestamp, partitionId);
final String zipFilename = String.format("%s-%d.zip", timestamp, partitionId);
// Fixed a bug in 1.x (fixed in 2.0) by using a Path here instead of string concatenation with File.separator.
// Using File.separator on Windows would result in a "\" in an S3 URL, which is awkward for a user to work with,
// and appears buggy and unexpected.
return new Path(path, zipFilename);
}

public String getZipFilePath() {
return zipFilePath;
return zipPath.toString();
}
}