@@ -91,6 +91,41 @@ void opaqueURI(@TempDir @NotNull Path tempDir) throws IOException {
9191 "'schema-specific part', which is just example/123.xml." );
9292 }
9393
94+ /**
95+ * Verifies that streaming documents to a zip file "just works" on account of supporting streaming of archive files
96+ * first. The same ZipFileWriter is used for both. The only difference with archive files is that it will also
97+ * check for metadata in each Spark row and include a metadata entry in the archive file.
98+ *
99+ * @param tempDir
100+ * @throws Exception
101+ */
102+ @ Test
103+ void streamZipFile (@ TempDir Path tempDir ) throws Exception {
104+ Dataset <Row > dataset = newSparkSession ().read ()
105+ .format (CONNECTOR_IDENTIFIER )
106+ .option (Options .READ_DOCUMENTS_PARTITIONS_PER_FOREST , 1 )
107+ .option (Options .CLIENT_URI , makeClientUri ())
108+ .option (Options .READ_DOCUMENTS_COLLECTIONS , "author" )
109+ .option (Options .STREAM_FILES , true )
110+ .load ();
111+
112+ assertEquals (15 , dataset .count (), "Should have 1 row per author document." );
113+ dataset .collectAsList ().forEach (row -> {
114+ assertFalse (row .isNullAt (0 ), "The URI column should be non-null." );
115+ assertTrue (row .isNullAt (1 ), "The content column should be empty. The document will be read during the " +
116+ "writer phase instead." );
117+ });
118+
119+ dataset .write ().format (CONNECTOR_IDENTIFIER )
120+ .option (Options .STREAM_FILES , true )
121+ .option (Options .CLIENT_URI , makeClientUri ())
122+ .option (Options .WRITE_FILES_COMPRESSION , "zip" )
123+ .mode (SaveMode .Append )
124+ .save (tempDir .toFile ().getAbsolutePath ());
125+
126+ verifyZipFilesHaveExpectedFilenames (tempDir );
127+ verifyZipFilesContainFifteenAuthors (tempDir );
128+ }
94129
95130 private Dataset <Row > readAuthorCollection () {
96131 return newSparkSession ().read ()
0 commit comments