Phase 1: Replace _getMain() with direct util/ imports

This commit is contained in:
Jay Lee
2026-07-03 13:40:00 -04:00
parent c308df4fd4
commit 2666602ca4
84 changed files with 15668 additions and 12557 deletions

View File

@@ -32,6 +32,49 @@ from gamlib import glgapi as GAPI
from gamlib import glglobals as GM
from gamlib import glindent
from gamlib import glmsgs as Msg
from gam.util.api import (
buildGAPIObject,
callGAPI,
getClientCredentials,
getHttpObj,
getOauth2TxtCredentials,
shortenURL,
writeClientCredentials,
)
from gam.util.args import (
ISOformatTimeStamp,
UTF8,
YYYYMMDDTHHMMSSZ_FORMAT,
checkForExtraneousArguments,
getArgument,
getEmailAddress,
getString,
)
from gam.util.display import (
entityActionNotPerformedWarning,
entityActionPerformed,
entityModifierNewValueActionPerformed,
printBlankLine,
printEntity,
printKeyValueList,
)
from gam.util.errors import (
CLIENT_SECRETS_JSON_REQUIRED_RC,
Cmd,
entityActionFailedExit,
invalidChoiceExit,
invalidClientSecretsJsonExit,
invalidOauth2TxtExit,
unknownArgumentExit,
)
from gam.util.fileio import DEFAULT_FILE_READ_MODE, deleteFile, readFile
from gam.util.output import (
Ind,
readStdin,
stderrErrorMsg,
systemErrorExit,
writeStdout,
)
Act = glaction.GamAction()
Ent = glentity.GamEntity()
@@ -59,9 +102,9 @@ def _getValidateLoginHint(login_hint, projectId=None):
while True:
if not login_hint:
if not projectId:
login_hint = _getMain().readStdin(Msg.ENTER_GSUITE_ADMIN_EMAIL_ADDRESS).strip()
login_hint = readStdin(Msg.ENTER_GSUITE_ADMIN_EMAIL_ADDRESS).strip()
else:
login_hint = _getMain().readStdin(Msg.ENTER_MANAGE_GCP_PROJECT_EMAIL_ADDRESS.format(projectId)).strip()
login_hint = readStdin(Msg.ENTER_MANAGE_GCP_PROJECT_EMAIL_ADDRESS.format(projectId)).strip()
if login_hint.find('@') == -1 and GC.Values[GC.DOMAIN]:
login_hint = f'{login_hint}@{GC.Values[GC.DOMAIN]}'
if VALIDEMAIL_PATTERN.match(login_hint):
@@ -70,16 +113,16 @@ def _getValidateLoginHint(login_hint, projectId=None):
login_hint = None
def getOAuthClientIDAndSecret():
cs_data = _getMain().readFile(GC.Values[GC.CLIENT_SECRETS_JSON], continueOnError=True, displayError=True)
cs_data = readFile(GC.Values[GC.CLIENT_SECRETS_JSON], continueOnError=True, displayError=True)
if not cs_data:
_getMain().invalidClientSecretsJsonExit(Msg.NO_DATA)
invalidClientSecretsJsonExit(Msg.NO_DATA)
try:
cs_json = json.loads(cs_data)
if not cs_json:
_getMain().systemErrorExit(_getMain().CLIENT_SECRETS_JSON_REQUIRED_RC, Msg.NO_CLIENT_ACCESS_CREATE_UPDATE_ALLOWED)
systemErrorExit(CLIENT_SECRETS_JSON_REQUIRED_RC, Msg.NO_CLIENT_ACCESS_CREATE_UPDATE_ALLOWED)
return (cs_json['installed']['client_id'], cs_json['installed']['client_secret'])
except (IndexError, KeyError, SyntaxError, TypeError, ValueError) as e:
_getMain().invalidClientSecretsJsonExit(str(e))
invalidClientSecretsJsonExit(str(e))
def getScopesFromUser(scopesList, clientAccess, currentScopes=None):
OAUTH2_CMDS = ['s', 'u', 'e', 'c']
@@ -106,7 +149,7 @@ Continue to authorization by entering a 'c'
if currentScopes is None and clientAccess:
lock = FileLock(GM.Globals[GM.OAUTH2_TXT_LOCK], mode=GC.Values[GC.OAUTH2_TXT_LOCK_MODE])
with lock:
_, credentials = _getMain().getOauth2TxtCredentials(exitOnError=False)
_, credentials = getOauth2TxtCredentials(exitOnError=False)
if credentials and credentials.scopes is not None:
currentScopes = sorted(credentials.scopes)
if currentScopes is not None:
@@ -168,7 +211,7 @@ Continue to authorization by entering a 'c'
os.system(['clear', 'cls'][sys.platform.startswith('win')])
sys.stdout.write(menu % tuple(selectedScopes))
while True:
choice = _getMain().readStdin(prompt)
choice = readStdin(prompt)
if choice:
selection = choice.lower()
if selection.find('r') >= 0:
@@ -223,8 +266,8 @@ Continue to authorization by entering a 'c'
if numSelectedScopes <= API.NUM_CLIENT_SCOPES_ERROR_LIMIT:
break
# If number of scopes is > 48 we'll probably get an error
_getMain().writeStdout(Msg.NUM_SELECTED_CLIENT_SCOPES.format(numSelectedScopes, API.NUM_CLIENT_SCOPES_ERROR_LIMIT))
choice = _getMain().readStdin('\nPlease enter c to continue to authorization or any other key to amend selection: ')
writeStdout(Msg.NUM_SELECTED_CLIENT_SCOPES.format(numSelectedScopes, API.NUM_CLIENT_SCOPES_ERROR_LIMIT))
choice = readStdin('\nPlease enter c to continue to authorization or any other key to amend selection: ')
if choice and choice.lower() == 'c':
break
else:
@@ -280,8 +323,8 @@ def _waitForHttpClient(d):
local_server.server_close()
def _waitForUserInput(d):
sys.stdin = open(0, _getMain().DEFAULT_FILE_READ_MODE, encoding=_getMain().UTF8)
d['code'] = _getMain().readStdin(Msg.ENTER_VERIFICATION_CODE_OR_URL)
sys.stdin = open(0, DEFAULT_FILE_READ_MODE, encoding=UTF8)
d['code'] = readStdin(Msg.ENTER_VERIFICATION_CODE_OR_URL)
class _GamOauthFlow(google_auth_oauthlib.flow.InstalledAppFlow):
def run_dual(self, **kwargs):
@@ -298,7 +341,7 @@ class _GamOauthFlow(google_auth_oauthlib.flow.InstalledAppFlow):
time.sleep(0.1)
self.redirect_uri = d['redirect_uri']
d['auth_url'], _ = super().authorization_url(**kwargs)
d['auth_url'] = _getMain().shortenURL(d['auth_url'])
d['auth_url'] = shortenURL(d['auth_url'])
print(Msg.OAUTH2_GO_TO_LINK_MESSAGE.format(url=d['auth_url']))
userInputProcess.start()
userInput = False
@@ -322,7 +365,7 @@ class _GamOauthFlow(google_auth_oauthlib.flow.InstalledAppFlow):
checkUser = False
alive -= 1
if 'code' not in d:
_getMain().systemErrorExit(_getMain().SYSTEM_ERROR_RC, Msg.AUTHENTICATION_FLOW_FAILED)
systemErrorExit(_getMain().SYSTEM_ERROR_RC, Msg.AUTHENTICATION_FLOW_FAILED)
while True:
code = d['code']
if code.startswith('http'):
@@ -337,8 +380,8 @@ class _GamOauthFlow(google_auth_oauthlib.flow.InstalledAppFlow):
break
except Exception as e:
if not userInput:
_getMain().systemErrorExit(_getMain().INVALID_TOKEN_RC, str(e))
_getMain().stderrErrorMsg(str(e))
systemErrorExit(_getMain().INVALID_TOKEN_RC, str(e))
stderrErrorMsg(str(e))
_waitForUserInput(d)
print(Msg.AUTHENTICATION_FLOW_COMPLETE)
return self.credentials
@@ -455,7 +498,7 @@ class Credentials(google.oauth2.credentials.Credentials):
expiry = info.get('token_expiry')
if expiry:
# Convert the raw expiry to datetime
expiry = arrow.Arrow.strptime(expiry, _getMain().YYYYMMDDTHHMMSSZ_FORMAT, tzinfo='UTC').naive
expiry = arrow.Arrow.strptime(expiry, YYYYMMDDTHHMMSSZ_FORMAT, tzinfo='UTC').naive
id_token_data = info.get('decoded_id_token')
# Provide backwards compatibility with field names when loading from _getMain().JSON.
@@ -480,7 +523,7 @@ class Credentials(google.oauth2.credentials.Credentials):
# Add properties which are not exported with the native to_json() output.
info['id_token'] = credentials.id_token
if credentials.expiry:
info['token_expiry'] = credentials.expiry.strftime(_getMain().YYYYMMDDTHHMMSSZ_FORMAT)
info['token_expiry'] = credentials.expiry.strftime(YYYYMMDDTHHMMSSZ_FORMAT)
info['quota_project_id'] = credentials.quota_project_id
return cls.from_authorized_user_info_gam(info, filename=filename)
@@ -549,7 +592,7 @@ class Credentials(google.oauth2.credentials.Credentials):
str: A _getMain().JSON representation of this instance, suitable to pass to
from_json().
"""
expiry = self.expiry.strftime(_getMain().YYYYMMDDTHHMMSSZ_FORMAT) if self.expiry else None
expiry = self.expiry.strftime(YYYYMMDDTHHMMSSZ_FORMAT) if self.expiry else None
prep = {
'token': self.token,
'refresh_token': self.refresh_token,
@@ -609,28 +652,28 @@ def doOAuthRequest(currentScopes, login_hint, verifyScopes=False):
open_browser=not GC.Values[GC.NO_BROWSER])
lock = FileLock(GM.Globals[GM.OAUTH2_TXT_LOCK], mode=GC.Values[GC.OAUTH2_TXT_LOCK_MODE])
with lock:
_getMain().writeClientCredentials(credentials, GC.Values[GC.OAUTH2_TXT])
_getMain().entityActionPerformed([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]])
writeClientCredentials(credentials, GC.Values[GC.OAUTH2_TXT])
entityActionPerformed([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]])
return True
# gam oauth|oauth2 create|request [<EmailAddress>]
# gam oauth|oauth2 create|request [admin <EmailAddress>] [scope|scopes <APIScopeURLList>]
def doOAuthCreate():
Cmd = _getMain().Cmd
Cmd = Cmd
if not Cmd.PeekArgumentPresent(['admin', 'scope', 'scopes']):
login_hint = _getMain().getEmailAddress(noUid=True, optional=True)
login_hint = getEmailAddress(noUid=True, optional=True)
scopes = None
_getMain().checkForExtraneousArguments()
checkForExtraneousArguments()
else:
login_hint = None
scopes = []
scopesList = API.getClientScopesList(GC.Values[GC.COMMANDDATA_CLIENTACCESS], GC.Values[GC.TODRIVE_CLIENTACCESS])
while Cmd.ArgumentsRemaining():
myarg = _getMain().getArgument()
myarg = getArgument()
if myarg == 'admin':
login_hint = _getMain().getEmailAddress(noUid=True)
login_hint = getEmailAddress(noUid=True)
elif myarg in {'scope', 'scopes'}:
for uscope in _getMain().getString(Cmd.OB_API_SCOPE_URL_LIST).lower().replace(',', ' ').split():
for uscope in getString(Cmd.OB_API_SCOPE_URL_LIST).lower().replace(',', ' ').split():
if uscope in {'openid', 'email', API.USERINFO_EMAIL_SCOPE, 'profile', API.USERINFO_PROFILE_SCOPE}:
continue
for scope in scopesList:
@@ -641,27 +684,27 @@ def doOAuthCreate():
scopes.append(uscope)
break
else:
_getMain().invalidChoiceExit(uscope,
invalidChoiceExit(uscope,
API.getClientScopesURLs(GC.Values[GC.COMMANDDATA_CLIENTACCESS], GC.Values[GC.TODRIVE_CLIENTACCESS]),
True)
else:
_getMain().unknownArgumentExit()
unknownArgumentExit()
if len(scopes) == 0:
scopes = None
doOAuthRequest(scopes, login_hint)
def exitIfNoOauth2Txt():
if not os.path.isfile(GC.Values[GC.OAUTH2_TXT]):
_getMain().entityActionNotPerformedWarning([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]], Msg.DOES_NOT_EXIST)
entityActionNotPerformedWarning([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]], Msg.DOES_NOT_EXIST)
sys.exit(GM.Globals[GM.SYSEXITRC])
# gam oauth|oauth2 delete|revoke
def doOAuthDelete():
_getMain().checkForExtraneousArguments()
checkForExtraneousArguments()
exitIfNoOauth2Txt()
lock = FileLock(GM.Globals[GM.OAUTH2_TXT_LOCK], mode=GC.Values[GC.OAUTH2_TXT_LOCK_MODE], timeout=10)
with lock:
_, credentials = _getMain().getOauth2TxtCredentials(noScopes=True)
_, credentials = getOauth2TxtCredentials(noScopes=True)
if not credentials:
return
entityType = Ent.OAUTH2_TXT_FILE
@@ -677,80 +720,80 @@ def doOAuthDelete():
time.sleep(1)
sys.stdout.write('boom!\n')
sys.stdout.flush()
httpObj = _getMain().getHttpObj()
httpObj = getHttpObj()
params = {'token': credentials.refresh_token}
revoke_uri = f'https://accounts.google.com/o/oauth2/revoke?{urlencode(params)}'
httpObj.request(revoke_uri, 'GET')
_getMain().deleteFile(GC.Values[GC.OAUTH2_TXT], continueOnError=True)
_getMain().entityActionPerformed([entityType, entityName])
deleteFile(GC.Values[GC.OAUTH2_TXT], continueOnError=True)
entityActionPerformed([entityType, entityName])
# gam oauth|oauth2 info|verify [showsecret] [accesstoken <AccessToken> idtoken <IDToken>] [showdetails]
def doOAuthInfo():
Cmd = _getMain().Cmd
Ind = _getMain().Ind
Cmd = Cmd
Ind = Ind
credentials = access_token = id_token = None
showDetails = showSecret = False
while Cmd.ArgumentsRemaining():
myarg = _getMain().getArgument()
myarg = getArgument()
if myarg == 'accesstoken':
access_token = _getMain().getString(Cmd.OB_ACCESS_TOKEN)
access_token = getString(Cmd.OB_ACCESS_TOKEN)
elif myarg == 'idtoken':
id_token = _getMain().getString(Cmd.OB_ID_TOKEN)
id_token = getString(Cmd.OB_ID_TOKEN)
elif myarg == 'showdetails':
showDetails = True
elif myarg == 'showsecret':
showSecret = True
else:
_getMain().unknownArgumentExit()
unknownArgumentExit()
exitIfNoOauth2Txt()
if not access_token and not id_token:
credentials = _getMain().getClientCredentials(noScopes=True)
credentials = getClientCredentials(noScopes=True)
access_token = credentials.token
_getMain().printEntity([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]])
oa2 = _getMain().buildGAPIObject(API.OAUTH2)
printEntity([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]])
oa2 = buildGAPIObject(API.OAUTH2)
try:
token_info = _getMain().callGAPI(oa2, 'tokeninfo',
token_info = callGAPI(oa2, 'tokeninfo',
throwReasons=[GAPI.INVALID],
access_token=access_token, id_token=id_token)
except GAPI.invalid as e:
_getMain().entityActionFailedExit([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]], str(e))
entityActionFailedExit([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]], str(e))
if 'issued_to' in token_info:
_getMain().printKeyValueList(['Client ID', token_info['issued_to']])
printKeyValueList(['Client ID', token_info['issued_to']])
if credentials is not None and showSecret:
_getMain().printKeyValueList(['Secret', credentials.client_secret])
printKeyValueList(['Secret', credentials.client_secret])
if 'scope' in token_info:
scopes = token_info['scope'].split(' ')
_getMain().printKeyValueList(['Scopes', len(scopes)])
printKeyValueList(['Scopes', len(scopes)])
Ind.Increment()
for scope in sorted(scopes):
_getMain().printKeyValueList([scope])
printKeyValueList([scope])
Ind.Decrement()
if 'email' in token_info:
_getMain().printKeyValueList(['Google Workspace Admin', f'{token_info["email"]}'])
printKeyValueList(['Google Workspace Admin', f'{token_info["email"]}'])
if 'expires_in' in token_info:
_getMain().printKeyValueList(['Expires', _getMain().ISOformatTimeStamp(arrow.now(GC.Values[GC.TIMEZONE]).shift(seconds=token_info['expires_in']))])
printKeyValueList(['Expires', ISOformatTimeStamp(arrow.now(GC.Values[GC.TIMEZONE]).shift(seconds=token_info['expires_in']))])
if showDetails:
for k, v in sorted(token_info.items()):
if k not in ['email', 'expires_in', 'issued_to', 'scope']:
_getMain().printKeyValueList([k, v])
_getMain().printBlankLine()
printKeyValueList([k, v])
printBlankLine()
# gam oauth|oauth2 update [<EmailAddress>]
# gam oauth|oauth2 update [admin <EmailAddress>]
def doOAuthUpdate():
Cmd = _getMain().Cmd
Cmd = Cmd
if Cmd.PeekArgumentPresent(['admin']):
Cmd.Advance()
login_hint = _getMain().getEmailAddress(noUid=True)
login_hint = getEmailAddress(noUid=True)
else:
login_hint = _getMain().getEmailAddress(noUid=True, optional=True)
_getMain().checkForExtraneousArguments()
login_hint = getEmailAddress(noUid=True, optional=True)
checkForExtraneousArguments()
exitIfNoOauth2Txt()
lock = FileLock(GM.Globals[GM.OAUTH2_TXT_LOCK], mode=GC.Values[GC.OAUTH2_TXT_LOCK_MODE])
with lock:
jsonData = _getMain().readFile(GC.Values[GC.OAUTH2_TXT], continueOnError=True, displayError=False)
jsonData = readFile(GC.Values[GC.OAUTH2_TXT], continueOnError=True, displayError=False)
if not jsonData:
_getMain().invalidOauth2TxtExit(Msg.NO_DATA)
invalidOauth2TxtExit(Msg.NO_DATA)
try:
jsonDict = json.loads(jsonData)
if 'client_id' in jsonDict:
@@ -761,26 +804,26 @@ def doOAuthUpdate():
else:
currentScopes = []
except (AttributeError, IndexError, KeyError, SyntaxError, TypeError, ValueError) as e:
_getMain().invalidOauth2TxtExit(str(e))
invalidOauth2TxtExit(str(e))
if not doOAuthRequest(currentScopes, login_hint, verifyScopes=True):
_getMain().entityActionNotPerformedWarning([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]], Msg.USER_CANCELLED)
entityActionNotPerformedWarning([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]], Msg.USER_CANCELLED)
sys.exit(GM.Globals[GM.SYSEXITRC])
# gam oauth|oauth2 refresh
def doOAuthRefresh():
_getMain().checkForExtraneousArguments()
checkForExtraneousArguments()
exitIfNoOauth2Txt()
_getMain().getClientCredentials(forceRefresh=True, forceWrite=True, filename=GC.Values[GC.OAUTH2_TXT], refreshOnly=True)
_getMain().entityActionPerformed([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]])
getClientCredentials(forceRefresh=True, forceWrite=True, filename=GC.Values[GC.OAUTH2_TXT], refreshOnly=True)
entityActionPerformed([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]])
# gam oauth|oauth2 export [<FileName>]
def doOAuthExport():
Cmd = _getMain().Cmd
Cmd = Cmd
if Cmd.ArgumentsRemaining():
filename = _getMain().getString(Cmd.OB_FILE_NAME)
_getMain().checkForExtraneousArguments()
filename = getString(Cmd.OB_FILE_NAME)
checkForExtraneousArguments()
else:
filename = GC.Values[GC.OAUTH2_TXT]
_getMain().getClientCredentials(forceRefresh=True, forceWrite=True, filename=filename, refreshOnly=True)
getClientCredentials(forceRefresh=True, forceWrite=True, filename=filename, refreshOnly=True)
if filename != '-':
_getMain().entityModifierNewValueActionPerformed([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]], Act.MODIFIER_TO, filename)
entityModifierNewValueActionPerformed([Ent.OAUTH2_TXT_FILE, GC.Values[GC.OAUTH2_TXT]], Act.MODIFIER_TO, filename)