1616import reactor .core .publisher .Flux ;
1717
1818import java .io .ByteArrayInputStream ;
19+ import java .io .ByteArrayOutputStream ;
20+ import java .nio .ByteBuffer ;
1921
22+ import static org .junit .jupiter .api .Assertions .assertArrayEquals ;
2023import static org .junit .jupiter .api .Assertions .assertDoesNotThrow ;
2124
2225public class DataLakeMessageEncoderUploadTests extends DataLakeTestBase {
@@ -32,103 +35,121 @@ public void uploadBinaryDataFullCRCHeader() {
3235 FileParallelUploadOptions options = new FileParallelUploadOptions (DATA .getDefaultBinaryData ())
3336 .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO );
3437
35- // viewed crc64 header through httptoolkit, unable to retrieve header through the response object
36- // due to the response object containing the request and response of the flush operation, not the append
37- // Response<PathInfo> response = fc.uploadWithResponse(options, null, Context.NONE);
38- // assertNotNull(response.getRequest().getHeaders().getValue(CONTENT_CRC64_HEADER_NAME));
38+ // viewed crc header through httptoolkit, unable to retrieve it through the response object
3939 assertDoesNotThrow (() -> fc .uploadWithResponse (options , null , Context .NONE ));
4040 }
4141
4242 @ Test
4343 public void uploadBinaryDataFullStructMess () {
44- FileParallelUploadOptions options
45- = new FileParallelUploadOptions (BinaryData .fromBytes (getRandomByteArray (Constants .MB * 5 )))
46- .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO );
44+ byte [] data = getRandomByteArray (Constants .MB * 5 );
4745
48- // viewed structured body type header through httptoolkit, unable to retrieve it through the response object
49- // Response<PathInfo> response = fc.uploadWithResponse(options, null, Context.NONE);
50- // assertEquals(STRUCTUED_BODY_TYPE, response.getRequest().getHeaders().getValue(STRUCTURED_BODY_TYPE_HEADER_NAME));
51- assertDoesNotThrow (() -> fc .uploadWithResponse (options , null , Context .NONE ));
46+ FileParallelUploadOptions options = new FileParallelUploadOptions (BinaryData .fromBytes (data ))
47+ .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO );
48+
49+ fc .uploadWithResponse (options , null , Context .NONE );
50+
51+ ByteArrayOutputStream outStream = new ByteArrayOutputStream ();
52+ fc .read (outStream );
53+
54+ assertArrayEquals (data , outStream .toByteArray ());
5255 }
5356
5457 @ Test
5558 public void uploadBinaryDataChunkedStructMess () {
56- FileParallelUploadOptions options
57- = new FileParallelUploadOptions (BinaryData .fromBytes (getRandomByteArray (Constants .MB * 8 )))
58- .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO )
59- .setParallelTransferOptions (
60- new ParallelTransferOptions ().setMaxSingleUploadSizeLong ((long ) Constants .MB * 4 ));
59+ byte [] data = getRandomByteArray (Constants .MB * 10 );
6160
62- assertDoesNotThrow (() -> fc .uploadWithResponse (options , null , Context .NONE ));
61+ FileParallelUploadOptions options = new FileParallelUploadOptions (BinaryData .fromBytes (data ))
62+ .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO )
63+ .setParallelTransferOptions (
64+ new ParallelTransferOptions ().setMaxSingleUploadSizeLong ((long ) Constants .MB * 2 )
65+ .setBlockSizeLong ((long ) Constants .MB * 2 ));
66+
67+ fc .uploadWithResponse (options , null , Context .NONE );
68+
69+ ByteArrayOutputStream outStream = new ByteArrayOutputStream ();
70+ fc .read (outStream );
71+
72+ assertArrayEquals (data , outStream .toByteArray ());
6373 }
6474
6575 @ Test
6676 public void uploadInputStreamFullCRCHeader () {
6777 FileParallelUploadOptions options = new FileParallelUploadOptions (DATA .getDefaultInputStream ())
6878 .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO );
6979
70- // viewed structured body type header through httptoolkit, unable to retrieve it through the response object
71- //Response<PathInfo> response = fc.uploadWithResponse(options, null, Context.NONE);
72- //assertNotNull(response.getRequest().getHeaders().getValue(CONTENT_CRC64_HEADER_NAME));
80+ // viewed crc header through httptoolkit, unable to retrieve it through the response object
7381 assertDoesNotThrow (() -> fc .uploadWithResponse (options , null , Context .NONE ));
7482 }
7583
7684 @ Test
7785 public void uploadInputStreamFullStructMess () {
78- byte [] randomData = getRandomByteArray (Constants .MB * 5 );
79- ByteArrayInputStream input = new ByteArrayInputStream (randomData );
80- FileParallelUploadOptions options
81- = new FileParallelUploadOptions (input ).setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO );
86+ byte [] data = getRandomByteArray (Constants .MB * 5 );
87+ FileParallelUploadOptions options = new FileParallelUploadOptions (new ByteArrayInputStream (data ))
88+ .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO );
8289
83- // viewed structured body type header through httptoolkit, unable to retrieve it through the response object
84- //Response<PathInfo> response = fc.uploadWithResponse(options, null, Context.NONE);
85- //assertEquals(STRUCTUED_BODY_TYPE,
86- // response.getRequest().getHeaders().getValue(STRUCTURED_BODY_TYPE_HEADER_NAME));
87- assertDoesNotThrow (() -> fc .uploadWithResponse (options , null , Context .NONE ));
90+ fc .uploadWithResponse (options , null , Context .NONE );
91+
92+ ByteArrayOutputStream outStream = new ByteArrayOutputStream ();
93+ fc .read (outStream );
94+
95+ assertArrayEquals (data , outStream .toByteArray ());
8896 }
8997
9098 @ Test
9199 public void uploadInputStreamChunkedStructMess () {
92- byte [] randomData = getRandomByteArray (Constants .MB * 8 );
93- ByteArrayInputStream input = new ByteArrayInputStream (randomData );
100+ byte [] data = getRandomByteArray (Constants .MB * 10 );
101+ ByteArrayInputStream input = new ByteArrayInputStream (data );
94102 FileParallelUploadOptions options
95103 = new FileParallelUploadOptions (input ).setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO )
96104 .setParallelTransferOptions (
97- new ParallelTransferOptions ().setMaxSingleUploadSizeLong ((long ) Constants .MB * 4 ));
105+ new ParallelTransferOptions ().setMaxSingleUploadSizeLong ((long ) Constants .MB * 2 )
106+ .setBlockSizeLong ((long ) Constants .MB * 2 ));
98107
99- assertDoesNotThrow (() -> fc .uploadWithResponse (options , null , Context .NONE ));
108+ fc .uploadWithResponse (options , null , Context .NONE );
109+
110+ ByteArrayOutputStream outStream = new ByteArrayOutputStream ();
111+ fc .read (outStream );
112+
113+ assertArrayEquals (data , outStream .toByteArray ());
100114 }
101115
102116 @ Test
103117 public void uploadFluxFullCRCHeader () {
104118 FileParallelUploadOptions options = new FileParallelUploadOptions (DATA .getDefaultFlux ())
105119 .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO );
106120
107- // viewed structured body type header through httptoolkit, unable to retrieve it through the response object
108- // Response<PathInfo> response = fc.uploadWithResponse(options, null, Context.NONE);
109- // assertNotNull(response.getRequest().getHeaders().getValue(CONTENT_CRC64_HEADER_NAME));
121+ // viewed crc header through httptoolkit, unable to retrieve it through the response object
110122 assertDoesNotThrow (() -> fc .uploadWithResponse (options , null , Context .NONE ));
111123 }
112124
113125 @ Test
114126 public void uploadFluxFullStructMess () {
115- FileParallelUploadOptions options = new FileParallelUploadOptions (Flux .just (getRandomData (Constants .MB * 5 )))
127+ byte [] data = getRandomByteArray (Constants .MB * 5 );
128+ FileParallelUploadOptions options = new FileParallelUploadOptions (Flux .just (ByteBuffer .wrap (data )))
116129 .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO );
117130
118- // viewed structured body type header through httptoolkit, unable to retrieve it through the response object
119- //Response<PathInfo> response = fc.uploadWithResponse(options, null, Context.NONE);
120- //assertEquals(STRUCTUED_BODY_TYPE,
121- //response.getRequest().getHeaders().getValue(STRUCTURED_BODY_TYPE_HEADER_NAME));
122- assertDoesNotThrow (() -> fc .uploadWithResponse (options , null , Context .NONE ));
131+ fc .uploadWithResponse (options , null , Context .NONE );
132+
133+ ByteArrayOutputStream outStream = new ByteArrayOutputStream ();
134+ fc .read (outStream );
135+
136+ assertArrayEquals (data , outStream .toByteArray ());
123137 }
124138
125139 @ Test
126140 public void uploadFluxChunkedStructMess () {
127- FileParallelUploadOptions options = new FileParallelUploadOptions (Flux .just (getRandomData (Constants .MB * 8 )))
141+ byte [] data = getRandomByteArray (Constants .MB * 10 );
142+ FileParallelUploadOptions options = new FileParallelUploadOptions (Flux .just (ByteBuffer .wrap (data )))
128143 .setStorageChecksumAlgorithm (StorageChecksumAlgorithm .AUTO )
129144 .setParallelTransferOptions (
130- new ParallelTransferOptions ().setMaxSingleUploadSizeLong ((long ) Constants .MB * 4 ));
145+ new ParallelTransferOptions ().setMaxSingleUploadSizeLong ((long ) Constants .MB * 2 )
146+ .setBlockSizeLong ((long ) Constants .MB * 2 ));
131147
132- assertDoesNotThrow (() -> fc .uploadWithResponse (options , null , Context .NONE ));
148+ fc .uploadWithResponse (options , null , Context .NONE );
149+
150+ ByteArrayOutputStream outStream = new ByteArrayOutputStream ();
151+ fc .read (outStream );
152+
153+ assertArrayEquals (data , outStream .toByteArray ());
133154 }
134155}
0 commit comments