signjwt key_type for key-less service account auth

This commit is contained in:
Jay Lee
2023-02-17 15:17:01 +00:00
parent 75c19104ae
commit 9036d114ed
3 changed files with 103 additions and 11 deletions

View File

@@ -51,6 +51,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID from cryptography.x509.oid import NameOID
import gam.auth.oauth import gam.auth.oauth
from gam.auth import signjwt
from gam import auth from gam import auth
from gam import controlflow from gam import controlflow
from gam import display from gam import display
@@ -925,14 +926,12 @@ def _getSvcAcctData():
controlflow.system_error_exit(6, None) controlflow.system_error_exit(6, None)
GM_Globals[GM_OAUTH2SERVICE_JSON_DATA] = json.loads(json_string) GM_Globals[GM_OAUTH2SERVICE_JSON_DATA] = json.loads(json_string)
jwt_apis = ['chat',
'cloudresourcemanager',
'accesscontextmanager'] # APIs which can handle OAuthless JWT tokens
def getSvcAcctCredentials(scopes, act_as, api=None): def getSvcAcctCredentials(scopes, act_as, api=None):
try: try:
_getSvcAcctData() _getSvcAcctData()
sign_method = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA].get('key_type', 'default') sign_method = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA].get('key_type', 'default')
if act_as or api not in jwt_apis: if act_as:
# DwD means we need to go about things differently...
if sign_method == 'default': if sign_method == 'default':
credentials = google.oauth2.service_account.Credentials.from_service_account_info( credentials = google.oauth2.service_account.Credentials.from_service_account_info(
GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]) GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
@@ -940,6 +939,10 @@ def getSvcAcctCredentials(scopes, act_as, api=None):
yksigner = yubikey.YubiKey(GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]) yksigner = yubikey.YubiKey(GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
credentials = google.oauth2.service_account.Credentials._from_signer_and_info(yksigner, credentials = google.oauth2.service_account.Credentials._from_signer_and_info(yksigner,
GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]) GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
elif sign_method == 'signjwt':
sjsigner = signjwt.SignJwt(GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
credentials = signjwt.Credentials._from_signer_and_info(sjsigner.sign,
GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
credentials = credentials.with_scopes(scopes) credentials = credentials.with_scopes(scopes)
if act_as: if act_as:
credentials = credentials.with_subject(act_as) credentials = credentials.with_subject(act_as)
@@ -953,6 +956,11 @@ def getSvcAcctCredentials(scopes, act_as, api=None):
credentials = JWTCredentials._from_signer_and_info(yksigner, credentials = JWTCredentials._from_signer_and_info(yksigner,
GM_Globals[GM_OAUTH2SERVICE_JSON_DATA], GM_Globals[GM_OAUTH2SERVICE_JSON_DATA],
audience=audience) audience=audience)
elif sign_method == 'signjwt':
sjsigner = signjwt.SignJwt(GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
credentials = signjwt.JWTCredentials._from_signer_and_info(sjsigner,
GM_Globals[GM_OAUTH2SERVICE_JSON_DATA],
audience=audience)
credentials.project_id = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['project_id'] credentials.project_id = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['project_id']
GM_Globals[GM_OAUTH2SERVICE_ACCOUNT_CLIENT_ID] = GM_Globals[ GM_Globals[GM_OAUTH2SERVICE_ACCOUNT_CLIENT_ID] = GM_Globals[
GM_OAUTH2SERVICE_JSON_DATA]['client_id'] GM_OAUTH2SERVICE_JSON_DATA]['client_id']
@@ -1315,9 +1323,7 @@ def doCheckServiceAccount(users):
'Invalid private key in oauth2service.json. Please delete the file and then\nrecreate with "gam create project" or "gam use project"' 'Invalid private key in oauth2service.json. Please delete the file and then\nrecreate with "gam create project" or "gam use project"'
) )
key_type = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA].get('key_type', 'default') key_type = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA].get('key_type', 'default')
if key_type == 'yubikey': if key_type == 'default':
printPassFail('Skipping age check. YubiKey rotation not necessary.', test_pass)
else:
print( print(
'Checking key age. Google recommends rotating keys on a routine basis...' 'Checking key age. Google recommends rotating keys on a routine basis...'
) )
@@ -1346,6 +1352,8 @@ def doCheckServiceAccount(users):
key_days = 'UNKNOWN' key_days = 'UNKNOWN'
print('Unable to check key age, please run "gam update project"') print('Unable to check key age, please run "gam update project"')
printPassFail(f'Key is {key_days} days old', key_age_result) printPassFail(f'Key is {key_days} days old', key_age_result)
else:
printPassFail(f'Skipping age check. {key_type} rotation not necessary.', test_pass)
if not check_scopes: if not check_scopes:
for _, scopes in list(API_SCOPE_MAPPING.items()): for _, scopes in list(API_SCOPE_MAPPING.items()):
for scope in scopes: for scope in scopes:
@@ -7824,8 +7832,7 @@ def doShowServiceAccountKeys():
else: else:
controlflow.invalid_argument_exit(myarg, 'gam show sakeys') controlflow.invalid_argument_exit(myarg, 'gam show sakeys')
name = f'projects/-/serviceAccounts/{GM_Globals[GM_OAUTH2SERVICE_ACCOUNT_CLIENT_ID]}' name = f'projects/-/serviceAccounts/{GM_Globals[GM_OAUTH2SERVICE_ACCOUNT_CLIENT_ID]}'
currentPrivateKeyId = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA][ currentPrivateKeyId = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA].get('private_key_id')
'private_key_id']
keys = gapi.get_items(iam.projects().serviceAccounts().keys(), keys = gapi.get_items(iam.projects().serviceAccounts().keys(),
'list', 'list',
'keys', 'keys',

View File

@@ -9,6 +9,7 @@ import gam
from gam import utils from gam import utils
from gam.auth import oauth from gam.auth import oauth
from gam.auth import signjwt
from gam.var import _FN_OAUTH2_TXT from gam.var import _FN_OAUTH2_TXT
from gam.var import _FN_OAUTH2SERVICE_JSON from gam.var import _FN_OAUTH2SERVICE_JSON
from gam.var import GC_OAUTH2_TXT from gam.var import GC_OAUTH2_TXT
@@ -40,7 +41,7 @@ def get_admin_credentials(api=None):
with open(credential_file) as f: with open(credential_file) as f:
creds_data = json.load(f) creds_data = json.load(f)
# Validate that enable DASA matches content of authorization file # Validate that enable DASA matches content of authorization file
if GC_Values[GC_ENABLE_DASA] and 'private_key_id' in creds_data: if GC_Values[GC_ENABLE_DASA] and 'key_type' in creds_data:
audience = f'https://{api}.googleapis.com/' audience = f'https://{api}.googleapis.com/'
key_type = creds_data.get('key_type', 'default') key_type = creds_data.get('key_type', 'default')
if key_type == 'default': if key_type == 'default':
@@ -51,6 +52,11 @@ def get_admin_credentials(api=None):
return JWTCredentials._from_signer_and_info(yksigner, return JWTCredentials._from_signer_and_info(yksigner,
creds_data, creds_data,
audience=audience) audience=audience)
elif key_type == 'signjwt':
sjsigner = signjwt.SignJwt(creds_data)
return signjwt.JWTCredentials._from_signer_and_info(sjsigner,
creds_data,
audience=audience)
elif not GC_Values[GC_ENABLE_DASA] and 'token' in creds_data: elif not GC_Values[GC_ENABLE_DASA] and 'token' in creds_data:
return oauth.Credentials.from_credentials_file(credential_file) return oauth.Credentials.from_credentials_file(credential_file)
else: else:

79
src/gam/auth/signjwt.py Normal file
View File

@@ -0,0 +1,79 @@
''' Use Google Application Default Credentials '''
import datetime
import json
from google.auth import _helpers, default
import google.oauth2.service_account
from googleapiclient.discovery import build
from gam import gapi
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
class JWTCredentials(google.auth.jwt.Credentials):
''' Class used for DASA '''
def _make_jwt(self):
now = _helpers.utcnow()
lifetime = datetime.timedelta(seconds=self._token_lifetime)
expiry = now + lifetime
payload = {
"iss": self._issuer,
"sub": self._subject,
"iat": _helpers.datetime_to_secs(now),
"exp": _helpers.datetime_to_secs(expiry),
}
if self._audience:
payload["aud"] = self._audience
payload.update(self._additional_claims)
jwt = self._signer.sign(payload)
return jwt, expiry
class Credentials(google.oauth2.service_account.Credentials):
''' Class used for DwD '''
def _make_authorization_grant_assertion(self):
now = _helpers.utcnow()
lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
expiry = now + lifetime
payload = {
"iat": _helpers.datetime_to_secs(now),
"exp": _helpers.datetime_to_secs(expiry),
"iss": self._service_account_email,
"aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
"scope": _helpers.scopes_to_string(self._scopes or ()),
}
payload.update(self._additional_claims)
# The subject can be a user email for domain-wide delegation.
if self._subject:
payload.setdefault("sub", self._subject)
token = self._signer(payload)
return token
class SignJwt(google.auth.crypt.Signer):
''' Signer class for SignJWT '''
def __init__(self, service_account_info):
self.service_account_email = service_account_info['client_email']
self.name = f'projects/-/serviceAccounts/{self.service_account_email}'
self._key_id = None
@property # type: ignore
def key_id(self):
return self._key_id
def sign(self, message):
''' Call IAM Credentials SignJWT API to get our signed JWT '''
credentials, _ = default()
iamc = build('iamcredentials', 'v1', credentials=credentials)
response = gapi.call(iamc.projects().serviceAccounts(),
'signJwt',
name=self.name,
body={'payload': json.dumps(message)})
signed_jwt = response.get('signedJwt')
return signed_jwt