[BEAM-12164] Feat: Add new restriction tracker to be able to track partition state along with timestamp for change streams connector. by nancyxu123 · Pull Request #17222 · apache/beam
| /** | |
| * Returns the progress made within the restriction so far. If lastAttemptedPosition is null, the | |
| * start of the range is used as the completed work; otherwise, lastAttemptedPosition will be | |
| * used. The time gap between lastAttemptedPosition and now is used as the remaining work. In this | |
| * way, when the time gap becomes large, we will have more backlog to process and we should add | |
| * more resources. | |
| * | |
| * @return work completed and work remaining in seconds. | |
| */ | |
| @Override | |
| public Progress getProgress() { | |
| BigDecimal end; | |
| if (range.getTo().compareTo(Timestamp.MAX_VALUE) == 0) { | |
| // When the given end timestamp equals to Timestamp.MAX_VALUE, this means that | |
| // the end timestamp is not specified which should be a streaming job. So we | |
| // use now() as the end timestamp. | |
| end = BigDecimal.valueOf(timeSupplier.get().getSeconds()); | |
| } else { | |
| end = BigDecimal.valueOf(range.getTo().getSeconds()); | |
| } | |
| BigDecimal current; | |
| if (lastClaimedPosition == null) { | |
| current = BigDecimal.valueOf(range.getFrom().getSeconds()); | |
| } else { | |
| current = BigDecimal.valueOf(lastClaimedPosition.getSeconds()); | |
| } | |
| // The remaining work must be greater than 0. Otherwise, it will cause an issue | |
| // that the watermark does not advance. | |
| final BigDecimal workRemaining = end.subtract(current).max(BigDecimal.ONE); | |
| LOG.debug( | |
| "Reported progress - current:" | |
| + current.doubleValue() | |
| + " end:" | |
| + end.doubleValue() | |
| + " workRemaining:" | |
| + workRemaining.doubleValue()); | |
| return Progress.from(current.doubleValue(), workRemaining.doubleValue()); | |
| } |