66import com .marklogic .spark .ConnectorException ;
77import com .marklogic .spark .ContextSupport ;
88import com .marklogic .spark .Options ;
9+ import com .marklogic .spark .Util ;
910import org .apache .commons .io .IOUtils ;
1011import org .apache .hadoop .fs .FileSystem ;
1112import org .apache .hadoop .fs .Path ;
1213import org .apache .spark .sql .catalyst .InternalRow ;
1314import org .apache .spark .sql .connector .write .DataWriter ;
1415import org .apache .spark .sql .connector .write .WriterCommitMessage ;
1516import org .apache .spark .util .SerializableConfiguration ;
16- import org .slf4j .Logger ;
17- import org .slf4j .LoggerFactory ;
1817
19- import java .io .File ;
2018import java .io .IOException ;
2119import java .text .SimpleDateFormat ;
2220import java .util .Date ;
2725
2826public class ZipFileWriter implements DataWriter <InternalRow > {
2927
30- private static final Logger logger = LoggerFactory .getLogger (ZipFileWriter .class );
31-
3228 private final ContextSupport context ;
3329 private final SerializableConfiguration hadoopConfiguration ;
3430
3531 private final String path ;
36- private final String zipFilePath ;
32+ private final Path zipPath ;
3733
3834 // These can be instantiated lazily depending on which constructor is used.
3935 private ContentWriter contentWriter ;
@@ -48,7 +44,7 @@ public class ZipFileWriter implements DataWriter<InternalRow> {
4844 public ZipFileWriter (String path , Map <String , String > properties , SerializableConfiguration hadoopConfiguration ,
4945 int partitionId , boolean createZipFileImmediately ) {
5046 this .path = path ;
51- this .zipFilePath = makeFilePath (path , partitionId );
47+ this .zipPath = makeFilePath (path , partitionId );
5248 this .context = new ContextSupport (properties );
5349 this .hadoopConfiguration = hadoopConfiguration ;
5450 if (createZipFileImmediately ) {
@@ -81,7 +77,7 @@ public void close() {
8177
8278 @ Override
8379 public WriterCommitMessage commit () {
84- return new ZipCommitMessage (path , zipFilePath , zipEntryCounter );
80+ return new ZipCommitMessage (path , zipPath . toString () , zipEntryCounter );
8581 }
8682
8783 @ Override
@@ -90,15 +86,14 @@ public void abort() {
9086 }
9187
9288 private void createZipFileAndContentWriter () {
93- Path filePath = new Path (zipFilePath );
94- if (logger .isDebugEnabled ()) {
95- logger .debug ("Will write to: {}" , filePath );
89+ if (Util .MAIN_LOGGER .isDebugEnabled ()) {
90+ Util .MAIN_LOGGER .debug ("Will write file at: {}" , zipPath );
9691 }
9792 this .contentWriter = new ContentWriter (context .getProperties ());
9893 try {
99- FileSystem fileSystem = filePath .getFileSystem (hadoopConfiguration .value ());
94+ FileSystem fileSystem = zipPath .getFileSystem (hadoopConfiguration .value ());
10095 fileSystem .setWriteChecksum (false );
101- zipOutputStream = new ZipOutputStream (fileSystem .create (filePath , true ));
96+ zipOutputStream = new ZipOutputStream (fileSystem .create (zipPath , true ));
10297 } catch (IOException e ) {
10398 throw new ConnectorException ("Unable to create stream for writing zip file: " + e .getMessage (), e );
10499 }
@@ -132,12 +127,16 @@ private boolean hasMetadata(InternalRow row) {
132127 * @param partitionId
133128 * @return
134129 */
135- private String makeFilePath (String path , int partitionId ) {
130+ private Path makeFilePath (String path , int partitionId ) {
136131 final String timestamp = new SimpleDateFormat ("yyyyMMddHHmmssZ" ).format (new Date ());
137- return String .format ("%s%s%s-%d.zip" , path , File .separator , timestamp , partitionId );
132+ final String zipFilename = String .format ("%s-%d.zip" , timestamp , partitionId );
133+ // Fixed a bug in 2.0 by using a Path here instead of string concatenation with File.separator.
134+ // Using File.separator on Windows would result in a "\" in an S3 URL, which is awkward for a user to work with,
135+ // and appears buggy and unexpected.
136+ return new Path (path , zipFilename );
138137 }
139138
140139 public String getZipFilePath () {
141- return zipFilePath ;
140+ return zipPath . toString () ;
142141 }
143142}
0 commit comments