diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 697640df..00000000 Binary files a/.DS_Store and /dev/null differ diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index 5ee64771..00000000 --- a/pytest.ini +++ /dev/null @@ -1,2 +0,0 @@ -[pytest] -testpaths = tests diff --git a/src/gam/util/csv_pf.py b/src/gam/util/csv_pf.py index 3cb6e30e..efdc753a 100644 --- a/src/gam/util/csv_pf.py +++ b/src/gam/util/csv_pf.py @@ -65,6 +65,28 @@ def CheckInputRowFilterHeaders(titlesList, rowFilter, rowDropFilter): if not status: sys.exit(USAGE_ERROR_RC) +def _stripTimeFromDateTime(rowDate): + if YYYYMMDD_PATTERN.match(rowDate): + try: + rowTime = arrow.Arrow.strptime(rowDate, YYYYMMDD_FORMAT) + except ValueError: + return None + else: + try: + rowTime = arrow.get(rowDate) + except (arrow.parser.ParserError, OverflowError): + return None + return ISOformatTimeStamp(arrow.Arrow(rowTime.year, rowTime.month, rowTime.day, tzinfo='UTC')) + +def _getHourMinuteFromDateTime(rowDate): + if YYYYMMDD_PATTERN.match(rowDate): + return None + try: + rowTime = arrow.get(rowDate) + except (arrow.parser.ParserError, OverflowError): + return None + return f'{rowTime.hour:02d}:{rowTime.minute:02d}' + def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter, rowDropFilterModeAll): def rowRegexFilterMatch(filterPattern): if anyMatch: @@ -89,18 +111,6 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter, return False - def stripTimeFromDateTime(rowDate): - if YYYYMMDD_PATTERN.match(rowDate): - try: - rowTime = arrow.Arrow.strptime(rowDate, YYYYMMDD_FORMAT) - except ValueError: - return None - else: - try: - rowTime = arrow.get(rowDate) - except (arrow.parser.ParserError, OverflowError): - return None - return ISOformatTimeStamp(arrow.Arrow(rowTime.year, rowTime.month, rowTime.day, tzinfo='UTC')) def rowDateTimeFilterMatch(dateMode, op, filterDate): def checkMatch(rowDate): @@ -109,7 +119,7 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter, if rowDate == GC.Values[GC.NEVER_TIME]: rowDate = NEVER_TIME if dateMode: - rowDate = stripTimeFromDateTime(rowDate) + rowDate = _stripTimeFromDateTime(rowDate) if not rowDate: return False if op == '<': @@ -141,7 +151,7 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter, if rowDate == GC.Values[GC.NEVER_TIME]: rowDate = NEVER_TIME if dateMode: - rowDate = stripTimeFromDateTime(rowDate) + rowDate = _stripTimeFromDateTime(rowDate) if not rowDate: return False if op == '!=': @@ -158,20 +168,12 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter, return False return True - def getHourMinuteFromDateTime(rowDate): - if YYYYMMDD_PATTERN.match(rowDate): - return None - try: - rowTime = arrow.get(rowDate) - except (arrow.parser.ParserError, OverflowError): - return None - return f'{rowTime.hour:02d}:{rowTime.minute:02d}' def rowTimeOfDayRangeFilterMatch(op, startHourMinute, endHourMinute): def checkMatch(rowDate): if not rowDate or not isinstance(rowDate, str) or rowDate == GC.Values[GC.NEVER_TIME]: return False - rowHourMinute = getHourMinuteFromDateTime(rowDate) + rowHourMinute = _getHourMinuteFromDateTime(rowDate) if not rowHourMinute: return False if op == '!=': @@ -378,49 +380,37 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter, return True def filterMatch(filterVal): - if filterVal[2] == 'regex': - if rowRegexFilterMatch(filterVal[3]): - return True - elif filterVal[2] == 'notregex': - if rowNotRegexFilterMatch(filterVal[3]): - return True - elif filterVal[2] in {'date', 'time'}: - if rowDateTimeFilterMatch(filterVal[2] == 'date', filterVal[3], filterVal[4]): - return True - elif filterVal[2] in {'daterange', 'timerange'}: - if rowDateTimeRangeFilterMatch(filterVal[2] == 'date', filterVal[3], filterVal[4], filterVal[5]): - return True - elif filterVal[2] == 'timeofdayrange': - if rowTimeOfDayRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]): - return True - elif filterVal[2] in {'count', 'number'}: - if rowCountFilterMatch(filterVal[3], filterVal[4]): - return True - elif filterVal[2] in {'countrange', 'numberrange'}: - if rowCountRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]): - return True - elif filterVal[2] == 'length': - if rowLengthFilterMatch(filterVal[3], filterVal[4]): - return True - elif filterVal[2] == 'lengthrange': - if rowLengthRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]): - return True - elif filterVal[2] == 'boolean': - if rowBooleanFilterMatch(filterVal[3]): - return True - elif filterVal[2] == 'data': - if rowDataFilterMatch(filterVal[3]): - return True - elif filterVal[2] == 'notdata': - if rowNotDataFilterMatch(filterVal[3]): - return True - elif filterVal[2] == 'text': - if rowTextFilterMatch(filterVal[3], filterVal[4]): - return True - elif filterVal[2] == 'textrange': - if rowTextRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]): - return True - return False + match filterVal[2]: + case 'regex': + return rowRegexFilterMatch(filterVal[3]) + case 'notregex': + return rowNotRegexFilterMatch(filterVal[3]) + case 'date' | 'time': + return rowDateTimeFilterMatch(filterVal[2] == 'date', filterVal[3], filterVal[4]) + case 'daterange' | 'timerange': + return rowDateTimeRangeFilterMatch(filterVal[2] == 'date', filterVal[3], filterVal[4], filterVal[5]) + case 'timeofdayrange': + return rowTimeOfDayRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]) + case 'count' | 'number': + return rowCountFilterMatch(filterVal[3], filterVal[4]) + case 'countrange' | 'numberrange': + return rowCountRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]) + case 'length': + return rowLengthFilterMatch(filterVal[3], filterVal[4]) + case 'lengthrange': + return rowLengthRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]) + case 'boolean': + return rowBooleanFilterMatch(filterVal[3]) + case 'data': + return rowDataFilterMatch(filterVal[3]) + case 'notdata': + return rowNotDataFilterMatch(filterVal[3]) + case 'text': + return rowTextFilterMatch(filterVal[3], filterVal[4]) + case 'textrange': + return rowTextRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]) + case _: + return False if rowFilter: anyMatches = False @@ -1272,403 +1262,383 @@ class CSVPrintFile(): if not self.JSONtitlesSet: systemErrorExit(USAGE_ERROR_RC, Msg.NO_COLUMNS_SELECTED_WITH_CSV_OUTPUT_HEADER_FILTER.format(GC.CSV_OUTPUT_HEADER_FILTER, GC.CSV_OUTPUT_HEADER_DROP_FILTER)) - def writeCSVfile(self, list_type, clearRowFilters=False): + @staticmethod + def _todriveCSVErrorExit(entityValueList, errMsg): + systemErrorExit(ACTION_FAILED_RC, formatKeyValueList(Ind.Spaces(), + Ent.FormatEntityValueList(entityValueList)+[Act.NotPerformed(), errMsg], + currentCountNL(0, 0))) - def todriveCSVErrorExit(entityValueList, errMsg): - systemErrorExit(ACTION_FAILED_RC, formatKeyValueList(Ind.Spaces(), - Ent.FormatEntityValueList(entityValueList)+[Act.NotPerformed(), errMsg], - currentCountNL(0, 0))) + @staticmethod + def _itemgetter(*items): + if len(items) == 1: + item = items[0] + def g(obj): + return obj.get(item, '') + else: + def g(obj): + return tuple(obj.get(item, '') for item in items) + return g - @staticmethod - def itemgetter(*items): - if len(items) == 1: - item = items[0] - def g(obj): - return obj.get(item, '') - else: - def g(obj): - return tuple(obj.get(item, '') for item in items) - return g - - def writeCSVData(writer): - try: - if not self.outputTranspose: - if GM.Globals[GM.CSVFILE][GM.REDIRECT_WRITE_HEADER]: - writer.writerow(dict((item, item) for item in writer.fieldnames)) - if not self.sortHeaders: - writer.writerows(self.rows) - else: - for row in sorted(self.rows, key=itemgetter(*self.sortHeaders)): - writer.writerow(row) - else: + def _writeCSVData(self, writer, titlesList, extrasaction): + try: + if not self.outputTranspose: + if GM.Globals[GM.CSVFILE][GM.REDIRECT_WRITE_HEADER]: + writer.writerow(dict((item, item) for item in writer.fieldnames)) + if not self.sortHeaders: writer.writerows(self.rows) - return True + else: + for row in sorted(self.rows, key=self._itemgetter(*self.sortHeaders)): + writer.writerow(row) + else: + writer.writerows(self.rows) + return True + except IOError as e: + stderrErrorMsg(e) + return False + + def _setDialect(self, lineterminator, noEscapeChar): + writerDialect = { + 'delimiter': self.columnDelimiter, + 'doublequote': True, + 'escapechar': '\\' if not noEscapeChar else None, + 'lineterminator': lineterminator, + 'quotechar': self.quoteChar, + 'quoting': csv.QUOTE_MINIMAL, + 'skipinitialspace': False, + 'strict': False} + return writerDialect + + def _normalizeSortHeaders(self, titlesList): + if self.sortHeaders: + writerKeyMap = {} + for k in titlesList: + writerKeyMap[k.lower()] = k + self.sortHeaders = [writerKeyMap[k.lower()] for k in self.sortHeaders if k.lower() in writerKeyMap] + + def _writeCSVToStdout(self, titlesList, extrasaction): + csvFile = StringIOobject() + writerDialect = self._setDialect('\n', self.noEscapeChar) + writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect) + if self._writeCSVData(writer, titlesList, extrasaction): + try: + GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD].write(csvFile.getvalue()) except IOError as e: - stderrErrorMsg(e) - return False + stderrErrorMsg(fdErrorMessage(GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD], 'stdout', e)) + setSysExitRC(FILE_ERROR_RC) + closeFile(csvFile) - def setDialect(lineterminator, noEscapeChar): - writerDialect = { - 'delimiter': self.columnDelimiter, - 'doublequote': True, - 'escapechar': '\\' if not noEscapeChar else None, - 'lineterminator': lineterminator, - 'quotechar': self.quoteChar, - 'quoting': csv.QUOTE_MINIMAL, - 'skipinitialspace': False, - 'strict': False} - return writerDialect - - def normalizeSortHeaders(): - if self.sortHeaders: - writerKeyMap = {} - for k in titlesList: - writerKeyMap[k.lower()] = k - self.sortHeaders = [writerKeyMap[k.lower()] for k in self.sortHeaders if k.lower() in writerKeyMap] - - def writeCSVToStdout(): - csvFile = StringIOobject() - writerDialect = setDialect('\n', self.noEscapeChar) + def _writeCSVToFile(self, titlesList, extrasaction): + csvFile = GM.Globals[GM.CSVFILE].get(GM.REDIRECT_FD, None) + if not csvFile: + csvFile = openFile(GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME], GM.Globals[GM.CSVFILE][GM.REDIRECT_MODE], newline='', + encoding=GM.Globals[GM.CSVFILE][GM.REDIRECT_ENCODING], errors='backslashreplace', + continueOnError=True) + if csvFile: + writerDialect = self._setDialect(str(GC.Values[GC.CSV_OUTPUT_LINE_TERMINATOR]), self.noEscapeChar) writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect) - if writeCSVData(writer): - try: - GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD].write(csvFile.getvalue()) - except IOError as e: - stderrErrorMsg(fdErrorMessage(GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD], 'stdout', e)) - setSysExitRC(FILE_ERROR_RC) + self._writeCSVData(writer, titlesList, extrasaction) closeFile(csvFile) - def writeCSVToFile(): - csvFile = GM.Globals[GM.CSVFILE].get(GM.REDIRECT_FD, None) - if not csvFile: - csvFile = openFile(GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME], GM.Globals[GM.CSVFILE][GM.REDIRECT_MODE], newline='', - encoding=GM.Globals[GM.CSVFILE][GM.REDIRECT_ENCODING], errors='backslashreplace', - continueOnError=True) - if csvFile: - writerDialect = setDialect(str(GC.Values[GC.CSV_OUTPUT_LINE_TERMINATOR]), self.noEscapeChar) - writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect) - writeCSVData(writer) - closeFile(csvFile) - - def writeCSVToDrive(): - numRows = len(self.rows) - numColumns = len(titlesList) - if numRows == 0 and not self.todrive['uploadnodata']: - printKeyValueList([Msg.NO_CSV_DATA_TO_UPLOAD]) - setSysExitRC(NO_CSV_DATA_TO_UPLOAD_RC) - return - if self.todrive['addsheet'] or self.todrive['updatesheet']: - csvFile = TemporaryFile(mode='w+', encoding=UTF8) + def _writeCSVToDrive(self, list_type, titlesList, extrasaction): + numRows = len(self.rows) + numColumns = len(titlesList) + if numRows == 0 and not self.todrive['uploadnodata']: + printKeyValueList([Msg.NO_CSV_DATA_TO_UPLOAD]) + setSysExitRC(NO_CSV_DATA_TO_UPLOAD_RC) + return + if self.todrive['addsheet'] or self.todrive['updatesheet']: + csvFile = TemporaryFile(mode='w+', encoding=UTF8) + else: + csvFile = StringIOobject() + writerDialect = self._setDialect('\n', self.todrive['noescapechar']) + writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect) + if self._writeCSVData(writer, titlesList, extrasaction): + if ((self.todrive['title'] is None) or + (not self.todrive['title'] and not self.todrive['timestamp'])): + title = f'{GC.Values[GC.DOMAIN]} - {list_type}' else: - csvFile = StringIOobject() - writerDialect = setDialect('\n', self.todrive['noescapechar']) - writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect) - if writeCSVData(writer): - if ((self.todrive['title'] is None) or - (not self.todrive['title'] and not self.todrive['timestamp'])): - title = f'{GC.Values[GC.DOMAIN]} - {list_type}' + title = self.todrive['title'] + if ((self.todrive['sheettitle'] is None) or + (not self.todrive['sheettitle'] and not self.todrive['sheettimestamp'])): + if ((self.todrive['sheetEntity'] is None) or + (not self.todrive['sheetEntity']['sheetTitle'])): + sheetTitle = title else: - title = self.todrive['title'] - if ((self.todrive['sheettitle'] is None) or - (not self.todrive['sheettitle'] and not self.todrive['sheettimestamp'])): - if ((self.todrive['sheetEntity'] is None) or - (not self.todrive['sheetEntity']['sheetTitle'])): - sheetTitle = title - else: - sheetTitle = self.todrive['sheetEntity']['sheetTitle'] + sheetTitle = self.todrive['sheetEntity']['sheetTitle'] + else: + sheetTitle = self.todrive['sheettitle'] + tdbasetime = tdtime = arrow.now(GC.Values[GC.TIMEZONE]) + if self.todrive['daysoffset'] is not None or self.todrive['hoursoffset'] is not None: + tdtime = tdbasetime.shift(days=-self.todrive['daysoffset'] if self.todrive['daysoffset'] is not None else 0, + hours=-self.todrive['hoursoffset'] if self.todrive['hoursoffset'] is not None else 0) + if self.todrive['timestamp']: + if title: + title += ' - ' + if not self.todrive['timeformat']: + title += ISOformatTimeStamp(tdtime) else: - sheetTitle = self.todrive['sheettitle'] - tdbasetime = tdtime = arrow.now(GC.Values[GC.TIMEZONE]) - if self.todrive['daysoffset'] is not None or self.todrive['hoursoffset'] is not None: - tdtime = tdbasetime.shift(days=-self.todrive['daysoffset'] if self.todrive['daysoffset'] is not None else 0, - hours=-self.todrive['hoursoffset'] if self.todrive['hoursoffset'] is not None else 0) - if self.todrive['timestamp']: - if title: - title += ' - ' - if not self.todrive['timeformat']: - title += ISOformatTimeStamp(tdtime) - else: - title += tdtime.strftime(self.todrive['timeformat']) - if self.todrive['sheettimestamp']: - if self.todrive['sheetdaysoffset'] is not None or self.todrive['sheethoursoffset'] is not None: - tdtime = tdbasetime.shift(days=-self.todrive['sheetdaysoffset'] if self.todrive['sheetdaysoffset'] is not None else 0, - hours=-self.todrive['sheethoursoffset'] if self.todrive['sheethoursoffset'] is not None else 0) - if sheetTitle: - sheetTitle += ' - ' - if not self.todrive['sheettimeformat']: - sheetTitle += ISOformatTimeStamp(tdtime) - else: - sheetTitle += tdtime.strftime(self.todrive['sheettimeformat']) - action = Act.Get() - if not GC.Values[GC.TODRIVE_CLIENTACCESS]: - user, drive = buildGAPIServiceObject(chooseSaAPI(API.DRIVETD, API.DRIVE3), self.todrive['user']) - if not drive: - closeFile(csvFile) - return + title += tdtime.strftime(self.todrive['timeformat']) + if self.todrive['sheettimestamp']: + if self.todrive['sheetdaysoffset'] is not None or self.todrive['sheethoursoffset'] is not None: + tdtime = tdbasetime.shift(days=-self.todrive['sheetdaysoffset'] if self.todrive['sheetdaysoffset'] is not None else 0, + hours=-self.todrive['sheethoursoffset'] if self.todrive['sheethoursoffset'] is not None else 0) + if sheetTitle: + sheetTitle += ' - ' + if not self.todrive['sheettimeformat']: + sheetTitle += ISOformatTimeStamp(tdtime) else: - user = self.todrive['user'] - drive = buildGAPIObject(API.DRIVE3) - importSize = csvFile.tell() + sheetTitle += tdtime.strftime(self.todrive['sheettimeformat']) + action = Act.Get() + if not GC.Values[GC.TODRIVE_CLIENTACCESS]: + user, drive = buildGAPIServiceObject(chooseSaAPI(API.DRIVETD, API.DRIVE3), self.todrive['user']) + if not drive: + closeFile(csvFile) + return + else: + user = self.todrive['user'] + drive = buildGAPIObject(API.DRIVE3) + importSize = csvFile.tell() # Add/Update sheet - try: - if self.todrive['addsheet'] or self.todrive['updatesheet']: - Act.Set(Act.CREATE if self.todrive['addsheet'] else Act.UPDATE) + try: + if self.todrive['addsheet'] or self.todrive['updatesheet']: + Act.Set(Act.CREATE if self.todrive['addsheet'] else Act.UPDATE) + result = callGAPI(drive.about(), 'get', + throwReasons=GAPI.DRIVE_USER_THROW_REASONS, + fields='maxImportSizes') + if numRows*numColumns > MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]): + self._todriveCSVErrorExit([Ent.USER, user], Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET) + fields = ','.join(['id', 'mimeType', 'webViewLink', 'name', 'capabilities(canEdit)']) + body = {'description': self.todrive['description']} + if body['description'] is None: + body['description'] = Cmd.QuotedArgumentList(Cmd.AllArguments()) + if not self.todrive['retaintitle']: + body['name'] = title + result = callGAPI(drive.files(), 'update', + throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, + GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR], + fileId=self.todrive['fileId'], body=body, fields=fields, supportsAllDrives=True) + entityValueList = [Ent.USER, user, Ent.DRIVE_FILE_ID, self.todrive['fileId']] + if not result['capabilities']['canEdit']: + self._todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) + if result['mimeType'] != MIMETYPE_GA_SPREADSHEET: + self._todriveCSVErrorExit(entityValueList, f'{Msg.NOT_A} {Ent.Singular(Ent.SPREADSHEET)}') + if not GC.Values[GC.TODRIVE_CLIENTACCESS]: + _, sheet = buildGAPIServiceObject(chooseSaAPI(API.SHEETSTD, API.SHEETS), user) + if sheet is None: + return + else: + sheet = buildGAPIObject(API.SHEETS) + csvFile.seek(0) + spreadsheet = None + if self.todrive['updatesheet']: + for sheetEntity in self.TDSHEET_ENTITY_MAP.values(): + if self.todrive[sheetEntity]: + entityValueList = [Ent.USER, user, Ent.SPREADSHEET, title, self.todrive[sheetEntity]['sheetType'], self.todrive[sheetEntity]['sheetValue']] + if spreadsheet is None: + spreadsheet = callGAPI(sheet.spreadsheets(), 'get', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=self.todrive['fileId'], + fields='spreadsheetUrl,sheets(properties(sheetId,title),protectedRanges(range(sheetId),requestingUserCanEdit))') + sheetId = getSheetIdFromSheetEntity(spreadsheet, self.todrive[sheetEntity]) + if sheetId is None: + if ((sheetEntity != 'sheetEntity') or (self.todrive[sheetEntity]['sheetType'] == Ent.SHEET_ID)): + self._todriveCSVErrorExit(entityValueList, Msg.NOT_FOUND) + self.todrive['addsheet'] = True + else: + if protectedSheetId(spreadsheet, sheetId): + self._todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) + self.todrive[sheetEntity]['sheetId'] = sheetId + if self.todrive['addsheet']: + body = {'requests': [{'addSheet': {'properties': {'title': sheetTitle, 'sheetType': 'GRID'}}}]} + try: + addresult = callGAPI(sheet.spreadsheets(), 'batchUpdate', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=self.todrive['fileId'], body=body) + self.todrive['sheetEntity'] = {'sheetId': addresult['replies'][0]['addSheet']['properties']['sheetId']} + except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, + GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.badRequest, + GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: + self._todriveCSVErrorExit(entityValueList, str(e)) + body = {'requests': []} + if not self.todrive['addsheet']: + if self.todrive['backupSheetEntity']: + body['requests'].append({'copyPaste': {'source': {'sheetId': self.todrive['sheetEntity']['sheetId']}, + 'destination': {'sheetId': self.todrive['backupSheetEntity']['sheetId']}, 'pasteType': 'PASTE_NORMAL'}}) + if self.todrive['clearfilter']: + body['requests'].append({'clearBasicFilter': {'sheetId': self.todrive['sheetEntity']['sheetId']}}) + if self.todrive['sheettitle']: + body['requests'].append({'updateSheetProperties': + {'properties': {'sheetId': self.todrive['sheetEntity']['sheetId'], 'title': sheetTitle}, 'fields': 'title'}}) + body['requests'].append({'updateCells': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, 'fields': '*'}}) + if self.todrive['cellwrap']: + body['requests'].append({'repeatCell': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, + 'fields': 'userEnteredFormat.wrapStrategy', + 'cell': {'userEnteredFormat': {'wrapStrategy': self.todrive['cellwrap']}}}}) + if self.todrive['cellnumberformat']: + body['requests'].append({'repeatCell': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, + 'fields': 'userEnteredFormat.numberFormat', + 'cell': {'userEnteredFormat': {'numberFormat': {'type': self.todrive['cellnumberformat']}}}}}) + body['requests'].append({'pasteData': {'coordinate': {'sheetId': self.todrive['sheetEntity']['sheetId'], 'rowIndex': '0', 'columnIndex': '0'}, + 'data': csvFile.read(), 'type': 'PASTE_NORMAL', 'delimiter': self.columnDelimiter}}) + if self.todrive['copySheetEntity']: + body['requests'].append({'copyPaste': {'source': {'sheetId': self.todrive['sheetEntity']['sheetId']}, + 'destination': {'sheetId': self.todrive['copySheetEntity']['sheetId']}, 'pasteType': 'PASTE_NORMAL'}}) + try: + callGAPI(sheet.spreadsheets(), 'batchUpdate', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=self.todrive['fileId'], body=body) + except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, + GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest, + GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: + self._todriveCSVErrorExit(entityValueList, str(e)) + closeFile(csvFile) +# Create/update file + else: + if GC.Values[GC.TODRIVE_CONVERSION]: result = callGAPI(drive.about(), 'get', throwReasons=GAPI.DRIVE_USER_THROW_REASONS, fields='maxImportSizes') - if numRows*numColumns > MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]): - todriveCSVErrorExit([Ent.USER, user], Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET) - fields = ','.join(['id', 'mimeType', 'webViewLink', 'name', 'capabilities(canEdit)']) - body = {'description': self.todrive['description']} - if body['description'] is None: - body['description'] = Cmd.QuotedArgumentList(Cmd.AllArguments()) - if not self.todrive['retaintitle']: - body['name'] = title - result = callGAPI(drive.files(), 'update', - throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, - GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR], - fileId=self.todrive['fileId'], body=body, fields=fields, supportsAllDrives=True) - entityValueList = [Ent.USER, user, Ent.DRIVE_FILE_ID, self.todrive['fileId']] - if not result['capabilities']['canEdit']: - todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) - if result['mimeType'] != MIMETYPE_GA_SPREADSHEET: - todriveCSVErrorExit(entityValueList, f'{Msg.NOT_A} {Ent.Singular(Ent.SPREADSHEET)}') + if numRows*len(titlesList) > MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]): + printKeyValueList([WARNING, Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET]) + mimeType = 'text/csv' + else: + mimeType = MIMETYPE_GA_SPREADSHEET + else: + mimeType = 'text/csv' + fields = ','.join(['id', 'mimeType', 'webViewLink']) + body = {'description': self.todrive['description'], 'mimeType': mimeType} + if body['description'] is None: + body['description'] = Cmd.QuotedArgumentList(Cmd.AllArguments()) + if not self.todrive['fileId'] or not self.todrive['retaintitle']: + body['name'] = title + try: + if not self.todrive['fileId']: + Act.Set(Act.CREATE) + body['parents'] = [self.todrive['parentId']] + result = callGAPI(drive.files(), 'create', + bailOnInternalError=True, + throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.FORBIDDEN, GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, + GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR, GAPI.INTERNAL_ERROR, GAPI.STORAGE_QUOTA_EXCEEDED, + GAPI.TEAMDRIVE_FILE_LIMIT_EXCEEDED, GAPI.TEAMDRIVE_HIERARCHY_TOO_DEEP], + body=body, + media_body=googleapiclient.http.MediaIoBaseUpload(io.BytesIO(csvFile.getvalue().encode()), mimetype='text/csv', resumable=True), + fields=fields, supportsAllDrives=True) + else: + Act.Set(Act.UPDATE) + result = callGAPI(drive.files(), 'update', + bailOnInternalError=True, + throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, + GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR, GAPI.INTERNAL_ERROR], + fileId=self.todrive['fileId'], + body=body, + media_body=googleapiclient.http.MediaIoBaseUpload(io.BytesIO(csvFile.getvalue().encode()), mimetype='text/csv', resumable=True), + fields=fields, supportsAllDrives=True) + spreadsheetId = result['id'] + except GAPI.internalError as e: + entityActionFailedWarning([Ent.DRIVE_FILE, body['name']], Msg.UPLOAD_CSV_FILE_INTERNAL_ERROR.format(str(e), str(numRows))) + closeFile(csvFile) + return + closeFile(csvFile) + if not self.todrive['fileId'] and self.todrive['share']: + Act.Set(Act.SHARE) + for share in self.todrive['share']: + if share['emailAddress'] != user: + try: + callGAPI(drive.permissions(), 'create', + bailOnInternalError=True, + throwReasons=GAPI.DRIVE_ACCESS_THROW_REASONS+GAPI.DRIVE3_CREATE_ACL_THROW_REASONS, + fileId=spreadsheetId, sendNotificationEmail=False, body=share, fields='', supportsAllDrives=True) + entityActionPerformed([Ent.USER, user, Ent.SPREADSHEET, title, + Ent.TARGET_USER, share['emailAddress'], Ent.ROLE, share['role']]) + except (GAPI.badRequest, GAPI.invalid, GAPI.fileNotFound, GAPI.forbidden, GAPI.internalError, + GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.unknownError, GAPI.ownershipChangeAcrossDomainNotPermitted, + GAPI.teamDriveDomainUsersOnlyRestriction, GAPI.teamDriveTeamMembersOnlyRestriction, + GAPI.targetUserRoleLimitedByLicenseRestriction, GAPI.insufficientAdministratorPrivileges, GAPI.sharingRateLimitExceeded, + GAPI.publishOutNotPermitted, GAPI.shareInNotPermitted, GAPI.shareOutNotPermitted, GAPI.shareOutNotPermittedToUser, + GAPI.cannotShareTeamDriveTopFolderWithAnyoneOrDomains, GAPI.cannotShareTeamDriveWithNonGoogleAccounts, + GAPI.ownerOnTeamDriveItemNotSupported, + GAPI.organizerOnNonTeamDriveNotSupported, GAPI.organizerOnNonTeamDriveItemNotSupported, + GAPI.fileOrganizerNotYetEnabledForThisTeamDrive, + GAPI.fileOrganizerOnFoldersInSharedDriveOnly, + GAPI.fileOrganizerOnNonTeamDriveNotSupported, + GAPI.cannotModifyInheritedPermission, + GAPI.teamDrivesFolderSharingNotSupported, GAPI.invalidLinkVisibility, + GAPI.invalidSharingRequest, GAPI.fileNeverWritable, GAPI.abusiveContentRestriction) as e: + entityActionFailedWarning([Ent.USER, user, Ent.SPREADSHEET, title, + Ent.TARGET_USER, share['emailAddress'], Ent.ROLE, share['role']], + str(e)) + if ((result['mimeType'] == MIMETYPE_GA_SPREADSHEET) and + (self.todrive['sheetEntity'] or self.todrive['locale'] or self.todrive['timeZone'] or + self.todrive['sheettitle'] or self.todrive['cellwrap'] or self.todrive['cellnumberformat'])): if not GC.Values[GC.TODRIVE_CLIENTACCESS]: _, sheet = buildGAPIServiceObject(chooseSaAPI(API.SHEETSTD, API.SHEETS), user) if sheet is None: return else: sheet = buildGAPIObject(API.SHEETS) - csvFile.seek(0) - spreadsheet = None - if self.todrive['updatesheet']: - for sheetEntity in self.TDSHEET_ENTITY_MAP.values(): - if self.todrive[sheetEntity]: - entityValueList = [Ent.USER, user, Ent.SPREADSHEET, title, self.todrive[sheetEntity]['sheetType'], self.todrive[sheetEntity]['sheetValue']] - if spreadsheet is None: - spreadsheet = callGAPI(sheet.spreadsheets(), 'get', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=self.todrive['fileId'], - fields='spreadsheetUrl,sheets(properties(sheetId,title),protectedRanges(range(sheetId),requestingUserCanEdit))') - sheetId = getSheetIdFromSheetEntity(spreadsheet, self.todrive[sheetEntity]) - if sheetId is None: - if ((sheetEntity != 'sheetEntity') or (self.todrive[sheetEntity]['sheetType'] == Ent.SHEET_ID)): - todriveCSVErrorExit(entityValueList, Msg.NOT_FOUND) - self.todrive['addsheet'] = True - else: - if protectedSheetId(spreadsheet, sheetId): - todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) - self.todrive[sheetEntity]['sheetId'] = sheetId - if self.todrive['addsheet']: - body = {'requests': [{'addSheet': {'properties': {'title': sheetTitle, 'sheetType': 'GRID'}}}]} - try: - addresult = callGAPI(sheet.spreadsheets(), 'batchUpdate', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=self.todrive['fileId'], body=body) - self.todrive['sheetEntity'] = {'sheetId': addresult['replies'][0]['addSheet']['properties']['sheetId']} - except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, - GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.badRequest, - GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: - todriveCSVErrorExit(entityValueList, str(e)) - body = {'requests': []} - if not self.todrive['addsheet']: - if self.todrive['backupSheetEntity']: - body['requests'].append({'copyPaste': {'source': {'sheetId': self.todrive['sheetEntity']['sheetId']}, - 'destination': {'sheetId': self.todrive['backupSheetEntity']['sheetId']}, 'pasteType': 'PASTE_NORMAL'}}) - if self.todrive['clearfilter']: - body['requests'].append({'clearBasicFilter': {'sheetId': self.todrive['sheetEntity']['sheetId']}}) - if self.todrive['sheettitle']: - body['requests'].append({'updateSheetProperties': - {'properties': {'sheetId': self.todrive['sheetEntity']['sheetId'], 'title': sheetTitle}, 'fields': 'title'}}) - body['requests'].append({'updateCells': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, 'fields': '*'}}) - if self.todrive['cellwrap']: - body['requests'].append({'repeatCell': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, - 'fields': 'userEnteredFormat.wrapStrategy', - 'cell': {'userEnteredFormat': {'wrapStrategy': self.todrive['cellwrap']}}}}) - if self.todrive['cellnumberformat']: - body['requests'].append({'repeatCell': {'range': {'sheetId': self.todrive['sheetEntity']['sheetId']}, - 'fields': 'userEnteredFormat.numberFormat', - 'cell': {'userEnteredFormat': {'numberFormat': {'type': self.todrive['cellnumberformat']}}}}}) - body['requests'].append({'pasteData': {'coordinate': {'sheetId': self.todrive['sheetEntity']['sheetId'], 'rowIndex': '0', 'columnIndex': '0'}, - 'data': csvFile.read(), 'type': 'PASTE_NORMAL', 'delimiter': self.columnDelimiter}}) - if self.todrive['copySheetEntity']: - body['requests'].append({'copyPaste': {'source': {'sheetId': self.todrive['sheetEntity']['sheetId']}, - 'destination': {'sheetId': self.todrive['copySheetEntity']['sheetId']}, 'pasteType': 'PASTE_NORMAL'}}) try: - callGAPI(sheet.spreadsheets(), 'batchUpdate', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=self.todrive['fileId'], body=body) + body = {'requests': []} + if self.todrive['sheetEntity'] or self.todrive['sheettitle'] or self.todrive['cellwrap']: + spreadsheet = callGAPI(sheet.spreadsheets(), 'get', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=spreadsheetId, fields='sheets/properties') + spreadsheet['sheets'][0]['properties']['title'] = sheetTitle + body['requests'].append({'updateSheetProperties': + {'properties': spreadsheet['sheets'][0]['properties'], 'fields': 'title'}}) + if self.todrive['cellwrap']: + body['requests'].append({'repeatCell': {'range': {'sheetId': spreadsheet['sheets'][0]['properties']['sheetId']}, + 'fields': 'userEnteredFormat.wrapStrategy', + 'cell': {'userEnteredFormat': {'wrapStrategy': self.todrive['cellwrap']}}}}) + if self.todrive['locale']: + body['requests'].append({'updateSpreadsheetProperties': + {'properties': {'locale': self.todrive['locale']}, 'fields': 'locale'}}) + if self.todrive['timeZone']: + body['requests'].append({'updateSpreadsheetProperties': + {'properties': {'timeZone': self.todrive['timeZone']}, 'fields': 'timeZone'}}) + if body['requests']: + callGAPI(sheet.spreadsheets(), 'batchUpdate', + throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, + spreadsheetId=spreadsheetId, body=body) except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest, - GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: - todriveCSVErrorExit(entityValueList, str(e)) - closeFile(csvFile) -# Create/update file + GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition, + GAPI.teamDriveFileLimitExceeded, GAPI.teamDriveHierarchyTooDeep) as e: + self._todriveCSVErrorExit([Ent.USER, user, Ent.SPREADSHEET, title], str(e)) + Act.Set(action) + file_url = result['webViewLink'] + msg_txt = f'{Msg.DATA_UPLOADED_TO_DRIVE_FILE}:\n{file_url}' + if not self.todrive['returnidonly']: + printKeyValueList([msg_txt]) + else: + if self.todrive['fileId']: + writeStdout(f'{self.todrive["fileId"]}\n') else: - if GC.Values[GC.TODRIVE_CONVERSION]: - result = callGAPI(drive.about(), 'get', - throwReasons=GAPI.DRIVE_USER_THROW_REASONS, - fields='maxImportSizes') - if numRows*len(titlesList) > MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]): - printKeyValueList([WARNING, Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET]) - mimeType = 'text/csv' - else: - mimeType = MIMETYPE_GA_SPREADSHEET - else: - mimeType = 'text/csv' - fields = ','.join(['id', 'mimeType', 'webViewLink']) - body = {'description': self.todrive['description'], 'mimeType': mimeType} - if body['description'] is None: - body['description'] = Cmd.QuotedArgumentList(Cmd.AllArguments()) - if not self.todrive['fileId'] or not self.todrive['retaintitle']: - body['name'] = title - try: - if not self.todrive['fileId']: - Act.Set(Act.CREATE) - body['parents'] = [self.todrive['parentId']] - result = callGAPI(drive.files(), 'create', - bailOnInternalError=True, - throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.FORBIDDEN, GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, - GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR, GAPI.INTERNAL_ERROR, GAPI.STORAGE_QUOTA_EXCEEDED, - GAPI.TEAMDRIVE_FILE_LIMIT_EXCEEDED, GAPI.TEAMDRIVE_HIERARCHY_TOO_DEEP], - body=body, - media_body=googleapiclient.http.MediaIoBaseUpload(io.BytesIO(csvFile.getvalue().encode()), mimetype='text/csv', resumable=True), - fields=fields, supportsAllDrives=True) - else: - Act.Set(Act.UPDATE) - result = callGAPI(drive.files(), 'update', - bailOnInternalError=True, - throwReasons=GAPI.DRIVE_USER_THROW_REASONS+[GAPI.INSUFFICIENT_PERMISSIONS, GAPI.INSUFFICIENT_PARENT_PERMISSIONS, - GAPI.FILE_NOT_FOUND, GAPI.UNKNOWN_ERROR, GAPI.INTERNAL_ERROR], - fileId=self.todrive['fileId'], - body=body, - media_body=googleapiclient.http.MediaIoBaseUpload(io.BytesIO(csvFile.getvalue().encode()), mimetype='text/csv', resumable=True), - fields=fields, supportsAllDrives=True) - spreadsheetId = result['id'] - except GAPI.internalError as e: - entityActionFailedWarning([Ent.DRIVE_FILE, body['name']], Msg.UPLOAD_CSV_FILE_INTERNAL_ERROR.format(str(e), str(numRows))) - closeFile(csvFile) - return - closeFile(csvFile) - if not self.todrive['fileId'] and self.todrive['share']: - Act.Set(Act.SHARE) - for share in self.todrive['share']: - if share['emailAddress'] != user: - try: - callGAPI(drive.permissions(), 'create', - bailOnInternalError=True, - throwReasons=GAPI.DRIVE_ACCESS_THROW_REASONS+GAPI.DRIVE3_CREATE_ACL_THROW_REASONS, - fileId=spreadsheetId, sendNotificationEmail=False, body=share, fields='', supportsAllDrives=True) - entityActionPerformed([Ent.USER, user, Ent.SPREADSHEET, title, - Ent.TARGET_USER, share['emailAddress'], Ent.ROLE, share['role']]) - except (GAPI.badRequest, GAPI.invalid, GAPI.fileNotFound, GAPI.forbidden, GAPI.internalError, - GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.unknownError, GAPI.ownershipChangeAcrossDomainNotPermitted, - GAPI.teamDriveDomainUsersOnlyRestriction, GAPI.teamDriveTeamMembersOnlyRestriction, - GAPI.targetUserRoleLimitedByLicenseRestriction, GAPI.insufficientAdministratorPrivileges, GAPI.sharingRateLimitExceeded, - GAPI.publishOutNotPermitted, GAPI.shareInNotPermitted, GAPI.shareOutNotPermitted, GAPI.shareOutNotPermittedToUser, - GAPI.cannotShareTeamDriveTopFolderWithAnyoneOrDomains, GAPI.cannotShareTeamDriveWithNonGoogleAccounts, - GAPI.ownerOnTeamDriveItemNotSupported, - GAPI.organizerOnNonTeamDriveNotSupported, GAPI.organizerOnNonTeamDriveItemNotSupported, - GAPI.fileOrganizerNotYetEnabledForThisTeamDrive, - GAPI.fileOrganizerOnFoldersInSharedDriveOnly, - GAPI.fileOrganizerOnNonTeamDriveNotSupported, - GAPI.cannotModifyInheritedPermission, - GAPI.teamDrivesFolderSharingNotSupported, GAPI.invalidLinkVisibility, - GAPI.invalidSharingRequest, GAPI.fileNeverWritable, GAPI.abusiveContentRestriction) as e: - entityActionFailedWarning([Ent.USER, user, Ent.SPREADSHEET, title, - Ent.TARGET_USER, share['emailAddress'], Ent.ROLE, share['role']], - str(e)) - if ((result['mimeType'] == MIMETYPE_GA_SPREADSHEET) and - (self.todrive['sheetEntity'] or self.todrive['locale'] or self.todrive['timeZone'] or - self.todrive['sheettitle'] or self.todrive['cellwrap'] or self.todrive['cellnumberformat'])): - if not GC.Values[GC.TODRIVE_CLIENTACCESS]: - _, sheet = buildGAPIServiceObject(chooseSaAPI(API.SHEETSTD, API.SHEETS), user) - if sheet is None: - return - else: - sheet = buildGAPIObject(API.SHEETS) - try: - body = {'requests': []} - if self.todrive['sheetEntity'] or self.todrive['sheettitle'] or self.todrive['cellwrap']: - spreadsheet = callGAPI(sheet.spreadsheets(), 'get', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=spreadsheetId, fields='sheets/properties') - spreadsheet['sheets'][0]['properties']['title'] = sheetTitle - body['requests'].append({'updateSheetProperties': - {'properties': spreadsheet['sheets'][0]['properties'], 'fields': 'title'}}) - if self.todrive['cellwrap']: - body['requests'].append({'repeatCell': {'range': {'sheetId': spreadsheet['sheets'][0]['properties']['sheetId']}, - 'fields': 'userEnteredFormat.wrapStrategy', - 'cell': {'userEnteredFormat': {'wrapStrategy': self.todrive['cellwrap']}}}}) - if self.todrive['locale']: - body['requests'].append({'updateSpreadsheetProperties': - {'properties': {'locale': self.todrive['locale']}, 'fields': 'locale'}}) - if self.todrive['timeZone']: - body['requests'].append({'updateSpreadsheetProperties': - {'properties': {'timeZone': self.todrive['timeZone']}, 'fields': 'timeZone'}}) - if body['requests']: - callGAPI(sheet.spreadsheets(), 'batchUpdate', - throwReasons=GAPI.SHEETS_ACCESS_THROW_REASONS, - spreadsheetId=spreadsheetId, body=body) - except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, - GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest, - GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition, - GAPI.teamDriveFileLimitExceeded, GAPI.teamDriveHierarchyTooDeep) as e: - todriveCSVErrorExit([Ent.USER, user, Ent.SPREADSHEET, title], str(e)) - Act.Set(action) - file_url = result['webViewLink'] - msg_txt = f'{Msg.DATA_UPLOADED_TO_DRIVE_FILE}:\n{file_url}' - if not self.todrive['returnidonly']: - printKeyValueList([msg_txt]) - else: - if self.todrive['fileId']: - writeStdout(f'{self.todrive["fileId"]}\n') - else: - writeStdout(f'{spreadsheetId}\n') - if not self.todrive['subject']: - subject = title - else: - subject = self.todrive['subject'].replace('#file#', title).replace('#sheet#', sheetTitle) - if not self.todrive['noemail']: - send_email(subject, msg_txt, user, clientAccess=GC.Values[GC.TODRIVE_CLIENTACCESS], msgFrom=self.todrive['from']) - if self.todrive['notify']: - for recipient in self.todrive['share']+self.todrive['alert']: - if recipient['emailAddress'] != user: - send_email(subject, msg_txt, recipient['emailAddress'], clientAccess=GC.Values[GC.TODRIVE_CLIENTACCESS], msgFrom=self.todrive['from']) - if not self.todrive['nobrowser']: - webbrowser.open(file_url) - except (GAPI.forbidden, GAPI.insufficientPermissions): - printWarningMessage(INSUFFICIENT_PERMISSIONS_RC, Msg.INSUFFICIENT_PERMISSIONS_TO_PERFORM_TASK) - except (GAPI.fileNotFound, GAPI.unknownError, GAPI.internalError, GAPI.storageQuotaExceeded) as e: - if not self.todrive['fileId']: - entityActionFailedWarning([Ent.DRIVE_FOLDER, self.todrive['parentId']], str(e)) - else: - entityActionFailedWarning([Ent.DRIVE_FILE, self.todrive['fileId']], str(e)) - except (GAPI.serviceNotAvailable, GAPI.authError, GAPI.domainPolicy) as e: - userDriveServiceNotEnabledWarning(user, str(e), 0, 0) - else: - closeFile(csvFile) + writeStdout(f'{spreadsheetId}\n') + if not self.todrive['subject']: + subject = title + else: + subject = self.todrive['subject'].replace('#file#', title).replace('#sheet#', sheetTitle) + if not self.todrive['noemail']: + send_email(subject, msg_txt, user, clientAccess=GC.Values[GC.TODRIVE_CLIENTACCESS], msgFrom=self.todrive['from']) + if self.todrive['notify']: + for recipient in self.todrive['share']+self.todrive['alert']: + if recipient['emailAddress'] != user: + send_email(subject, msg_txt, recipient['emailAddress'], clientAccess=GC.Values[GC.TODRIVE_CLIENTACCESS], msgFrom=self.todrive['from']) + if not self.todrive['nobrowser']: + webbrowser.open(file_url) + except (GAPI.forbidden, GAPI.insufficientPermissions): + printWarningMessage(INSUFFICIENT_PERMISSIONS_RC, Msg.INSUFFICIENT_PERMISSIONS_TO_PERFORM_TASK) + except (GAPI.fileNotFound, GAPI.unknownError, GAPI.internalError, GAPI.storageQuotaExceeded) as e: + if not self.todrive['fileId']: + entityActionFailedWarning([Ent.DRIVE_FOLDER, self.todrive['parentId']], str(e)) + else: + entityActionFailedWarning([Ent.DRIVE_FILE, self.todrive['fileId']], str(e)) + except (GAPI.serviceNotAvailable, GAPI.authError, GAPI.domainPolicy) as e: + userDriveServiceNotEnabledWarning(user, str(e), 0, 0) + else: + closeFile(csvFile) - if GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE] is not None: - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_NAME, list_type)) - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_TODRIVE, self.todrive)) - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CSVPF, - (self.titlesList, self.sortTitlesList, self.indexedTitles, - self.formatJSON, self.JSONtitlesList, - self.columnDelimiter, self.noEscapeChar, self.quoteChar, - self.sortHeaders, self.timestampColumn, - self.fixPaths, - self.mapNodataFields, - self.nodataFields, - self.driveListFields, - self.driveSubfieldsChoiceMap, - self.oneItemPerRow, - self.showPermissionsLast, - self.zeroBlankMimeTypeCounts))) - if clearRowFilters: - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CLEAR_ROW_FILTERS, clearRowFilters)) - GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_DATA, self.rows)) - return + def _prepareHeaders(self, clearRowFilters): if self.zeroBlankMimeTypeCounts: self.ZeroBlankMimeTypeCounts() if not clearRowFilters and (self.rowFilter or self.rowDropFilter): @@ -1730,7 +1700,32 @@ class CSVPrintFile(): if self.headerOrder: self.JSONtitlesList = self.orderHeaders(self.JSONtitlesList) titlesList = self.JSONtitlesList - normalizeSortHeaders() + self._normalizeSortHeaders(titlesList) + return titlesList, extrasaction + + def writeCSVfile(self, list_type, clearRowFilters=False): + + if GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE] is not None: + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_NAME, list_type)) + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_TODRIVE, self.todrive)) + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CSVPF, + (self.titlesList, self.sortTitlesList, self.indexedTitles, + self.formatJSON, self.JSONtitlesList, + self.columnDelimiter, self.noEscapeChar, self.quoteChar, + self.sortHeaders, self.timestampColumn, + self.fixPaths, + self.mapNodataFields, + self.nodataFields, + self.driveListFields, + self.driveSubfieldsChoiceMap, + self.oneItemPerRow, + self.showPermissionsLast, + self.zeroBlankMimeTypeCounts))) + if clearRowFilters: + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CLEAR_ROW_FILTERS, clearRowFilters)) + GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_DATA, self.rows)) + return + titlesList, extrasaction = self._prepareHeaders(clearRowFilters) if self.outputTranspose: newRows = [] newTitlesList = list(range(len(self.rows) + 1)) @@ -1746,14 +1741,14 @@ class CSVPrintFile(): if (not self.todrive) or self.todrive['localcopy']: if GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME] == '-': if GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD]: - writeCSVToStdout() + self._writeCSVToStdout(titlesList, extrasaction) else: GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME] = GM.Globals[GM.STDOUT][GM.REDIRECT_NAME] - writeCSVToFile() + self._writeCSVToFile(titlesList, extrasaction) else: - writeCSVToFile() + self._writeCSVToFile(titlesList, extrasaction) if self.todrive: - writeCSVToDrive() + self._writeCSVToDrive(list_type, titlesList, extrasaction) if GM.Globals[GM.CSVFILE][GM.REDIRECT_MODE] == DEFAULT_FILE_APPEND_MODE: GM.Globals[GM.CSVFILE][GM.REDIRECT_WRITE_HEADER] = False diff --git a/tests/conftest.py b/tests/conftest.py index e73559f9..8760614f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,10 +5,16 @@ gam package directory on sys.path. We add it in a fixture (not at module level) to avoid shadowing Python's stdlib 'cmd' module during pytest configuration. """ +# Make pytest discover this directory as the test root without needing +# a pytest.ini at the repo root. Running `pytest` or `python -m pytest` +# from anywhere in the repo will find tests here. +collect_ignore_glob = [] # nothing to ignore; marker for rootdir detection + import os import sys import pytest +import arrow @pytest.fixture(autouse=True) @@ -46,6 +52,25 @@ def _gam_path_and_globals(): GC.Values.setdefault(GC.CUSTOMER_ID, 'C00000000') GC.Values.setdefault(GC.TIMEZONE, 'UTC') GC.Values.setdefault(GC.SHOW_COUNTS_MIN, 0) + # CSV-related defaults needed by CSVPrintFile + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_FORCE, []) + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_REQUIRED, []) + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_ORDER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_FILTER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_DROP_FILTER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_FILTER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_FILTER_MODE, 'allmatch') + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_DROP_FILTER, []) + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_DROP_FILTER_MODE, 'anymatch') + GC.Values.setdefault(GC.CSV_OUTPUT_ROW_LIMIT, 0) + GC.Values.setdefault(GC.CSV_OUTPUT_COLUMN_DELIMITER, ',') + GC.Values.setdefault(GC.CSV_OUTPUT_QUOTE_CHAR, '"') + GC.Values.setdefault(GC.CSV_OUTPUT_NO_ESCAPE_CHAR, False) + GC.Values.setdefault(GC.CSV_OUTPUT_SORT_HEADERS, []) + GC.Values.setdefault(GC.CSV_OUTPUT_TIMESTAMP_COLUMN, '') + GC.Values.setdefault(GC.CSV_OUTPUT_LINE_TERMINATOR, '\n') + GC.Values.setdefault(GC.NEVER_TIME, 'Never') + GC.Values.setdefault(GC.OUTPUT_TIMEFORMAT, '') # Ensure Globals dict exists if not GM.Globals: @@ -53,6 +78,16 @@ def _gam_path_and_globals(): GM.Globals.setdefault(GM.STDOUT, None) GM.Globals.setdefault(GM.STDERR, None) GM.Globals.setdefault(GM.SYSEXITRC, 0) + # CSV-related globals needed by CSVPrintFile + GM.Globals.setdefault(GM.CSV_OUTPUT_TRANSPOSE, False) + GM.Globals.setdefault(GM.CSV_TODRIVE, {}) + GM.Globals.setdefault(GM.CSV_OUTPUT_COLUMN_DELIMITER, None) + GM.Globals.setdefault(GM.CSV_OUTPUT_QUOTE_CHAR, None) + GM.Globals.setdefault(GM.CSV_OUTPUT_NO_ESCAPE_CHAR, None) + GM.Globals.setdefault(GM.CSV_OUTPUT_SORT_HEADERS, []) + GM.Globals.setdefault(GM.CSV_OUTPUT_TIMESTAMP_COLUMN, '') + if not GM.Globals.get(GM.DATETIME_NOW): + GM.Globals[GM.DATETIME_NOW] = arrow.now('UTC') yield diff --git a/tests/test_csv_pf.py b/tests/test_csv_pf.py new file mode 100644 index 00000000..d681ee3a --- /dev/null +++ b/tests/test_csv_pf.py @@ -0,0 +1,611 @@ +"""Unit tests for gam.util.csv_pf — RowFilterMatch and CSV output functions. + +Tests cover all filter types: regex, boolean, count, date, time, length, +text, data, ranges, and their negations/combinations. Also covers +CSVPrintFile header processing, title management, and output formatting. +""" + +import re + +import pytest +import arrow + + +# --------------------------------------------------------------------------- +# RowFilterMatch tests +# --------------------------------------------------------------------------- + +class TestRowFilterMatchRegex: + """Test regex and notregex filter types.""" + + def test_regex_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'John Smith', 'email': 'john@example.com'} + titlesList = ['name', 'email'] + # filterVal: (columnPat, anyMatch, filterType, ...) + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'regex', re.compile('John', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_regex_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'Jane Doe'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'regex', re.compile('^John$', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_notregex_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'Jane Doe'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'notregex', re.compile('^John$', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_notregex_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'John Smith'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'notregex', re.compile('John', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_regex_case_sensitive(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'john smith'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + # Case-sensitive regex should NOT match lowercase + rowFilter = [(columnPat, True, 'regex', re.compile('^John', 0))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_regex_wildcard_column(self): + """Regex column pattern matching multiple columns.""" + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'no', 'email': 'yes@match.com'} + titlesList = ['name', 'email'] + # anyMatch=True: at least one column must match + columnPat = re.compile('.*', re.IGNORECASE) + rowFilter = [(columnPat, True, 'regex', re.compile('match', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + +class TestRowFilterMatchBoolean: + """Test boolean filter type.""" + + def test_boolean_true_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'active': 'True'} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', True)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_boolean_true_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'active': 'False'} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', True)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_boolean_false_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'active': 'False'} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', False)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_boolean_native_bool(self): + from gam.util.csv_pf import RowFilterMatch + row = {'active': True} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', True)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_boolean_blank_is_false(self): + """Blank string should be treated as False.""" + from gam.util.csv_pf import RowFilterMatch + row = {'active': ''} + titlesList = ['active'] + columnPat = re.compile('^active$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'boolean', False)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + +class TestRowFilterMatchCount: + """Test count/number filter types.""" + + def test_count_equals(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '5'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_greater(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '10'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '>', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_less(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '3'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '<', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_not_equal(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '3'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '!=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_gte(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '5'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '>=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_lte(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '5'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '<=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_blank_is_zero(self): + """Blank string count should be treated as 0.""" + from gam.util.csv_pf import RowFilterMatch + row = {'count': ''} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '=', 0)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_count_non_digit_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': 'abc'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '=', 0)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_count_native_int(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': 5} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'count', '=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_number_alias(self): + """'number' should work the same as 'count'.""" + from gam.util.csv_pf import RowFilterMatch + row = {'val': '42'} + titlesList = ['val'] + columnPat = re.compile('^val$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'number', '=', 42)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + +class TestRowFilterMatchCountRange: + """Test countrange/numberrange filter types.""" + + def test_countrange_in_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '5'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'countrange', '=', 1, 10)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_countrange_out_of_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'count': '15'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'countrange', '=', 1, 10)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_countrange_not_equal(self): + """countrange with != should return True when value IS outside range.""" + from gam.util.csv_pf import RowFilterMatch + row = {'count': '15'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'countrange', '!=', 1, 10)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_countrange_boundary(self): + """Boundary values should be inclusive.""" + from gam.util.csv_pf import RowFilterMatch + row = {'count': '10'} + titlesList = ['count'] + columnPat = re.compile('^count$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'countrange', '=', 1, 10)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + +class TestRowFilterMatchLength: + """Test length and lengthrange filter types.""" + + def test_length_equals(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'hello'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'length', '=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_length_greater(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'hello world'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'length', '>', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_lengthrange_in_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'hello'} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'lengthrange', '=', 3, 8)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_length_non_string_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 12345} + titlesList = ['name'] + columnPat = re.compile('^name$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'length', '=', 5)] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +class TestRowFilterMatchText: + """Test text and textrange filter types.""" + + def test_text_equals(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'active'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'text', '=', 'active')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_text_not_equal(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'inactive'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'text', '!=', 'active')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_text_greater(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'b'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'text', '>', 'a')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_textrange_in_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'banana'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'textrange', '=', 'apple', 'cherry')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_textrange_out_of_range(self): + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'zebra'} + titlesList = ['status'] + columnPat = re.compile('^status$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'textrange', '=', 'apple', 'cherry')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +class TestRowFilterMatchData: + """Test data and notdata filter types.""" + + def test_data_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'role': 'admin'} + titlesList = ['role'] + columnPat = re.compile('^role$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'data', {'admin', 'owner', 'editor'})] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_data_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'role': 'viewer'} + titlesList = ['role'] + columnPat = re.compile('^role$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'data', {'admin', 'owner', 'editor'})] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_notdata_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'role': 'viewer'} + titlesList = ['role'] + columnPat = re.compile('^role$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'notdata', {'admin', 'owner', 'editor'})] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_notdata_no_match(self): + from gam.util.csv_pf import RowFilterMatch + row = {'role': 'admin'} + titlesList = ['role'] + columnPat = re.compile('^role$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'notdata', {'admin', 'owner', 'editor'})] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +class TestRowFilterMatchDate: + """Test date and time filter types.""" + + def test_date_greater(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'created': '2025-06-15T10:30:00Z'} + titlesList = ['created'] + columnPat = re.compile('^created$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'date', '>', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_date_less(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'created': '2024-06-15T10:30:00Z'} + titlesList = ['created'] + columnPat = re.compile('^created$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'date', '<', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_time_greater(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'modified': '2025-06-15T10:30:00.000Z'} + titlesList = ['modified'] + columnPat = re.compile('^modified$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'time', '>', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_date_empty_no_match(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'created': ''} + titlesList = ['created'] + columnPat = re.compile('^created$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'date', '>', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_date_non_string_no_match(self): + from gam.util.csv_pf import RowFilterMatch + from gamlib import settings as GC + GC.Values[GC.NEVER_TIME] = 'Never' + row = {'created': None} + titlesList = ['created'] + columnPat = re.compile('^created$', re.IGNORECASE) + rowFilter = [(columnPat, True, 'date', '>', '2025-01-01T00:00:00.000Z')] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +class TestRowFilterMatchModes: + """Test rowFilter mode combinations (Any vs All) and drop filters.""" + + def test_no_filters_returns_true(self): + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'anything'} + assert RowFilterMatch(row, ['name'], [], True, [], True) is True + + def test_row_filter_mode_any(self): + """Any mode: at least one filter must match to select.""" + from gam.util.csv_pf import RowFilterMatch + row = {'a': 'yes', 'b': 'no'} + titlesList = ['a', 'b'] + f1 = (re.compile('^a$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE)) + f2 = (re.compile('^b$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE)) + # f1 matches, f2 doesn't — Any mode should select + assert RowFilterMatch(row, titlesList, [f1, f2], False, [], True) is True + + def test_row_filter_mode_all(self): + """All mode: all filters must match to select.""" + from gam.util.csv_pf import RowFilterMatch + row = {'a': 'yes', 'b': 'no'} + titlesList = ['a', 'b'] + f1 = (re.compile('^a$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE)) + f2 = (re.compile('^b$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE)) + # f1 matches, f2 doesn't — All mode should NOT select + assert RowFilterMatch(row, titlesList, [f1, f2], True, [], True) is False + + def test_drop_filter_any(self): + """Drop filter in Any mode: any match drops the row.""" + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'deleted'} + titlesList = ['status'] + dropFilter = [(re.compile('^status$', re.IGNORECASE), True, 'regex', re.compile('deleted', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, [], True, dropFilter, False) is False + + def test_drop_filter_no_match_keeps(self): + """Drop filter that doesn't match should keep the row.""" + from gam.util.csv_pf import RowFilterMatch + row = {'status': 'active'} + titlesList = ['status'] + dropFilter = [(re.compile('^status$', re.IGNORECASE), True, 'regex', re.compile('deleted', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, [], True, dropFilter, False) is True + + def test_row_and_drop_filter_combined(self): + """Row selected by rowFilter but dropped by dropFilter.""" + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'John', 'status': 'deleted'} + titlesList = ['name', 'status'] + rowFilter = [(re.compile('^name$', re.IGNORECASE), True, 'regex', re.compile('John', re.IGNORECASE))] + dropFilter = [(re.compile('^status$', re.IGNORECASE), True, 'regex', re.compile('deleted', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, dropFilter, False) is False + + def test_column_not_in_titles(self): + """Filter column not in titles should use [None] as columns.""" + from gam.util.csv_pf import RowFilterMatch + row = {'name': 'John'} + titlesList = ['name'] + # Filter on 'missing_col' which isn't in titlesList + rowFilter = [(re.compile('^missing_col$', re.IGNORECASE), True, 'regex', re.compile('John', re.IGNORECASE))] + # No columns match, so row.get(None, '') = '', regex doesn't match + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + def test_any_match_across_columns(self): + """anyMatch=True across multiple columns: any column matching suffices.""" + from gam.util.csv_pf import RowFilterMatch + row = {'col1': 'no', 'col2': 'yes'} + titlesList = ['col1', 'col2'] + # Match any column starting with 'col' + columnPat = re.compile('^col', re.IGNORECASE) + rowFilter = [(columnPat, True, 'regex', re.compile('yes', re.IGNORECASE))] + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True + + def test_all_match_across_columns(self): + """anyMatch=False (all mode) across columns: ALL columns must match.""" + from gam.util.csv_pf import RowFilterMatch + row = {'col1': 'yes', 'col2': 'no'} + titlesList = ['col1', 'col2'] + columnPat = re.compile('^col', re.IGNORECASE) + rowFilter = [(columnPat, False, 'regex', re.compile('yes', re.IGNORECASE))] + # col2 doesn't match — all mode fails + assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False + + +# --------------------------------------------------------------------------- +# CSVPrintFile title management tests +# --------------------------------------------------------------------------- + +class TestCSVPrintFileTitles: + """Test CSVPrintFile title management methods.""" + + def test_add_title(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitle('email') + assert 'email' in pf.titlesList + assert 'email' in pf.titlesSet + + def test_add_titles_no_duplicate(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['email']) + pf.AddTitles(['email']) + assert pf.titlesList.count('email') == 1 + + def test_add_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['email', 'name', 'role']) + assert pf.titlesList == ['email', 'name', 'role'] + + def test_remove_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['email', 'name', 'role']) + pf.RemoveTitles(['name']) + assert 'name' not in pf.titlesList + assert 'name' not in pf.titlesSet + + def test_set_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['a', 'b', 'c']) + pf.SetTitles(['x', 'y']) + assert pf.titlesList == ['x', 'y'] + assert pf.titlesSet == {'x', 'y'} + + def test_insert_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['b', 'c']) + pf.InsertTitles(0, ['a']) + assert pf.titlesList[0] == 'a' + + def test_move_titles_to_end(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['a', 'b', 'c']) + pf.MoveTitlesToEnd(['a']) + assert pf.titlesList == ['b', 'c', 'a'] + + def test_add_sort_title(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddSortTitle('email') + assert 'email' in pf.sortTitlesList + + def test_add_sort_titles(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddSortTitles(['email', 'name']) + assert pf.sortTitlesList == ['email', 'name'] + + +class TestCSVPrintFileRows: + """Test CSVPrintFile row management.""" + + def test_write_row_no_filter(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['name', 'email']) + pf.WriteRowNoFilter({'name': 'John', 'email': 'john@example.com'}) + assert len(pf.rows) == 1 + assert pf.rows[0]['name'] == 'John' + + def test_append_row(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['name']) + pf.AppendRow({'name': 'test'}) + assert len(pf.rows) == 1 + + def test_header_filter_match_no_filter(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + # No filter = match all + assert pf.headerFilter == [] + assert pf.headerDropFilter == [] + + def test_header_filter_set(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pattern = [(re.compile('^email$', re.IGNORECASE), True)] + pf.SetHeaderFilter(pattern) + assert pf.headerFilter == pattern + + +class TestCSVPrintFileMapTitles: + """Test MapTitles method.""" + + def test_map_titles_basic(self): + from gam.util.csv_pf import CSVPrintFile + pf = CSVPrintFile() + pf.AddTitles(['old_name', 'old_email']) + pf.MapTitles('old_name', 'name') + pf.MapTitles('old_email', 'email') + assert pf.titlesList == ['name', 'email'] + assert 'name' in pf.titlesSet + assert 'old_name' not in pf.titlesSet