[BEAM-12164] Feat: Add new restriction tracker to be able to track partition state along with timestamp for change streams connector. by nancyxu123 · Pull Request #17222 · apache/beam

/**
* Returns the progress made within the restriction so far. If lastAttemptedPosition is null, the
* start of the range is used as the completed work; otherwise, lastAttemptedPosition will be
* used. The time gap between lastAttemptedPosition and now is used as the remaining work. In this
* way, when the time gap becomes large, we will have more backlog to process and we should add
* more resources.
*
* @return work completed and work remaining in seconds.
*/
@Override
public Progress getProgress() {
BigDecimal end;
if (range.getTo().compareTo(Timestamp.MAX_VALUE) == 0) {
// When the given end timestamp equals to Timestamp.MAX_VALUE, this means that
// the end timestamp is not specified which should be a streaming job. So we
// use now() as the end timestamp.
end = BigDecimal.valueOf(timeSupplier.get().getSeconds());
} else {
end = BigDecimal.valueOf(range.getTo().getSeconds());
}
BigDecimal current;
if (lastClaimedPosition == null) {
current = BigDecimal.valueOf(range.getFrom().getSeconds());
} else {
current = BigDecimal.valueOf(lastClaimedPosition.getSeconds());
}
// The remaining work must be greater than 0. Otherwise, it will cause an issue
// that the watermark does not advance.
final BigDecimal workRemaining = end.subtract(current).max(BigDecimal.ONE);
LOG.debug(
"Reported progress - current:"
+ current.doubleValue()
+ " end:"
+ end.doubleValue()
+ " workRemaining:"
+ workRemaining.doubleValue());
return Progress.from(current.doubleValue(), workRemaining.doubleValue());
}