feat: implement Resumable Upload Protocol in python by viacheslav-rostovtsev · Pull Request #16557 · googleapis/google-cloud-python

@viacheslav-rostovtsev

Test User added 4 commits

March 26, 2026 18:26

@google-cla

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

gemini-code-assist[bot]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a resumable media upload library for Google APIs, featuring a Sans-I/O state machine and a synchronous implementation using the requests library. The implementation includes support for chunked uploads, automatic retries for transient errors, and a recovery mechanism to resynchronize state with the server. Feedback includes a suggestion to prevent a potential infinite loop by checking the machine's invalid state during the upload loop and a simplification of the chunk size alignment logic.

Comment on lines +469 to +470

while not upload.finished:
final_response = upload.transmit_next_chunk(transport=transport)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The loop should check the invalid state of the machine. If the state machine enters an unrecoverable state (e.g., the server returns a CANCELLED status), the loop might continue indefinitely if finished never becomes true.

while not upload.finished:
final_response = upload.transmit_next_chunk(transport=transport)
while not upload.finished and not upload._machine.invalid:
final_response = upload.transmit_next_chunk(transport=transport)

Comment on lines +74 to +82

def chunk_size(self) -> int:
"""int: The block-aligned chunk size informed by server granularity."""
actual_chunk_size = self._chunk_size
if self._chunk_granularity:
if actual_chunk_size % self._chunk_granularity != 0:
actual_chunk_size = (
(actual_chunk_size // self._chunk_granularity) + 1
) * self._chunk_granularity
return actual_chunk_size

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for rounding up the chunk size to the nearest multiple of granularity can be simplified using the standard formula for alignment.

def chunk_size(self) -> int:
"""int: The block-aligned chunk size informed by server granularity."""
actual_chunk_size = self._chunk_size
if self._chunk_granularity:
if actual_chunk_size % self._chunk_granularity != 0:
actual_chunk_size = (
(actual_chunk_size // self._chunk_granularity) + 1
) * self._chunk_granularity
return actual_chunk_size
@property
def chunk_size(self) -> int:
"""int: The block-aligned chunk size informed by server granularity."""
actual_chunk_size = self._chunk_size
if self._chunk_granularity:
actual_chunk_size = (
(actual_chunk_size + self._chunk_granularity - 1)
// self._chunk_granularity
) * self._chunk_granularity
return actual_chunk_size

gemini-code-assist[bot]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a resumable media upload library for Google APIs, featuring a Sans-I/O state machine and a synchronous implementation using the requests library. The new modules handle upload initiation, chunked data transmission, and state recovery through server queries. Feedback focuses on enhancing the robustness of the implementation, specifically by correctly decoding byte-based header values, ensuring timezone-aware datetime comparisons for deadlines, and expanding the retry logic to include ChunkedEncodingError for more resilient stream handling.

# Merge user metadata first
if stream_metadata:
for k, v in stream_metadata.items():
headers[k] = str(v)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Stringifying header values using str(v) can lead to incorrect header values if v is of type bytes. In Python 3, str(b'value') results in the string "b'value'", which is likely not the intended header content. It is safer to decode bytes or handle them explicitly.

headers[k] = v.decode("utf-8") if isinstance(v, bytes) else str(v)

Comment on lines +117 to +118

now = datetime.datetime.now(datetime.timezone.utc)
timeout = (self.deadline - now).total_seconds()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calculating the timeout by subtracting now (which is timezone-aware UTC) from self.deadline will raise a TypeError if self.deadline is a naive datetime object. To ensure robustness, you should handle naive datetimes by assuming they are in UTC or converting them accordingly.

now = datetime.datetime.now(datetime.timezone.utc)
deadline = self.deadline
if deadline.tzinfo is None:
    deadline = deadline.replace(tzinfo=datetime.timezone.utc)
timeout = (deadline - now).total_seconds()

Comment on lines +126 to +138

def _get_retry_predicate(self):
def should_retry(exc):
if isinstance(exc, requests.exceptions.RequestException):
if isinstance(
exc,
(requests.exceptions.ConnectionError, requests.exceptions.Timeout),
):
return True
if isinstance(exc, exceptions.GoogleAPICallError):
return exc.code in _common.RETRYABLE_STATUS_CODES
return False

return should_retry

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The retry predicate for requests exceptions is missing requests.exceptions.ChunkedEncodingError. This error is common during large stream uploads and is typically transient, making it a good candidate for retries.

def _get_retry_predicate(self):
    def should_retry(exc):
        if isinstance(exc, requests.exceptions.RequestException):
            if isinstance(
                exc,
                (
                    requests.exceptions.ConnectionError,
                    requests.exceptions.Timeout,
                    requests.exceptions.ChunkedEncodingError,
                ),
            ):
                return True
        if isinstance(exc, exceptions.GoogleAPICallError):
            return exc.code in _common.RETRYABLE_STATUS_CODES
        return False

    return should_retry