"""GAM usage reports and activity reports.""" import arrow import sys import datetime import re from gamlib import glaction from gamlib import glapi as API from gamlib import glcfg as GC from gamlib import glclargs from gamlib import glentity from gamlib import glgapi as GAPI from gamlib import glglobals as GM from gamlib import glindent from gamlib import glmsgs as Msg Act = glaction.GamAction() Ent = glentity.GamEntity() Ind = glindent.GamIndent() Cmd = glclargs.GamCLArgs() def _getMain(): return sys.modules['gam'] def __getattr__(name): """Fall back to gam module for any undefined names.""" main = _getMain() try: return getattr(main, name) except AttributeError: raise AttributeError(f"module {__name__!r} has no attribute {name!r}") def doWhatIs(): def _showPrimaryType(entityType, email): _getMain().printEntity([entityType, email]) def _showAliasType(entityType, email, primaryEntityType, primaryEmail): _getMain().printEntity([entityType, email, primaryEntityType, primaryEmail]) cd = _getMain().buildGAPIObject(API.DIRECTORY) email = _getMain().getEmailAddress() showInfo = invitableCheck = True while Cmd.ArgumentsRemaining(): myarg = _getMain().getArgument() if myarg == 'noinfo': showInfo = False elif myarg == 'noinvitablecheck': invitableCheck = False else: _getMain().unknownArgumentExit() try: result = _getMain().callGAPI(cd.users(), 'get', throwReasons=GAPI.USER_GET_THROW_REASONS, userKey=email, fields='id,primaryEmail') if (result['primaryEmail'].lower() == email) or (result['id'] == email): if showInfo: _getMain().infoUsers(entityList=[email]) else: _showPrimaryType(Ent.USER, email) _getMain().setSysExitRC(_getMain().ENTITY_IS_A_USER_RC) else: if showInfo: _getMain().infoAliases(entityList=[email]) else: _showAliasType(Ent.USER_ALIAS, email, Ent.USER, result['primaryEmail']) _getMain().setSysExitRC(_getMain().ENTITY_IS_A_USER_ALIAS_RC) return except (GAPI.userNotFound, GAPI.badRequest): pass except (GAPI.domainNotFound, GAPI.domainCannotUseApis, GAPI.forbidden, GAPI.backendError, GAPI.systemError): _getMain().entityUnknownWarning(Ent.EMAIL, email) _getMain().setSysExitRC(_getMain().ENTITY_IS_UKNOWN_RC) return try: result = _getMain().callGAPI(cd.groups(), 'get', throwReasons=GAPI.GROUP_GET_THROW_REASONS, groupKey=email, fields='id,email') if (result['email'].lower() == email) or (result['id'] == email): if showInfo: _getMain().infoGroups([email]) else: _showPrimaryType(Ent.GROUP, email) _getMain().setSysExitRC(_getMain().ENTITY_IS_A_GROUP_RC) else: if showInfo: _getMain().infoAliases(entityList=[email]) else: _showAliasType(Ent.GROUP_ALIAS, email, Ent.GROUP, result['email']) _getMain().setSysExitRC(_getMain().ENTITY_IS_A_GROUP_ALIAS_RC) return except (GAPI.groupNotFound, GAPI.forbidden): pass except (GAPI.domainNotFound, GAPI.domainCannotUseApis, GAPI.badRequest): _getMain().entityUnknownWarning(Ent.EMAIL, email) _getMain().setSysExitRC(_getMain().ENTITY_IS_UKNOWN_RC) return if not invitableCheck: isInvitableUser = False ci = None else: isInvitableUser, ci = _getMain()._getIsInvitableUser(None, email) if isInvitableUser: if showInfo: name, user, ci = _getMain()._getCIUserInvitationsEntity(ci, email) _getMain().infoCIUserInvitations(name, user, ci, None) else: _showPrimaryType(Ent.USER_INVITATION, email) _getMain().setSysExitRC(_getMain().ENTITY_IS_AN_UNMANAGED_ACCOUNT_RC) else: _getMain().entityUnknownWarning(Ent.EMAIL, email) _getMain().setSysExitRC(_getMain().ENTITY_IS_UKNOWN_RC) def _adjustTryDate(errMsg, numDateChanges, limitDateChanges, prevTryDate): match_date = re.match('Data for dates later than (.*) is not yet available. Please check back later', errMsg) if match_date: tryDate = match_date.group(1) else: match_date = re.match('Start date can not be later than (.*)', errMsg) if match_date: tryDate = match_date.group(1) else: match_date = re.match('End date greater than LastReportedDate.', errMsg) if match_date: tryDateTime = arrow.Arrow.strptime(prevTryDate, _getMain().YYYYMMDD_FORMAT).shift(days=-1) tryDate = tryDateTime.strftime(_getMain().YYYYMMDD_FORMAT) if (not match_date) or (numDateChanges > limitDateChanges >= 0): _getMain().printWarningMessage(_getMain().DATA_NOT_AVALIABLE_RC, errMsg) return None return tryDate def _checkDataRequiredServices(result, tryDate, dataRequiredServices, parameterServices=None, checkUserEmail=False): # -1: Data not available: # 0: Backup to earlier date # 1: Data available dataWarnings = result.get('warnings', []) usageReports = result.get('usageReports', []) # move to day before if we don't have at least one usageReport with parameters if not usageReports or not usageReports[0].get('parameters', []): tryDateTime = arrow.Arrow.strptime(tryDate, _getMain().YYYYMMDD_FORMAT).shift(days=-1) return (0, tryDateTime.strftime(_getMain().YYYYMMDD_FORMAT), None) for warning in dataWarnings: if warning['code'] == 'PARTIAL_DATA_AVAILABLE': for app in warning['data']: if app['key'] == 'application' and app['value'] != 'docs' and app['value'] in dataRequiredServices: tryDateTime = arrow.Arrow.strptime(tryDate, _getMain().YYYYMMDD_FORMAT).shift(days=-1) return (0, tryDateTime.strftime(_getMain().YYYYMMDD_FORMAT), None) elif warning['code'] == 'DATA_NOT_AVAILABLE': for app in warning['data']: if app['key'] == 'application' and app['value'] != 'docs' and app['value'] in dataRequiredServices: return (-1, tryDate, None) if parameterServices: requiredServices = parameterServices.copy() for item in usageReports[0].get('parameters', []): if 'name' not in item: continue service, _ = item['name'].split(':', 1) if service in requiredServices: requiredServices.remove(service) if not requiredServices: break else: tryDateTime = arrow.Arrow.strptime(tryDate, _getMain().YYYYMMDD_FORMAT).shift(days=-1) return (0, tryDateTime.strftime(_getMain().YYYYMMDD_FORMAT), None) if checkUserEmail: if 'entity' not in usageReports[0] or 'userEmail' not in usageReports[0]['entity']: tryDateTime = arrow.Arrow.strptime(tryDate, _getMain().YYYYMMDD_FORMAT).shift(days=-1) return (0, tryDateTime.strftime(_getMain().YYYYMMDD_FORMAT), None) return (1, tryDate, usageReports) CUSTOMER_REPORT_SERVICES = { 'accounts', 'app_maker', 'apps_scripts', 'calendar', 'chat', 'classroom', 'cros', 'device_management', 'docs', 'drive', 'gmail', 'gplus', 'meet', 'sites', } USER_REPORT_SERVICES = { 'accounts', 'chat', 'classroom', 'docs', 'drive', 'gmail', 'gplus', } CUSTOMER_USER_CHOICES = {'customer', 'user'} # gam report usageparameters customer|user [todrive *] def doReportUsageParameters(): report = _getMain().getChoice(CUSTOMER_USER_CHOICES) csvPF = _getMain().CSVPrintFile(['parameter'], 'sortall') _getMain().getTodriveOnly(csvPF) rep = _getMain().buildGAPIObject(API.REPORTS) if report == 'customer': service = rep.customerUsageReports() dataRequiredServices = CUSTOMER_REPORT_SERVICES kwargs = {} else: # 'user' service = rep.userUsageReport() dataRequiredServices = USER_REPORT_SERVICES kwargs = {'userKey': _getMain()._getAdminEmail()} customerId = GC.Values[GC.CUSTOMER_ID] if customerId == GC.MY_CUSTOMER: customerId = None tryDate = _getMain().todaysDate().strftime(_getMain().YYYYMMDD_FORMAT) allParameters = set() while True: try: result = _getMain().callGAPI(service, 'get', throwReasons=[GAPI.INVALID, GAPI.BAD_REQUEST], date=tryDate, customerId=customerId, fields='warnings,usageReports(parameters(name))', **kwargs) fullData, tryDate, usageReports = _checkDataRequiredServices(result, tryDate, dataRequiredServices) if fullData < 0: _getMain().printWarningMessage(_getMain().DATA_NOT_AVALIABLE_RC, Msg.NO_USAGE_PARAMETERS_DATA_AVAILABLE) return if usageReports: for parameter in usageReports[0]['parameters']: name = parameter.get('name') if name: allParameters.add(name) if fullData == 1: break except GAPI.badRequest: _getMain().printErrorMessage(_getMain().BAD_REQUEST_RC, Msg.BAD_REQUEST) return except GAPI.invalid as e: tryDate = _adjustTryDate(str(e), 0, -1, tryDate) if not tryDate: break for parameter in sorted(allParameters): csvPF.WriteRow({'parameter': parameter}) csvPF.writeCSVfile(f'{report.capitalize()} Report Usage Parameters') def getUserOrgUnits(cd, orgUnit, orgUnitId): try: if orgUnit == orgUnitId: orgUnit = _getMain().callGAPI(cd.orgunits(), 'get', throwReasons=GAPI.ORGUNIT_GET_THROW_REASONS, customerId=GC.Values[GC.CUSTOMER_ID], orgUnitPath=orgUnit, fields='orgUnitPath')['orgUnitPath'] _getMain().printGettingAllEntityItemsForWhom(Ent.USER, orgUnit, qualifier=Msg.IN_THE.format(Ent.Singular(Ent.ORGANIZATIONAL_UNIT)), entityType=Ent.ORGANIZATIONAL_UNIT) result = _getMain().callGAPIpages(cd.users(), 'list', 'users', pageMessage=_getMain().getPageMessageForWhom(), throwReasons=[GAPI.INVALID_ORGUNIT, GAPI.ORGUNIT_NOT_FOUND, GAPI.INVALID_INPUT, GAPI.BAD_REQUEST, GAPI.RESOURCE_NOT_FOUND, GAPI.FORBIDDEN], retryReasons=GAPI.SERVICE_NOT_AVAILABLE_RETRY_REASONS, customer=GC.Values[GC.CUSTOMER_ID], query=_getMain().orgUnitPathQuery(orgUnit, None, None), orderBy='email', fields='nextPageToken,users(primaryEmail,orgUnitPath)', maxResults=GC.Values[GC.USER_MAX_RESULTS]) userOrgUnits = {} for user in result: userOrgUnits[user['primaryEmail']] = user['orgUnitPath'] return userOrgUnits except (GAPI.badRequest, GAPI.invalidInput, GAPI.invalidOrgunit, GAPI.orgunitNotFound, GAPI.backendError, GAPI.invalidCustomerId, GAPI.loginRequired, GAPI.resourceNotFound, GAPI.forbidden): checkEntityDNEorAccessErrorExit(cd, Ent.ORGANIZATIONAL_UNIT, orgUnit) # Convert report mb item to gb def convertReportMBtoGB(name, item): if item is not None: item['intValue'] = f"{int(item['intValue'])/1024:.2f}" return name.replace('_in_mb', '_in_gb') REPORTS_PARAMETERS_SIMPLE_TYPES = ['intValue', 'boolValue', 'datetimeValue', 'stringValue'] # gam report usage user [todrive *] # [(user all|)|(orgunit|org|ou [showorgunit])|(select )] # [([start|startdate ] [end|enddate ])|(range )| # thismonth|(previousmonths )] # [skipdates [:](,[:])*] [skipdaysofweek (,)*] # [fields|parameters )] # [convertmbtogb] # (addcsvdata )* # gam report usage customer [todrive *] # [([start|startdate ] [end|enddate ])|(range )| # thismonth|(previousmonths )] # [skipdates [:](,[:])*] [skipdaysofweek (,)*] # [fields|parameters )] # [convertmbtogb] # (addcsvdata )* def doReportUsage(): def usageEntitySelectors(): selectorChoices = Cmd.USER_ENTITY_SELECTORS+Cmd.USER_CSVDATA_ENTITY_SELECTORS if GC.Values[GC.USER_SERVICE_ACCOUNT_ACCESS_ONLY]: selectorChoices += Cmd.SERVICE_ACCOUNT_ONLY_ENTITY_SELECTORS[:]+[Cmd.ENTITY_USER, Cmd.ENTITY_USERS] else: selectorChoices += Cmd.BASE_ENTITY_SELECTORS[:]+Cmd.USER_ENTITIES[:] return selectorChoices def validateYYYYMMDD(argstr): if argstr in _getMain().TODAY_NOW or argstr[0] in _getMain().PLUS_MINUS: if argstr == 'NOW': argstr = 'TODAY' deltaDate = _getMain().getDelta(argstr, _getMain().DELTA_DATE_PATTERN) if deltaDate is None: Cmd.Backup() _getMain().invalidArgumentExit(_getMain().DELTA_DATE_FORMAT_REQUIRED) return deltaDate try: argDate = arrow.Arrow.strptime(argstr, _getMain().YYYYMMDD_FORMAT) return arrow.Arrow(argDate.year, argDate.month, argDate.day, tzinfo=GC.Values[GC.TIMEZONE]) except ValueError: Cmd.Backup() _getMain().invalidArgumentExit(_getMain().YYYYMMDD_FORMAT_REQUIRED) report = _getMain().getChoice(CUSTOMER_USER_CHOICES) rep = _getMain().buildGAPIObject(API.REPORTS) titles = ['date'] if report == 'customer': fullDataServices = CUSTOMER_REPORT_SERVICES userReports = False service = rep.customerUsageReports() kwargs = [{}] else: # 'user' fullDataServices = USER_REPORT_SERVICES userReports = True service = rep.userUsageReport() kwargs = [{'userKey': 'all'}] titles.append('user') csvPF = _getMain().CSVPrintFile() customerId = GC.Values[GC.CUSTOMER_ID] if customerId == GC.MY_CUSTOMER: customerId = None parameters = set() convertMbToGb = select = showOrgUnit = False userKey = 'all' cd = orgUnit = orgUnitId = None userOrgUnits = {} startEndTime = _getMain().StartEndTime('startdate', 'enddate', 'date') skipDayNumbers = [] skipDates = set() addCSVData = {} while Cmd.ArgumentsRemaining(): myarg = _getMain().getArgument() if csvPF and myarg == 'todrive': csvPF.GetTodriveParameters() elif myarg in {'start', 'startdate', 'end', 'enddate', 'range', 'thismonth', 'previousmonths'}: startEndTime.Get(myarg) elif userReports and myarg in {'ou', 'org', 'orgunit'}: if cd is None: cd = _getMain().buildGAPIObject(API.DIRECTORY) orgUnit, orgUnitId = _getMain().getOrgUnitId(cd) select = False elif userReports and myarg == 'showorgunit': showOrgUnit = True elif myarg in {'fields', 'parameters'}: for field in _getMain().getString(Cmd.OB_STRING).replace(',', ' ').split(): if ':' in field: repsvc, _ = field.split(':', 1) if repsvc in fullDataServices: parameters.add(field) else: _getMain().invalidChoiceExit(repsvc, fullDataServices, True) else: Cmd.Backup() _getMain().invalidArgumentExit('service:parameter') elif myarg == 'skipdates': for skip in _getMain().getString(Cmd.OB_STRING).upper().split(','): if skip.find(':') == -1: skipDates.add(validateYYYYMMDD(skip)) else: skipStart, skipEnd = skip.split(':', 1) skipStartDate = validateYYYYMMDD(skipStart) skipEndDate = validateYYYYMMDD(skipEnd) if skipEndDate < skipStartDate: Cmd.Backup() _getMain().usageErrorExit(Msg.INVALID_DATE_TIME_RANGE.format(myarg, skipEnd, myarg, skipStart)) while skipStartDate <= skipEndDate: skipDates.add(skipStartDate) skipStartDate = skipStartDate.shift(days=1) elif myarg == 'skipdaysofweek': skipdaynames = _getMain().getString(Cmd.OB_STRING).lower().split(',') dow = [d.lower() for d in _getMain().DAYS_OF_WEEK] skipDayNumbers = [dow.index(d) for d in skipdaynames if d in dow] elif userReports and myarg == 'user': userKey = _getMain().getString(Cmd.OB_EMAIL_ADDRESS) orgUnit = orgUnitId = None select = False elif userReports and (myarg == 'select' or myarg in usageEntitySelectors()): if myarg != 'select': Cmd.Backup() _, users = _getMain().getEntityToModify(defaultEntityType=Cmd.ENTITY_USERS) orgUnit = orgUnitId = None select = True elif myarg == 'convertmbtogb': convertMbToGb = True elif myarg == 'addcsvdata': _getMain().getAddCSVData(addCSVData) else: _getMain().unknownArgumentExit() if startEndTime.endDateTime is None: startEndTime.endDateTime = _getMain().todaysDate() if startEndTime.startDateTime is None: startEndTime.startDateTime = startEndTime.endDateTime.shift(days=-30) startDateTime = startEndTime.startDateTime startDate = startDateTime.strftime(_getMain().YYYYMMDD_FORMAT) endDateTime = startEndTime.endDateTime endDate = endDateTime.strftime(_getMain().YYYYMMDD_FORMAT) startUseDate = endUseDate = None if not orgUnitId: showOrgUnit = False if userReports: if select: Ent.SetGetting(Ent.REPORT) kwargs = [{'userKey': _getMain().normalizeEmailAddressOrUID(user)} for user in users] elif userKey == 'all': if orgUnitId: kwargs[0]['orgUnitID'] = orgUnitId userOrgUnits = getUserOrgUnits(cd, orgUnit, orgUnitId) forWhom = f'users in orgUnit {orgUnit}' else: forWhom = 'all users' _getMain().printGettingEntityItemForWhom(Ent.REPORT, forWhom) else: Ent.SetGetting(Ent.REPORT) kwargs = [{'userKey': _getMain().normalizeEmailAddressOrUID(userKey)}] _getMain().printGettingEntityItemForWhom(Ent.REPORT, kwargs[0]['userKey']) if showOrgUnit: titles.append('orgUnitPath') else: pageMessage = None if addCSVData: titles.extend(sorted(addCSVData.keys())) csvPF.SetTitles(titles) csvPF.SetSortAllTitles() parameters = ','.join(parameters) if parameters else None while startDateTime <= endDateTime: if startDateTime.weekday() in skipDayNumbers or startDateTime in skipDates: startDateTime = startDateTime.shift(days=1) continue useDate = startDateTime.strftime(_getMain().YYYYMMDD_FORMAT) startDateTime = startDateTime.shift(days=1) try: for kwarg in kwargs: if userReports: if not select and userKey == 'all': pageMessage = _getMain().getPageMessageForWhom(forWhom, showDate=useDate) else: pageMessage = _getMain().getPageMessageForWhom(kwarg['userKey'], showDate=useDate) try: usage = _getMain().callGAPIpages(service, 'get', 'usageReports', pageMessage=pageMessage, throwReasons=[GAPI.INVALID, GAPI.INVALID_INPUT, GAPI.BAD_REQUEST, GAPI.FORBIDDEN], retryReasons=GAPI.SERVICE_NOT_AVAILABLE_RETRY_REASONS, customerId=customerId, date=useDate, parameters=parameters, **kwarg) except GAPI.badRequest: continue for entity in usage: row = {'date': useDate} if userReports: if 'userEmail' in entity['entity']: row['user'] = entity['entity']['userEmail'] if showOrgUnit: row['orgUnitPath'] = userOrgUnits.get(row['user'], _getMain().UNKNOWN) else: row['user'] = _getMain().UNKNOWN if addCSVData: row.update(addCSVData) for item in entity.get('parameters', []): if 'name' not in item: continue name = item['name'] if name == 'cros:device_version_distribution': versions = {} for version in item['msgValue']: versions[version['version_number']] = version['num_devices'] for k, v in sorted(versions.items(), reverse=True): title = f'cros:num_devices_chrome_{k}' row[title] = v else: for ptype in REPORTS_PARAMETERS_SIMPLE_TYPES: if ptype in item: if ptype != 'datetimeValue': if convertMbToGb and name.endswith('_in_mb'): name = convertReportMBtoGB(name, item) row[name] = item[ptype] else: row[name] = _getMain().formatLocalTime(item[ptype]) break else: row[name] = '' if not startUseDate: startUseDate = useDate endUseDate = useDate csvPF.WriteRowTitles(row) except GAPI.invalid as e: _getMain().stderrWarningMsg(str(e)) break except GAPI.invalidInput as e: _getMain().systemErrorExit(_getMain().GOOGLE_API_ERROR_RC, str(e)) except GAPI.forbidden as e: _getMain().accessErrorExit(None, str(e)) if startUseDate: reportName = f'{report.capitalize()} Usage Report - {startUseDate}:{endUseDate}' else: reportName = f'{report.capitalize()} Usage Report - {startDate}:{endDate} - No Data' csvPF.writeCSVfile(reportName) NL_SPACES_PATTERN = re.compile(r'\n +') DISABLED_REASON_TIME_PATTERN = re.compile(r'.*(\d{4}/\d{2}/\d{2}-\d{2}:\d{2}:\d{2})') REPORT_ALIASES_CHOICE_MAP = { 'access': 'accesstransparency', 'calendars': 'calendar', 'cloud': 'gcp', 'currents': 'gplus', 'customers': 'customer', 'domain': 'customer', 'devices': 'mobile', 'doc': 'drive', 'docs': 'drive', 'enterprisegroups': 'groupsenterprise', 'gemini': 'geminiinworkspaceapps', 'geminiforworkspace': 'geminiinworkspaceapps', 'group': 'groups', 'google+': 'gplus', 'hangoutsmeet': 'meet', 'logins': 'login', 'lookerstudio': 'datastudio', 'oauthtoken': 'token', 'tokens': 'token', 'users': 'user', } REPORT_CHOICE_MAP = { 'accessevaluation': 'access_evaluation', 'accesstransparency': 'access_transparency', 'admin': 'admin', 'admindataaction': 'admin_data_action', 'assignments': 'assignments', 'calendar': 'calendar', 'chat': 'chat', 'chrome': 'chrome', 'classroom': 'classroom', 'cloudsearch': 'cloud_search', 'contacts': 'contacts', 'contextawareaccess': 'context_aware_access', 'customer': 'customer', 'datamigration': 'data_migration', 'datastudio': 'data_studio', 'directorysync': 'directory_sync', 'drive': 'drive', 'gcp': 'gcp', 'geminiinworkspaceapps': 'gemini_in_workspace_apps', 'gmail': 'gmail', 'gplus': 'gplus', 'graduation':'graduation', 'groups': 'groups', 'groupsenterprise': 'groups_enterprise', 'jamboard': 'jamboard', 'keep': 'keep', 'ldap': 'ldap', 'login': 'login', 'meet': 'meet', 'meethardware': 'meet_hardware', 'mobile': 'mobile', 'profile': 'profile', 'rules': 'rules', 'saml': 'saml', 'takeout': 'takeout', 'tasks': 'tasks', 'token': 'token', 'usage': 'usage', 'usageparameters': 'usageparameters', 'user': 'user', 'useraccounts': 'user_accounts', 'vault': 'vault', } REPORT_ACTIVITIES_UPPERCASE_EVENTS = { 'access_transparency', 'admin', 'chrome', 'cloud_search', 'context_aware_access', 'data_migration', 'data_studio', 'directory_sync', 'gcp', 'jamboard', 'meet_hardware', 'mobile', 'profile', 'takeout', } REPORT_ACTIVITIES_FILTER_MAP = { 'applicationinfofilter': 'applicationInfoFilter', 'groupidfilter': 'groupIdFilter', 'networkinfofilter': 'networkInfoFilter', 'resourcedetailsfilter': 'resourceDetailsFilter', 'statusfilter': 'statusFilter', } REPORT_ACTIVITIES_TIME_OBJECTS = {'time'} # gam report [todrive *] # [(user all|)|(orgunit|org|ou [showorgunit])|(select )] # [userisactor] # [([start