google-auth library 1.5.0

This commit is contained in:
Jay Lee
2018-07-04 20:00:39 -04:00
parent 518e506df4
commit 19018e4854
16 changed files with 707 additions and 64 deletions

View File

@@ -34,6 +34,9 @@ _CLOUD_SDK_POSIX_COMMAND = 'gcloud'
_CLOUD_SDK_WINDOWS_COMMAND = 'gcloud.cmd'
# The command to get the Cloud SDK configuration
_CLOUD_SDK_CONFIG_COMMAND = ('config', 'config-helper', '--format', 'json')
# Cloud SDK's application-default client ID
CLOUD_SDK_CLIENT_ID = (
'764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com')
def get_config_path():

View File

@@ -21,6 +21,7 @@ import io
import json
import logging
import os
import warnings
import six
@@ -36,13 +37,34 @@ _SERVICE_ACCOUNT_TYPE = 'service_account'
_VALID_TYPES = (_AUTHORIZED_USER_TYPE, _SERVICE_ACCOUNT_TYPE)
# Help message when no credentials can be found.
_HELP_MESSAGE = """
Could not automatically determine credentials. Please set {env} or
explicitly create credential and re-run the application. For more
information, please see
_HELP_MESSAGE = """\
Could not automatically determine credentials. Please set {env} or \
explicitly create credentials and re-run the application. For more \
information, please see \
https://developers.google.com/accounts/docs/application-default-credentials.
""".format(env=environment_vars.CREDENTIALS).strip()
# Warning when using Cloud SDK user credentials
_CLOUD_SDK_CREDENTIALS_WARNING = """\
Your application has authenticated using end user credentials from Google \
Cloud SDK. We recommend that most server applications use service accounts \
instead. If your application continues to use end user credentials from Cloud \
SDK, you might receive a "quota exceeded" or "API not enabled" error. For \
more information about service accounts, see \
https://cloud.google.com/docs/authentication/."""
def _warn_about_problematic_credentials(credentials):
"""Determines if the credentials are problematic.
Credentials from the Cloud SDK that are associated with Cloud SDK's project
are problematic because they may not have APIs enabled and have limited
quota. If this is the case, warn about it.
"""
from google.auth import _cloud_sdk
if credentials.client_id == _cloud_sdk.CLOUD_SDK_CLIENT_ID:
warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)
def _load_credentials_from_file(filename):
"""Loads credentials from a file.
@@ -90,6 +112,7 @@ def _load_credentials_from_file(filename):
new_exc = exceptions.DefaultCredentialsError(msg, caught_exc)
six.raise_from(new_exc, caught_exc)
# Authorized user credentials do not contain the project ID.
_warn_about_problematic_credentials(credentials)
return credentials, None
elif credential_type == _SERVICE_ACCOUNT_TYPE:

View File

@@ -136,7 +136,7 @@ class Credentials(credentials.Scoped, credentials.Signing,
@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes):
return Credentials(
return self.__class__(
scopes=scopes, service_account_id=self._service_account_id)
@_helpers.copy_docstring(credentials.Signing)

View File

@@ -15,8 +15,10 @@
"""Google Compute Engine authentication."""
from google.auth.compute_engine.credentials import Credentials
from google.auth.compute_engine.credentials import IDTokenCredentials
__all__ = [
'Credentials'
'Credentials',
'IDTokenCredentials',
]

View File

@@ -19,11 +19,17 @@ Engine using the Compute Engine metadata server.
"""
import datetime
import six
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
from google.auth import iam
from google.auth import jwt
from google.auth.compute_engine import _metadata
from google.oauth2 import _client
class Credentials(credentials.ReadOnlyScoped, credentials.Credentials):
@@ -108,3 +114,126 @@ class Credentials(credentials.ReadOnlyScoped, credentials.Credentials):
def requires_scopes(self):
"""False: Compute Engine credentials can not be scoped."""
return False
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
_DEFAULT_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
class IDTokenCredentials(credentials.Credentials, credentials.Signing):
"""Open ID Connect ID Token-based service account credentials.
These credentials relies on the default service account of a GCE instance.
In order for this to work, the GCE instance must have been started with
a service account that has access to the IAM Cloud API.
"""
def __init__(self, request, target_audience,
token_uri=_DEFAULT_TOKEN_URI,
additional_claims=None,
service_account_email=None):
"""
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token. The ID Token's ``aud`` claim
will be set to this string.
token_uri (str): The OAuth 2.0 Token URI.
additional_claims (Mapping[str, str]): Any additional claims for
the JWT assertion used in the authorization grant.
service_account_email (str): Optional explicit service account to
use to sign JWT tokens.
By default, this is the default GCE service account.
"""
super(IDTokenCredentials, self).__init__()
if service_account_email is None:
sa_info = _metadata.get_service_account_info(request)
service_account_email = sa_info['email']
self._service_account_email = service_account_email
self._signer = iam.Signer(
request=request,
credentials=Credentials(),
service_account_email=service_account_email)
self._token_uri = token_uri
self._target_audience = target_audience
if additional_claims is not None:
self._additional_claims = additional_claims
else:
self._additional_claims = {}
def with_target_audience(self, target_audience):
"""Create a copy of these credentials with the specified target
audience.
Args:
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token.
Returns:
google.auth.service_account.IDTokenCredentials: A new credentials
instance.
"""
return self.__class__(
self._signer,
service_account_email=self._service_account_email,
token_uri=self._token_uri,
target_audience=target_audience,
additional_claims=self._additional_claims.copy())
def _make_authorization_grant_assertion(self):
"""Create the OAuth 2.0 assertion.
This assertion is used during the OAuth 2.0 grant to acquire an
ID token.
Returns:
bytes: The authorization grant assertion.
"""
now = _helpers.utcnow()
lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
expiry = now + lifetime
payload = {
'iat': _helpers.datetime_to_secs(now),
'exp': _helpers.datetime_to_secs(expiry),
# The issuer must be the service account email.
'iss': self.service_account_email,
# The audience must be the auth token endpoint's URI
'aud': self._token_uri,
# The target audience specifies which service the ID token is
# intended for.
'target_audience': self._target_audience
}
payload.update(self._additional_claims)
token = jwt.encode(self._signer, payload)
return token
@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
assertion = self._make_authorization_grant_assertion()
access_token, expiry, _ = _client.id_token_jwt_grant(
request, self._token_uri, assertion)
self.token = access_token
self.expiry = expiry
@property
@_helpers.copy_docstring(credentials.Signing)
def signer(self):
return self._signer
@_helpers.copy_docstring(credentials.Signing)
def sign_bytes(self, message):
return self._signer.sign(message)
@property
def service_account_email(self):
"""The service account email."""
return self._service_account_email
@property
def signer_email(self):
return self._service_account_email

