Added group/ou filtering to print|show policies
Some checks are pending
Build and test GAM / build (Win64, build, 8, VC-WIN64A, windows-2022) (push) Waiting to run
Build and test GAM / build (aarch64, build, 3, linux-aarch64, [self-hosted linux arm64]) (push) Waiting to run
Build and test GAM / build (aarch64, build, 5, linux-aarch64, [self-hosted linux arm64], yes) (push) Waiting to run
Build and test GAM / build (aarch64, build, 7, darwin64-arm64, macos-14) (push) Waiting to run
Build and test GAM / build (x86_64, build, 1, linux-x86_64, ubuntu-22.04) (push) Waiting to run
Build and test GAM / build (x86_64, build, 2, linux-x86_64, ubuntu-24.04) (push) Waiting to run
Build and test GAM / build (x86_64, build, 4, linux-x86_64, ubuntu-22.04, yes) (push) Waiting to run
Build and test GAM / build (x86_64, build, 6, darwin64-x86_64, macos-13) (push) Waiting to run
Build and test GAM / build (x86_64, test, 10, ubuntu-24.04, 3.10) (push) Waiting to run
Build and test GAM / build (x86_64, test, 11, ubuntu-24.04, 3.11) (push) Waiting to run
Build and test GAM / build (x86_64, test, 12, ubuntu-24.04, 3.12) (push) Waiting to run
Build and test GAM / build (x86_64, test, 9, ubuntu-24.04, 3.9) (push) Waiting to run
Build and test GAM / merge (push) Blocked by required conditions
Build and test GAM / publish (push) Blocked by required conditions
CodeQL / Analyze (python) (push) Waiting to run
Check for Google Root CA Updates / check-apis (push) Waiting to run

Updated `gam get|update|delete contactphotos` to use the People API
This commit is contained in:
Ross Scroggs
2024-11-10 08:18:43 -08:00
parent f24629602c
commit 95b3a97925
8 changed files with 156 additions and 172 deletions

View File

