|
23 | 23 | import java.math.MathContext; |
24 | 24 | import org.apache.beam.sdk.io.range.OffsetRange; |
25 | 25 | import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; |
| 26 | +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedLong; |
26 | 27 |
|
27 | 28 | /** |
28 | 29 | * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code Long.MAX_VALUE} is |
@@ -68,6 +69,7 @@ public GrowableOffsetRangeTracker(long start, RangeEndEstimator rangeEndEstimato |
68 | 69 | this.rangeEndEstimator = checkNotNull(rangeEndEstimator); |
69 | 70 | } |
70 | 71 |
|
| 72 | + // TODO(sjvanrossum): Use UnsignedLong instead of BigDecimal for splitting ranges |
71 | 73 | @Override |
72 | 74 | public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) { |
73 | 75 | // If current tracking range is no longer growable, split it as a normal range. |
@@ -115,30 +117,12 @@ public Progress getProgress() { |
115 | 117 | return super.getProgress(); |
116 | 118 | } |
117 | 119 |
|
118 | | - // Convert to BigDecimal in computation to prevent overflow, which may result in lost of |
119 | | - // precision. |
120 | | - BigDecimal estimateRangeEnd = BigDecimal.valueOf(rangeEndEstimator.estimate()); |
121 | | - |
122 | | - if (lastAttemptedOffset == null) { |
123 | | - return Progress.from( |
124 | | - 0, |
125 | | - estimateRangeEnd |
126 | | - .subtract(BigDecimal.valueOf(range.getFrom()), MathContext.DECIMAL128) |
127 | | - .max(BigDecimal.ZERO) |
128 | | - .doubleValue()); |
129 | | - } |
| 120 | + final long completedEnd = lastAttemptedOffset == null ? range.getFrom() : lastAttemptedOffset; |
| 121 | + final long remainingEnd = Math.max(completedEnd, rangeEndEstimator.estimate()); |
130 | 122 |
|
131 | | - BigDecimal workRemaining = |
132 | | - estimateRangeEnd |
133 | | - .subtract(BigDecimal.valueOf(lastAttemptedOffset), MathContext.DECIMAL128) |
134 | | - .max(BigDecimal.ZERO); |
135 | | - BigDecimal totalWork = |
136 | | - estimateRangeEnd |
137 | | - .max(BigDecimal.valueOf(lastAttemptedOffset)) |
138 | | - .subtract(BigDecimal.valueOf(range.getFrom()), MathContext.DECIMAL128); |
139 | 123 | return Progress.from( |
140 | | - totalWork.subtract(workRemaining, MathContext.DECIMAL128).doubleValue(), |
141 | | - workRemaining.doubleValue()); |
| 124 | + UnsignedLong.fromLongBits(completedEnd - range.getFrom()).doubleValue(), |
| 125 | + UnsignedLong.fromLongBits(remainingEnd - completedEnd).doubleValue()); |
142 | 126 | } |
143 | 127 |
|
144 | 128 | @Override |
|
0 commit comments