View File

@@ -53,8 +53,9 @@ class Credentials(object):
def expired(self):
"""Checks if the credentials are expired.
Note that credentials can be invalid but not expired becaue Credentials
with :attr:`expiry` set to None is considered to never expire.
Note that credentials can be invalid but not expired because
Credentials with :attr:`expiry` set to None is considered to never
expire.
"""
if not self.expiry:
return False

View File

@@ -0,0 +1,149 @@
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""RSA verifier and signer that use the ``cryptography`` library.
This is a much faster implementation than the default (in
``google.auth.crypt._python_rsa``), which depends on the pure-Python
``rsa`` library.
"""
import cryptography.exceptions
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
import cryptography.x509
import pkg_resources
from google.auth import _helpers
from google.auth.crypt import base
_IMPORT_ERROR_MSG = (
'cryptography>=1.4.0 is required to use cryptography-based RSA '
'implementation.')
try: # pragma: NO COVER
release = pkg_resources.get_distribution('cryptography').parsed_version
if release < pkg_resources.parse_version('1.4.0'):
raise ImportError(_IMPORT_ERROR_MSG)
except pkg_resources.DistributionNotFound: # pragma: NO COVER
raise ImportError(_IMPORT_ERROR_MSG)
_CERTIFICATE_MARKER = b'-----BEGIN CERTIFICATE-----'
_BACKEND = backends.default_backend()
_PADDING = padding.PKCS1v15()
_SHA256 = hashes.SHA256()
class RSAVerifier(base.Verifier):
"""Verifies RSA cryptographic signatures using public keys.
Args:
public_key (
cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey):
The public key used to verify signatures.
"""
def __init__(self, public_key):
self._pubkey = public_key
@_helpers.copy_docstring(base.Verifier)
def verify(self, message, signature):
message = _helpers.to_bytes(message)
try:
self._pubkey.verify(signature, message, _PADDING, _SHA256)
return True
except (ValueError, cryptography.exceptions.InvalidSignature):
return False
@classmethod
def from_string(cls, public_key):
"""Construct an Verifier instance from a public key or public
certificate string.
Args:
public_key (Union[str, bytes]): The public key in PEM format or the
x509 public key certificate.
Returns:
Verifier: The constructed verifier.
Raises:
ValueError: If the public key can't be parsed.
"""
public_key_data = _helpers.to_bytes(public_key)
if _CERTIFICATE_MARKER in public_key_data:
cert = cryptography.x509.load_pem_x509_certificate(
public_key_data, _BACKEND)
pubkey = cert.public_key()
else:
pubkey = serialization.load_pem_public_key(
public_key_data, _BACKEND)
return cls(pubkey)
class RSASigner(base.Signer, base.FromServiceAccountMixin):
"""Signs messages with an RSA private key.
Args:
private_key (
cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
The private key to sign with.
key_id (str): Optional key ID used to identify this private key. This
can be useful to associate the private key with its associated
public key or certificate.
"""
def __init__(self, private_key, key_id=None):
self._key = private_key
self._key_id = key_id
@property
@_helpers.copy_docstring(base.Signer)
def key_id(self):
return self._key_id
@_helpers.copy_docstring(base.Signer)
def sign(self, message):
message = _helpers.to_bytes(message)
return self._key.sign(
message, _PADDING, _SHA256)
@classmethod
def from_string(cls, key, key_id=None):
"""Construct a RSASigner from a private key in PEM format.
Args:
key (Union[bytes, str]): Private key in PEM format.
key_id (str): An optional key id used to identify the private key.
Returns:
google.auth.crypt._cryptography_rsa.RSASigner: The
constructed signer.
Raises:
ValueError: If ``key`` is not ``bytes`` or ``str`` (unicode).
UnicodeDecodeError: If ``key`` is ``bytes`` but cannot be decoded
into a UTF-8 ``str``.
ValueError: If ``cryptography`` "Could not deserialize key data."
"""
key = _helpers.to_bytes(key)
private_key = serialization.load_pem_private_key(
key, password=None, backend=_BACKEND)
return cls(private_key, key_id=key_id)

View File

View File

@@ -21,9 +21,6 @@ certificates. There is no support for p12 files.
from __future__ import absolute_import
import io
import json
from pyasn1.codec.der import decoder
from pyasn1_modules import pem
from pyasn1_modules.rfc2459 import Certificate
@@ -41,8 +38,6 @@ _PKCS1_MARKER = ('-----BEGIN RSA PRIVATE KEY-----',
_PKCS8_MARKER = ('-----BEGIN PRIVATE KEY-----',
'-----END PRIVATE KEY-----')
_PKCS8_SPEC = PrivateKeyInfo()
_JSON_FILE_PRIVATE_KEY = 'private_key'
_JSON_FILE_PRIVATE_KEY_ID = 'private_key_id'
def _bit_list_to_bytes(bit_list):
@@ -119,7 +114,7 @@ class RSAVerifier(base.Verifier):
return cls(pubkey)
class RSASigner(base.Signer):
class RSASigner(base.Signer, base.FromServiceAccountMixin):
"""Signs messages with an RSA private key.
Args:
@@ -179,43 +174,3 @@ class RSASigner(base.Signer):
raise ValueError('No key could be detected.')
return cls(private_key, key_id=key_id)
@classmethod
def from_service_account_info(cls, info):
"""Creates a Signer instance instance from a dictionary containing
service account info in Google format.
Args:
info (Mapping[str, str]): The service account info in Google
format.
Returns:
google.auth.crypt.Signer: The constructed signer.
Raises:
ValueError: If the info is not in the expected format.
"""
if _JSON_FILE_PRIVATE_KEY not in info:
raise ValueError(
'The private_key field was not found in the service account '
'info.')
return cls.from_string(
info[_JSON_FILE_PRIVATE_KEY],
info.get(_JSON_FILE_PRIVATE_KEY_ID))
@classmethod
def from_service_account_file(cls, filename):
"""Creates a Signer instance from a service account .json file
in Google format.
Args:
filename (str): The path to the service account .json file.
Returns:
google.auth.crypt.Signer: The constructed signer.
"""
with io.open(filename, 'r', encoding='utf-8') as json_file:
data = json.load(json_file)
return cls.from_service_account_info(data)

View File

@@ -15,10 +15,16 @@
"""Base classes for cryptographic signers and verifiers."""
import abc
import io
import json
import six
_JSON_FILE_PRIVATE_KEY = 'private_key'
_JSON_FILE_PRIVATE_KEY_ID = 'private_key_id'
@six.add_metaclass(abc.ABCMeta)
class Verifier(object):
"""Abstract base class for crytographic signature verifiers."""
@@ -62,3 +68,64 @@ class Signer(object):
# pylint: disable=missing-raises-doc,redundant-returns-doc
# (pylint doesn't recognize that this is abstract)
raise NotImplementedError('Sign must be implemented')
@six.add_metaclass(abc.ABCMeta)
class FromServiceAccountMixin(object):
"""Mix-in to enable factory constructors for a Signer."""
@abc.abstractmethod
def from_string(cls, key, key_id=None):
"""Construct an Signer instance from a private key string.
Args:
key (str): Private key as a string.
key_id (str): An optional key id used to identify the private key.
Returns:
google.auth.crypt.Signer: The constructed signer.
Raises:
ValueError: If the key cannot be parsed.
"""
raise NotImplementedError('from_string must be implemented')
@classmethod
def from_service_account_info(cls, info):
"""Creates a Signer instance instance from a dictionary containing
service account info in Google format.
Args:
info (Mapping[str, str]): The service account info in Google
format.
Returns:
google.auth.crypt.Signer: The constructed signer.
Raises:
ValueError: If the info is not in the expected format.
"""
if _JSON_FILE_PRIVATE_KEY not in info:
raise ValueError(
'The private_key field was not found in the service account '
'info.')
return cls.from_string(
info[_JSON_FILE_PRIVATE_KEY],
info.get(_JSON_FILE_PRIVATE_KEY_ID))
@classmethod
def from_service_account_file(cls, filename):
"""Creates a Signer instance from a service account .json file
in Google format.
Args:
filename (str): The path to the service account .json file.
Returns:
google.auth.crypt.Signer: The constructed signer.
"""
with io.open(filename, 'r', encoding='utf-8') as json_file:
data = json.load(json_file)
return cls.from_service_account_info(data)

View File

@@ -14,7 +14,17 @@
"""RSA cryptography signer and verifier."""
from google.auth.crypt import _python_rsa
RSASigner = _python_rsa.RSASigner
RSAVerifier = _python_rsa.RSAVerifier
try:
# Prefer cryptograph-based RSA implementation.
from google.auth.crypt import _cryptography_rsa
RSASigner = _cryptography_rsa.RSASigner
RSAVerifier = _cryptography_rsa.RSAVerifier
except ImportError: # pragma: NO COVER
# Fallback to pure-python RSA implementation if cryptography is
# unavailable.
from google.auth.crypt import _python_rsa
RSASigner = _python_rsa.RSASigner
RSAVerifier = _python_rsa.RSAVerifier

View File

@@ -21,7 +21,7 @@ See `rfc7519`_ for more details on JWTs.
To encode a JWT use :func:`encode`::
from google.auth import crypto
from google.auth import crypt
from google.auth import jwt
signer = crypt.Signer(private_key)
@@ -438,7 +438,7 @@ class Credentials(google.auth.credentials.Signing,
new_additional_claims = copy.deepcopy(self._additional_claims)
new_additional_claims.update(additional_claims or {})
return Credentials(
return self.__class__(
self._signer,
issuer=issuer if issuer is not None else self._issuer,
subject=subject if subject is not None else self._subject,
@@ -643,7 +643,7 @@ class OnDemandCredentials(
new_additional_claims = copy.deepcopy(self._additional_claims)
new_additional_claims.update(additional_claims or {})
return OnDemandCredentials(
return self.__class__(
self._signer,
issuer=issuer if issuer is not None else self._issuer,
subject=subject if subject is not None else self._subject,

View File

@@ -32,6 +32,7 @@ from six.moves import urllib
from google.auth import _helpers
from google.auth import exceptions
from google.auth import jwt
_URLENCODED_CONTENT_TYPE = 'application/x-www-form-urlencoded'
_JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer'
@@ -155,6 +156,51 @@ def jwt_grant(request, token_uri, assertion):
return access_token, expiry, response_data
def id_token_jwt_grant(request, token_uri, assertion):
"""Implements the JWT Profile for OAuth 2.0 Authorization Grants, but
requests an OpenID Connect ID Token instead of an access token.
This is a variant on the standard JWT Profile that is currently unique
to Google. This was added for the benefit of authenticating to services
that require ID Tokens instead of access tokens or JWT bearer tokens.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
token_uri (str): The OAuth 2.0 authorization server's token endpoint
URI.
assertion (str): JWT token signed by a service account. The token's
payload must include a ``target_audience`` claim.
Returns:
Tuple[str, Optional[datetime], Mapping[str, str]]:
The (encoded) Open ID Connect ID Token, expiration, and additional
data returned by the endpoint.
Raises:
google.auth.exceptions.RefreshError: If the token endpoint returned
an error.
"""
body = {
'assertion': assertion,
'grant_type': _JWT_GRANT_TYPE,
}
response_data = _token_endpoint_request(request, token_uri, body)
try:
id_token = response_data['id_token']
except KeyError as caught_exc:
new_exc = exceptions.RefreshError(
'No ID token in response.', response_data)
six.raise_from(new_exc, caught_exc)
payload = jwt.decode(id_token, verify=False)
expiry = datetime.datetime.utcfromtimestamp(payload['exp'])
return id_token, expiry, response_data
def refresh_grant(request, token_uri, refresh_token, client_id, client_secret):
"""Implements the OAuth 2.0 refresh token grant.

View File

@@ -38,6 +38,7 @@ import six
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
from google.oauth2 import _client
@@ -120,6 +121,15 @@ class Credentials(credentials.ReadOnlyScoped, credentials.Credentials):
@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
if (self._refresh_token is None or
self._token_uri is None or
self._client_id is None or
self._client_secret is None):
raise exceptions.RefreshError(
'The credentials do not contain the necessary fields need to '
'refresh the access token. You must specify refresh_token, '
'token_uri, client_id, and client_secret.')
access_token, refresh_token, expiry, grant_response = (
_client.refresh_grant(
request, self._token_uri, self._refresh_token, self._client_id,

View File

@@ -12,7 +12,51 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google ID Token helpers."""
"""Google ID Token helpers.
Provides support for verifying `OpenID Connect ID Tokens`_, especially ones
generated by Google infrastructure.
To parse and verify an ID Token issued by Google's OAuth 2.0 authorization
server use :func:`verify_oauth2_token`. To verify an ID Token issued by
Firebase, use :func:`verify_firebase_token`.
A general purpose ID Token verifier is available as :func:`verify_token`.
Example::
from google.oauth2 import id_token
from google.auth.transport import requests
request = requests.Request()
id_info = id_token.verify_oauth2_token(
token, request, 'my-client-id.example.com')
if id_info['iss'] != 'https://accounts.google.com':
raise ValueError('Wrong issuer.')
userid = id_info['sub']
By default, this will re-fetch certificates for each verification. Because
Google's public keys are only changed infrequently (on the order of once per
day), you may wish to take advantage of caching to reduce latency and the
potential for network errors. This can be accomplished using an external
library like `CacheControl`_ to create a cache-aware
:class:`google.auth.transport.Request`::
import cachecontrol
import google.auth.transport.requests
import requests
session = requests.session()
cached_session = cachecontrol.CacheControl(session)
request = google.auth.transport.requests.Request(session=cached_session)
.. _OpenID Connect ID Token:
http://openid.net/specs/openid-connect-core-1_0.html#IDToken
.. _CacheControl: https://cachecontrol.readthedocs.io
"""
import json

View File

@@ -230,7 +230,7 @@ class Credentials(credentials.Signing,
@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes):
return Credentials(
return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=scopes,
@@ -249,7 +249,7 @@ class Credentials(credentials.Signing,
google.auth.service_account.Credentials: A new credentials
instance.
"""
return Credentials(
return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=self._scopes,
@@ -273,7 +273,7 @@ class Credentials(credentials.Signing,
new_additional_claims = copy.deepcopy(self._additional_claims)
new_additional_claims.update(additional_claims or {})
return Credentials(
return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=self._scopes,
@@ -336,3 +336,207 @@ class Credentials(credentials.Signing,
@_helpers.copy_docstring(credentials.Signing)
def signer_email(self):
return self._service_account_email
class IDTokenCredentials(credentials.Signing, credentials.Credentials):
"""Open ID Connect ID Token-based service account credentials.
These credentials are largely similar to :class:`.Credentials`, but instead
of using an OAuth 2.0 Access Token as the bearer token, they use an Open
ID Connect ID Token as the bearer token. These credentials are useful when
communicating to services that require ID Tokens and can not accept access
tokens.
Usually, you'll create these credentials with one of the helper
constructors. To create credentials using a Google service account
private key JSON file::
credentials = (
service_account.IDTokenCredentials.from_service_account_file(
'service-account.json'))
Or if you already have the service account file loaded::
service_account_info = json.load(open('service_account.json'))
credentials = (
service_account.IDTokenCredentials.from_service_account_info(
service_account_info))
Both helper methods pass on arguments to the constructor, so you can
specify additional scopes and a subject if necessary::
credentials = (
service_account.IDTokenCredentials.from_service_account_file(
'service-account.json',
scopes=['email'],
subject='user@example.com'))
`
The credentials are considered immutable. If you want to modify the scopes
or the subject used for delegation, use :meth:`with_scopes` or
:meth:`with_subject`::
scoped_credentials = credentials.with_scopes(['email'])
delegated_credentials = credentials.with_subject(subject)
"""
def __init__(self, signer, service_account_email, token_uri,
target_audience, additional_claims=None):
"""
Args:
signer (google.auth.crypt.Signer): The signer used to sign JWTs.
service_account_email (str): The service account's email.
token_uri (str): The OAuth 2.0 Token URI.
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token. The ID Token's ``aud`` claim
will be set to this string.
additional_claims (Mapping[str, str]): Any additional claims for
the JWT assertion used in the authorization grant.
.. note:: Typically one of the helper constructors
:meth:`from_service_account_file` or
:meth:`from_service_account_info` are used instead of calling the
constructor directly.
"""
super(IDTokenCredentials, self).__init__()
self._signer = signer
self._service_account_email = service_account_email
self._token_uri = token_uri
self._target_audience = target_audience
if additional_claims is not None:
self._additional_claims = additional_claims
else:
self._additional_claims = {}
@classmethod
def _from_signer_and_info(cls, signer, info, **kwargs):
"""Creates a credentials instance from a signer and service account
info.
Args:
signer (google.auth.crypt.Signer): The signer used to sign JWTs.
info (Mapping[str, str]): The service account info.
kwargs: Additional arguments to pass to the constructor.
Returns:
google.auth.jwt.IDTokenCredentials: The constructed credentials.
Raises:
ValueError: If the info is not in the expected format.
"""
kwargs.setdefault('service_account_email', info['client_email'])
kwargs.setdefault('token_uri', info['token_uri'])
return cls(signer, **kwargs)
@classmethod
def from_service_account_info(cls, info, **kwargs):
"""Creates a credentials instance from parsed service account info.
Args:
info (Mapping[str, str]): The service account info in Google
format.
kwargs: Additional arguments to pass to the constructor.
Returns:
google.auth.service_account.IDTokenCredentials: The constructed
credentials.
Raises:
ValueError: If the info is not in the expected format.
"""
signer = _service_account_info.from_dict(
info, require=['client_email', 'token_uri'])
return cls._from_signer_and_info(signer, info, **kwargs)
@classmethod
def from_service_account_file(cls, filename, **kwargs):
"""Creates a credentials instance from a service account json file.
Args:
filename (str): The path to the service account json file.
kwargs: Additional arguments to pass to the constructor.
Returns:
google.auth.service_account.IDTokenCredentials: The constructed
credentials.
"""
info, signer = _service_account_info.from_filename(
filename, require=['client_email', 'token_uri'])
return cls._from_signer_and_info(signer, info, **kwargs)
def with_target_audience(self, target_audience):
"""Create a copy of these credentials with the specified target
audience.
Args:
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token.
Returns:
google.auth.service_account.IDTokenCredentials: A new credentials
instance.
"""
return self.__class__(
self._signer,
service_account_email=self._service_account_email,
token_uri=self._token_uri,
target_audience=target_audience,
additional_claims=self._additional_claims.copy())
def _make_authorization_grant_assertion(self):
"""Create the OAuth 2.0 assertion.
This assertion is used during the OAuth 2.0 grant to acquire an
ID token.
Returns:
bytes: The authorization grant assertion.
"""
now = _helpers.utcnow()
lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
expiry = now + lifetime
payload = {
'iat': _helpers.datetime_to_secs(now),
'exp': _helpers.datetime_to_secs(expiry),
# The issuer must be the service account email.
'iss': self.service_account_email,
# The audience must be the auth token endpoint's URI
'aud': self._token_uri,
# The target audience specifies which service the ID token is
# intended for.
'target_audience': self._target_audience
}
payload.update(self._additional_claims)
token = jwt.encode(self._signer, payload)
return token
@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
assertion = self._make_authorization_grant_assertion()
access_token, expiry, _ = _client.id_token_jwt_grant(
request, self._token_uri, assertion)
self.token = access_token
self.expiry = expiry
@property
def service_account_email(self):
"""The service account email."""
return self._service_account_email
@_helpers.copy_docstring(credentials.Signing)
def sign_bytes(self, message):
return self._signer.sign(message)
@property
@_helpers.copy_docstring(credentials.Signing)
def signer(self):
return self._signer
@property
@_helpers.copy_docstring(credentials.Signing)
def signer_email(self):
return self._service_account_email