mirror of
https://github.com/GAM-team/GAM.git
synced 2026-06-29 18:31:38 +00:00
update googleapiclient, httplib2, oauth2client and passlib to latest versions
This commit is contained in:
@@ -15,415 +15,229 @@
|
||||
# limitations under the License.
|
||||
"""Crypto-related routines for oauth2client."""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
|
||||
import six
|
||||
from oauth2client._helpers import _from_bytes
|
||||
from oauth2client._helpers import _json_encode
|
||||
from oauth2client._helpers import _to_bytes
|
||||
from oauth2client._helpers import _urlsafe_b64decode
|
||||
from oauth2client._helpers import _urlsafe_b64encode
|
||||
|
||||
|
||||
CLOCK_SKEW_SECS = 300 # 5 minutes in seconds
|
||||
AUTH_TOKEN_LIFETIME_SECS = 300 # 5 minutes in seconds
|
||||
MAX_TOKEN_LIFETIME_SECS = 86400 # 1 day in seconds
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AppIdentityError(Exception):
|
||||
pass
|
||||
"""Error to indicate crypto failure."""
|
||||
|
||||
|
||||
try:
|
||||
from OpenSSL import crypto
|
||||
|
||||
class OpenSSLVerifier(object):
|
||||
"""Verifies the signature on a message."""
|
||||
|
||||
def __init__(self, pubkey):
|
||||
"""Constructor.
|
||||
|
||||
Args:
|
||||
pubkey, OpenSSL.crypto.PKey, The public key to verify with.
|
||||
"""
|
||||
self._pubkey = pubkey
|
||||
|
||||
def verify(self, message, signature):
|
||||
"""Verifies a message against a signature.
|
||||
|
||||
Args:
|
||||
message: string, The message to verify.
|
||||
signature: string, The signature on the message.
|
||||
|
||||
Returns:
|
||||
True if message was signed by the private key associated with the public
|
||||
key that this object was constructed with.
|
||||
"""
|
||||
try:
|
||||
if isinstance(message, six.text_type):
|
||||
message = message.encode('utf-8')
|
||||
crypto.verify(self._pubkey, signature, message, 'sha256')
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def from_string(key_pem, is_x509_cert):
|
||||
"""Construct a Verified instance from a string.
|
||||
|
||||
Args:
|
||||
key_pem: string, public key in PEM format.
|
||||
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
|
||||
expected to be an RSA key in PEM format.
|
||||
|
||||
Returns:
|
||||
Verifier instance.
|
||||
|
||||
Raises:
|
||||
OpenSSL.crypto.Error if the key_pem can't be parsed.
|
||||
"""
|
||||
if is_x509_cert:
|
||||
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
|
||||
else:
|
||||
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
|
||||
return OpenSSLVerifier(pubkey)
|
||||
|
||||
|
||||
class OpenSSLSigner(object):
|
||||
"""Signs messages with a private key."""
|
||||
|
||||
def __init__(self, pkey):
|
||||
"""Constructor.
|
||||
|
||||
Args:
|
||||
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
|
||||
"""
|
||||
self._key = pkey
|
||||
|
||||
def sign(self, message):
|
||||
"""Signs a message.
|
||||
|
||||
Args:
|
||||
message: bytes, Message to be signed.
|
||||
|
||||
Returns:
|
||||
string, The signature of the message for the given key.
|
||||
"""
|
||||
if isinstance(message, six.text_type):
|
||||
message = message.encode('utf-8')
|
||||
return crypto.sign(self._key, message, 'sha256')
|
||||
|
||||
@staticmethod
|
||||
def from_string(key, password=b'notasecret'):
|
||||
"""Construct a Signer instance from a string.
|
||||
|
||||
Args:
|
||||
key: string, private key in PKCS12 or PEM format.
|
||||
password: string, password for the private key file.
|
||||
|
||||
Returns:
|
||||
Signer instance.
|
||||
|
||||
Raises:
|
||||
OpenSSL.crypto.Error if the key can't be parsed.
|
||||
"""
|
||||
parsed_pem_key = _parse_pem_key(key)
|
||||
if parsed_pem_key:
|
||||
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
|
||||
else:
|
||||
if isinstance(password, six.text_type):
|
||||
password = password.encode('utf-8')
|
||||
pkey = crypto.load_pkcs12(key, password).get_privatekey()
|
||||
return OpenSSLSigner(pkey)
|
||||
|
||||
|
||||
def pkcs12_key_as_pem(private_key_text, private_key_password):
|
||||
"""Convert the contents of a PKCS12 key to PEM using OpenSSL.
|
||||
|
||||
Args:
|
||||
private_key_text: String. Private key.
|
||||
private_key_password: String. Password for PKCS12.
|
||||
|
||||
Returns:
|
||||
String. PEM contents of ``private_key_text``.
|
||||
"""
|
||||
decoded_body = base64.b64decode(private_key_text)
|
||||
if isinstance(private_key_password, six.string_types):
|
||||
private_key_password = private_key_password.encode('ascii')
|
||||
|
||||
pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password)
|
||||
return crypto.dump_privatekey(crypto.FILETYPE_PEM,
|
||||
pkcs12.get_privatekey())
|
||||
except ImportError:
|
||||
OpenSSLVerifier = None
|
||||
OpenSSLSigner = None
|
||||
def pkcs12_key_as_pem(*args, **kwargs):
|
||||
def _bad_pkcs12_key_as_pem(*args, **kwargs):
|
||||
raise NotImplementedError('pkcs12_key_as_pem requires OpenSSL.')
|
||||
|
||||
|
||||
try:
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Hash import SHA256
|
||||
from Crypto.Signature import PKCS1_v1_5
|
||||
from Crypto.Util.asn1 import DerSequence
|
||||
from oauth2client._openssl_crypt import OpenSSLVerifier
|
||||
from oauth2client._openssl_crypt import OpenSSLSigner
|
||||
from oauth2client._openssl_crypt import pkcs12_key_as_pem
|
||||
except ImportError: # pragma: NO COVER
|
||||
OpenSSLVerifier = None
|
||||
OpenSSLSigner = None
|
||||
pkcs12_key_as_pem = _bad_pkcs12_key_as_pem
|
||||
|
||||
|
||||
class PyCryptoVerifier(object):
|
||||
"""Verifies the signature on a message."""
|
||||
|
||||
def __init__(self, pubkey):
|
||||
"""Constructor.
|
||||
|
||||
Args:
|
||||
pubkey, OpenSSL.crypto.PKey (or equiv), The public key to verify with.
|
||||
"""
|
||||
self._pubkey = pubkey
|
||||
|
||||
def verify(self, message, signature):
|
||||
"""Verifies a message against a signature.
|
||||
|
||||
Args:
|
||||
message: string, The message to verify.
|
||||
signature: string, The signature on the message.
|
||||
|
||||
Returns:
|
||||
True if message was signed by the private key associated with the public
|
||||
key that this object was constructed with.
|
||||
"""
|
||||
try:
|
||||
return PKCS1_v1_5.new(self._pubkey).verify(
|
||||
SHA256.new(message), signature)
|
||||
except:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def from_string(key_pem, is_x509_cert):
|
||||
"""Construct a Verified instance from a string.
|
||||
|
||||
Args:
|
||||
key_pem: string, public key in PEM format.
|
||||
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
|
||||
expected to be an RSA key in PEM format.
|
||||
|
||||
Returns:
|
||||
Verifier instance.
|
||||
"""
|
||||
if is_x509_cert:
|
||||
if isinstance(key_pem, six.text_type):
|
||||
key_pem = key_pem.encode('ascii')
|
||||
pemLines = key_pem.replace(b' ', b'').split()
|
||||
certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1]))
|
||||
certSeq = DerSequence()
|
||||
certSeq.decode(certDer)
|
||||
tbsSeq = DerSequence()
|
||||
tbsSeq.decode(certSeq[0])
|
||||
pubkey = RSA.importKey(tbsSeq[6])
|
||||
else:
|
||||
pubkey = RSA.importKey(key_pem)
|
||||
return PyCryptoVerifier(pubkey)
|
||||
|
||||
|
||||
class PyCryptoSigner(object):
|
||||
"""Signs messages with a private key."""
|
||||
|
||||
def __init__(self, pkey):
|
||||
"""Constructor.
|
||||
|
||||
Args:
|
||||
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
|
||||
"""
|
||||
self._key = pkey
|
||||
|
||||
def sign(self, message):
|
||||
"""Signs a message.
|
||||
|
||||
Args:
|
||||
message: string, Message to be signed.
|
||||
|
||||
Returns:
|
||||
string, The signature of the message for the given key.
|
||||
"""
|
||||
if isinstance(message, six.text_type):
|
||||
message = message.encode('utf-8')
|
||||
return PKCS1_v1_5.new(self._key).sign(SHA256.new(message))
|
||||
|
||||
@staticmethod
|
||||
def from_string(key, password='notasecret'):
|
||||
"""Construct a Signer instance from a string.
|
||||
|
||||
Args:
|
||||
key: string, private key in PEM format.
|
||||
password: string, password for private key file. Unused for PEM files.
|
||||
|
||||
Returns:
|
||||
Signer instance.
|
||||
|
||||
Raises:
|
||||
NotImplementedError if they key isn't in PEM format.
|
||||
"""
|
||||
parsed_pem_key = _parse_pem_key(key)
|
||||
if parsed_pem_key:
|
||||
pkey = RSA.importKey(parsed_pem_key)
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
'PKCS12 format is not supported by the PyCrypto library. '
|
||||
'Try converting to a "PEM" '
|
||||
'(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) '
|
||||
'or using PyOpenSSL if native code is an option.')
|
||||
return PyCryptoSigner(pkey)
|
||||
|
||||
except ImportError:
|
||||
PyCryptoVerifier = None
|
||||
PyCryptoSigner = None
|
||||
try:
|
||||
from oauth2client._pycrypto_crypt import PyCryptoVerifier
|
||||
from oauth2client._pycrypto_crypt import PyCryptoSigner
|
||||
except ImportError: # pragma: NO COVER
|
||||
PyCryptoVerifier = None
|
||||
PyCryptoSigner = None
|
||||
|
||||
|
||||
if OpenSSLSigner:
|
||||
Signer = OpenSSLSigner
|
||||
Verifier = OpenSSLVerifier
|
||||
elif PyCryptoSigner:
|
||||
Signer = PyCryptoSigner
|
||||
Verifier = PyCryptoVerifier
|
||||
else:
|
||||
raise ImportError('No encryption library found. Please install either '
|
||||
'PyOpenSSL, or PyCrypto 2.6 or later')
|
||||
|
||||
|
||||
def _parse_pem_key(raw_key_input):
|
||||
"""Identify and extract PEM keys.
|
||||
|
||||
Determines whether the given key is in the format of PEM key, and extracts
|
||||
the relevant part of the key if it is.
|
||||
|
||||
Args:
|
||||
raw_key_input: The contents of a private key file (either PEM or PKCS12).
|
||||
|
||||
Returns:
|
||||
string, The actual key if the contents are from a PEM file, or else None.
|
||||
"""
|
||||
offset = raw_key_input.find(b'-----BEGIN ')
|
||||
if offset != -1:
|
||||
return raw_key_input[offset:]
|
||||
|
||||
|
||||
def _urlsafe_b64encode(raw_bytes):
|
||||
if isinstance(raw_bytes, six.text_type):
|
||||
raw_bytes = raw_bytes.encode('utf-8')
|
||||
return base64.urlsafe_b64encode(raw_bytes).decode('ascii').rstrip('=')
|
||||
|
||||
|
||||
def _urlsafe_b64decode(b64string):
|
||||
# Guard against unicode strings, which base64 can't handle.
|
||||
if isinstance(b64string, six.text_type):
|
||||
b64string = b64string.encode('ascii')
|
||||
padded = b64string + b'=' * (4 - len(b64string) % 4)
|
||||
return base64.urlsafe_b64decode(padded)
|
||||
|
||||
|
||||
def _json_encode(data):
|
||||
return json.dumps(data, separators=(',', ':'))
|
||||
Signer = OpenSSLSigner
|
||||
Verifier = OpenSSLVerifier
|
||||
elif PyCryptoSigner: # pragma: NO COVER
|
||||
Signer = PyCryptoSigner
|
||||
Verifier = PyCryptoVerifier
|
||||
else: # pragma: NO COVER
|
||||
raise ImportError('No encryption library found. Please install either '
|
||||
'PyOpenSSL, or PyCrypto 2.6 or later')
|
||||
|
||||
|
||||
def make_signed_jwt(signer, payload):
|
||||
"""Make a signed JWT.
|
||||
"""Make a signed JWT.
|
||||
|
||||
See http://self-issued.info/docs/draft-jones-json-web-token.html.
|
||||
See http://self-issued.info/docs/draft-jones-json-web-token.html.
|
||||
|
||||
Args:
|
||||
signer: crypt.Signer, Cryptographic signer.
|
||||
payload: dict, Dictionary of data to convert to JSON and then sign.
|
||||
Args:
|
||||
signer: crypt.Signer, Cryptographic signer.
|
||||
payload: dict, Dictionary of data to convert to JSON and then sign.
|
||||
|
||||
Returns:
|
||||
string, The JWT for the payload.
|
||||
"""
|
||||
header = {'typ': 'JWT', 'alg': 'RS256'}
|
||||
Returns:
|
||||
string, The JWT for the payload.
|
||||
"""
|
||||
header = {'typ': 'JWT', 'alg': 'RS256'}
|
||||
|
||||
segments = [
|
||||
segments = [
|
||||
_urlsafe_b64encode(_json_encode(header)),
|
||||
_urlsafe_b64encode(_json_encode(payload)),
|
||||
]
|
||||
signing_input = '.'.join(segments)
|
||||
]
|
||||
signing_input = b'.'.join(segments)
|
||||
|
||||
signature = signer.sign(signing_input)
|
||||
segments.append(_urlsafe_b64encode(signature))
|
||||
signature = signer.sign(signing_input)
|
||||
segments.append(_urlsafe_b64encode(signature))
|
||||
|
||||
logger.debug(str(segments))
|
||||
logger.debug(str(segments))
|
||||
|
||||
return '.'.join(segments)
|
||||
return b'.'.join(segments)
|
||||
|
||||
|
||||
def verify_signed_jwt_with_certs(jwt, certs, audience):
|
||||
"""Verify a JWT against public certs.
|
||||
def _verify_signature(message, signature, certs):
|
||||
"""Verifies signed content using a list of certificates.
|
||||
|
||||
See http://self-issued.info/docs/draft-jones-json-web-token.html.
|
||||
Args:
|
||||
message: string or bytes, The message to verify.
|
||||
signature: string or bytes, The signature on the message.
|
||||
certs: iterable, certificates in PEM format.
|
||||
|
||||
Args:
|
||||
jwt: string, A JWT.
|
||||
certs: dict, Dictionary where values of public keys in PEM format.
|
||||
audience: string, The audience, 'aud', that this JWT should contain. If
|
||||
None then the JWT's 'aud' parameter is not verified.
|
||||
Raises:
|
||||
AppIdentityError: If none of the certificates can verify the message
|
||||
against the signature.
|
||||
"""
|
||||
for pem in certs:
|
||||
verifier = Verifier.from_string(pem, is_x509_cert=True)
|
||||
if verifier.verify(message, signature):
|
||||
return
|
||||
|
||||
Returns:
|
||||
dict, The deserialized JSON payload in the JWT.
|
||||
# If we have not returned, no certificate confirms the signature.
|
||||
raise AppIdentityError('Invalid token signature')
|
||||
|
||||
Raises:
|
||||
AppIdentityError if any checks are failed.
|
||||
"""
|
||||
segments = jwt.split('.')
|
||||
|
||||
if len(segments) != 3:
|
||||
raise AppIdentityError('Wrong number of segments in token: %s' % jwt)
|
||||
signed = '%s.%s' % (segments[0], segments[1])
|
||||
def _check_audience(payload_dict, audience):
|
||||
"""Checks audience field from a JWT payload.
|
||||
|
||||
signature = _urlsafe_b64decode(segments[2])
|
||||
Does nothing if the passed in ``audience`` is null.
|
||||
|
||||
# Parse token.
|
||||
json_body = _urlsafe_b64decode(segments[1])
|
||||
try:
|
||||
parsed = json.loads(json_body.decode('utf-8'))
|
||||
except:
|
||||
raise AppIdentityError('Can\'t parse token: %s' % json_body)
|
||||
Args:
|
||||
payload_dict: dict, A dictionary containing a JWT payload.
|
||||
audience: string or NoneType, an audience to check for in
|
||||
the JWT payload.
|
||||
|
||||
# Check signature.
|
||||
verified = False
|
||||
for pem in certs.values():
|
||||
verifier = Verifier.from_string(pem, True)
|
||||
if verifier.verify(signed, signature):
|
||||
verified = True
|
||||
break
|
||||
if not verified:
|
||||
raise AppIdentityError('Invalid token signature: %s' % jwt)
|
||||
Raises:
|
||||
AppIdentityError: If there is no ``'aud'`` field in the payload
|
||||
dictionary but there is an ``audience`` to check.
|
||||
AppIdentityError: If the ``'aud'`` field in the payload dictionary
|
||||
does not match the ``audience``.
|
||||
"""
|
||||
if audience is None:
|
||||
return
|
||||
|
||||
# Check creation timestamp.
|
||||
iat = parsed.get('iat')
|
||||
if iat is None:
|
||||
raise AppIdentityError('No iat field in token: %s' % json_body)
|
||||
earliest = iat - CLOCK_SKEW_SECS
|
||||
audience_in_payload = payload_dict.get('aud')
|
||||
if audience_in_payload is None:
|
||||
raise AppIdentityError('No aud field in token: %s' %
|
||||
(payload_dict,))
|
||||
if audience_in_payload != audience:
|
||||
raise AppIdentityError('Wrong recipient, %s != %s: %s' %
|
||||
(audience_in_payload, audience, payload_dict))
|
||||
|
||||
# Check expiration timestamp.
|
||||
now = int(time.time())
|
||||
exp = parsed.get('exp')
|
||||
if exp is None:
|
||||
raise AppIdentityError('No exp field in token: %s' % json_body)
|
||||
if exp >= now + MAX_TOKEN_LIFETIME_SECS:
|
||||
raise AppIdentityError('exp field too far in future: %s' % json_body)
|
||||
latest = exp + CLOCK_SKEW_SECS
|
||||
|
||||
if now < earliest:
|
||||
raise AppIdentityError('Token used too early, %d < %d: %s' %
|
||||
(now, earliest, json_body))
|
||||
if now > latest:
|
||||
raise AppIdentityError('Token used too late, %d > %d: %s' %
|
||||
(now, latest, json_body))
|
||||
def _verify_time_range(payload_dict):
|
||||
"""Verifies the issued at and expiration from a JWT payload.
|
||||
|
||||
# Check audience.
|
||||
if audience is not None:
|
||||
aud = parsed.get('aud')
|
||||
if aud is None:
|
||||
raise AppIdentityError('No aud field in token: %s' % json_body)
|
||||
if aud != audience:
|
||||
raise AppIdentityError('Wrong recipient, %s != %s: %s' %
|
||||
(aud, audience, json_body))
|
||||
Makes sure the current time (in UTC) falls between the issued at and
|
||||
expiration for the JWT (with some skew allowed for via
|
||||
``CLOCK_SKEW_SECS``).
|
||||
|
||||
return parsed
|
||||
Args:
|
||||
payload_dict: dict, A dictionary containing a JWT payload.
|
||||
|
||||
Raises:
|
||||
AppIdentityError: If there is no ``'iat'`` field in the payload
|
||||
dictionary.
|
||||
AppIdentityError: If there is no ``'exp'`` field in the payload
|
||||
dictionary.
|
||||
AppIdentityError: If the JWT expiration is too far in the future (i.e.
|
||||
if the expiration would imply a token lifetime
|
||||
longer than what is allowed.)
|
||||
AppIdentityError: If the token appears to have been issued in the
|
||||
future (up to clock skew).
|
||||
AppIdentityError: If the token appears to have expired in the past
|
||||
(up to clock skew).
|
||||
"""
|
||||
# Get the current time to use throughout.
|
||||
now = int(time.time())
|
||||
|
||||
# Make sure issued at and expiration are in the payload.
|
||||
issued_at = payload_dict.get('iat')
|
||||
if issued_at is None:
|
||||
raise AppIdentityError('No iat field in token: %s' % (payload_dict,))
|
||||
expiration = payload_dict.get('exp')
|
||||
if expiration is None:
|
||||
raise AppIdentityError('No exp field in token: %s' % (payload_dict,))
|
||||
|
||||
# Make sure the expiration gives an acceptable token lifetime.
|
||||
if expiration >= now + MAX_TOKEN_LIFETIME_SECS:
|
||||
raise AppIdentityError('exp field too far in future: %s' %
|
||||
(payload_dict,))
|
||||
|
||||
# Make sure (up to clock skew) that the token wasn't issued in the future.
|
||||
earliest = issued_at - CLOCK_SKEW_SECS
|
||||
if now < earliest:
|
||||
raise AppIdentityError('Token used too early, %d < %d: %s' %
|
||||
(now, earliest, payload_dict))
|
||||
# Make sure (up to clock skew) that the token isn't already expired.
|
||||
latest = expiration + CLOCK_SKEW_SECS
|
||||
if now > latest:
|
||||
raise AppIdentityError('Token used too late, %d > %d: %s' %
|
||||
(now, latest, payload_dict))
|
||||
|
||||
|
||||
def verify_signed_jwt_with_certs(jwt, certs, audience=None):
|
||||
"""Verify a JWT against public certs.
|
||||
|
||||
See http://self-issued.info/docs/draft-jones-json-web-token.html.
|
||||
|
||||
Args:
|
||||
jwt: string, A JWT.
|
||||
certs: dict, Dictionary where values of public keys in PEM format.
|
||||
audience: string, The audience, 'aud', that this JWT should contain. If
|
||||
None then the JWT's 'aud' parameter is not verified.
|
||||
|
||||
Returns:
|
||||
dict, The deserialized JSON payload in the JWT.
|
||||
|
||||
Raises:
|
||||
AppIdentityError: if any checks are failed.
|
||||
"""
|
||||
jwt = _to_bytes(jwt)
|
||||
|
||||
if jwt.count(b'.') != 2:
|
||||
raise AppIdentityError(
|
||||
'Wrong number of segments in token: %s' % (jwt,))
|
||||
|
||||
header, payload, signature = jwt.split(b'.')
|
||||
message_to_sign = header + b'.' + payload
|
||||
signature = _urlsafe_b64decode(signature)
|
||||
|
||||
# Parse token.
|
||||
payload_bytes = _urlsafe_b64decode(payload)
|
||||
try:
|
||||
payload_dict = json.loads(_from_bytes(payload_bytes))
|
||||
except:
|
||||
raise AppIdentityError('Can\'t parse token: %s' % (payload_bytes,))
|
||||
|
||||
# Verify that the signature matches the message.
|
||||
_verify_signature(message_to_sign, signature, certs.values())
|
||||
|
||||
# Verify the issued at and created times in the payload.
|
||||
_verify_time_range(payload_dict)
|
||||
|
||||
# Check audience.
|
||||
_check_audience(payload_dict, audience)
|
||||
|
||||
return payload_dict
|
||||
|
||||
Reference in New Issue
Block a user