|
9 | 9 | import com.azure.core.http.HttpPipelineNextSyncPolicy; |
10 | 10 | import com.azure.core.http.HttpResponse; |
11 | 11 | import com.azure.core.http.policy.HttpPipelinePolicy; |
12 | | -import com.azure.storage.common.implementation.BufferAggregator; |
13 | 12 | import com.azure.storage.common.implementation.BufferStagingArea; |
14 | 13 | import com.azure.storage.common.implementation.StorageCrc64Calculator; |
15 | 14 | import com.azure.storage.common.implementation.structuredmessage.StructuredMessageEncoder; |
@@ -112,7 +111,42 @@ private void applyStructuredMessage(HttpPipelineCallContext context) { |
112 | 111 | .getBody() |
113 | 112 | .flatMapSequential(stagingArea::write, 1, 1) |
114 | 113 | .concatWith(Flux.defer(stagingArea::flush)) |
115 | | - .flatMap(bufferAggregator -> bufferAggregator.asFlux().flatMap(structuredMessageEncoder::encode)); |
| 114 | + .concatMap(bufferAggregator -> bufferAggregator.asFlux().concatMap(structuredMessageEncoder::encode)); |
| 115 | + // |
| 116 | + // // For test purposes, write the encoded body to a file. |
| 117 | + // final AtomicReference<FileChannel> fileChannelRef = new AtomicReference<>(); |
| 118 | + // encodedBody = encodedBody.doOnSubscribe(subscription -> { |
| 119 | + // try { |
| 120 | + // Path path = Paths.get("encoded-output-bad.bin"); |
| 121 | + // fileChannelRef.set(FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, |
| 122 | + // StandardOpenOption.TRUNCATE_EXISTING)); |
| 123 | + // } catch (IOException e) { |
| 124 | + // throw new UncheckedIOException(e); |
| 125 | + // } |
| 126 | + // }).doOnNext(buffer -> { |
| 127 | + // try { |
| 128 | + // // Duplicate buffer to avoid affecting the original buffer's position |
| 129 | + // fileChannelRef.get().write(buffer.duplicate()); |
| 130 | + // } catch (IOException e) { |
| 131 | + // throw new UncheckedIOException(e); |
| 132 | + // } |
| 133 | + // }).doOnComplete(() -> { |
| 134 | + // try { |
| 135 | + // if (fileChannelRef.get() != null) { |
| 136 | + // fileChannelRef.get().close(); |
| 137 | + // } |
| 138 | + // } catch (IOException e) { |
| 139 | + // throw new UncheckedIOException(e); |
| 140 | + // } |
| 141 | + // }).doOnError(error -> { |
| 142 | + // try { |
| 143 | + // if (fileChannelRef.get() != null) { |
| 144 | + // fileChannelRef.get().close(); |
| 145 | + // } |
| 146 | + // } catch (IOException e) { |
| 147 | + // error.addSuppressed(e); |
| 148 | + // } |
| 149 | + // }); |
116 | 150 |
|
117 | 151 | // Set the encoded body |
118 | 152 | context.getHttpRequest().setBody(encodedBody); |
|
0 commit comments