Allow conditions for admin role assignments

This commit is contained in:
Jay Lee
2022-02-03 18:45:35 +00:00
parent 40f71cc703
commit 341d61444c
6 changed files with 7963 additions and 132 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,7 @@ extra_files = [(os.path.join(proot, 'cacerts.txt'), 'httplib2')]
extra_files += copy_metadata('google-api-python-client')
extra_files += [('cbcm-v1.1beta1.json', '.')]
extra_files += [('contactdelegation-v1.json', '.')]
extra_files += [('admin-directory_v1.1beta1.json', '.')]
hidden_imports = [
'gam.auth.yubikey',

View File

@@ -78,6 +78,7 @@ from gam.gapi.directory import printers as gapi_directory_printers
from gam.gapi.directory import privileges as gapi_directory_privileges
from gam.gapi.directory import resource as gapi_directory_resource
from gam.gapi.directory import roles as gapi_directory_roles
from gam.gapi.directory import roleassignments as gapi_directory_roleassignments
from gam.gapi.directory import users as gapi_directory_users
from gam.gapi import licensing as gapi_licensing
from gam.gapi import siteverification as gapi_siteverification
@@ -1033,6 +1034,7 @@ def buildGAPIObject(api):
_get_admin_email())
if not GC_Values[GC_CUSTOMER_ID]:
GC_Values[GC_CUSTOMER_ID] = MY_CUSTOMER
print(GC_Values[GC_CUSTOMER_ID])
return service
@@ -1724,134 +1726,6 @@ def doUpdateCourse():
print(f'Updated Course {result["id"]}')
def doDelAdmin():
cd = buildGAPIObject('directory')
roleAssignmentId = sys.argv[3]
print(f'Deleting Admin Role Assignment {roleAssignmentId}')
gapi.call(cd.roleAssignments(),
'delete',
customer=GC_Values[GC_CUSTOMER_ID],
roleAssignmentId=roleAssignmentId)
def doCreateAdmin():
cd = buildGAPIObject('directory')
user = normalizeEmailAddressOrUID(sys.argv[3])
body = {'assignedTo': convertEmailAddressToUID(user, cd)}
role = sys.argv[4]
body['roleId'] = getRoleId(role)
body['scopeType'] = sys.argv[5].upper()
if body['scopeType'] not in ['CUSTOMER', 'ORG_UNIT']:
controlflow.expected_argument_exit('scope type',
', '.join(['customer', 'org_unit']),
body['scopeType'])
if body['scopeType'] == 'ORG_UNIT':
orgUnit, orgUnitId = gapi_directory_orgunits.getOrgUnitId(
sys.argv[6], cd)
body['orgUnitId'] = orgUnitId[3:]
scope = f'ORG_UNIT {orgUnit}'
else:
scope = 'CUSTOMER'
print(f'Giving {user} admin role {role} for {scope}')
gapi.call(cd.roleAssignments(),
'insert',
customer=GC_Values[GC_CUSTOMER_ID],
body=body)
def doPrintAdmins():
cd = buildGAPIObject('directory')
roleId = None
todrive = False
kwargs = {}
fields = 'nextPageToken,items(roleAssignmentId,roleId,assignedTo,scopeType,orgUnitId)'
titles = [
'roleAssignmentId', 'roleId', 'role', 'assignedTo', 'assignedToUser',
'scopeType', 'orgUnitId', 'orgUnit'
]
csvRows = []
i = 3
while i < len(sys.argv):
myarg = sys.argv[i].lower()
if myarg == 'user':
kwargs['userKey'] = normalizeEmailAddressOrUID(sys.argv[i + 1])
i += 2
elif myarg == 'role':
roleId = getRoleId(sys.argv[i + 1])
i += 2
elif myarg == 'todrive':
todrive = True
i += 1
else:
controlflow.invalid_argument_exit(sys.argv[i], 'gam print admins')
if roleId and not kwargs:
kwargs['roleId'] = roleId
roleId = None
admins = gapi.get_all_pages(cd.roleAssignments(),
'list',
'items',
customer=GC_Values[GC_CUSTOMER_ID],
fields=fields,
**kwargs)
for admin in admins:
if roleId and roleId != admin['roleId']:
continue
admin_attrib = {}
for key, value in list(admin.items()):
if key == 'assignedTo':
admin_attrib['assignedToUser'] = user_from_userid(value)
elif key == 'roleId':
admin_attrib['role'] = role_from_roleid(value)
elif key == 'orgUnitId':
value = f'id:{value}'
admin_attrib[
'orgUnit'] = gapi_directory_orgunits.orgunit_from_orgunitid(
value, cd)
admin_attrib[key] = value
csvRows.append(admin_attrib)
display.write_csv_file(csvRows, titles, 'Admins', todrive)
def buildRoleIdToNameToIdMap():
cd = buildGAPIObject('directory')
result = gapi.get_all_pages(cd.roles(),
'list',
'items',
customer=GC_Values[GC_CUSTOMER_ID],
fields='nextPageToken,items(roleId,roleName)')
GM_Globals[GM_MAP_ROLE_ID_TO_NAME] = {}
GM_Globals[GM_MAP_ROLE_NAME_TO_ID] = {}
for role in result:
GM_Globals[GM_MAP_ROLE_ID_TO_NAME][role['roleId']] = role['roleName']
GM_Globals[GM_MAP_ROLE_NAME_TO_ID][role['roleName']] = role['roleId']
def role_from_roleid(roleid):
if not GM_Globals[GM_MAP_ROLE_ID_TO_NAME]:
buildRoleIdToNameToIdMap()
return GM_Globals[GM_MAP_ROLE_ID_TO_NAME].get(roleid, roleid)
def roleid_from_role(role):
if not GM_Globals[GM_MAP_ROLE_NAME_TO_ID]:
buildRoleIdToNameToIdMap()
return GM_Globals[GM_MAP_ROLE_NAME_TO_ID].get(role, None)
def getRoleId(role):
cg = UID_PATTERN.match(role)
if cg:
roleId = cg.group(1)
else:
roleId = roleid_from_role(role)
if not roleId:
controlflow.system_error_exit(
4,
f'{role} is not a valid role. Please ensure role name is exactly as shown in admin console.'
)
return roleId
def buildUserIdToNameMap():
cd = buildGAPIObject('directory')
result = gapi.get_all_pages(cd.users(),
@@ -11440,7 +11314,7 @@ def ProcessGAMCommand(args):
elif argument in ['domainalias', 'aliasdomain']:
gapi_directory_domainaliases.create()
elif argument == 'admin':
doCreateAdmin()
gapi_directory_roleassignments.create()
elif argument in ['guardianinvite', 'inviteguardian', 'guardian']:
doInviteGuardian()
elif argument in ['project', 'apiproject']:
@@ -11644,7 +11518,7 @@ def ProcessGAMCommand(args):
elif argument in ['domainalias', 'aliasdomain']:
gapi_directory_domainaliases.delete()
elif argument == 'admin':
doDelAdmin()
gapi_directory_roleassignments.delete()
elif argument in ['guardian', 'guardians']:
doDeleteGuardian()
elif argument in ['project', 'projects']:
@@ -11751,7 +11625,7 @@ def ProcessGAMCommand(args):
elif argument in ['domainaliases', 'aliasdomains']:
gapi_directory_domainaliases.print_()
elif argument == 'admins':
doPrintAdmins()
gapi_directory_roleassignments.print_()
elif argument in ['roles', 'adminroles']:
gapi_directory_roles.print_()
elif argument in ['guardian', 'guardians']:

View File

@@ -0,0 +1,119 @@
import sys
from gam.var import GC_Values, GC_CUSTOMER_ID
import gam
from gam import controlflow
from gam import display
from gam import gapi
from gam.gapi import directory as gapi_directory
from gam.gapi.directory import orgunits as gapi_directory_orgunits
from gam.gapi.directory import roles as gapi_directory_roles
def create():
cd = gapi_directory.build()
user = gam.normalizeEmailAddressOrUID(sys.argv[3])
body = {'assignedTo': gam.convertEmailAddressToUID(user, cd)}
role = sys.argv[4]
body['roleId'] = gapi_directory_roles.getRoleId(role)
body['scopeType'] = sys.argv[5].upper()
i = 6
while i < len(sys.argv):
myarg = sys.argv[i].lower()
if myarg == 'condition':
cd = gapi_directory.build_beta()
body['condition'] = sys.argv[i+1]
if body['condition'] == 'securitygroup':
body['condition'] = "api.getAttribute('cloudidentity.googleapis.com/groups.labels', []).hasAny(['groups.security']) && resource.type == 'cloudidentity.googleapis.com/Group'"
elif body['condition'] == 'nonsecuritygroup':
body['condition'] = "!api.getAttribute('cloudidentity.googleapis.com/groups.labels', []).hasAny(['groups.security']) && resource.type == 'cloudidentity.googleapis.com/Group'"
i += 2
else:
controlflow.invalid_argument_exit(sys.argv[i], 'gam create admin')
if body['scopeType'] not in ['CUSTOMER', 'ORG_UNIT']:
controlflow.expected_argument_exit('scope type',
', '.join(['customer', 'org_unit']),
body['scopeType'])
if body['scopeType'] == 'ORG_UNIT':
orgUnit, orgUnitId = gapi_directory_orgunits.getOrgUnitId(
sys.argv[6], cd)
body['orgUnitId'] = orgUnitId[3:]
scope = f'ORG_UNIT {orgUnit}'
else:
scope = 'CUSTOMER'
print(f'Giving {user} admin role {role} for {scope}')
gapi.call(cd.roleAssignments(),
'insert',
customer=GC_Values[GC_CUSTOMER_ID],
body=body)
def delete():
cd = gapi_directory.build()
roleAssignmentId = sys.argv[3]
print(f'Deleting Admin Role Assignment {roleAssignmentId}')
gapi.call(cd.roleAssignments(),
'delete',
customer=GC_Values[GC_CUSTOMER_ID],
roleAssignmentId=roleAssignmentId)
def print_():
cd = gapi_directory.build()
roleId = None
todrive = False
kwargs = {}
item_fields = ['roleAssignmentId', 'roleId', 'assignedTo', 'scopeType', 'orgUnitId']
titles = [
'roleAssignmentId', 'roleId', 'role', 'assignedTo', 'assignedToUser',
'scopeType', 'orgUnitId', 'orgUnit'
]
csvRows = []
i = 3
while i < len(sys.argv):
myarg = sys.argv[i].lower()
if myarg == 'user':
kwargs['userKey'] = gam.normalizeEmailAddressOrUID(sys.argv[i + 1])
i += 2
elif myarg == 'role':
roleId = gapi_directory_roles.getRoleId(sys.argv[i + 1])
i += 2
elif myarg == 'condition':
cd = gapi_directory.build_beta()
item_fields.append('condition')
i += 1
elif myarg == 'todrive':
todrive = True
i += 1
else:
controlflow.invalid_argument_exit(sys.argv[i], 'gam print admins')
fields = f'nextPageToken,items({",".join(item_fields)})'
if roleId and not kwargs:
kwargs['roleId'] = roleId
roleId = None
admins = gapi.get_all_pages(cd.roleAssignments(),
'list',
'items',
customer=GC_Values[GC_CUSTOMER_ID],
fields=fields,
**kwargs)
for admin in admins:
if roleId and roleId != admin['roleId']:
continue
admin_attrib = {}
for key, value in list(admin.items()):
if key == 'assignedTo':
admin_attrib['assignedToUser'] = gam.user_from_userid(value)
elif key == 'roleId':
admin_attrib['role'] = gapi_directory_roles.role_from_roleid(value)
elif key == 'orgUnitId':
value = f'id:{value}'
admin_attrib[
'orgUnit'] = gapi_directory_orgunits.orgunit_from_orgunitid(
value, cd)
if key not in titles:
titles.append(key)
admin_attrib[key] = value
csvRows.append(admin_attrib)
display.write_csv_file(csvRows, titles, 'Admins', todrive)

View File

@@ -1,6 +1,13 @@
import sys
from gam.var import GC_Values, GC_CUSTOMER_ID
from gam.var import (
GC_Values,
GC_CUSTOMER_ID,
GM_Globals,
GM_MAP_ROLE_ID_TO_NAME,
GM_MAP_ROLE_NAME_TO_ID,
UID_PATTERN
)
import gam
from gam import controlflow
from gam import display
@@ -9,6 +16,47 @@ from gam.gapi import directory as gapi_directory
from gam.gapi.directory import privileges as gapi_directory_privileges
def buildRoleIdToNameToIdMap(cd=None):
if not cd:
cd = gapi_directory.build()
result = gapi.get_all_pages(cd.roles(),
'list',
'items',
customer=GC_Values[GC_CUSTOMER_ID],
fields='nextPageToken,items(roleId,roleName)')
GM_Globals[GM_MAP_ROLE_ID_TO_NAME] = {}
GM_Globals[GM_MAP_ROLE_NAME_TO_ID] = {}
for role in result:
GM_Globals[GM_MAP_ROLE_ID_TO_NAME][role['roleId']] = role['roleName']
GM_Globals[GM_MAP_ROLE_NAME_TO_ID][role['roleName']] = role['roleId']
def role_from_roleid(roleid):
if not GM_Globals[GM_MAP_ROLE_ID_TO_NAME]:
buildRoleIdToNameToIdMap()
return GM_Globals[GM_MAP_ROLE_ID_TO_NAME].get(roleid, roleid)
def roleid_from_role(role):
if not GM_Globals[GM_MAP_ROLE_NAME_TO_ID]:
buildRoleIdToNameToIdMap()
return GM_Globals[GM_MAP_ROLE_NAME_TO_ID].get(role, None)
def getRoleId(role):
cg = UID_PATTERN.match(role)
if cg:
roleId = cg.group(1)
else:
roleId = roleid_from_role(role)
if not roleId:
controlflow.system_error_exit(
4,
f'{role} is not a valid role. Please ensure role name is exactly as shown in admin console.'
)
return roleId
def getPrivileges(body, privs, action):
all_privileges = gapi_directory_privileges.print_(return_only=True)
if privs == 'ALL':
@@ -30,6 +78,7 @@ def getPrivileges(body, privs, action):
controlflow.invalid_argument_exit(priv,
f'gam {action} adminrole privileges')
def create():
cd = gapi_directory.build()
body = {'roleName': sys.argv[3]}
@@ -55,6 +104,7 @@ def create():
customer=GC_Values[GC_CUSTOMER_ID],
body=body)
def update():
cd = gapi_directory.build()
body = {}
@@ -122,3 +172,4 @@ def print_():
role_attrib[key] = value
csvRows.append(role_attrib)
display.write_csv_file(csvRows, titles, 'Admin Roles', todrive)

View File

@@ -292,6 +292,7 @@ V1_DISCOVERY_APIS = {
API_NAME_MAPPING = {
'directory': 'admin',
'directory_beta': 'admin',
'reports': 'admin',
'datatransfer': 'admin',
'drive3': 'drive',
@@ -313,6 +314,7 @@ API_VER_MAPPING = {
'contactdelegation': 'v1',
'datatransfer': 'datatransfer_v1',
'directory': 'directory_v1',
'directory_beta': 'directory_v1.1beta1',
'drive': 'v2',
'drive3': 'v3',
'gmail': 'v1',