feat: Adding Agent Identity bound token support and handling certific… · googleapis/google-auth-library-python@b32c934

1+

# Copyright 2025 Google LLC

2+

#

3+

# Licensed under the Apache License, Version 2.0 (the "License");

4+

# you may not use this file except in compliance with the License.

5+

# You may obtain a copy of the License at

6+

#

7+

# http://www.apache.org/licenses/LICENSE-2.0

8+

#

9+

# Unless required by applicable law or agreed to in writing, software

10+

# distributed under the License is distributed on an "AS IS" BASIS,

11+

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

12+

# See the License for the specific language governing permissions and

13+

# limitations under the License.

14+15+

"""Helpers for Agent Identity credentials."""

16+17+

import base64

18+

import hashlib

19+

import logging

20+

import os

21+

import re

22+

import time

23+

from urllib.parse import quote, urlparse

24+25+

from google.auth import environment_vars

26+

from google.auth import exceptions

27+

from google.auth.transport import _mtls_helper

28+29+30+

_LOGGER = logging.getLogger(__name__)

31+32+

CRYPTOGRAPHY_NOT_FOUND_ERROR = (

33+

"The cryptography library is required for certificate-based authentication."

34+

"Please install it with `pip install google-auth[cryptography]`."

35+

)

36+37+

# SPIFFE trust domain patterns for Agent Identities.

38+

_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [

39+

r"^agents\.global\.org-\d+\.system\.id\.goog$",

40+

r"^agents\.global\.proj-\d+\.system\.id\.goog$",

41+

]

42+43+

_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem"

44+45+

# Constants for polling the certificate file.

46+

_FAST_POLL_CYCLES = 50

47+

_FAST_POLL_INTERVAL = 0.1 # 100ms

48+

_SLOW_POLL_INTERVAL = 0.5 # 500ms

49+

_TOTAL_TIMEOUT = 30 # seconds

50+51+

# Calculate the number of slow poll cycles based on the total timeout.

52+

_SLOW_POLL_CYCLES = int(

53+

(_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL

54+

)

55+56+

_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + (

57+

[_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES

58+

)

59+60+61+

def _is_certificate_file_ready(path):

62+

"""Checks if a file exists and is not empty."""

63+

return path and os.path.exists(path) and os.path.getsize(path) > 0

64+65+66+

def get_agent_identity_certificate_path():

67+

"""Gets the certificate path from the certificate config file.

68+69+

The path to the certificate config file is read from the

70+

GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function

71+

implements a retry mechanism to handle cases where the environment

72+

variable is set before the files are available on the filesystem.

73+74+

Returns:

75+

str: The path to the leaf certificate file.

76+77+

Raises:

78+

google.auth.exceptions.RefreshError: If the certificate config file

79+

or the certificate file cannot be found after retries.

80+

"""

81+

import json

82+83+

cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)

84+

if not cert_config_path:

85+

return None

86+87+

has_logged_warning = False

88+89+

for interval in _POLLING_INTERVALS:

90+

try:

91+

with open(cert_config_path, "r") as f:

92+

cert_config = json.load(f)

93+

cert_path = (

94+

cert_config.get("cert_configs", {})

95+

.get("workload", {})

96+

.get("cert_path")

97+

)

98+

if _is_certificate_file_ready(cert_path):

99+

return cert_path

100+

except (IOError, ValueError, KeyError):

101+

if not has_logged_warning:

102+

_LOGGER.warning(

103+

"Certificate config file not found at %s (from %s environment "

104+

"variable). Retrying for up to %s seconds.",

105+

cert_config_path,

106+

environment_vars.GOOGLE_API_CERTIFICATE_CONFIG,

107+

_TOTAL_TIMEOUT,

108+

)

109+

has_logged_warning = True

110+

pass

111+112+

# As a fallback, check the well-known certificate path.

113+

if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH):

114+

return _WELL_KNOWN_CERT_PATH

115+116+

# A sleep is required in two cases:

117+

# 1. The config file is not found (the except block).

118+

# 2. The config file is found, but the certificate is not yet available.

119+

# In both cases, we need to poll, so we sleep on every iteration

120+

# that doesn't return a certificate.

121+

time.sleep(interval)

122+123+

raise exceptions.RefreshError(

124+

"Certificate config or certificate file not found after multiple retries. "

125+

f"Token binding protection is failing. You can turn off this protection by setting "

126+

f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false "

127+

"to fall back to unbound tokens."

128+

)

129+130+131+

def get_and_parse_agent_identity_certificate():

132+

"""Gets and parses the agent identity certificate if not opted out.

133+134+

Checks if the user has opted out of certificate-bound tokens. If not,

135+

it gets the certificate path, reads the file, and parses it.

136+137+

Returns:

138+

The parsed certificate object if found and not opted out, otherwise None.

139+

"""