@@ -25,7 +25,7 @@ https://github.com/GAM-team/GAM/wiki
"""
__author__ = 'GAM Team <google-apps-manager@googlegroups.com>'
__version__ = '7.00.36'
__version__ = '7.00.37'
__license__ = 'Apache License 2.0 (http://www.apache.org/licenses/LICENSE-2.0)'
#pylint: disable=wrong-import-position
@@ -2693,6 +2693,11 @@ def performActionNumItems(itemCount, itemType, i=0, count=0):
[f'{Act.ToPerform()} {itemCount} {Ent.Choose(itemType, itemCount)}'],
currentCountNL(i, count)))
def performActionModifierNumItems(modifier, itemCount, itemType, i=0, count=0):
writeStdout(formatKeyValueList(Ind.Spaces(),
[f'{Act.ToPerform()} {modifier} {itemCount} {Ent.Choose(itemType, itemCount)}'],
currentCountNL(i, count)))
def actionPerformedNumItems(itemCount, itemType, i=0, count=0):
writeStderr(formatKeyValueList(Ind.Spaces(),
[f'{itemCount} {Ent.Choose(itemType, itemCount)} {Act.Performed()} '],
@@ -20287,137 +20292,6 @@ def doPrintShowDomainContacts():
def doPrintShowGAL():
_printShowContacts(False)
def _processContactPhotos(function):
def _makeFilenameFromPattern():
filename = filenamePattern[:]
if subForContactId:
filename = filename.replace('#contactid#', contactId)
if subForEmail:
for email in fields.get('Emails', []):
if email.get('primary', 'false') == 'true':
filename = filename.replace('#email#', email['value'])
break
else:
filename = filename.replace('#email#', contactId)
return filename
entityType = Ent.DOMAIN
contactsManager = ContactsManager()
entityList, contactQuery, queriedContacts = _getContactEntityList(1, False)
if function in {'ChangePhoto', 'GetPhoto'}:
targetFolder = os.getcwd()
filenamePattern = '#contactid#.jpg'
while Cmd.ArgumentsRemaining():
myarg = getArgument()
if myarg == 'drivedir':
targetFolder = GC.Values[GC.DRIVE_DIR]
elif myarg in {'sourcefolder', 'targetfolder'}:
targetFolder = os.path.expanduser(getString(Cmd.OB_FILE_PATH))
if function == 'GetPhoto' and not os.path.isdir(targetFolder):
os.makedirs(targetFolder)
elif myarg == 'filename':
filenamePattern = getString(Cmd.OB_PHOTO_FILENAME_PATTERN)
else:
unknownArgumentExit()
subForContactId = filenamePattern.find('#contactid#') != -1
subForEmail = filenamePattern.find('#email#') != -1
if not subForContactId and not subForEmail:
filename = filenamePattern
else: #elif function == 'DeletePhoto':
checkForExtraneousArguments()
user, contactsObject = getContactsObject(True)
if queriedContacts:
entityList = queryContacts(contactsObject, contactQuery)
if entityList is None:
return
j = 0
jcount = len(entityList)
entityPerformActionModifierNumItems([entityType, user], Msg.MAXIMUM_OF, jcount, Ent.PHOTO)
if jcount == 0:
setSysExitRC(NO_ENTITIES_FOUND_RC)
return
Ind.Increment()
for contact in entityList:
j += 1
try:
if not queriedContacts:
contactId = normalizeContactId(contact)
contact = callGData(contactsObject, 'GetContact',
throwErrors=[GDATA.NOT_FOUND, GDATA.BAD_REQUEST, GDATA.SERVICE_NOT_APPLICABLE, GDATA.FORBIDDEN, GDATA.NOT_IMPLEMENTED],
retryErrors=[GDATA.INTERNAL_SERVER_ERROR],
uri=contactsObject.GetContactFeedUri(contact_list=user, contactId=contactId))
fields = contactsManager.ContactToFields(contact)
else:
contactId = contactsManager.GetContactShortId(contact)
fields = contactsManager.ContactToFields(contact)
if not localContactSelects(contactsManager, contactQuery, fields):
continue
except (GDATA.notFound, GDATA.badRequest) as e:
entityActionFailedWarning([entityType, user, Ent.CONTACT, contactId], str(e), j, jcount)
break
except (GDATA.forbidden, GDATA.notImplemented):
entityServiceNotApplicableWarning(entityType, user)
break
except GDATA.serviceNotApplicable:
entityUnknownWarning(entityType, user)
break
try:
if function == 'ChangePhoto':
if subForContactId or subForEmail:
filename = _makeFilenameFromPattern()
filename = os.path.join(targetFolder, filename)
callGData(contactsObject, function,
throwErrors=[GDATA.NOT_FOUND, GDATA.BAD_REQUEST, GDATA.SERVICE_NOT_APPLICABLE, GDATA.FORBIDDEN, GDATA.NOT_IMPLEMENTED],
retryErrors=[GDATA.INTERNAL_SERVER_ERROR],
media=filename, contact_entry_or_url=contact,
content_type='image/*', content_length=os.path.getsize(filename), extra_headers={'If-Match': '*'})
entityActionPerformed([entityType, user, Ent.CONTACT, contactId, Ent.PHOTO, filename])
elif function == 'GetPhoto':
if subForContactId or subForEmail:
filename = _makeFilenameFromPattern()
filename = os.path.join(targetFolder, filename)
photo_data = callGData(contactsObject, function,
throwErrors=[GDATA.NOT_FOUND, GDATA.BAD_REQUEST, GDATA.SERVICE_NOT_APPLICABLE, GDATA.FORBIDDEN, GDATA.NOT_IMPLEMENTED],
retryErrors=[GDATA.INTERNAL_SERVER_ERROR],
contact_entry_or_url=contact)
if photo_data:
status, e = writeFileReturnError(filename, eval(photo_data), mode='wb') #pylint: disable=eval-used
if status:
entityActionPerformed([entityType, user, Ent.CONTACT, contactId, Ent.PHOTO, filename])
else:
entityActionFailedWarning([entityType, user, Ent.CONTACT, contactId, Ent.PHOTO, filename], str(e))
else:
entityDoesNotHaveItemWarning([entityType, user, Ent.CONTACT, contactId, Ent.PHOTO, ''])
else: #elif function == 'DeletePhoto':
filename = ''
callGData(contactsObject, function,
throwErrors=[GDATA.NOT_FOUND, GDATA.BAD_REQUEST, GDATA.SERVICE_NOT_APPLICABLE, GDATA.FORBIDDEN, GDATA.NOT_IMPLEMENTED],
retryErrors=[GDATA.INTERNAL_SERVER_ERROR],
contact_entry_or_url=contact, extra_headers={'If-Match': '*'})
entityActionPerformed([entityType, user, Ent.CONTACT, contactId, Ent.PHOTO, filename])
except GDATA.notFound as e:
entityActionFailedWarning([entityType, user, Ent.CONTACT, contactId, Ent.PHOTO, filename], str(e), j, jcount)
except (GDATA.badRequest, OSError, IOError) as e:
entityActionFailedWarning([entityType, user, Ent.CONTACT, contactId, Ent.PHOTO, filename], str(e), j, jcount)
except (GDATA.forbidden, GDATA.notImplemented):
entityServiceNotApplicableWarning(entityType, user)
break
Ind.Decrement()
# gam update contactphotos <ContactEntity>|<ContactSelection>
# [drivedir|(sourcefolder <FilePath>)] [filename <FileNamePattern>]
def doUpdateDomainContactPhoto():
_processContactPhotos('ChangePhoto')
# gam get contactphotos <ContactEntity>|<ContactSelection>
# [drivedir|(targetfolder <FilePath>)] [filename <FileNamePattern>]
def doGetDomainContactPhoto():
_processContactPhotos('GetPhoto')
# gam delete contactphotos <ContactEntity>|<ContactSelection>
def doDeleteDomainContactPhoto():
_processContactPhotos('DeletePhoto')
# Prople commands utilities
#
def normalizePeopleResourceName(resourceName):
@@ -22634,9 +22508,16 @@ def _processPeopleContactPhotos(users, function):
filename = filename.replace('#email#', resourceName.split('/')[1])
return filename
entityType = Ent.USER
peopleEntityType = Ent.PEOPLE_CONTACT
sources = [PEOPLE_READ_SOURCES_CHOICE_MAP['contact']]
if users is not None:
entityType = Ent.USER
peopleEntityType = Ent.PEOPLE_CONTACT
sources = [PEOPLE_READ_SOURCES_CHOICE_MAP['contact']]
else:
users = [None]
people = buildGAPIObject(API.PEOPLE_DIRECTORY)
entityType = Ent.DOMAIN
peopleEntityType = Ent.PEOPLE_CONTACT
sources = [PEOPLE_READ_SOURCES_CHOICE_MAP['domaincontact']]
entityList, resourceNameLists, contactQuery, queriedContacts = _getPeopleContactEntityList(entityType, 1)
if function in {'updateContactPhoto', 'getContactPhoto'}:
targetFolder = os.getcwd()
@@ -22665,9 +22546,12 @@ def _processPeopleContactPhotos(users, function):
i += 1
if resourceNameLists:
entityList = resourceNameLists[user]
user, people = buildGAPIServiceObject(API.PEOPLE, user, i, count)
if not people:
continue
if user is not None:
user, people = buildGAPIServiceObject(API.PEOPLE, user, i, count)
if not people:
continue
else:
user = Ent.Singular(entityType)
if contactQuery['contactGroupSelect']:
groupId, _, contactGroupNames = validatePeopleContactGroup(people, contactQuery['contactGroupSelect'],
None, None, entityType, user, i, count)
@@ -22774,6 +22658,20 @@ def getUserPeopleContactPhoto(users):
def deleteUserPeopleContactPhoto(users):
_processPeopleContactPhotos(users, 'deleteContactPhoto')
# gam update contactphotos <PeopleResourceNameEntity>|<PeopleUserContactSelection>
# [drivedir|(sourcefolder <FilePath>)] [filename <FileNamePattern>]
def doUpdateDomainContactPhoto():
_processPeopleContactPhotos(None, 'updateContactPhoto')
# gam get contactphotos <PeopleResourceNameEntity>|<PeopleUserContactSelection>
# [drivedir|(targetfolder <FilePath>)] [filename <FileNamePattern>]
def doGetDomainContactPhoto():
_processPeopleContactPhotos(None, 'getContactPhoto')
# gam delete contactphoto <PeopleResourceNameEntity>|<PeopleUserContactSelection>
def doDeleteDomainContactPhoto():
_processPeopleContactPhotos(None, 'deleteContactPhoto')
# gam <UserTypeEntity> create contactgroup <ContactGroupAttribute>+
# [(csv [todrive <ToDriveAttribute>*] (addcsvdata <FieldName> <String>)*))| returnidonly]
def createUserPeopleContactGroup(users):
@@ -32648,6 +32546,8 @@ def infoGroups(entityList):
getCloudIdentity = True
cifields = getFieldsFromFieldsList(groupFieldsLists['ci'])
ci = buildGAPIObject(API.CLOUDIDENTITY_GROUPS)
else:
cifields = None
i = 0
count = len(entityList)
for group in entityList:
@@ -35131,7 +35031,9 @@ def _getPolicyAppNameFromId(httpObj, app):
if a:
app['applicationName'] = a.group(1)
def _cleanPolicy(policy, add_warnings, no_appnames, cd, groups_ci):
def _cleanPolicy(policy, add_warnings, no_appnames,
groupEmailPattern, orgUnitPathPattern,
cd, groups_ci):
# convert any wordlists into spaced strings to reduce output complexity
if policy['setting']['type'] == 'settings/detector.word_list':
policy['setting']['value']['wordList'] = ' '.join(policy['setting']['value']['wordList']['words'])
@@ -35145,10 +35047,15 @@ def _cleanPolicy(policy, add_warnings, no_appnames, cd, groups_ci):
policy['warning'] = CIPOLICY_ADDITIONAL_WARNINGS[policy['setting']['type']]
if groupId := policy['policyQuery'].get('group'):
_, _, policy['policyQuery']['groupEmail'] = convertGroupCloudIDToEmail(groups_ci, groupId)
if groupEmailPattern is not None and not groupEmailPattern.match(policy['policyQuery']['groupEmail']):
return False
# all groups are in the root OU so the orgUnit attribute is useless
policy['policyQuery'].pop('orgUnit', None)
elif orgId := policy['policyQuery'].get('orgUnit'):
policy['policyQuery']['orgUnitPath'] = convertOrgUnitIDtoPath(cd, orgId)
if orgUnitPathPattern is not None and not orgUnitPathPattern.match(policy['policyQuery']['orgUnitPath']):
return False
return True
def _showPolicy(policy, FJQC, i=0, count=0):
if FJQC is not None and FJQC.formatJSON:
@@ -35163,15 +35070,22 @@ def _showPolicy(policy, FJQC, i=0, count=0):
printBlankLine()
Ind.Decrement()
def _showPolicies(policies, FJQC, add_warnings, no_appnames, cd, groups_ci):
def _showPolicies(policies, FJQC, add_warnings, no_appnames,
groupEmailPattern, orgUnitPathPattern,
cd, groups_ci):
count = len(policies)
performActionNumItems(count, Ent.POLICY)
if groupEmailPattern is None and orgUnitPathPattern is None:
performActionNumItems(count, Ent.POLICY)
else:
performActionModifierNumItems(Msg.MAXIMUM_OF, count, Ent.POLICY)
Ind.Increment()
i = 0
for policy in policies:
i += 1
_cleanPolicy(policy, add_warnings, no_appnames, cd, groups_ci)
_showPolicy(policy, FJQC, i, count)
if _cleanPolicy(policy, add_warnings, no_appnames,
groupEmailPattern, orgUnitPathPattern,
cd, groups_ci):
_showPolicy(policy, FJQC, i, count)
Ind.Decrement()
# gam info policies <CIPolicyNameEntity>
@@ -35213,13 +35127,16 @@ def doInfoCIPolicies():
ifilter = f"setting.type.matches('{pname}')"
printGettingAllAccountEntities(Ent.POLICY, ifilter)
policies = _filterPolicies(ci, getPageMessage(), ifilter)
_showPolicies(policies, FJQC, add_warnings, no_appnames, cd, groups_ci)
_showPolicies(policies, FJQC, add_warnings, no_appnames,
None, None, cd, groups_ci)
# gam print policies [todrive <ToDriveAttribute>*]
# [filter <String>] [nowarnings] [noappnames]
# [group <RegularExpression>] [ou|org|orgunit <RegularExpression>]
# [formatjson [quotechar <Character>]]
# gam show policies
# [filter <String>] [nowarnings] [noappnames]
# [group <RegularExpression>] [ou|org|orgunit <RegularExpression>]
# [formatjson]
def doPrintShowCIPolicies():
@@ -35241,6 +35158,7 @@ def doPrintShowCIPolicies():
ifilter = None
add_warnings = True
no_appnames = False
groupEmailPattern = orgUnitPathPattern = None
while Cmd.ArgumentsRemaining():
myarg = getArgument()
if csvPF and myarg == 'todrive':
@@ -35250,17 +35168,25 @@ def doPrintShowCIPolicies():
elif myarg == 'nowarnings':
add_warnings = False
elif myarg == 'noappnames':
no_appnames=True
no_appnames = True
elif myarg == 'group':
groupEmailPattern = getREPattern(re.IGNORECASE)
elif myarg in {'ou', 'org', 'orgunit'}:
orgUnitPathPattern = getREPattern(re.IGNORECASE)
else:
FJQC.GetFormatJSONQuoteChar(myarg, True)
printGettingAllAccountEntities(Ent.POLICY, ifilter)
policies = _filterPolicies(ci, getPageMessage(), ifilter)
if not csvPF:
_showPolicies(policies, FJQC, add_warnings, no_appnames, cd, groups_ci)
_showPolicies(policies, FJQC, add_warnings, no_appnames,
groupEmailPattern, orgUnitPathPattern,
cd, groups_ci)
else:
for policy in policies:
_cleanPolicy(policy, add_warnings, no_appnames, cd, groups_ci)
_printPolicy(policy)
if _cleanPolicy(policy, add_warnings, no_appnames,
groupEmailPattern, orgUnitPathPattern,
cd, groups_ci):
_printPolicy(policy)
if csvPF:
csvPF.writeCSVfile('Policies')