176176import java .util .Objects ;
177177import java .util .concurrent .ExecutionException ;
178178import java .util .concurrent .Executor ;
179+ import java .util .concurrent .ExecutorService ;
179180import java .util .concurrent .Executors ;
181+ import java .util .concurrent .Future ;
180182import java .util .concurrent .TimeUnit ;
181183import java .util .logging .Level ;
182184import java .util .logging .Logger ;
@@ -768,7 +770,7 @@ public synchronized void closeBatchTxn() throws SpannerException {
768770 new ThreadFactoryBuilder ().setNameFormat ("action-pool-%d" ).build ());
769771
770772 // Thread pool to verify end to end traces.
771- private static final Executor endToEndTracesThreadPool =
773+ private static final ExecutorService endToEndTracesThreadPool =
772774 Executors .newCachedThreadPool (
773775 new ThreadFactoryBuilder ().setNameFormat ("end-to-end-traces-pool-%d" ).build ());
774776
@@ -891,23 +893,27 @@ private synchronized TraceServiceClient getTraceServiceClient() throws IOExcepti
891893 return traceServiceClient ;
892894 }
893895
894- /* Handles verification of end to end traces */
895- public Status startVerificationOfEndToEndTrace (
896- String traceId , ExecutionFlowContext executionContext ) {
897- endToEndTracesThreadPool .execute (
896+ public Future <Boolean > getEndToEndTraceVerificationTask (String traceId ) {
897+ return endToEndTracesThreadPool .submit (
898898 () -> {
899- boolean isValidTrace = isExportedEndToEndTraceValid (traceId );
900- if (!isValidTrace ) {
901- LOGGER .log (Level .WARNING , String .format ("traceId:%s failed to be verified." , traceId ));
902- executionContext .onError (
903- Status .INTERNAL
904- .withDescription (
905- String .format (
906- "failed to verify end to end trace for trace_id: %s" , traceId ))
907- .getCause ());
899+ try {
900+ // Wait for 10 seconds before verifying to ensure traces are exported.
901+ long sleepDuration = TimeUnit .SECONDS .toMillis (10 );
902+ LOGGER .log (
903+ Level .INFO ,
904+ String .format (
905+ "Sleeping for %d milliseconds before verifying end to end trace" ,
906+ sleepDuration ));
907+ Thread .sleep (sleepDuration );
908+ } catch (InterruptedException e ) {
909+ Thread .currentThread ().interrupt (); // Handle interruption
910+ LOGGER .log (
911+ Level .INFO ,
912+ String .format ("Thread interrupted." ));
913+ return false ; // Return false if interrupted
908914 }
915+ return isExportedEndToEndTraceValid (traceId );
909916 });
910- return Status .OK ;
911917 }
912918
913919 private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction" ;
@@ -924,7 +930,8 @@ public boolean isExportedEndToEndTraceValid(String traceId) {
924930 Trace trace = getTraceServiceClient ().getTrace (getTraceRequest );
925931 boolean readWriteOrReadOnlyTxnPresent = false , spannerServerSideSpanPresent = false ;
926932 for (TraceSpan span : trace .getSpansList ()) {
927- if (span .getName () == READ_ONLY_TRANSACTION || span .getName () == READ_WRITE_TRANSACTION ) {
933+ if (span .getName ().contains (READ_ONLY_TRANSACTION )
934+ || span .getName ().contains (READ_WRITE_TRANSACTION )) {
928935 readWriteOrReadOnlyTxnPresent = true ;
929936 }
930937 if (span .getName ().startsWith ("Spanner." )) {
@@ -934,8 +941,8 @@ public boolean isExportedEndToEndTraceValid(String traceId) {
934941 if (readWriteOrReadOnlyTxnPresent && !spannerServerSideSpanPresent ) {
935942 return false ;
936943 }
937- } catch (IOException e ) {
938- LOGGER .log (Level .WARNING , "failed to verify end to end traces ." , e );
944+ } catch (Exception e ) {
945+ LOGGER .log (Level .WARNING , "Failed to verify end to end trace ." , e );
939946 return false ;
940947 }
941948 return true ;
0 commit comments