140+

# If the user has opted out of cert bound tokens, there is no need to

141+

# look up the certificate.

142+

is_opted_out = (

143+

os.environ.get(

144+

environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,

145+

"true",

146+

).lower()

147+

== "false"

148+

)

149+

if is_opted_out:

150+

return None

151+152+

cert_path = get_agent_identity_certificate_path()

153+

if not cert_path:

154+

return None

155+156+

with open(cert_path, "rb") as cert_file:

157+

cert_bytes = cert_file.read()

158+159+

return parse_certificate(cert_bytes)

160+161+162+

def parse_certificate(cert_bytes):

163+

"""Parses a PEM-encoded certificate.

164+165+

Args:

166+

cert_bytes (bytes): The PEM-encoded certificate bytes.

167+168+

Returns:

169+

cryptography.x509.Certificate: The parsed certificate object.

170+

"""

171+

try:

172+

from cryptography import x509

173+174+

return x509.load_pem_x509_certificate(cert_bytes)

175+

except ImportError as e:

176+

raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e

177+178+179+

def _is_agent_identity_certificate(cert):

180+

"""Checks if a certificate is an Agent Identity certificate.

181+182+

This is determined by checking the Subject Alternative Name (SAN) for a

183+

SPIFFE ID with a trust domain matching Agent Identity patterns.

184+185+

Args:

186+

cert (cryptography.x509.Certificate): The parsed certificate object.

187+188+

Returns:

189+

bool: True if the certificate is an Agent Identity certificate,

190+

False otherwise.

191+

"""

192+

try:

193+

from cryptography import x509

194+

from cryptography.x509.oid import ExtensionOID

195+196+

try:

197+

ext = cert.extensions.get_extension_for_oid(

198+

ExtensionOID.SUBJECT_ALTERNATIVE_NAME

199+

)

200+

except x509.ExtensionNotFound:

201+

return False

202+

uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier)

203+204+

for uri in uris:

205+

parsed_uri = urlparse(uri)

206+

if parsed_uri.scheme == "spiffe":

207+

trust_domain = parsed_uri.netloc

208+

for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS:

209+

if re.match(pattern, trust_domain):

210+

return True

211+

return False

212+

except ImportError as e:

213+

raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e

214+215+216+

def calculate_certificate_fingerprint(cert):

217+

"""Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a

218+

DER-encoded certificate.

219+220+

Args:

221+

cert (cryptography.x509.Certificate): The parsed certificate object.

222+223+

Returns:

224+

str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint.

225+

"""

226+

try:

227+

from cryptography.hazmat.primitives import serialization

228+229+

der_cert = cert.public_bytes(serialization.Encoding.DER)

230+

fingerprint = hashlib.sha256(der_cert).digest()

231+

# The certificate fingerprint is generated in two steps to align with GFE's

232+

# expectations and ensure proper URL transmission:

233+

# 1. Standard base64 encoding is applied, and padding ('=') is removed.

234+

# 2. The resulting string is then URL-encoded to handle special characters

235+

# ('+', '/') that would otherwise be misinterpreted in URL parameters.

236+

base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8")

237+

unpadded_base64_fingerprint = base64_fingerprint.rstrip("=")

238+

return quote(unpadded_base64_fingerprint)

239+

except ImportError as e:

240+

raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e

241+242+243+

def should_request_bound_token(cert):

244+

"""Determines if a bound token should be requested.

245+246+

This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES

247+

environment variable and whether the certificate is an agent identity cert.

248+249+

Args:

250+

cert (cryptography.x509.Certificate): The parsed certificate object.

251+252+

Returns:

253+

bool: True if a bound token should be requested, False otherwise.

254+

"""

255+

is_agent_cert = _is_agent_identity_certificate(cert)

256+

is_opted_in = (

257+

os.environ.get(

258+

environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,

259+

"true",

260+

).lower()

261+

== "true"

262+

)

263+

return is_agent_cert and is_opted_in

264+265+266+

def call_client_cert_callback():

267+

"""Calls the client cert callback and returns the certificate and key."""

268+

_, cert_bytes, key_bytes, passphrase = _mtls_helper.get_client_ssl_credentials(

269+

generate_encrypted_key=True

270+

)

271+

return cert_bytes, key_bytes

272+273+274+

def get_cached_cert_fingerprint(cached_cert):

275+

"""Returns the fingerprint of the cached certificate."""

276+

if cached_cert:

277+

cert_obj = parse_certificate(cached_cert)

278+

cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj)

279+

else:

280+

raise ValueError("mTLS connection is not configured.")

281+

return cached_cert_fingerprint