feat: Adding Agent Identity bound token support and handling certific… · googleapis/google-auth-library-python@b32c934
1+# Copyright 2025 Google LLC
2+#
3+# Licensed under the Apache License, Version 2.0 (the "License");
4+# you may not use this file except in compliance with the License.
5+# You may obtain a copy of the License at
6+#
7+# http://www.apache.org/licenses/LICENSE-2.0
8+#
9+# Unless required by applicable law or agreed to in writing, software
10+# distributed under the License is distributed on an "AS IS" BASIS,
11+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+# See the License for the specific language governing permissions and
13+# limitations under the License.
14+15+"""Helpers for Agent Identity credentials."""
16+17+import base64
18+import hashlib
19+import logging
20+import os
21+import re
22+import time
23+from urllib.parse import quote, urlparse
24+25+from google.auth import environment_vars
26+from google.auth import exceptions
27+from google.auth.transport import _mtls_helper
28+29+30+_LOGGER = logging.getLogger(__name__)
31+32+CRYPTOGRAPHY_NOT_FOUND_ERROR = (
33+"The cryptography library is required for certificate-based authentication."
34+"Please install it with `pip install google-auth[cryptography]`."
35+)
36+37+# SPIFFE trust domain patterns for Agent Identities.
38+_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [
39+r"^agents\.global\.org-\d+\.system\.id\.goog$",
40+r"^agents\.global\.proj-\d+\.system\.id\.goog$",
41+]
42+43+_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem"
44+45+# Constants for polling the certificate file.
46+_FAST_POLL_CYCLES = 50
47+_FAST_POLL_INTERVAL = 0.1 # 100ms
48+_SLOW_POLL_INTERVAL = 0.5 # 500ms
49+_TOTAL_TIMEOUT = 30 # seconds
50+51+# Calculate the number of slow poll cycles based on the total timeout.
52+_SLOW_POLL_CYCLES = int(
53+ (_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL
54+)
55+56+_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + (
57+ [_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES
58+)
59+60+61+def _is_certificate_file_ready(path):
62+"""Checks if a file exists and is not empty."""
63+return path and os.path.exists(path) and os.path.getsize(path) > 0
64+65+66+def get_agent_identity_certificate_path():
67+"""Gets the certificate path from the certificate config file.
68+69+ The path to the certificate config file is read from the
70+ GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function
71+ implements a retry mechanism to handle cases where the environment
72+ variable is set before the files are available on the filesystem.
73+74+ Returns:
75+ str: The path to the leaf certificate file.
76+77+ Raises:
78+ google.auth.exceptions.RefreshError: If the certificate config file
79+ or the certificate file cannot be found after retries.
80+ """
81+import json
82+83+cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
84+if not cert_config_path:
85+return None
86+87+has_logged_warning = False
88+89+for interval in _POLLING_INTERVALS:
90+try:
91+with open(cert_config_path, "r") as f:
92+cert_config = json.load(f)
93+cert_path = (
94+cert_config.get("cert_configs", {})
95+ .get("workload", {})
96+ .get("cert_path")
97+ )
98+if _is_certificate_file_ready(cert_path):
99+return cert_path
100+except (IOError, ValueError, KeyError):
101+if not has_logged_warning:
102+_LOGGER.warning(
103+"Certificate config file not found at %s (from %s environment "
104+"variable). Retrying for up to %s seconds.",
105+cert_config_path,
106+environment_vars.GOOGLE_API_CERTIFICATE_CONFIG,
107+_TOTAL_TIMEOUT,
108+ )
109+has_logged_warning = True
110+pass
111+112+# As a fallback, check the well-known certificate path.
113+if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH):
114+return _WELL_KNOWN_CERT_PATH
115+116+# A sleep is required in two cases:
117+# 1. The config file is not found (the except block).
118+# 2. The config file is found, but the certificate is not yet available.
119+# In both cases, we need to poll, so we sleep on every iteration
120+# that doesn't return a certificate.
121+time.sleep(interval)
122+123+raise exceptions.RefreshError(
124+"Certificate config or certificate file not found after multiple retries. "
125+f"Token binding protection is failing. You can turn off this protection by setting "
126+f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false "
127+"to fall back to unbound tokens."
128+ )
129+130+131+def get_and_parse_agent_identity_certificate():
132+"""Gets and parses the agent identity certificate if not opted out.
133+134+ Checks if the user has opted out of certificate-bound tokens. If not,
135+ it gets the certificate path, reads the file, and parses it.
136+137+ Returns:
138+ The parsed certificate object if found and not opted out, otherwise None.
139+ """
140+# If the user has opted out of cert bound tokens, there is no need to
141+# look up the certificate.
142+is_opted_out = (
143+os.environ.get(
144+environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
145+"true",
146+ ).lower()
147+== "false"
148+ )
149+if is_opted_out:
150+return None
151+152+cert_path = get_agent_identity_certificate_path()
153+if not cert_path:
154+return None
155+156+with open(cert_path, "rb") as cert_file:
157+cert_bytes = cert_file.read()
158+159+return parse_certificate(cert_bytes)
160+161+162+def parse_certificate(cert_bytes):
163+"""Parses a PEM-encoded certificate.
164+165+ Args:
166+ cert_bytes (bytes): The PEM-encoded certificate bytes.
167+168+ Returns:
169+ cryptography.x509.Certificate: The parsed certificate object.
170+ """
171+try:
172+from cryptography import x509
173+174+return x509.load_pem_x509_certificate(cert_bytes)
175+except ImportError as e:
176+raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
177+178+179+def _is_agent_identity_certificate(cert):
180+"""Checks if a certificate is an Agent Identity certificate.
181+182+ This is determined by checking the Subject Alternative Name (SAN) for a
183+ SPIFFE ID with a trust domain matching Agent Identity patterns.
184+185+ Args:
186+ cert (cryptography.x509.Certificate): The parsed certificate object.
187+188+ Returns:
189+ bool: True if the certificate is an Agent Identity certificate,
190+ False otherwise.
191+ """
192+try:
193+from cryptography import x509
194+from cryptography.x509.oid import ExtensionOID
195+196+try:
197+ext = cert.extensions.get_extension_for_oid(
198+ExtensionOID.SUBJECT_ALTERNATIVE_NAME
199+ )
200+except x509.ExtensionNotFound:
201+return False
202+uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier)
203+204+for uri in uris:
205+parsed_uri = urlparse(uri)
206+if parsed_uri.scheme == "spiffe":
207+trust_domain = parsed_uri.netloc
208+for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS:
209+if re.match(pattern, trust_domain):
210+return True
211+return False
212+except ImportError as e:
213+raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
214+215+216+def calculate_certificate_fingerprint(cert):
217+"""Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a
218+ DER-encoded certificate.
219+220+ Args:
221+ cert (cryptography.x509.Certificate): The parsed certificate object.
222+223+ Returns:
224+ str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint.
225+ """
226+try:
227+from cryptography.hazmat.primitives import serialization
228+229+der_cert = cert.public_bytes(serialization.Encoding.DER)
230+fingerprint = hashlib.sha256(der_cert).digest()
231+# The certificate fingerprint is generated in two steps to align with GFE's
232+# expectations and ensure proper URL transmission:
233+# 1. Standard base64 encoding is applied, and padding ('=') is removed.
234+# 2. The resulting string is then URL-encoded to handle special characters
235+# ('+', '/') that would otherwise be misinterpreted in URL parameters.
236+base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8")
237+unpadded_base64_fingerprint = base64_fingerprint.rstrip("=")
238+return quote(unpadded_base64_fingerprint)
239+except ImportError as e:
240+raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
241+242+243+def should_request_bound_token(cert):
244+"""Determines if a bound token should be requested.
245+246+ This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES
247+ environment variable and whether the certificate is an agent identity cert.
248+249+ Args:
250+ cert (cryptography.x509.Certificate): The parsed certificate object.
251+252+ Returns:
253+ bool: True if a bound token should be requested, False otherwise.
254+ """
255+is_agent_cert = _is_agent_identity_certificate(cert)
256+is_opted_in = (
257+os.environ.get(
258+environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
259+"true",
260+ ).lower()
261+== "true"
262+ )
263+return is_agent_cert and is_opted_in
264+265+266+def call_client_cert_callback():
267+"""Calls the client cert callback and returns the certificate and key."""
268+_, cert_bytes, key_bytes, passphrase = _mtls_helper.get_client_ssl_credentials(
269+generate_encrypted_key=True
270+ )
271+return cert_bytes, key_bytes
272+273+274+def get_cached_cert_fingerprint(cached_cert):
275+"""Returns the fingerprint of the cached certificate."""
276+if cached_cert:
277+cert_obj = parse_certificate(cached_cert)
278+cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj)
279+else:
280+raise ValueError("mTLS connection is not configured.")
281+return cached_cert_fingerprint