|
18 | 18 | import com.google.bigtable.v2.ReadRowsResponse.CellChunk; |
19 | 19 | import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator; |
20 | 20 | import com.google.cloud.bigtable.data.v2.models.RowAdapter.RowBuilder; |
| 21 | +import com.google.common.base.Joiner; |
21 | 22 | import com.google.common.base.Preconditions; |
| 23 | +import com.google.common.collect.EvictingQueue; |
22 | 24 | import com.google.protobuf.ByteString; |
23 | 25 | import java.util.List; |
24 | 26 |
|
@@ -77,6 +79,14 @@ final class StateMachine<RowT> { |
77 | 79 | private State currentState; |
78 | 80 | private ByteString lastCompleteRowKey; |
79 | 81 |
|
| 82 | + // debug stats |
| 83 | + private int numScannedNotifications = 0; |
| 84 | + private int numRowsCommitted = 0; |
| 85 | + private int numChunksProcessed = 0; |
| 86 | + private int numCellsInRow = 0; |
| 87 | + private int numCellsInLastRow = 0; |
| 88 | + private EvictingQueue<ByteString> lastSeenKeys = EvictingQueue.create(5); |
| 89 | + |
80 | 90 | // Track current cell attributes: protocol omits them when they are repeated |
81 | 91 | private ByteString rowKey; |
82 | 92 | private String familyName; |
@@ -120,6 +130,7 @@ final class StateMachine<RowT> { |
120 | 130 | */ |
121 | 131 | void handleLastScannedRow(ByteString key) { |
122 | 132 | try { |
| 133 | + numScannedNotifications++; |
123 | 134 | currentState = currentState.handleLastScannedRow(key); |
124 | 135 | } catch (RuntimeException e) { |
125 | 136 | currentState = null; |
@@ -148,6 +159,7 @@ void handleLastScannedRow(ByteString key) { |
148 | 159 | */ |
149 | 160 | void handleChunk(CellChunk chunk) { |
150 | 161 | try { |
| 162 | + numChunksProcessed++; |
151 | 163 | currentState = currentState.handleChunk(chunk); |
152 | 164 | } catch (RuntimeException e) { |
153 | 165 | currentState = null; |
@@ -191,6 +203,7 @@ private void reset() { |
191 | 203 | expectedCellSize = 0; |
192 | 204 | remainingCellBytes = 0; |
193 | 205 | completeRow = null; |
| 206 | + numCellsInRow = 0; |
194 | 207 |
|
195 | 208 | adapter.reset(); |
196 | 209 | } |
@@ -326,6 +339,7 @@ State handleChunk(CellChunk chunk) { |
326 | 339 | return AWAITING_CELL_VALUE; |
327 | 340 | } |
328 | 341 | adapter.finishCell(); |
| 342 | + numCellsInRow++; |
329 | 343 |
|
330 | 344 | if (!chunk.getCommitRow()) { |
331 | 345 | return AWAITING_NEW_CELL; |
@@ -374,6 +388,7 @@ State handleChunk(CellChunk chunk) { |
374 | 388 | return AWAITING_CELL_VALUE; |
375 | 389 | } |
376 | 390 | adapter.finishCell(); |
| 391 | + numCellsInRow++; |
377 | 392 |
|
378 | 393 | if (!chunk.getCommitRow()) { |
379 | 394 | return AWAITING_NEW_CELL; |
@@ -416,12 +431,31 @@ private State handleCommit() { |
416 | 431 | validate(remainingCellBytes == 0, "Can't commit with remaining bytes"); |
417 | 432 | completeRow = adapter.finishRow(); |
418 | 433 | lastCompleteRowKey = rowKey; |
| 434 | + |
| 435 | + lastSeenKeys.add(rowKey); |
| 436 | + numRowsCommitted++; |
| 437 | + numCellsInLastRow = numCellsInRow; |
419 | 438 | return AWAITING_ROW_CONSUME; |
420 | 439 | } |
421 | 440 |
|
422 | | - private static void validate(boolean condition, String message) { |
| 441 | + private void validate(boolean condition, String message) { |
423 | 442 | if (!condition) { |
424 | | - throw new InvalidInputException(message); |
| 443 | + throw new InvalidInputException( |
| 444 | + message |
| 445 | + + ". numScannedNotifications: " |
| 446 | + + numScannedNotifications |
| 447 | + + ", numRowsCommitted: " |
| 448 | + + numRowsCommitted |
| 449 | + + ", numChunksProcessed: " |
| 450 | + + numChunksProcessed |
| 451 | + + ", numCellsInRow: " |
| 452 | + + numCellsInRow |
| 453 | + + ", numCellsInLastRow: " |
| 454 | + + numCellsInLastRow |
| 455 | + + ", rowKey: " |
| 456 | + + rowKey |
| 457 | + + ", last5Keys: " |
| 458 | + + Joiner.on(",").join(lastSeenKeys)); |
425 | 459 | } |
426 | 460 | } |
427 | 461 |
|
|
0 commit comments