From 213270fe12de98bc4977aa8ac24b83c08ba48a9f Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Sat, 4 Jul 2026 14:49:41 -0400 Subject: [PATCH] Phase 5c - Decompose csv_pf.py + 66 new tests --- .DS_Store | Bin 8196 -> 0 bytes pytest.ini | 2 - src/gam/util/csv_pf.py | 877 ++++++++++++++++++++--------------------- tests/conftest.py | 35 ++ tests/test_csv_pf.py | 611 ++++++++++++++++++++++++++++ 5 files changed, 1082 insertions(+), 443 deletions(-) delete mode 100644 .DS_Store delete mode 100644 pytest.ini create mode 100644 tests/test_csv_pf.py diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 697640df45ae9aadd4db6915ee12a185fdd697f9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 8196 zcmeHMU2GIp6u#fIzziMe07Vw;goQ#7vOp=7w)~jwpHlt`Y)iN0XPMm@>A-ZR?96Tp zrLi&b1q9=h#($nP8hJ3F#1~CeL?0DRFvbU>Mt#9VUsN7EGk2Dsmc|!j6z3*$&pqd! zbMKrp-??}1UB(z%3i=wxVvI479-k@|RNbZc{r)qd2%)5$D9E0I6*rS zL=lK05Je!0Koo%}0{?~x(4OrVagu$X_eOOTfhYq1%Lw@EL!2I;CPO;LX|Qxq6&wMG z`bU7EP+RdEMB^b%hIEWm>Y)N|N|c)-JYs;GlRO^kB||#KDK}>b4<86;MtDL&FguMO z57ZfwoJMsNfhYnKBjBe`8FN{l<+4M!pWiLh^=fOsgiuyKW$H9>nph$3PxU)PDNpkX zZi|-R>vLU>V`WQiExX4whB8W3t7ChbX&d>#z|;*=+1F#*x--<`6l~oMY*V~Lltd|` z#70Ky)-@&Kb@g?liTFrUU2P&>zpj3CR1)XbtlqFYbHp03os--Fgo^;1G0ve=x4g03 zVk^ImC`)Arxi?>xypg6?$TQTQbZ_5YIit)j(rb4PdZuH$dyKqGG^13yA0ppOmEsVhCRP;LRY%ab@Eil%F3Eu+iU28>xqNYa(c*!%_eSF7q|s_8=3JX$cl zV@i!E26Mz0sq#LZ-f6p|`?KiE1(C*~*_SR-)xCaCYu=z*4-J;DP~|R{oL96^an)*B z?m9vMP+b2N$aHPJtae*k*02;^;`1~%$!brL$6whR3O7~BYC7i(43LX%C~Iy})dT*t z+cekf95iftrOV;m4^v;C+ppz~&LIL|DBVKR;FFTvH)_{S&j~sy8av2~UVpe|T@QR% z5Sw4Arqpgx`Ap0xifI>30zG|Jj(xGF^l566D|oZ!{p`V@6$H8_SMhZ&fpomh?nsS&f;~vh4Z+8xA7h>;}d*}&v6ak;yZkg>-Zgi z;D#_om?y-9`NCo$A*>Wu2@OJ{uwK|8><~JIw4e!x1xFa>4hW@0+t2w@EgYj(xNK0O zh1Y(FwYT5PwIW1c(n1 zF%GR)cn-=7OZhoI8M`S}jA7BwSY;a5UMw#WDOHpyoLwd_7b!`UshmwzuM#OO6(L~d zHPve+N>B%l{mgC< z#%E#7egdO-3}bj4rwHX|@D!fLb9f#v z;3d3@*9hrv;7!8$JGh8<@c}->C43wr{%gYfPxxgVhPRi{%PD@nh2dfH00a%J4Bn1B9npZxy+p7cF(DvCf90UiOAx24)zs6E-*=byFX^c MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]): + self._todriveCSVErrorExit([Ent.USER, user], Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET) + fields = ','.join(['id', 'mimeType', 'webViewLink', 'name', 'capabilities(canEdit)']) + body = {'description': self.todrive['description']} + if body['description'] is None: + body['description'] = Cmd.QuotedArgumentList(Cmd.AllArguments()) + if not self.todrive['retaintitle']: + body['name'] = title + result = callGAPI(drive.files(), 'update', + throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, + GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR], + fileId=self.todrive['fileId'], body=body, fields=fields, supportsAllDrives=True) + entityValueList = [Ent.USER, user, Ent.DRIVE_FILE_ID, self.todrive['fileId']] + if not result['capabilities']['canEdit']: + self._todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) + if result['mimeType'] != MIMETYPE_GA_SPREADSHEET: + self._todriveCSVErrorExit(entityValueList, f'{Msg.NOT_A} {Ent.Singular(Ent.SPREADSHEET)}') + if not GC.Values[GC.TODRIVE_CLIENTACCESS]: + _, sheet = buildGAPIServiceObject(chooseSaAPI(API.SHEETSTD, API.SHEETS), user) + if sheet is None: + return + else: + sheet = buildGAPIObject(API.SHEETS) + csvFile.seek(0) + spreadsheet = None + if self.todrive['updatesheet']: + for sheetEntity in self.TDSHEET_ENTITY_MAP.values(): + if self.todrive[sheetEntity]: + entityValueList = [Ent.USER, user, Ent.SPREADSHEET, title, self.todrive[sheetEntity]['sheetType'], self.todrive[sheetEntity]['sheetValue']] + if spreadsheet is None: + spreadsheet = callGAPI(sheet.spreadsheets(), 'get', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=self.todrive['fileId'], + fields='spreadsheetUrl,sheets(properties(sheetId,title),protectedRanges(range(sheetId),requestingUserCanEdit))') + sheetId = getSheetIdFromSheetEntity(spreadsheet, self.todrive[sheetEntity]) + if sheetId is None: + if ((sheetEntity != 'sheetEntity') or (self.todrive[sheetEntity]['sheetType'] == Ent.SHEET_ID)): + self._todriveCSVErrorExit(entityValueList, Msg.NOT_FOUND) + self.todrive['addsheet'] = True + else: + if protectedSheetId(spreadsheet, sheetId): + self._todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) + self.todrive[sheetEntity]['sheetId'] = sheetId + if self.todrive['addsheet']: + body = {'requests': [{'addSheet': {'properties': {'title': sheetTitle, 'sheetType': 'GRID'}}}]} + try: + addresult = callGAPI(sheet.spreadsheets(), 'batchUpdate', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=self.todrive['fileId'], body=body) + self.todrive['sheetEntity'] = {'sheetId': addresult['replies'][0]['addSheet']['properties']['sheetId']} + except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, + GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.badRequest, + GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: + self._todriveCSVErrorExit(entityValueList, str(e)) + body = {'requests': []} + if not self.todrive['addsheet']: + if self.todrive['backupSheetEntity']: + body['requests'].append({'copyPaste': {'source': {'sheetId': self.todrive['sheetEntity']['sheetId']}, + 'destination': {'sheetId': self.todrive['backupSheetEntity']['sheetId']}, 'pasteType': 'PASTE_NORMAL'}}) + if self.todrive['clearfilter']: + body['requests'].append({'clearBasicFilter': {'sheetId': self.todrive['sheetEntity']['sheetId']}}) + if self.todrive['sheettitle']: + body['requests'].append({'updateSheetProperties': + {'properties': {'sheetId': self.todrive['sheetEntity']['sheetId'], 'title': sheetTitle}, 'fields': 'title'}}) + body['requests'].append({'updateCells': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, 'fields': '*'}}) + if self.todrive['cellwrap']: + body['requests'].append({'repeatCell': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, + 'fields': 'userEnteredFormat.wrapStrategy', + 'cell': {'userEnteredFormat': {'wrapStrategy': self.todrive['cellwrap']}}}}) + if self.todrive['cellnumberformat']: + body['requests'].append({'repeatCell': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, + 'fields': 'userEnteredFormat.numberFormat', + 'cell': {'userEnteredFormat': {'numberFormat': {'type': self.todrive['cellnumberformat']}}}}}) + body['requests'].append({'pasteData': {'coordinate': {'sheetId': self.todrive['sheetEntity']['sheetId'], 'rowIndex': '0', 'columnIndex': '0'}, + 'data': csvFile.read(), 'type': 'PASTE_NORMAL', 'delimiter': self.columnDelimiter}}) + if self.todrive['copySheetEntity']: + body['requests'].append({'copyPaste': {'source': {'sheetId': self.todrive['sheetEntity']['sheetId']}, + 'destination': {'sheetId': self.todrive['copySheetEntity']['sheetId']}, 'pasteType': 'PASTE_NORMAL'}}) + try: + callGAPI(sheet.spreadsheets(), 'batchUpdate', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=self.todrive['fileId'], body=body) + except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, + GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest, + GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: + self._todriveCSVErrorExit(entityValueList, str(e)) + closeFile(csvFile) +# Create/update file + else: + if GC.Values[GC.TODRIVE_CONVERSION]: result = callGAPI(drive.about(), 'get', throwReasons=GAPI.DRIVE_USER_THROW_REASONS, fields='maxImportSizes') - if numRows*numColumns > MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]): - todriveCSVErrorExit([Ent.USER, user], Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET) - fields = ','.join(['id', 'mimeType', 'webViewLink', 'name', 'capabilities(canEdit)']) - body = {'description': self.todrive['description']} - if body['description'] is None: - body['description'] = Cmd.QuotedArgumentList(Cmd.AllArguments()) - if not self.todrive['retaintitle']: - body['name'] = title - result = callGAPI(drive.files(), 'update', - throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, - GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR], - fileId=self.todrive['fileId'], body=body, fields=fields, supportsAllDrives=True) - entityValueList = [Ent.USER, user, Ent.DRIVE_FILE_ID, self.todrive['fileId']] - if not result['capabilities']['canEdit']: - todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) - if result['mimeType'] != MIMETYPE_GA_SPREADSHEET: - todriveCSVErrorExit(entityValueList, f'{Msg.NOT_A} {Ent.Singular(Ent.SPREADSHEET)}') + if numRows*len(titlesList) > MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]): + printKeyValueList([WARNING, Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET]) + mimeType = 'text/csv' + else: + mimeType = MIMETYPE_GA_SPREADSHEET + else: + mimeType = 'text/csv' + fields = ','.join(['id', 'mimeType', 'webViewLink']) + body = {'description': self.todrive['description'], 'mimeType': mimeType} + if body['description'] is None: + body['description'] = Cmd.QuotedArgumentList(Cmd.AllArguments()) + if not self.todrive['fileId'] or not self.todrive['retaintitle']: + body['name'] = title + try: + if not self.todrive['fileId']: + Act.Set(Act.CREATE) + body['parents'] = [self.todrive['parentId']] + result = callGAPI(drive.files(), 'create', + bailOnInternalError=True, + throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.FORBIDDEN, GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, + GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR, GAPI.INTERNAL_ERROR, GAPI.STORAGE_QUOTA_EXCEEDED, + GAPI.TEAMDRIVE_FILE_LIMIT_EXCEEDED, GAPI.TEAMDRIVE_HIERARCHY_TOO_DEEP], + body=body, + media_body=googleapiclient.http.MediaIoBaseUpload(io.BytesIO(csvFile.getvalue().encode()), mimetype='text/csv', resumable=True), + fields=fields, supportsAllDrives=True) + else: + Act.Set(Act.UPDATE) + result = callGAPI(drive.files(), 'update', + bailOnInternalError=True, + throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, + GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR, GAPI.INTERNAL_ERROR], + fileId=self.todrive['fileId'], + body=body, + media_body=googleapiclient.http.MediaIoBaseUpload(io.BytesIO(csvFile.getvalue().encode()), mimetype='text/csv', resumable=True), + fields=fields, supportsAllDrives=True) + spreadsheetId = result['id'] + except GAPI.internalError as e: + entityActionFailedWarning([Ent.DRIVE_FILE, body['name']], Msg.UPLOAD_CSV_FILE_INTERNAL_ERROR.format(str(e), str(numRows))) + closeFile(csvFile) + return + closeFile(csvFile) + if not self.todrive['fileId'] and self.todrive['share']: + Act.Set(Act.SHARE) + for share in self.todrive['share']: + if share['emailAddress'] != user: + try: + callGAPI(drive.permissions(), 'create', + bailOnInternalError=True, + throwReasons=GAPI.DRIVE_ACCESS_THROW_REASONS+GAPI.DRIVE3_CREATE_ACL_THROW_REASONS, + fileId=spreadsheetId, sendNotificationEmail=False, body=share, fields='', supportsAllDrives=True) + entityActionPerformed([Ent.USER, user, Ent.SPREADSHEET, title, + Ent.TARGET_USER, share['emailAddress'], Ent.ROLE, share['role']]) + except (GAPI.badRequest, GAPI.invalid, GAPI.fileNotFound, GAPI.forbidden, GAPI.internalError, + GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.unknownError, GAPI.ownershipChangeAcrossDomainNotPermitted, + GAPI.teamDriveDomainUsersOnlyRestriction, GAPI.teamDriveTeamMembersOnlyRestriction, + GAPI.targetUserRoleLimitedByLicenseRestriction, GAPI.insufficientAdministratorPrivileges, GAPI.sharingRateLimitExceeded, + GAPI.publishOutNotPermitted, GAPI.shareInNotPermitted, GAPI.shareOutNotPermitted, GAPI.shareOutNotPermittedToUser, + GAPI.cannotShareTeamDriveTopFolderWithAnyoneOrDomains, GAPI.cannotShareTeamDriveWithNonGoogleAccounts, + GAPI.ownerOnTeamDriveItemNotSupported, + GAPI.organizerOnNonTeamDriveNotSupported, GAPI.organizerOnNonTeamDriveItemNotSupported, + GAPI.fileOrganizerNotYetEnabledForThisTeamDrive, + GAPI.fileOrganizerOnFoldersInSharedDriveOnly, + GAPI.fileOrganizerOnNonTeamDriveNotSupported, + GAPI.cannotModifyInheritedPermission, + GAPI.teamDrivesFolderSharingNotSupported, GAPI.invalidLinkVisibility, + GAPI.invalidSharingRequest, GAPI.fileNeverWritable, GAPI.abusiveContentRestriction) as e: + entityActionFailedWarning([Ent.USER, user, Ent.SPREADSHEET, title, + Ent.TARGET_USER, share['emailAddress'], Ent.ROLE, share['role']], + str(e)) + if ((result['mimeType'] == MIMETYPE_GA_SPREADSHEET) and + (self.todrive['sheetEntity'] or self.todrive['locale'] or self.todrive['timeZone'] or + self.todrive['sheettitle'] or self.todrive['cellwrap'] or self.todrive['cellnumberformat'])): if not GC.Values[GC.TODRIVE_CLIENTACCESS]: _, sheet = buildGAPIServiceObject(chooseSaAPI(API.SHEETSTD, API.SHEETS), user) if sheet is None: return else: sheet = buildGAPIObject(API.SHEETS) - csvFile.seek(0) - spreadsheet = None - if self.todrive['updatesheet']: - for sheetEntity in self.TDSHEET_ENTITY_MAP.values(): - if self.todrive[sheetEntity]: - entityValueList = [Ent.USER, user, Ent.SPREADSHEET, title, self.todrive[sheetEntity]['sheetType'], self.todrive[sheetEntity]['sheetValue']] - if spreadsheet is None: - spreadsheet = callGAPI(sheet.spreadsheets(), 'get', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=self.todrive['fileId'], - fields='spreadsheetUrl,sheets(properties(sheetId,title),protectedRanges(range(sheetId),requestingUserCanEdit))') - sheetId = getSheetIdFromSheetEntity(spreadsheet, self.todrive[sheetEntity]) - if sheetId is None: - if ((sheetEntity != 'sheetEntity') or (self.todrive[sheetEntity]['sheetType'] == Ent.SHEET_ID)): - todriveCSVErrorExit(entityValueList, Msg.NOT_FOUND) - self.todrive['addsheet'] = True - else: - if protectedSheetId(spreadsheet, sheetId): - todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) - self.todrive[sheetEntity]['sheetId'] = sheetId - if self.todrive['addsheet']: - body = {'requests': [{'addSheet': {'properties': {'title': sheetTitle, 'sheetType': 'GRID'}}}]} - try: - addresult = callGAPI(sheet.spreadsheets(), 'batchUpdate', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=self.todrive['fileId'], body=body) - self.todrive['sheetEntity'] = {'sheetId': addresult['replies'][0]['addSheet']['properties']['sheetId']} - except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, - GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.badRequest, - GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: - todriveCSVErrorExit(entityValueList, str(e)) - body = {'requests': []} - if not self.todrive['addsheet']: - if self.todrive['backupSheetEntity']: - body['requests'].append({'copyPaste': {'source': {'sheetId': self.todrive['sheetEntity']['sheetId']}, - 'destination': {'sheetId': self.todrive['backupSheetEntity']['sheetId']}, 'pasteType': 'PASTE_NORMAL'}}) - if self.todrive['clearfilter']: - body['requests'].append({'clearBasicFilter': {'sheetId': self.todrive['sheetEntity']['sheetId']}}) - if self.todrive['sheettitle']: - body['requests'].append({'updateSheetProperties': - {'properties': {'sheetId': self.todrive['sheetEntity']['sheetId'], 'title': sheetTitle}, 'fields': 'title'}}) - body['requests'].append({'updateCells': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, 'fields': '*'}}) - if self.todrive['cellwrap']: - body['requests'].append({'repeatCell': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, - 'fields': 'userEnteredFormat.wrapStrategy', - 'cell': {'userEnteredFormat': {'wrapStrategy': self.todrive['cellwrap']}}}}) - if self.todrive['cellnumberformat']: - body['requests'].append({'repeatCell': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, - 'fields': 'userEnteredFormat.numberFormat', - 'cell': {'userEnteredFormat': {'numberFormat': {'type': self.todrive['cellnumberformat']}}}}}) - body['requests'].append({'pasteData': {'coordinate': {'sheetId': self.todrive['sheetEntity']['sheetId'], 'rowIndex': '0', 'columnIndex': '0'}, - 'data': csvFile.read(), 'type': 'PASTE_NORMAL', 'delimiter': self.columnDelimiter}}) - if self.todrive['copySheetEntity']: - body['requests'].append({'copyPaste': {'source': {'sheetId': self.todrive['sheetEntity']['sheetId']}, - 'destination': {'sheetId': self.todrive['copySheetEntity']['sheetId']}, 'pasteType': 'PASTE_NORMAL'}}) try: - callGAPI(sheet.spreadsheets(), 'batchUpdate', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=self.todrive['fileId'], body=body) + body = {'requests': []} + if self.todrive['sheetEntity'] or self.todrive['sheettitle'] or self.todrive['cellwrap']: + spreadsheet = callGAPI(sheet.spreadsheets(), 'get', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=spreadsheetId, fields='sheets/properties') + spreadsheet['sheets'][0]['properties']['title'] = sheetTitle + body['requests'].append({'updateSheetProperties': + {'properties': spreadsheet['sheets'][0]['properties'], 'fields': 'title'}}) + if self.todrive['cellwrap']: + body['requests'].append({'repeatCell': {'range': {'sheetId': spreadsheet['sheets'][0]['properties']['sheetId']}, + 'fields': 'userEnteredFormat.wrapStrategy', + 'cell': {'userEnteredFormat': {'wrapStrategy': self.todrive['cellwrap']}}}}) + if self.todrive['locale']: + body['requests'].append({'updateSpreadsheetProperties': + {'properties': {'locale': self.todrive['locale']}, 'fields': 'locale'}}) + if self.todrive['timeZone']: + body['requests'].append({'updateSpreadsheetProperties': + {'properties': {'timeZone': self.todrive['timeZone']}, 'fields': 'timeZone'}}) + if body['requests']: + callGAPI(sheet.spreadsheets(), 'batchUpdate', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=spreadsheetId, body=body) except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest, - GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: - todriveCSVErrorExit(entityValueList, str(e)) - closeFile(csvFile) -# Create/update file + GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition, + GAPI.teamDriveFileLimitExceeded, GAPI.teamDriveHierarchyTooDeep) as e: + self._todriveCSVErrorExit([Ent.USER, user, Ent.SPREADSHEET, title], str(e)) + Act.Set(action) + file_url = result['webViewLink'] + msg_txt = f'{Msg.DATA_UPLOADED_TO_DRIVE_FILE}:\n{file_url}' + if not self.todrive['returnidonly']: + printKeyValueList([msg_txt]) + else: + if self.todrive['fileId']: + writeStdout(f'{self.todrive["fileId"]}\n') else: - if GC.Values[GC.TODRIVE_CONVERSION]: - result = callGAPI(drive.about(), 'get', - throwReasons=GAPI.DRIVE_USER_THROW_REASONS, - fields='maxImportSizes') - if numRows*len(titlesList) > MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]): - printKeyValueList([WARNING, Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET]) - mimeType = 'text/csv' - else: - mimeType = MIMETYPE_GA_SPREADSHEET - else: - mimeType = 'text/csv' - fields = ','.join(['id', 'mimeType', 'webViewLink']) - body = {'description': self.todrive['description'], 'mimeType': mimeType} - if body['description'] is None: - body['description'] = Cmd.QuotedArgumentList(Cmd.AllArguments()) - if not self.todrive['fileId'] or not self.todrive['retaintitle']: - body['name'] = title - try: - if not self.todrive['fileId']: - Act.Set(Act.CREATE) - body['parents'] = [self.todrive['parentId']] - result = callGAPI(drive.files(), 'create', - bailOnInternalError=True, - throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.FORBIDDEN, GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, - GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR, GAPI.INTERNAL_ERROR, GAPI.STORAGE_QUOTA_EXCEEDED, - GAPI.TEAMDRIVE_FILE_LIMIT_EXCEEDED, GAPI.TEAMDRIVE_HIERARCHY_TOO_DEEP], - body=body, - media_body=googleapiclient.http.MediaIoBaseUpload(io.BytesIO(csvFile.getvalue().encode()), mimetype='text/csv', resumable=True), - fields=fields, supportsAllDrives=True) - else: - Act.Set(Act.UPDATE) - result = callGAPI(drive.files(), 'update', - bailOnInternalError=True, - throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, - GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR, GAPI.INTERNAL_ERROR], - fileId=self.todrive['fileId'], - body=body, - media_body=googleapiclient.http.MediaIoBaseUpload(io.BytesIO(csvFile.getvalue().encode()), mimetype='text/csv', resumable=True), - fields=fields, supportsAllDrives=True) - spreadsheetId = result['id'] - except GAPI.internalError as e: - entityActionFailedWarning([Ent.DRIVE_FILE, body['name']], Msg.UPLOAD_CSV_FILE_INTERNAL_ERROR.format(str(e), str(numRows))) - closeFile(csvFile) - return - closeFile(csvFile) - if not self.todrive['fileId'] and self.todrive['share']: - Act.Set(Act.SHARE) - for share in self.todrive['share']: - if share['emailAddress'] != user: - try: - callGAPI(drive.permissions(), 'create', - bailOnInternalError=True, - throwReasons=GAPI.DRIVE_ACCESS_THROW_REASONS+GAPI.DRIVE3_CREATE_ACL_THROW_REASONS, - fileId=spreadsheetId, sendNotificationEmail=False, body=share, fields='', supportsAllDrives=True) - entityActionPerformed([Ent.USER, user, Ent.SPREADSHEET, title, - Ent.TARGET_USER, share['emailAddress'], Ent.ROLE, share['role']]) - except (GAPI.badRequest, GAPI.invalid, GAPI.fileNotFound, GAPI.forbidden, GAPI.internalError, - GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.unknownError, GAPI.ownershipChangeAcrossDomainNotPermitted, - GAPI.teamDriveDomainUsersOnlyRestriction, GAPI.teamDriveTeamMembersOnlyRestriction, - GAPI.targetUserRoleLimitedByLicenseRestriction, GAPI.insufficientAdministratorPrivileges, GAPI.sharingRateLimitExceeded, - GAPI.publishOutNotPermitted, GAPI.shareInNotPermitted, GAPI.shareOutNotPermitted, GAPI.shareOutNotPermittedToUser, - GAPI.cannotShareTeamDriveTopFolderWithAnyoneOrDomains, GAPI.cannotShareTeamDriveWithNonGoogleAccounts, - GAPI.ownerOnTeamDriveItemNotSupported, - GAPI.organizerOnNonTeamDriveNotSupported, GAPI.organizerOnNonTeamDriveItemNotSupported, - GAPI.fileOrganizerNotYetEnabledForThisTeamDrive, - GAPI.fileOrganizerOnFoldersInSharedDriveOnly, - GAPI.fileOrganizerOnNonTeamDriveNotSupported, - GAPI.cannotModifyInheritedPermission, - GAPI.teamDrivesFolderSharingNotSupported, GAPI.invalidLinkVisibility, - GAPI.invalidSharingRequest, GAPI.fileNeverWritable, GAPI.abusiveContentRestriction) as e: - entityActionFailedWarning([Ent.USER, user, Ent.SPREADSHEET, title, - Ent.TARGET_USER, share['emailAddress'], Ent.ROLE, share['role']], - str(e)) - if ((result['mimeType'] == MIMETYPE_GA_SPREADSHEET) and - (self.todrive['sheetEntity'] or self.todrive['locale'] or self.todrive['timeZone'] or - self.todrive['sheettitle'] or self.todrive['cellwrap'] or self.todrive['cellnumberformat'])): - if not GC.Values[GC.TODRIVE_CLIENTACCESS]: - _, sheet = buildGAPIServiceObject(chooseSaAPI(API.SHEETSTD, API.SHEETS), user) - if sheet is None: - return - else: - sheet = buildGAPIObject(API.SHEETS) - try: - body = {'requests': []} - if self.todrive['sheetEntity'] or self.todrive['sheettitle'] or self.todrive['cellwrap']: - spreadsheet = callGAPI(sheet.spreadsheets(), 'get', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=spreadsheetId, fields='sheets/properties') - spreadsheet['sheets'][0]['properties']['title'] = sheetTitle - body['requests'].append({'updateSheetProperties': - {'properties': spreadsheet['sheets'][0]['properties'], 'fields': 'title'}}) - if self.todrive['cellwrap']: - body['requests'].append({'repeatCell': {'range': {'sheetId': spreadsheet['sheets'][0]['properties']['sheetId']}, - 'fields': 'userEnteredFormat.wrapStrategy', - 'cell': {'userEnteredFormat': {'wrapStrategy': self.todrive['cellwrap']}}}}) - if self.todrive['locale']: - body['requests'].append({'updateSpreadsheetProperties': - {'properties': {'locale': self.todrive['locale']}, 'fields': 'locale'}}) - if self.todrive['timeZone']: - body['requests'].append({'updateSpreadsheetProperties': - {'properties': {'timeZone': self.todrive['timeZone']}, 'fields': 'timeZone'}}) - if body['requests']: - callGAPI(sheet.spreadsheets(), 'batchUpdate', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=spreadsheetId, body=body) - except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, - GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest, - GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition, - GAPI.teamDriveFileLimitExceeded, GAPI.teamDriveHierarchyTooDeep) as e: - todriveCSVErrorExit([Ent.USER, user, Ent.SPREADSHEET, title], str(e)) - Act.Set(action) - file_url = result['webViewLink'] - msg_txt = f'{Msg.DATA_UPLOADED_TO_DRIVE_FILE}:\n{file_url}' - if not self.todrive['returnidonly']: - printKeyValueList([msg_txt]) - else: - if self.todrive['fileId']: - writeStdout(f'{self.todrive["fileId"]}\n') - else: - writeStdout(f'{spreadsheetId}\n') - if not self.todrive['subject']: - subject = title - else: - subject = self.todrive['subject'].replace('#file#', title).replace('#sheet#', sheetTitle) - if not self.todrive['noemail']: - send_email(subject, msg_txt, user, clientAccess=GC.Values[GC.TODRIVE_CLIENTACCESS], msgFrom=self.todrive['from']) - if self.todrive['notify']: - for recipient in self.todrive['share']+self.todrive['alert']: - if recipient['emailAddress'] != user: - send_email(subject, msg_txt, recipient['emailAddress'], clientAccess=GC.Values[GC.TODRIVE_CLIENTACCESS], msgFrom=self.todrive['from']) - if not self.todrive['nobrowser']: - webbrowser.open(file_url) - except (GAPI.forbidden, GAPI.insufficientPermissions): - printWarningMessage(INSUFFICIENT_PERMISSIONS_RC, Msg.INSUFFICIENT_PERMISSIONS_TO_PERFORM_TASK) - except (GAPI.fileNotFound, GAPI.unknownError, GAPI.internalError, GAPI.storageQuotaExceeded) as e: - if not self.todrive['fileId']: - entityActionFailedWarning([Ent.DRIVE_FOLDER, self.todrive['parentId']], str(e)) - else: - entityActionFailedWarning([Ent.DRIVE_FILE, self.todrive['fileId']], str(e)) - except (GAPI.serviceNotAvailable, GAPI.authError, GAPI.domainPolicy) as e: - userDriveServiceNotEnabledWarning(user, str(e), 0, 0) - else: - closeFile(csvFile) + writeStdout(f'{spreadsheetId}\n') + if not self.todrive['subject']: + subject = title + else: + subject = self.todrive['subject'].replace('#file#', title).replace('#sheet#', sheetTitle) + if not self.todrive['noemail']: + send_email(subject, msg_txt, user, clientAccess=GC.Values[GC.TODRIVE_CLIENTACCESS], msgFrom=self.todrive['from']) + if self.todrive['notify']: + for recipient in self.todrive['share']+self.todrive['alert']: + if recipient['emailAddress'] != user: + send_email(subject, msg_txt, recipient['emailAddress'], clientAccess=GC.Values[GC.TODRIVE_CLIENTACCESS], msgFrom=self.todrive['from']) + if not self.todrive['nobrowser']: + webbrowser.open(file_url) + except (GAPI.forbidden, GAPI.insufficientPermissions): + printWarningMessage(INSUFFICIENT_PERMISSIONS_RC, Msg.INSUFFICIENT_PERMISSIONS_TO_PERFORM_TASK) + except (GAPI.fileNotFound, GAPI.unknownError, GAPI.internalError, GAPI.storageQuotaExceeded) as e: + if not self.todrive['fileId']: + entityActionFailedWarning([Ent.DRIVE_FOLDER, self.todrive['parentId']], str(e)) + else: + entityActionFailedWarning([Ent.DRIVE_FILE, self.todrive['fileId']], str(e)) + except (GAPI.serviceNotAvailable, GAPI.authError, GAPI.domainPolicy) as e: + userDriveServiceNotEnabledWarning(user, str(e), 0, 0) + else: + closeFile(csvFile) - if GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE] is not None: - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_NAME, list_type)) - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_TODRIVE, self.todrive)) - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CSVPF, - (self.titlesList, self.sortTitlesList, self.indexedTitles, - self.formatJSON, self.JSONtitlesList, - self.columnDelimiter, self.noEscapeChar, self.quoteChar, - self.sortHeaders, self.timestampColumn, - self.fixPaths, - self.mapNodataFields, - self.nodataFields, - self.driveListFields, - self.driveSubfieldsChoiceMap, - self.oneItemPerRow, - self.showPermissionsLast, - self.zeroBlankMimeTypeCounts))) - if clearRowFilters: - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CLEAR_ROW_FILTERS, clearRowFilters)) - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_DATA, self.rows)) - return + def _prepareHeaders(self, clearRowFilters): if self.zeroBlankMimeTypeCounts: self.ZeroBlankMimeTypeCounts() if not clearRowFilters and (self.rowFilter or self.rowDropFilter): @@ -1730,7 +1700,32 @@ class CSVPrintFile(): if self.headerOrder: self.JSONtitlesList = self.orderHeaders(self.JSONtitlesList) titlesList = self.JSONtitlesList - normalizeSortHeaders() + self._normalizeSortHeaders(titlesList) + return titlesList, extrasaction + + def writeCSVfile(self, list_type, clearRowFilters=False): + + if GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE] is not None: + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_NAME, list_type)) + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_TODRIVE, self.todrive)) + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CSVPF, + (self.titlesList, self.sortTitlesList, self.indexedTitles, + self.formatJSON, self.JSONtitlesList, + self.columnDelimiter, self.noEscapeChar, self.quoteChar, + self.sortHeaders, self.timestampColumn, + self.fixPaths, + self.mapNodataFields, + self.nodataFields, + self.driveListFields, + self.driveSubfieldsChoiceMap, + self.oneItemPerRow, + self.showPermissionsLast, + self.zeroBlankMimeTypeCounts))) + if clearRowFilters: + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CLEAR_ROW_FILTERS, clearRowFilters)) + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_DATA, self.rows)) + return + titlesList, extrasaction = self._prepareHeaders(clearRowFilters) if self.outputTranspose: newRows = [] newTitlesList = list(range(len(self.rows) + 1)) @@ -1746,14 +1741,14 @@ class CSVPrintFile(): if (not self.todrive) or self.todrive['localcopy']: if GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME] == '-': if GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD]: - writeCSVToStdout() + self._writeCSVToStdout(titlesList, extrasaction) else: GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME] = GM.Globals[GM.STDOUT][GM.REDIRECT_NAME] - writeCSVToFile() + self._writeCSVToFile(titlesList, extrasaction) else: - writeCSVToFile() + self._writeCSVToFile(titlesList, extrasaction) if self.todrive: - writeCSVToDrive() + self._writeCSVToDrive(list_type, titlesList, extrasaction) if GM.Globals[GM.CSVFILE][GM.REDIRECT_MODE] == DEFAULT_FILE_APPEND_MODE: GM.Globals[GM.CSVFILE][GM.REDIRECT_WRITE_HEADER] = False diff --git a/tests/conftest.py b/tests/conftest.py index e73559f9..8760614f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,10 +5,16 @@ gam package directory on sys.path. We add it in a fixture (not at module level) to avoid shadowing Python's stdlib 'cmd' module during pytest configuration. """ +# Make pytest discover this directory as the test root without needing +# a pytest.ini at the repo root. Running `pytest` or `python -m pytest` +# from anywhere in the repo will find tests here. +collect_ignore_glob = [] # nothing to ignore; marker for rootdir detection + import os import sys import pytest +import arrow @pytest.fixture(autouse=True) @@ -46,6 +52,25 @@ def _gam_path_and_globals(): GC.Values.setdefault(GC.CUSTOMER_ID, 'C00000000') GC.Values.setdefault(GC.TIMEZONE, 'UTC') GC.Values.setdefault(GC.SHOW_COUNTS_MIN, 0) + # CSV-related defaults needed by CSVPrintFile + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_FORCE, []) + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_REQUIRED, []) + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_ORDER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_FILTER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_DROP_FILTER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_FILTER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_FILTER_MODE, 'allmatch') + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_DROP_FILTER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_DROP_FILTER_MODE, 'anymatch') + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_LIMIT, 0) + GC.Values.setdefault(GC.CSV_OUTPUT_COLUMN_DELIMITER, ',') + GC.Values.setdefault(GC.CSV_OUTPUT_QUOTE_CHAR, '"') + GC.Values.setdefault(GC.CSV_OUTPUT_NO_ESCAPE_CHAR, False) + GC.Values.setdefault(GC.CSV_OUTPUT_SORT_HEADERS, []) + GC.Values.setdefault(GC.CSV_OUTPUT_TIMESTAMP_COLUMN, '') + GC.Values.setdefault(GC.CSV_OUTPUT_LINE_TERMINATOR, '\n') + GC.Values.setdefault(GC.NEVER_TIME, 'Never') + GC.Values.setdefault(GC.OUTPUT_TIMEFORMAT, '') # Ensure Globals dict exists if not GM.Globals: @@ -53,6 +78,16 @@ def _gam_path_and_globals(): GM.Globals.setdefault(GM.STDOUT, None) GM.Globals.setdefault(GM.STDERR, None) GM.Globals.setdefault(GM.SYSEXITRC, 0) + # CSV-related globals needed by CSVPrintFile + GM.Globals.setdefault(GM.CSV_OUTPUT_TRANSPOSE, False) + GM.Globals.setdefault(GM.CSV_TODRIVE, {}) + GM.Globals.setdefault(GM.CSV_OUTPUT_COLUMN_DELIMITER, None) + GM.Globals.setdefault(GM.CSV_OUTPUT_QUOTE_CHAR, None) + GM.Globals.setdefault(GM.CSV_OUTPUT_NO_ESCAPE_CHAR, None) + GM.Globals.setdefault(GM.CSV_OUTPUT_SORT_HEADERS, []) + GM.Globals.setdefault(GM.CSV_OUTPUT_TIMESTAMP_COLUMN, '') + if not GM.Globals.get(GM.DATETIME_NOW): + GM.Globals[GM.DATETIME_NOW] = arrow.now('UTC') yield diff --git a/tests/test_csv_pf.py b/tests/test_csv_pf.py new file mode 100644 index 00000000..d681ee3a --- /dev/null +++ b/tests/test_csv_pf.py @@ -0,0 +1,611 @@ +"""Unit tests for gam.util.csv_pf — RowFilterMatch and CSV output functions. + +Tests cover all filter types: regex, boolean, count, date, time, length, +text, data, ranges, and their negations/combinations. Also covers +CSVPrintFile header processing, title management, and output formatting. +""" + +import re + +import pytest +import arrow + + +# --------------------------------------------------------------------------- +# RowFilterMatch tests +# --------------------------------------------------------------------------- + +class TestRowFilterMatchRegex: + """Test regex and notregex filter types.""" + + def test_regex_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'John Smith', 'email': 'john@example.com'} + titlesList = ['name', 'email'] + # filterVal: (columnPat, anyMatch, filterType, ...) + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'regex', re.compile('John', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_regex_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'Jane Doe'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'regex', re.compile('^John$', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_notregex_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'Jane Doe'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'notregex', re.compile('^John$', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_notregex_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'John Smith'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'notregex', re.compile('John', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_regex_case_sensitive(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'john smith'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + # Case-sensitive regex should NOT match lowercase + rowFilter = [(columnPat, True, 'regex', re.compile('^John', 0))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_regex_wildcard_column(self): + """Regex column pattern matching multiple columns.""" + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'no', 'email': 'yes@match.com'} + titlesList = ['name', 'email'] + # anyMatch=True: at least one column must match + columnPat = re.compile('.*', re.IGNORECASE) + rowFilter = [(columnPat, True, 'regex', re.compile('match', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + +class TestRowFilterMatchBoolean: + """Test boolean filter type.""" + + def test_boolean_true_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'active': 'True'} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', True)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_boolean_true_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'active': 'False'} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', True)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_boolean_false_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'active': 'False'} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', False)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_boolean_native_bool(self): + from gam.util.csv_pf import RowFilterMatch + row = {'active': True} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', True)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_boolean_blank_is_false(self): + """Blank string should be treated as False.""" + from gam.util.csv_pf import RowFilterMatch + row = {'active': ''} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', False)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + +class TestRowFilterMatchCount: + """Test count/number filter types.""" + + def test_count_equals(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '5'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_greater(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '10'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '>', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_less(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '3'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '<', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_not_equal(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '3'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '!=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_gte(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '5'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '>=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_lte(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '5'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '<=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_blank_is_zero(self): + """Blank string count should be treated as 0.""" + from gam.util.csv_pf import RowFilterMatch + row = {'count': ''} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '=', 0)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_non_digit_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': 'abc'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '=', 0)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_count_native_int(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': 5} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_number_alias(self): + """'number' should work the same as 'count'.""" + from gam.util.csv_pf import RowFilterMatch + row = {'val': '42'} + titlesList = ['val'] + columnPat = re.compile('^val$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'number', '=', 42)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + +class TestRowFilterMatchCountRange: + """Test countrange/numberrange filter types.""" + + def test_countrange_in_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '5'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'countrange', '=', 1, 10)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_countrange_out_of_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '15'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'countrange', '=', 1, 10)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_countrange_not_equal(self): + """countrange with != should return True when value IS outside range.""" + from gam.util.csv_pf import RowFilterMatch + row = {'count': '15'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'countrange', '!=', 1, 10)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_countrange_boundary(self): + """Boundary values should be inclusive.""" + from gam.util.csv_pf import RowFilterMatch + row = {'count': '10'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'countrange', '=', 1, 10)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + +class TestRowFilterMatchLength: + """Test length and lengthrange filter types.""" + + def test_length_equals(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'hello'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'length', '=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_length_greater(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'hello world'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'length', '>', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_lengthrange_in_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'hello'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'lengthrange', '=', 3, 8)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_length_non_string_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 12345} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'length', '=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +class TestRowFilterMatchText: + """Test text and textrange filter types.""" + + def test_text_equals(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'active'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'text', '=', 'active')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_text_not_equal(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'inactive'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'text', '!=', 'active')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_text_greater(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'b'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'text', '>', 'a')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_textrange_in_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'banana'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'textrange', '=', 'apple', 'cherry')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_textrange_out_of_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'zebra'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'textrange', '=', 'apple', 'cherry')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +class TestRowFilterMatchData: + """Test data and notdata filter types.""" + + def test_data_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'role': 'admin'} + titlesList = ['role'] + columnPat = re.compile('^role$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'data', {'admin', 'owner', 'editor'})] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_data_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'role': 'viewer'} + titlesList = ['role'] + columnPat = re.compile('^role$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'data', {'admin', 'owner', 'editor'})] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_notdata_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'role': 'viewer'} + titlesList = ['role'] + columnPat = re.compile('^role$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'notdata', {'admin', 'owner', 'editor'})] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_notdata_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'role': 'admin'} + titlesList = ['role'] + columnPat = re.compile('^role$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'notdata', {'admin', 'owner', 'editor'})] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +class TestRowFilterMatchDate: + """Test date and time filter types.""" + + def test_date_greater(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'created': '2025-06-15T10:30:00Z'} + titlesList = ['created'] + columnPat = re.compile('^created$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'date', '>', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_date_less(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'created': '2024-06-15T10:30:00Z'} + titlesList = ['created'] + columnPat = re.compile('^created$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'date', '<', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_time_greater(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'modified': '2025-06-15T10:30:00.000Z'} + titlesList = ['modified'] + columnPat = re.compile('^modified$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'time', '>', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_date_empty_no_match(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'created': ''} + titlesList = ['created'] + columnPat = re.compile('^created$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'date', '>', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_date_non_string_no_match(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'created': None} + titlesList = ['created'] + columnPat = re.compile('^created$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'date', '>', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +class TestRowFilterMatchModes: + """Test rowFilter mode combinations (Any vs All) and drop filters.""" + + def test_no_filters_returns_true(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'anything'} + assert RowFilterMatch(row, ['name'], [], True, [], True) is True + + def test_row_filter_mode_any(self): + """Any mode: at least one filter must match to select.""" + from gam.util.csv_pf import RowFilterMatch + row = {'a': 'yes', 'b': 'no'} + titlesList = ['a', 'b'] + f1 = (re.compile('^a$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE)) + f2 = (re.compile('^b$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE)) + # f1 matches, f2 doesn't — Any mode should select + assert RowFilterMatch(row, titlesList, [f1, f2], False, [], True) is True + + def test_row_filter_mode_all(self): + """All mode: all filters must match to select.""" + from gam.util.csv_pf import RowFilterMatch + row = {'a': 'yes', 'b': 'no'} + titlesList = ['a', 'b'] + f1 = (re.compile('^a$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE)) + f2 = (re.compile('^b$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE)) + # f1 matches, f2 doesn't — All mode should NOT select + assert RowFilterMatch(row, titlesList, [f1, f2], True, [], True) is False + + def test_drop_filter_any(self): + """Drop filter in Any mode: any match drops the row.""" + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'deleted'} + titlesList = ['status'] + dropFilter = [(re.compile('^status$', re.IGNORECASE), True, 'regex', re.compile('deleted', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, [], True, dropFilter, False) is False + + def test_drop_filter_no_match_keeps(self): + """Drop filter that doesn't match should keep the row.""" + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'active'} + titlesList = ['status'] + dropFilter = [(re.compile('^status$', re.IGNORECASE), True, 'regex', re.compile('deleted', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, [], True, dropFilter, False) is True + + def test_row_and_drop_filter_combined(self): + """Row selected by rowFilter but dropped by dropFilter.""" + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'John', 'status': 'deleted'} + titlesList = ['name', 'status'] + rowFilter = [(re.compile('^name$', re.IGNORECASE), True, 'regex', re.compile('John', re.IGNORECASE))] + dropFilter = [(re.compile('^status$', re.IGNORECASE), True, 'regex', re.compile('deleted', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, dropFilter, False) is False + + def test_column_not_in_titles(self): + """Filter column not in titles should use [None] as columns.""" + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'John'} + titlesList = ['name'] + # Filter on 'missing_col' which isn't in titlesList + rowFilter = [(re.compile('^missing_col$', re.IGNORECASE), True, 'regex', re.compile('John', re.IGNORECASE))] + # No columns match, so row.get(None, '') = '', regex doesn't match + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_any_match_across_columns(self): + """anyMatch=True across multiple columns: any column matching suffices.""" + from gam.util.csv_pf import RowFilterMatch + row = {'col1': 'no', 'col2': 'yes'} + titlesList = ['col1', 'col2'] + # Match any column starting with 'col' + columnPat = re.compile('^col', re.IGNORECASE) + rowFilter = [(columnPat, True, 'regex', re.compile('yes', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_all_match_across_columns(self): + """anyMatch=False (all mode) across columns: ALL columns must match.""" + from gam.util.csv_pf import RowFilterMatch + row = {'col1': 'yes', 'col2': 'no'} + titlesList = ['col1', 'col2'] + columnPat = re.compile('^col', re.IGNORECASE) + rowFilter = [(columnPat, False, 'regex', re.compile('yes', re.IGNORECASE))] + # col2 doesn't match — all mode fails + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +# --------------------------------------------------------------------------- +# CSVPrintFile title management tests +# --------------------------------------------------------------------------- + +class TestCSVPrintFileTitles: + """Test CSVPrintFile title management methods.""" + + def test_add_title(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitle('email') + assert 'email' in pf.titlesList + assert 'email' in pf.titlesSet + + def test_add_titles_no_duplicate(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['email']) + pf.AddTitles(['email']) + assert pf.titlesList.count('email') == 1 + + def test_add_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['email', 'name', 'role']) + assert pf.titlesList == ['email', 'name', 'role'] + + def test_remove_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['email', 'name', 'role']) + pf.RemoveTitles(['name']) + assert 'name' not in pf.titlesList + assert 'name' not in pf.titlesSet + + def test_set_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['a', 'b', 'c']) + pf.SetTitles(['x', 'y']) + assert pf.titlesList == ['x', 'y'] + assert pf.titlesSet == {'x', 'y'} + + def test_insert_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['b', 'c']) + pf.InsertTitles(0, ['a']) + assert pf.titlesList[0] == 'a' + + def test_move_titles_to_end(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['a', 'b', 'c']) + pf.MoveTitlesToEnd(['a']) + assert pf.titlesList == ['b', 'c', 'a'] + + def test_add_sort_title(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddSortTitle('email') + assert 'email' in pf.sortTitlesList + + def test_add_sort_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddSortTitles(['email', 'name']) + assert pf.sortTitlesList == ['email', 'name'] + + +class TestCSVPrintFileRows: + """Test CSVPrintFile row management.""" + + def test_write_row_no_filter(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['name', 'email']) + pf.WriteRowNoFilter({'name': 'John', 'email': 'john@example.com'}) + assert len(pf.rows) == 1 + assert pf.rows[0]['name'] == 'John' + + def test_append_row(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['name']) + pf.AppendRow({'name': 'test'}) + assert len(pf.rows) == 1 + + def test_header_filter_match_no_filter(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + # No filter = match all + assert pf.headerFilter == [] + assert pf.headerDropFilter == [] + + def test_header_filter_set(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pattern = [(re.compile('^email$', re.IGNORECASE), True)] + pf.SetHeaderFilter(pattern) + assert pf.headerFilter == pattern + + +class TestCSVPrintFileMapTitles: + """Test MapTitles method.""" + + def test_map_titles_basic(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['old_name', 'old_email']) + pf.MapTitles('old_name', 'name') + pf.MapTitles('old_email', 'email') + assert pf.titlesList == ['name', 'email'] + assert 'name' in pf.titlesSet + assert 'old_name' not in pf.titlesSet