From 9036d114ed7b08267d99718581854b82b690e8ba Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Fri, 17 Feb 2023 15:17:01 +0000 Subject: [PATCH] signjwt key_type for key-less service account auth --- src/gam/__init__.py | 27 +++++++++----- src/gam/auth/__init__.py | 8 +++- src/gam/auth/signjwt.py | 79 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 11 deletions(-) create mode 100644 src/gam/auth/signjwt.py diff --git a/src/gam/__init__.py b/src/gam/__init__.py index cc2e220a..5dd5289d 100755 --- a/src/gam/__init__.py +++ b/src/gam/__init__.py @@ -51,6 +51,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID import gam.auth.oauth +from gam.auth import signjwt from gam import auth from gam import controlflow from gam import display @@ -925,14 +926,12 @@ def _getSvcAcctData(): controlflow.system_error_exit(6, None) GM_Globals[GM_OAUTH2SERVICE_JSON_DATA] = json.loads(json_string) -jwt_apis = ['chat', - 'cloudresourcemanager', - 'accesscontextmanager'] # APIs which can handle OAuthless JWT tokens def getSvcAcctCredentials(scopes, act_as, api=None): try: _getSvcAcctData() sign_method = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA].get('key_type', 'default') - if act_as or api not in jwt_apis: + if act_as: + # DwD means we need to go about things differently... if sign_method == 'default': credentials = google.oauth2.service_account.Credentials.from_service_account_info( GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]) @@ -940,6 +939,10 @@ def getSvcAcctCredentials(scopes, act_as, api=None): yksigner = yubikey.YubiKey(GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]) credentials = google.oauth2.service_account.Credentials._from_signer_and_info(yksigner, GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]) + elif sign_method == 'signjwt': + sjsigner = signjwt.SignJwt(GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]) + credentials = signjwt.Credentials._from_signer_and_info(sjsigner.sign, + GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]) credentials = credentials.with_scopes(scopes) if act_as: credentials = credentials.with_subject(act_as) @@ -953,6 +956,11 @@ def getSvcAcctCredentials(scopes, act_as, api=None): credentials = JWTCredentials._from_signer_and_info(yksigner, GM_Globals[GM_OAUTH2SERVICE_JSON_DATA], audience=audience) + elif sign_method == 'signjwt': + sjsigner = signjwt.SignJwt(GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]) + credentials = signjwt.JWTCredentials._from_signer_and_info(sjsigner, + GM_Globals[GM_OAUTH2SERVICE_JSON_DATA], + audience=audience) credentials.project_id = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['project_id'] GM_Globals[GM_OAUTH2SERVICE_ACCOUNT_CLIENT_ID] = GM_Globals[ GM_OAUTH2SERVICE_JSON_DATA]['client_id'] @@ -1315,12 +1323,10 @@ def doCheckServiceAccount(users): 'Invalid private key in oauth2service.json. Please delete the file and then\nrecreate with "gam create project" or "gam use project"' ) key_type = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA].get('key_type', 'default') - if key_type == 'yubikey': - printPassFail('Skipping age check. YubiKey rotation not necessary.', test_pass) - else: + if key_type == 'default': print( 'Checking key age. Google recommends rotating keys on a routine basis...' - ) + ) try: iam = buildGAPIServiceObject('iam', None) project = GM_Globals[GM_OAUTH2SERVICE_ACCOUNT_CLIENT_ID] @@ -1346,6 +1352,8 @@ def doCheckServiceAccount(users): key_days = 'UNKNOWN' print('Unable to check key age, please run "gam update project"') printPassFail(f'Key is {key_days} days old', key_age_result) + else: + printPassFail(f'Skipping age check. {key_type} rotation not necessary.', test_pass) if not check_scopes: for _, scopes in list(API_SCOPE_MAPPING.items()): for scope in scopes: @@ -7824,8 +7832,7 @@ def doShowServiceAccountKeys(): else: controlflow.invalid_argument_exit(myarg, 'gam show sakeys') name = f'projects/-/serviceAccounts/{GM_Globals[GM_OAUTH2SERVICE_ACCOUNT_CLIENT_ID]}' - currentPrivateKeyId = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA][ - 'private_key_id'] + currentPrivateKeyId = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA].get('private_key_id') keys = gapi.get_items(iam.projects().serviceAccounts().keys(), 'list', 'keys', diff --git a/src/gam/auth/__init__.py b/src/gam/auth/__init__.py index a2de8752..5d4206a8 100644 --- a/src/gam/auth/__init__.py +++ b/src/gam/auth/__init__.py @@ -9,6 +9,7 @@ import gam from gam import utils from gam.auth import oauth +from gam.auth import signjwt from gam.var import _FN_OAUTH2_TXT from gam.var import _FN_OAUTH2SERVICE_JSON from gam.var import GC_OAUTH2_TXT @@ -40,7 +41,7 @@ def get_admin_credentials(api=None): with open(credential_file) as f: creds_data = json.load(f) # Validate that enable DASA matches content of authorization file - if GC_Values[GC_ENABLE_DASA] and 'private_key_id' in creds_data: + if GC_Values[GC_ENABLE_DASA] and 'key_type' in creds_data: audience = f'https://{api}.googleapis.com/' key_type = creds_data.get('key_type', 'default') if key_type == 'default': @@ -51,6 +52,11 @@ def get_admin_credentials(api=None): return JWTCredentials._from_signer_and_info(yksigner, creds_data, audience=audience) + elif key_type == 'signjwt': + sjsigner = signjwt.SignJwt(creds_data) + return signjwt.JWTCredentials._from_signer_and_info(sjsigner, + creds_data, + audience=audience) elif not GC_Values[GC_ENABLE_DASA] and 'token' in creds_data: return oauth.Credentials.from_credentials_file(credential_file) else: diff --git a/src/gam/auth/signjwt.py b/src/gam/auth/signjwt.py new file mode 100644 index 00000000..d76a0d65 --- /dev/null +++ b/src/gam/auth/signjwt.py @@ -0,0 +1,79 @@ +''' Use Google Application Default Credentials ''' +import datetime +import json + +from google.auth import _helpers, default +import google.oauth2.service_account +from googleapiclient.discovery import build + +from gam import gapi + +_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds +_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" + + +class JWTCredentials(google.auth.jwt.Credentials): + ''' Class used for DASA ''' + def _make_jwt(self): + now = _helpers.utcnow() + lifetime = datetime.timedelta(seconds=self._token_lifetime) + expiry = now + lifetime + payload = { + "iss": self._issuer, + "sub": self._subject, + "iat": _helpers.datetime_to_secs(now), + "exp": _helpers.datetime_to_secs(expiry), + } + if self._audience: + payload["aud"] = self._audience + payload.update(self._additional_claims) + jwt = self._signer.sign(payload) + return jwt, expiry + + +class Credentials(google.oauth2.service_account.Credentials): + ''' Class used for DwD ''' + + def _make_authorization_grant_assertion(self): + now = _helpers.utcnow() + lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) + expiry = now + lifetime + payload = { + "iat": _helpers.datetime_to_secs(now), + "exp": _helpers.datetime_to_secs(expiry), + "iss": self._service_account_email, + "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, + "scope": _helpers.scopes_to_string(self._scopes or ()), + } + + payload.update(self._additional_claims) + + # The subject can be a user email for domain-wide delegation. + if self._subject: + payload.setdefault("sub", self._subject) + token = self._signer(payload) + return token + + +class SignJwt(google.auth.crypt.Signer): + ''' Signer class for SignJWT ''' + def __init__(self, service_account_info): + self.service_account_email = service_account_info['client_email'] + self.name = f'projects/-/serviceAccounts/{self.service_account_email}' + self._key_id = None + + @property # type: ignore + def key_id(self): + return self._key_id + + + def sign(self, message): + ''' Call IAM Credentials SignJWT API to get our signed JWT ''' + credentials, _ = default() + iamc = build('iamcredentials', 'v1', credentials=credentials) + response = gapi.call(iamc.projects().serviceAccounts(), + 'signJwt', + name=self.name, + body={'payload': json.dumps(message)}) + signed_jwt = response.get('signedJwt') + return signed_jwt