Lookup service account emails by ID

This commit is contained in:
Jay Lee
2024-06-25 16:25:10 +00:00
parent 793f230c30
commit 53f40eb9eb
4 changed files with 208 additions and 11 deletions

View File

@@ -5485,7 +5485,8 @@ def buildGAPIObject(api, credentials=None):
API_Scopes = set(API.VAULT_SCOPES) if api == API.VAULT else set()
GM.Globals[GM.CURRENT_CLIENT_API] = api
GM.Globals[GM.CURRENT_CLIENT_API_SCOPES] = API_Scopes.intersection(GM.Globals[GM.CREDENTIALS_SCOPES])
if api not in {API.OAUTH2, API.CHROMEVERSIONHISTORY} and not GM.Globals[GM.CURRENT_CLIENT_API_SCOPES]:
scopeless_apis = {API.OAUTH2, API.CHROMEVERSIONHISTORY, API.SERVICEACCOUNTLOOKUP}
if api not in scopeless_apis and not GM.Globals[GM.CURRENT_CLIENT_API_SCOPES]:
systemErrorExit(NO_SCOPES_FOR_API_RC, Msg.NO_SCOPES_FOR_API.format(API.getAPIName(api)))
if not GC.Values[GC.DOMAIN]:
GC.Values[GC.DOMAIN] = GM.Globals[GM.DECODED_ID_TOKEN].get('hd', 'UNKNOWN').lower()
@@ -5624,7 +5625,7 @@ def getGroupEmailFromID(uid, cd):
return None
# Convert UID to email address and type
def convertUIDtoEmailAddressWithType(emailAddressOrUID, cd=None, emailTypes=None,
def convertUIDtoEmailAddressWithType(emailAddressOrUID, cd=None, sal=None, emailTypes=None,
checkForCustomerId=False, ciGroupsAPI=False, aliasAllowed=True):
if emailTypes is None:
emailTypes = ['user']
@@ -5675,8 +5676,36 @@ def convertUIDtoEmailAddressWithType(emailAddressOrUID, cd=None, emailTypes=None
return (result['resourceEmail'].lower(), 'resource')
except (GAPI.badRequest, GAPI.resourceNotFound, GAPI.forbidden):
pass
if 'serviceaccount' in emailTypes:
if sal is None:
sal = buildGAPIObject(API.SERVICEACCOUNTLOOKUP)
uid = getServiceAccountEmailFromID(normalizedEmailAddressOrUID, sal)
if uid:
return (uid, 'serviceaccount')
return (normalizedEmailAddressOrUID, 'unknown')
def getServiceAccountEmailFromID(account_id, sal=None):
if sal is None:
sal = buildGAPIObject('serviceaccountlookup')
throwReasons = [GAPI.BAD_REQUEST,
GAPI.RESOURCE_NOT_FOUND,
GAPI.INVALID_ARGUMENT]
try:
certs = callGAPI(sal.serviceaccounts(),
'lookup',
account=account_id,
throwReasons=throwReasons)
except (GAPI.badRequest, GAPI.resourceNotFound, GAPI.invalidArgument):
return
sa_cn_rx = r'CN=.*\.gserviceaccount\.com$'
sa_emails = []
for kid, raw_cert in certs.items():
cert = x509.load_pem_x509_certificate(raw_cert.encode(), default_backend())
subject = cert.issuer.rfc4514_string()
if re.match(sa_cn_rx, subject):
sa_emails.append(subject[3:])
return ' or '.join(sa_emails)
# Convert UID to email address
def convertUIDtoEmailAddress(emailAddressOrUID, cd=None, emailTypes=None,
checkForCustomerId=False, ciGroupsAPI=False, aliasAllowed=True):
@@ -16177,6 +16206,9 @@ def doCreateUpdateAdminRoles():
body['rolePrivileges'].append({'privilegeName': p, 'serviceId': ouPrivileges[p]})
elif p in childPrivileges:
body['rolePrivileges'].append({'privilegeName': p, 'serviceId': childPrivileges[p]})
elif ':' in p:
priv, serv = p.split(':')
body['rolePrivileges'].append({'privilegeName': priv, 'serviceId': serv.lower()})
else:
invalidChoiceExit(p, list(allPrivileges.keys())+list(ouPrivileges.keys())+list(childPrivileges.keys()), True)
elif myarg == 'description':
@@ -16255,7 +16287,10 @@ def doInfoAdminRole():
fields = ','.join(set(fieldsList))
try:
role = callGAPI(cd.roles(), 'get',
throwReasons=[GAPI.BAD_REQUEST, GAPI.CUSTOMER_NOT_FOUND, GAPI.FORBIDDEN]+[GAPI.NOT_FOUND, GAPI.FAILED_PRECONDITION],
throwReasons=[GAPI.BAD_REQUEST,
GAPI.CUSTOMER_NOT_FOUND,
GAPI.FORBIDDEN]+[GAPI.NOT_FOUND,
GAPI.FAILED_PRECONDITION],
customer=GC.Values[GC.CUSTOMER_ID], roleId=roleId, fields=fields)
role.setdefault('isSuperAdminRole', False)
role.setdefault('isSystemRole', False)
@@ -16414,8 +16449,13 @@ def doPrintShowAdmins():
if roleId not in rolePrivileges:
try:
rolePrivileges[roleId] = callGAPI(cd.roles(), 'get',
throwReasons=[GAPI.BAD_REQUEST, GAPI.CUSTOMER_NOT_FOUND, GAPI.FORBIDDEN]+[GAPI.NOT_FOUND, GAPI.FAILED_PRECONDITION],
customer=GC.Values[GC.CUSTOMER_ID], roleId=roleId, fields='rolePrivileges')
throwReasons=[GAPI.BAD_REQUEST,
GAPI.CUSTOMER_NOT_FOUND,
GAPI.FORBIDDEN]+[GAPI.NOT_FOUND,
GAPI.FAILED_PRECONDITION],
customer=GC.Values[GC.CUSTOMER_ID],
roleId=roleId,
fields='rolePrivileges')
except (GAPI.notFound, GAPI.forbidden, GAPI.failedPrecondition) as e:
entityActionFailedExit([Ent.USER, userKey, Ent.ADMIN_ROLE, admin['roleId']], str(e))
rolePrivileges[roleId] = None
@@ -16432,14 +16472,22 @@ def doPrintShowAdmins():
assignedToField = 'assignedToUser'
elif assigneeType == 'group':
assignedToField = 'assignedToGroup'
elif assigneeType == 'serviceaccount':
assignedToField = 'assignedToServiceAccount'
else:
assignedToField = None
assigneeEmail, assigneeType = convertUIDtoEmailAddressWithType(f'uid:{assignedTo}', cd, emailTypes=['user', 'group'])
if not assignedToField and assigneeType in ['user', 'group']:
emailTypes = ['user', 'group', 'serviceaccount']
assigneeEmail, assigneeType = convertUIDtoEmailAddressWithType(f'uid:{assignedTo}',
cd,
sal,
emailTypes=emailTypes)
if not assignedToField and assigneeType in ['user', 'group', 'serviceaccount']:
if assigneeType == 'user':
assignedToField = 'assignedToUser'
else:
elif assigneeType == 'group':
assignedToField = 'assignedToGroup'
elif assigneeType == 'serviceaccount':
assignedToField = 'assignedToServiceAccount'
assignedToIdEmailMap[assignedTo] = {'assignedToField': assignedToField, 'assigneeEmail': assigneeEmail}
assignedToField = assignedToIdEmailMap[assignedTo]['assignedToField']
if assignedToField:
@@ -16455,6 +16503,7 @@ def doPrintShowAdmins():
admin['condition'] = 'nonsecuritygroup'
cd = buildGAPIObject(API.DIRECTORY)
sal = buildGAPIObject(API.SERVICEACCOUNTLOOKUP)
csvPF = CSVPrintFile(PRINT_ADMIN_TITLES) if Act.csvFormat() else None
roleId = None
userKey = None
@@ -25673,10 +25722,10 @@ def _getChatMemberEmail(cd, member):
if 'member' in member:
if member['member']['type'] == 'HUMAN':
_, memberUid = member['member']['name'].split('/')
member['member']['email'], _ = convertUIDtoEmailAddressWithType(f'uid:{memberUid}', cd, emailTypes=['user'])
member['member']['email'], _ = convertUIDtoEmailAddressWithType(f'uid:{memberUid}', cd, None, emailTypes=['user'])
elif 'groupMember' in member:
_, memberUid = member['groupMember']['name'].split('/')
member['groupMember']['email'], _ = convertUIDtoEmailAddressWithType(f'uid:{memberUid}', cd, emailTypes=['group'])
member['groupMember']['email'], _ = convertUIDtoEmailAddressWithType(f'uid:{memberUid}', cd, None, emailTypes=['group'])
# gam <UserTypeEntity> create chatmember <ChatSpace>
# [type human|bot] [role member|manager]
@@ -26300,7 +26349,7 @@ def doPrintShowChatMembers():
def _getChatSenderEmail(cd, sender):
if sender['type'] == 'HUMAN':
_, senderUid = sender['name'].split('/')
sender['email'], _ = convertUIDtoEmailAddressWithType(f'uid:{senderUid}', cd, emailTypes=['user'])
sender['email'], _ = convertUIDtoEmailAddressWithType(f'uid:{senderUid}', cd, None, emailTypes=['user'])
def trimChatMessageIfRequired(body):
msgLen = len(body['text'])