2222import com .google .api .core .SettableApiFuture ;
2323import com .google .api .gax .retrying .BasicResultRetryAlgorithm ;
2424import com .google .api .gax .rpc .DataLossException ;
25+ import com .google .api .gax .rpc .PermissionDeniedException ;
2526import com .google .cloud .storage .Retrying .RetryingDependencies ;
2627import com .google .cloud .storage .WriteCtx .WriteObjectRequestBuilderFactory ;
2728import com .google .common .collect .ImmutableList ;
2829import com .google .common .collect .ImmutableMap ;
30+ import com .google .common .collect .ImmutableSet ;
2931import com .google .protobuf .ByteString ;
3032import com .google .storage .v2 .Object ;
3133import com .google .storage .v2 .StartResumableWriteRequest ;
4042import io .grpc .stub .StreamObserver ;
4143import java .io .IOException ;
4244import java .nio .ByteBuffer ;
45+ import java .util .ArrayList ;
4346import java .util .List ;
47+ import java .util .Set ;
4448import java .util .concurrent .ExecutionException ;
4549import java .util .concurrent .atomic .AtomicBoolean ;
4650import java .util .concurrent .atomic .AtomicInteger ;
4751import java .util .function .BiConsumer ;
52+ import java .util .logging .Logger ;
53+ import java .util .stream .Collector ;
54+ import java .util .stream .Collectors ;
4855import org .junit .Test ;
4956
5057public final class GapicUnbufferedWritableByteChannelTest {
58+ private static final Logger LOGGER =
59+ Logger .getLogger (GapicUnbufferedWritableByteChannelTest .class .getName ());
5160
5261 private static final ChunkSegmenter segmenter =
5362 new ChunkSegmenter (Hasher .noop (), ByteStringStrategy .copy (), 10 , 5 );
@@ -174,18 +183,31 @@ public void resumableUpload() throws IOException, InterruptedException, Executio
174183 try (FakeServer fake = FakeServer .of (service );
175184 StorageClient sc = StorageClient .create (fake .storageSettings ())) {
176185 SettableApiFuture <WriteObjectResponse > result = SettableApiFuture .create ();
177- try ( GapicUnbufferedWritableByteChannel <?> c =
186+ GapicUnbufferedWritableByteChannel <?> c =
178187 new GapicUnbufferedWritableByteChannel <>(
179188 result ,
180189 segmenter ,
181190 reqFactory ,
182191 WriteFlushStrategy .fsyncEveryFlush (
183192 sc .writeObjectCallable (),
184193 RetryingDependencies .attemptOnce (),
185- Retrying .neverRetry ()))) {
194+ Retrying .neverRetry ()));
195+ ArrayList <String > debugMessages = new ArrayList <>();
196+ try {
186197 ImmutableList <ByteBuffer > buffers = TestUtils .subDivide (bytes , 10 );
187198 for (ByteBuffer buf : buffers ) {
188- c .write (buf );
199+ debugMessages .add (String .format ("Writing buffer. buf = %s" , buf ));
200+ int written = c .write (buf );
201+ debugMessages .add (String .format ("Wrote bytes. written = %2d" , written ));
202+ }
203+ // explicitly only close on success so we can trap the original error that maybe have
204+ // happened before we reach here.
205+ // Realistically, calling close here isn't strictly necessary because once we leave the
206+ // try block for FakeServer the server will shut down.
207+ c .close ();
208+ } catch (PermissionDeniedException ignore ) {
209+ for (String debugMessage : debugMessages ) {
210+ LOGGER .warning (debugMessage );
189211 }
190212 }
191213 assertThat (result .get ()).isEqualTo (resp5 );
@@ -228,6 +250,14 @@ public void resumableUpload_chunkAutomaticRetry()
228250 obs .onNext (resp5 );
229251 obs .onCompleted ();
230252 } else {
253+ DirectWriteService .logUnexpectedRequest (
254+ ImmutableSet .of (
255+ ImmutableList .of (req1 ),
256+ ImmutableList .of (req2 ),
257+ ImmutableList .of (req3 ),
258+ ImmutableList .of (req4 ),
259+ ImmutableList .of (req5 )),
260+ requests );
231261 obs .onError (
232262 TestUtils .apiException (Code .PERMISSION_DENIED , "Unexpected request chain." ));
233263 }
@@ -276,6 +306,7 @@ public boolean shouldRetry(Throwable t, Object ignore) {
276306 }
277307
278308 static class DirectWriteService extends StorageImplBase {
309+ private static final Logger LOGGER = Logger .getLogger (DirectWriteService .class .getName ());
279310 private final BiConsumer <StreamObserver <WriteObjectResponse >, List <WriteObjectRequest >> c ;
280311
281312 private ImmutableList .Builder <WriteObjectRequest > requests ;
@@ -293,12 +324,27 @@ static class DirectWriteService extends StorageImplBase {
293324 obs .onNext (writes .get (build ));
294325 obs .onCompleted ();
295326 } else {
327+ logUnexpectedRequest (writes .keySet (), build );
296328 obs .onError (
297329 TestUtils .apiException (Code .PERMISSION_DENIED , "Unexpected request chain." ));
298330 }
299331 });
300332 }
301333
334+ private static void logUnexpectedRequest (
335+ Set <List <WriteObjectRequest >> writes , List <WriteObjectRequest > build ) {
336+ Collector <CharSequence , ?, String > joining = Collectors .joining (",\n \t " , "[\n \t " , "\n ]" );
337+ Collector <CharSequence , ?, String > oneLine = Collectors .joining ("," , "[" , "]" );
338+ String msg =
339+ String .format (
340+ "Unexpected Request Chain.%nexpected one of: %s%n but was: %s" ,
341+ writes .stream ()
342+ .map (l -> l .stream ().map (StorageV2ProtoUtils ::fmtProto ).collect (oneLine ))
343+ .collect (joining ),
344+ build .stream ().map (StorageV2ProtoUtils ::fmtProto ).collect (oneLine ));
345+ LOGGER .warning (msg );
346+ }
347+
302348 @ Override
303349 public StreamObserver <WriteObjectRequest > writeObject (StreamObserver <WriteObjectResponse > obs ) {
304350 return new Adapter () {
0 commit comments