Phase 5b - SetGlobalVariables modernization and single func size reduction

This commit is contained in:
Jay Lee
2026-07-04 14:06:19 -04:00
parent 7d6dffc138
commit 0e1505f210
2 changed files with 802 additions and 761 deletions

View File

@@ -217,6 +217,23 @@ def _getIAMSigner(service_account_info):
return google.auth.iam.Signer(request, credentials, return google.auth.iam.Signer(request, credentials,
service_account_info['client_email']) service_account_info['client_email'])
def _getSigner(service_account_info):
'''Return a signer for the given key_type, or None for default keys.
key_type is read from service_account_info:
- "default": Returns None (caller should use from_service_account_info)
- "yubikey": Returns a YubiKey hardware signer
- "signjwt": Returns an IAM signBlob signer via ADC
'''
key_type = service_account_info.get('key_type', 'default')
if key_type == 'default':
return None
if key_type == 'yubikey':
return yubikey.YubiKey(service_account_info)
if key_type == 'signjwt':
return _getIAMSigner(service_account_info)
return None
def handleOAuthTokenError(e, softErrors, displayError=False, i=0, count=0): def handleOAuthTokenError(e, softErrors, displayError=False, i=0, count=0):
errMsg = str(e).replace('.', '') errMsg = str(e).replace('.', '')
if ((errMsg in API.OAUTH2_TOKEN_ERRORS) or if ((errMsg in API.OAUTH2_TOKEN_ERRORS) or
@@ -258,15 +275,10 @@ def getOauth2TxtCredentials(exitOnError=True, api=None, noDASA=False, refreshOnl
jsonDict = json.loads(jsonData) jsonDict = json.loads(jsonData)
api, _, _ = API.getVersion(api) api, _, _ = API.getVersion(api)
audience = f'https://{api}.googleapis.com/' audience = f'https://{api}.googleapis.com/'
key_type = jsonDict.get('key_type', 'default') signer = _getSigner(jsonDict)
if key_type == 'default': if signer is None:
return (True, JWTCredentials.from_service_account_info(jsonDict, audience=audience)) return (True, JWTCredentials.from_service_account_info(jsonDict, audience=audience))
if key_type == 'yubikey': return (True, JWTCredentials._from_signer_and_info(signer, jsonDict, audience=audience))
yksigner = yubikey.YubiKey(jsonDict)
return (True, JWTCredentials._from_signer_and_info(yksigner, jsonDict, audience=audience))
if key_type == 'signjwt':
sjsigner = _getIAMSigner(jsonDict)
return (True, JWTCredentials._from_signer_and_info(sjsigner, jsonDict, audience=audience))
except (IndexError, KeyError, SyntaxError, TypeError, ValueError) as e: except (IndexError, KeyError, SyntaxError, TypeError, ValueError) as e:
invalidOauth2serviceJsonExit(str(e)) invalidOauth2serviceJsonExit(str(e))
invalidOauth2serviceJsonExit(Msg.NO_DATA) invalidOauth2serviceJsonExit(Msg.NO_DATA)
@@ -615,19 +627,14 @@ def getSvcAcctCredentials(scopesOrAPI, userEmail, softErrors=False, forceOauth=F
else: else:
GM.Globals[GM.CURRENT_SVCACCT_API] = '' GM.Globals[GM.CURRENT_SVCACCT_API] = ''
GM.Globals[GM.CURRENT_SVCACCT_API_SCOPES] = scopesOrAPI GM.Globals[GM.CURRENT_SVCACCT_API_SCOPES] = scopesOrAPI
key_type = GM.Globals[GM.OAUTH2SERVICE_JSON_DATA].get('key_type', 'default') svcacct_info = GM.Globals[GM.OAUTH2SERVICE_JSON_DATA]
signer = _getSigner(svcacct_info)
if not GM.Globals[GM.CURRENT_SVCACCT_API] or scopesOrAPI not in API.JWT_APIS or forceOauth: if not GM.Globals[GM.CURRENT_SVCACCT_API] or scopesOrAPI not in API.JWT_APIS or forceOauth:
try: try:
if key_type == 'default': if signer is None:
credentials = google.oauth2.service_account.Credentials.from_service_account_info(GM.Globals[GM.OAUTH2SERVICE_JSON_DATA]) credentials = google.oauth2.service_account.Credentials.from_service_account_info(svcacct_info)
elif key_type == 'yubikey': else:
yksigner = yubikey.YubiKey(GM.Globals[GM.OAUTH2SERVICE_JSON_DATA]) credentials = google.oauth2.service_account.Credentials._from_signer_and_info(signer, svcacct_info)
credentials = google.oauth2.service_account.Credentials._from_signer_and_info(yksigner,
GM.Globals[GM.OAUTH2SERVICE_JSON_DATA])
elif key_type == 'signjwt':
sjsigner = _getIAMSigner(GM.Globals[GM.OAUTH2SERVICE_JSON_DATA])
credentials = google.oauth2.service_account.Credentials._from_signer_and_info(sjsigner,
GM.Globals[GM.OAUTH2SERVICE_JSON_DATA])
except (ValueError, IndexError, KeyError) as e: except (ValueError, IndexError, KeyError) as e:
if softErrors: if softErrors:
return None return None
@@ -636,19 +643,10 @@ def getSvcAcctCredentials(scopesOrAPI, userEmail, softErrors=False, forceOauth=F
else: else:
audience = f'https://{scopesOrAPI}.googleapis.com/' audience = f'https://{scopesOrAPI}.googleapis.com/'
try: try:
if key_type == 'default': if signer is None:
credentials = JWTCredentials.from_service_account_info(GM.Globals[GM.OAUTH2SERVICE_JSON_DATA], credentials = JWTCredentials.from_service_account_info(svcacct_info, audience=audience)
audience=audience) else:
elif key_type == 'yubikey': credentials = JWTCredentials._from_signer_and_info(signer, svcacct_info, audience=audience)
yksigner = yubikey.YubiKey(GM.Globals[GM.OAUTH2SERVICE_JSON_DATA])
credentials = JWTCredentials._from_signer_and_info(yksigner,
GM.Globals[GM.OAUTH2SERVICE_JSON_DATA],
audience=audience)
elif key_type == 'signjwt':
sjsigner = _getIAMSigner(GM.Globals[GM.OAUTH2SERVICE_JSON_DATA])
credentials = JWTCredentials._from_signer_and_info(sjsigner,
GM.Globals[GM.OAUTH2SERVICE_JSON_DATA],
audience=audience)
credentials.project_id = GM.Globals[GM.OAUTH2SERVICE_JSON_DATA]['project_id'] credentials.project_id = GM.Globals[GM.OAUTH2SERVICE_JSON_DATA]['project_id']
except (ValueError, IndexError, KeyError) as e: except (ValueError, IndexError, KeyError) as e:
if softErrors: if softErrors:

File diff suppressed because it is too large Load Diff