Phase 5c - Decompose csv_pf.py + 66 new tests

This commit is contained in:
Jay Lee
2026-07-04 14:49:41 -04:00
parent 0948e239fc
commit 213270fe12
5 changed files with 1082 additions and 443 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@@ -1,2 +0,0 @@
[pytest]
testpaths = tests

View File

@@ -65,6 +65,28 @@ def CheckInputRowFilterHeaders(titlesList, rowFilter, rowDropFilter):
if not status: if not status:
sys.exit(USAGE_ERROR_RC) sys.exit(USAGE_ERROR_RC)
def _stripTimeFromDateTime(rowDate):
if YYYYMMDD_PATTERN.match(rowDate):
try:
rowTime = arrow.Arrow.strptime(rowDate, YYYYMMDD_FORMAT)
except ValueError:
return None
else:
try:
rowTime = arrow.get(rowDate)
except (arrow.parser.ParserError, OverflowError):
return None
return ISOformatTimeStamp(arrow.Arrow(rowTime.year, rowTime.month, rowTime.day, tzinfo='UTC'))
def _getHourMinuteFromDateTime(rowDate):
if YYYYMMDD_PATTERN.match(rowDate):
return None
try:
rowTime = arrow.get(rowDate)
except (arrow.parser.ParserError, OverflowError):
return None
return f'{rowTime.hour:02d}:{rowTime.minute:02d}'
def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter, rowDropFilterModeAll): def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter, rowDropFilterModeAll):
def rowRegexFilterMatch(filterPattern): def rowRegexFilterMatch(filterPattern):
if anyMatch: if anyMatch:
@@ -89,18 +111,6 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter,
return False return False
def stripTimeFromDateTime(rowDate):
if YYYYMMDD_PATTERN.match(rowDate):
try:
rowTime = arrow.Arrow.strptime(rowDate, YYYYMMDD_FORMAT)
except ValueError:
return None
else:
try:
rowTime = arrow.get(rowDate)
except (arrow.parser.ParserError, OverflowError):
return None
return ISOformatTimeStamp(arrow.Arrow(rowTime.year, rowTime.month, rowTime.day, tzinfo='UTC'))
def rowDateTimeFilterMatch(dateMode, op, filterDate): def rowDateTimeFilterMatch(dateMode, op, filterDate):
def checkMatch(rowDate): def checkMatch(rowDate):
@@ -109,7 +119,7 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter,
if rowDate == GC.Values[GC.NEVER_TIME]: if rowDate == GC.Values[GC.NEVER_TIME]:
rowDate = NEVER_TIME rowDate = NEVER_TIME
if dateMode: if dateMode:
rowDate = stripTimeFromDateTime(rowDate) rowDate = _stripTimeFromDateTime(rowDate)
if not rowDate: if not rowDate:
return False return False
if op == '<': if op == '<':
@@ -141,7 +151,7 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter,
if rowDate == GC.Values[GC.NEVER_TIME]: if rowDate == GC.Values[GC.NEVER_TIME]:
rowDate = NEVER_TIME rowDate = NEVER_TIME
if dateMode: if dateMode:
rowDate = stripTimeFromDateTime(rowDate) rowDate = _stripTimeFromDateTime(rowDate)
if not rowDate: if not rowDate:
return False return False
if op == '!=': if op == '!=':
@@ -158,20 +168,12 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter,
return False return False
return True return True
def getHourMinuteFromDateTime(rowDate):
if YYYYMMDD_PATTERN.match(rowDate):
return None
try:
rowTime = arrow.get(rowDate)
except (arrow.parser.ParserError, OverflowError):
return None
return f'{rowTime.hour:02d}:{rowTime.minute:02d}'
def rowTimeOfDayRangeFilterMatch(op, startHourMinute, endHourMinute): def rowTimeOfDayRangeFilterMatch(op, startHourMinute, endHourMinute):
def checkMatch(rowDate): def checkMatch(rowDate):
if not rowDate or not isinstance(rowDate, str) or rowDate == GC.Values[GC.NEVER_TIME]: if not rowDate or not isinstance(rowDate, str) or rowDate == GC.Values[GC.NEVER_TIME]:
return False return False
rowHourMinute = getHourMinuteFromDateTime(rowDate) rowHourMinute = _getHourMinuteFromDateTime(rowDate)
if not rowHourMinute: if not rowHourMinute:
return False return False
if op == '!=': if op == '!=':
@@ -378,48 +380,36 @@ def RowFilterMatch(row, titlesList, rowFilter, rowFilterModeAll, rowDropFilter,
return True return True
def filterMatch(filterVal): def filterMatch(filterVal):
if filterVal[2] == 'regex': match filterVal[2]:
if rowRegexFilterMatch(filterVal[3]): case 'regex':
return True return rowRegexFilterMatch(filterVal[3])
elif filterVal[2] == 'notregex': case 'notregex':
if rowNotRegexFilterMatch(filterVal[3]): return rowNotRegexFilterMatch(filterVal[3])
return True case 'date' | 'time':
elif filterVal[2] in {'date', 'time'}: return rowDateTimeFilterMatch(filterVal[2] == 'date', filterVal[3], filterVal[4])
if rowDateTimeFilterMatch(filterVal[2] == 'date', filterVal[3], filterVal[4]): case 'daterange' | 'timerange':
return True return rowDateTimeRangeFilterMatch(filterVal[2] == 'date', filterVal[3], filterVal[4], filterVal[5])
elif filterVal[2] in {'daterange', 'timerange'}: case 'timeofdayrange':
if rowDateTimeRangeFilterMatch(filterVal[2] == 'date', filterVal[3], filterVal[4], filterVal[5]): return rowTimeOfDayRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5])
return True case 'count' | 'number':
elif filterVal[2] == 'timeofdayrange': return rowCountFilterMatch(filterVal[3], filterVal[4])
if rowTimeOfDayRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]): case 'countrange' | 'numberrange':
return True return rowCountRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5])
elif filterVal[2] in {'count', 'number'}: case 'length':
if rowCountFilterMatch(filterVal[3], filterVal[4]): return rowLengthFilterMatch(filterVal[3], filterVal[4])
return True case 'lengthrange':
elif filterVal[2] in {'countrange', 'numberrange'}: return rowLengthRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5])
if rowCountRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]): case 'boolean':
return True return rowBooleanFilterMatch(filterVal[3])
elif filterVal[2] == 'length': case 'data':
if rowLengthFilterMatch(filterVal[3], filterVal[4]): return rowDataFilterMatch(filterVal[3])
return True case 'notdata':
elif filterVal[2] == 'lengthrange': return rowNotDataFilterMatch(filterVal[3])
if rowLengthRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]): case 'text':
return True return rowTextFilterMatch(filterVal[3], filterVal[4])
elif filterVal[2] == 'boolean': case 'textrange':
if rowBooleanFilterMatch(filterVal[3]): return rowTextRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5])
return True case _:
elif filterVal[2] == 'data':
if rowDataFilterMatch(filterVal[3]):
return True
elif filterVal[2] == 'notdata':
if rowNotDataFilterMatch(filterVal[3]):
return True
elif filterVal[2] == 'text':
if rowTextFilterMatch(filterVal[3], filterVal[4]):
return True
elif filterVal[2] == 'textrange':
if rowTextRangeFilterMatch(filterVal[3], filterVal[4], filterVal[5]):
return True
return False return False
if rowFilter: if rowFilter:
@@ -1272,15 +1262,14 @@ class CSVPrintFile():
if not self.JSONtitlesSet: if not self.JSONtitlesSet:
systemErrorExit(USAGE_ERROR_RC, Msg.NO_COLUMNS_SELECTED_WITH_CSV_OUTPUT_HEADER_FILTER.format(GC.CSV_OUTPUT_HEADER_FILTER, GC.CSV_OUTPUT_HEADER_DROP_FILTER)) systemErrorExit(USAGE_ERROR_RC, Msg.NO_COLUMNS_SELECTED_WITH_CSV_OUTPUT_HEADER_FILTER.format(GC.CSV_OUTPUT_HEADER_FILTER, GC.CSV_OUTPUT_HEADER_DROP_FILTER))
def writeCSVfile(self, list_type, clearRowFilters=False): @staticmethod
def _todriveCSVErrorExit(entityValueList, errMsg):
def todriveCSVErrorExit(entityValueList, errMsg):
systemErrorExit(ACTION_FAILED_RC, formatKeyValueList(Ind.Spaces(), systemErrorExit(ACTION_FAILED_RC, formatKeyValueList(Ind.Spaces(),
Ent.FormatEntityValueList(entityValueList)+[Act.NotPerformed(), errMsg], Ent.FormatEntityValueList(entityValueList)+[Act.NotPerformed(), errMsg],
currentCountNL(0, 0))) currentCountNL(0, 0)))
@staticmethod @staticmethod
def itemgetter(*items): def _itemgetter(*items):
if len(items) == 1: if len(items) == 1:
item = items[0] item = items[0]
def g(obj): def g(obj):
@@ -1290,7 +1279,7 @@ class CSVPrintFile():
return tuple(obj.get(item, '') for item in items) return tuple(obj.get(item, '') for item in items)
return g return g
def writeCSVData(writer): def _writeCSVData(self, writer, titlesList, extrasaction):
try: try:
if not self.outputTranspose: if not self.outputTranspose:
if GM.Globals[GM.CSVFILE][GM.REDIRECT_WRITE_HEADER]: if GM.Globals[GM.CSVFILE][GM.REDIRECT_WRITE_HEADER]:
@@ -1298,7 +1287,7 @@ class CSVPrintFile():
if not self.sortHeaders: if not self.sortHeaders:
writer.writerows(self.rows) writer.writerows(self.rows)
else: else:
for row in sorted(self.rows, key=itemgetter(*self.sortHeaders)): for row in sorted(self.rows, key=self._itemgetter(*self.sortHeaders)):
writer.writerow(row) writer.writerow(row)
else: else:
writer.writerows(self.rows) writer.writerows(self.rows)
@@ -1307,7 +1296,7 @@ class CSVPrintFile():
stderrErrorMsg(e) stderrErrorMsg(e)
return False return False
def setDialect(lineterminator, noEscapeChar): def _setDialect(self, lineterminator, noEscapeChar):
writerDialect = { writerDialect = {
'delimiter': self.columnDelimiter, 'delimiter': self.columnDelimiter,
'doublequote': True, 'doublequote': True,
@@ -1319,18 +1308,18 @@ class CSVPrintFile():
'strict': False} 'strict': False}
return writerDialect return writerDialect
def normalizeSortHeaders(): def _normalizeSortHeaders(self, titlesList):
if self.sortHeaders: if self.sortHeaders:
writerKeyMap = {} writerKeyMap = {}
for k in titlesList: for k in titlesList:
writerKeyMap[k.lower()] = k writerKeyMap[k.lower()] = k
self.sortHeaders = [writerKeyMap[k.lower()] for k in self.sortHeaders if k.lower() in writerKeyMap] self.sortHeaders = [writerKeyMap[k.lower()] for k in self.sortHeaders if k.lower() in writerKeyMap]
def writeCSVToStdout(): def _writeCSVToStdout(self, titlesList, extrasaction):
csvFile = StringIOobject() csvFile = StringIOobject()
writerDialect = setDialect('\n', self.noEscapeChar) writerDialect = self._setDialect('\n', self.noEscapeChar)
writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect) writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect)
if writeCSVData(writer): if self._writeCSVData(writer, titlesList, extrasaction):
try: try:
GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD].write(csvFile.getvalue()) GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD].write(csvFile.getvalue())
except IOError as e: except IOError as e:
@@ -1338,19 +1327,19 @@ class CSVPrintFile():
setSysExitRC(FILE_ERROR_RC) setSysExitRC(FILE_ERROR_RC)
closeFile(csvFile) closeFile(csvFile)
def writeCSVToFile(): def _writeCSVToFile(self, titlesList, extrasaction):
csvFile = GM.Globals[GM.CSVFILE].get(GM.REDIRECT_FD, None) csvFile = GM.Globals[GM.CSVFILE].get(GM.REDIRECT_FD, None)
if not csvFile: if not csvFile:
csvFile = openFile(GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME], GM.Globals[GM.CSVFILE][GM.REDIRECT_MODE], newline='', csvFile = openFile(GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME], GM.Globals[GM.CSVFILE][GM.REDIRECT_MODE], newline='',
encoding=GM.Globals[GM.CSVFILE][GM.REDIRECT_ENCODING], errors='backslashreplace', encoding=GM.Globals[GM.CSVFILE][GM.REDIRECT_ENCODING], errors='backslashreplace',
continueOnError=True) continueOnError=True)
if csvFile: if csvFile:
writerDialect = setDialect(str(GC.Values[GC.CSV_OUTPUT_LINE_TERMINATOR]), self.noEscapeChar) writerDialect = self._setDialect(str(GC.Values[GC.CSV_OUTPUT_LINE_TERMINATOR]), self.noEscapeChar)
writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect) writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect)
writeCSVData(writer) self._writeCSVData(writer, titlesList, extrasaction)
closeFile(csvFile) closeFile(csvFile)
def writeCSVToDrive(): def _writeCSVToDrive(self, list_type, titlesList, extrasaction):
numRows = len(self.rows) numRows = len(self.rows)
numColumns = len(titlesList) numColumns = len(titlesList)
if numRows == 0 and not self.todrive['uploadnodata']: if numRows == 0 and not self.todrive['uploadnodata']:
@@ -1361,9 +1350,9 @@ class CSVPrintFile():
csvFile = TemporaryFile(mode='w+', encoding=UTF8) csvFile = TemporaryFile(mode='w+', encoding=UTF8)
else: else:
csvFile = StringIOobject() csvFile = StringIOobject()
writerDialect = setDialect('\n', self.todrive['noescapechar']) writerDialect = self._setDialect('\n', self.todrive['noescapechar'])
writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect) writer = csv.DictWriter(csvFile, titlesList, extrasaction=extrasaction, **writerDialect)
if writeCSVData(writer): if self._writeCSVData(writer, titlesList, extrasaction):
if ((self.todrive['title'] is None) or if ((self.todrive['title'] is None) or
(not self.todrive['title'] and not self.todrive['timestamp'])): (not self.todrive['title'] and not self.todrive['timestamp'])):
title = f'{GC.Values[GC.DOMAIN]} - {list_type}' title = f'{GC.Values[GC.DOMAIN]} - {list_type}'
@@ -1417,7 +1406,7 @@ class CSVPrintFile():
throwReasons=GAPI.DRIVE_USER_THROW_REASONS, throwReasons=GAPI.DRIVE_USER_THROW_REASONS,
fields='maxImportSizes') fields='maxImportSizes')
if numRows*numColumns > MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]): if numRows*numColumns > MAX_GOOGLE_SHEET_CELLS or importSize > int(result['maxImportSizes'][MIMETYPE_GA_SPREADSHEET]):
todriveCSVErrorExit([Ent.USER, user], Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET) self._todriveCSVErrorExit([Ent.USER, user], Msg.RESULTS_TOO_LARGE_FOR_GOOGLE_SPREADSHEET)
fields = ','.join(['id', 'mimeType', 'webViewLink', 'name', 'capabilities(canEdit)']) fields = ','.join(['id', 'mimeType', 'webViewLink', 'name', 'capabilities(canEdit)'])
body = {'description': self.todrive['description']} body = {'description': self.todrive['description']}
if body['description'] is None: if body['description'] is None:
@@ -1430,9 +1419,9 @@ class CSVPrintFile():
fileId=self.todrive['fileId'], body=body, fields=fields, supportsAllDrives=True) fileId=self.todrive['fileId'], body=body, fields=fields, supportsAllDrives=True)
entityValueList = [Ent.USER, user, Ent.DRIVE_FILE_ID, self.todrive['fileId']] entityValueList = [Ent.USER, user, Ent.DRIVE_FILE_ID, self.todrive['fileId']]
if not result['capabilities']['canEdit']: if not result['capabilities']['canEdit']:
todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) self._todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE)
if result['mimeType'] != MIMETYPE_GA_SPREADSHEET: if result['mimeType'] != MIMETYPE_GA_SPREADSHEET:
todriveCSVErrorExit(entityValueList, f'{Msg.NOT_A} {Ent.Singular(Ent.SPREADSHEET)}') self._todriveCSVErrorExit(entityValueList, f'{Msg.NOT_A} {Ent.Singular(Ent.SPREADSHEET)}')
if not GC.Values[GC.TODRIVE_CLIENTACCESS]: if not GC.Values[GC.TODRIVE_CLIENTACCESS]:
_, sheet = buildGAPIServiceObject(chooseSaAPI(API.SHEETSTD, API.SHEETS), user) _, sheet = buildGAPIServiceObject(chooseSaAPI(API.SHEETSTD, API.SHEETS), user)
if sheet is None: if sheet is None:
@@ -1453,11 +1442,11 @@ class CSVPrintFile():
sheetId = getSheetIdFromSheetEntity(spreadsheet, self.todrive[sheetEntity]) sheetId = getSheetIdFromSheetEntity(spreadsheet, self.todrive[sheetEntity])
if sheetId is None: if sheetId is None:
if ((sheetEntity != 'sheetEntity') or (self.todrive[sheetEntity]['sheetType'] == Ent.SHEET_ID)): if ((sheetEntity != 'sheetEntity') or (self.todrive[sheetEntity]['sheetType'] == Ent.SHEET_ID)):
todriveCSVErrorExit(entityValueList, Msg.NOT_FOUND) self._todriveCSVErrorExit(entityValueList, Msg.NOT_FOUND)
self.todrive['addsheet'] = True self.todrive['addsheet'] = True
else: else:
if protectedSheetId(spreadsheet, sheetId): if protectedSheetId(spreadsheet, sheetId):
todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE) self._todriveCSVErrorExit(entityValueList, Msg.NOT_WRITABLE)
self.todrive[sheetEntity]['sheetId'] = sheetId self.todrive[sheetEntity]['sheetId'] = sheetId
if self.todrive['addsheet']: if self.todrive['addsheet']:
body = {'requests': [{'addSheet': {'properties': {'title': sheetTitle, 'sheetType': 'GRID'}}}]} body = {'requests': [{'addSheet': {'properties': {'title': sheetTitle, 'sheetType': 'GRID'}}}]}
@@ -1469,7 +1458,7 @@ class CSVPrintFile():
except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied,
GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.badRequest, GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.insufficientParentPermissions, GAPI.badRequest,
GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e:
todriveCSVErrorExit(entityValueList, str(e)) self._todriveCSVErrorExit(entityValueList, str(e))
body = {'requests': []} body = {'requests': []}
if not self.todrive['addsheet']: if not self.todrive['addsheet']:
if self.todrive['backupSheetEntity']: if self.todrive['backupSheetEntity']:
@@ -1501,7 +1490,7 @@ class CSVPrintFile():
except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied, except (GAPI.notFound, GAPI.forbidden, GAPI.permissionDenied,
GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest, GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest,
GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e: GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition) as e:
todriveCSVErrorExit(entityValueList, str(e)) self._todriveCSVErrorExit(entityValueList, str(e))
closeFile(csvFile) closeFile(csvFile)
# Create/update file # Create/update file
else: else:
@@ -1614,7 +1603,7 @@ class CSVPrintFile():
GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest, GAPI.internalError, GAPI.insufficientFilePermissions, GAPI.badRequest,
GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition, GAPI.invalid, GAPI.invalidArgument, GAPI.failedPrecondition,
GAPI.teamDriveFileLimitExceeded, GAPI.teamDriveHierarchyTooDeep) as e: GAPI.teamDriveFileLimitExceeded, GAPI.teamDriveHierarchyTooDeep) as e:
todriveCSVErrorExit([Ent.USER, user, Ent.SPREADSHEET, title], str(e)) self._todriveCSVErrorExit([Ent.USER, user, Ent.SPREADSHEET, title], str(e))
Act.Set(action) Act.Set(action)
file_url = result['webViewLink'] file_url = result['webViewLink']
msg_txt = f'{Msg.DATA_UPLOADED_TO_DRIVE_FILE}:\n{file_url}' msg_txt = f'{Msg.DATA_UPLOADED_TO_DRIVE_FILE}:\n{file_url}'
@@ -1649,26 +1638,7 @@ class CSVPrintFile():
else: else:
closeFile(csvFile) closeFile(csvFile)
if GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE] is not None: def _prepareHeaders(self, clearRowFilters):
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_NAME, list_type))
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_TODRIVE, self.todrive))
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CSVPF,
(self.titlesList, self.sortTitlesList, self.indexedTitles,
self.formatJSON, self.JSONtitlesList,
self.columnDelimiter, self.noEscapeChar, self.quoteChar,
self.sortHeaders, self.timestampColumn,
self.fixPaths,
self.mapNodataFields,
self.nodataFields,
self.driveListFields,
self.driveSubfieldsChoiceMap,
self.oneItemPerRow,
self.showPermissionsLast,
self.zeroBlankMimeTypeCounts)))
if clearRowFilters:
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CLEAR_ROW_FILTERS, clearRowFilters))
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_DATA, self.rows))
return
if self.zeroBlankMimeTypeCounts: if self.zeroBlankMimeTypeCounts:
self.ZeroBlankMimeTypeCounts() self.ZeroBlankMimeTypeCounts()
if not clearRowFilters and (self.rowFilter or self.rowDropFilter): if not clearRowFilters and (self.rowFilter or self.rowDropFilter):
@@ -1730,7 +1700,32 @@ class CSVPrintFile():
if self.headerOrder: if self.headerOrder:
self.JSONtitlesList = self.orderHeaders(self.JSONtitlesList) self.JSONtitlesList = self.orderHeaders(self.JSONtitlesList)
titlesList = self.JSONtitlesList titlesList = self.JSONtitlesList
normalizeSortHeaders() self._normalizeSortHeaders(titlesList)
return titlesList, extrasaction
def writeCSVfile(self, list_type, clearRowFilters=False):
if GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE] is not None:
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_NAME, list_type))
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_TODRIVE, self.todrive))
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CSVPF,
(self.titlesList, self.sortTitlesList, self.indexedTitles,
self.formatJSON, self.JSONtitlesList,
self.columnDelimiter, self.noEscapeChar, self.quoteChar,
self.sortHeaders, self.timestampColumn,
self.fixPaths,
self.mapNodataFields,
self.nodataFields,
self.driveListFields,
self.driveSubfieldsChoiceMap,
self.oneItemPerRow,
self.showPermissionsLast,
self.zeroBlankMimeTypeCounts)))
if clearRowFilters:
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_CLEAR_ROW_FILTERS, clearRowFilters))
GM.Globals[GM.CSVFILE][GM.REDIRECT_QUEUE].put((GM.REDIRECT_QUEUE_DATA, self.rows))
return
titlesList, extrasaction = self._prepareHeaders(clearRowFilters)
if self.outputTranspose: if self.outputTranspose:
newRows = [] newRows = []
newTitlesList = list(range(len(self.rows) + 1)) newTitlesList = list(range(len(self.rows) + 1))
@@ -1746,14 +1741,14 @@ class CSVPrintFile():
if (not self.todrive) or self.todrive['localcopy']: if (not self.todrive) or self.todrive['localcopy']:
if GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME] == '-': if GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME] == '-':
if GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD]: if GM.Globals[GM.STDOUT][GM.REDIRECT_MULTI_FD]:
writeCSVToStdout() self._writeCSVToStdout(titlesList, extrasaction)
else: else:
GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME] = GM.Globals[GM.STDOUT][GM.REDIRECT_NAME] GM.Globals[GM.CSVFILE][GM.REDIRECT_NAME] = GM.Globals[GM.STDOUT][GM.REDIRECT_NAME]
writeCSVToFile() self._writeCSVToFile(titlesList, extrasaction)
else: else:
writeCSVToFile() self._writeCSVToFile(titlesList, extrasaction)
if self.todrive: if self.todrive:
writeCSVToDrive() self._writeCSVToDrive(list_type, titlesList, extrasaction)
if GM.Globals[GM.CSVFILE][GM.REDIRECT_MODE] == DEFAULT_FILE_APPEND_MODE: if GM.Globals[GM.CSVFILE][GM.REDIRECT_MODE] == DEFAULT_FILE_APPEND_MODE:
GM.Globals[GM.CSVFILE][GM.REDIRECT_WRITE_HEADER] = False GM.Globals[GM.CSVFILE][GM.REDIRECT_WRITE_HEADER] = False

View File

@@ -5,10 +5,16 @@ gam package directory on sys.path. We add it in a fixture (not at module level)
to avoid shadowing Python's stdlib 'cmd' module during pytest configuration. to avoid shadowing Python's stdlib 'cmd' module during pytest configuration.
""" """
# Make pytest discover this directory as the test root without needing
# a pytest.ini at the repo root. Running `pytest` or `python -m pytest`
# from anywhere in the repo will find tests here.
collect_ignore_glob = [] # nothing to ignore; marker for rootdir detection
import os import os
import sys import sys
import pytest import pytest
import arrow
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -46,6 +52,25 @@ def _gam_path_and_globals():
GC.Values.setdefault(GC.CUSTOMER_ID, 'C00000000') GC.Values.setdefault(GC.CUSTOMER_ID, 'C00000000')
GC.Values.setdefault(GC.TIMEZONE, 'UTC') GC.Values.setdefault(GC.TIMEZONE, 'UTC')
GC.Values.setdefault(GC.SHOW_COUNTS_MIN, 0) GC.Values.setdefault(GC.SHOW_COUNTS_MIN, 0)
# CSV-related defaults needed by CSVPrintFile
GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_FORCE, [])
GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_REQUIRED, [])
GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_ORDER, [])
GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_FILTER, [])
GC.Values.setdefault(GC.CSV_OUTPUT_HEADER_DROP_FILTER, [])
GC.Values.setdefault(GC.CSV_OUTPUT_ROW_FILTER, [])
GC.Values.setdefault(GC.CSV_OUTPUT_ROW_FILTER_MODE, 'allmatch')
GC.Values.setdefault(GC.CSV_OUTPUT_ROW_DROP_FILTER, [])
GC.Values.setdefault(GC.CSV_OUTPUT_ROW_DROP_FILTER_MODE, 'anymatch')
GC.Values.setdefault(GC.CSV_OUTPUT_ROW_LIMIT, 0)
GC.Values.setdefault(GC.CSV_OUTPUT_COLUMN_DELIMITER, ',')
GC.Values.setdefault(GC.CSV_OUTPUT_QUOTE_CHAR, '"')
GC.Values.setdefault(GC.CSV_OUTPUT_NO_ESCAPE_CHAR, False)
GC.Values.setdefault(GC.CSV_OUTPUT_SORT_HEADERS, [])
GC.Values.setdefault(GC.CSV_OUTPUT_TIMESTAMP_COLUMN, '')
GC.Values.setdefault(GC.CSV_OUTPUT_LINE_TERMINATOR, '\n')
GC.Values.setdefault(GC.NEVER_TIME, 'Never')
GC.Values.setdefault(GC.OUTPUT_TIMEFORMAT, '')
# Ensure Globals dict exists # Ensure Globals dict exists
if not GM.Globals: if not GM.Globals:
@@ -53,6 +78,16 @@ def _gam_path_and_globals():
GM.Globals.setdefault(GM.STDOUT, None) GM.Globals.setdefault(GM.STDOUT, None)
GM.Globals.setdefault(GM.STDERR, None) GM.Globals.setdefault(GM.STDERR, None)
GM.Globals.setdefault(GM.SYSEXITRC, 0) GM.Globals.setdefault(GM.SYSEXITRC, 0)
# CSV-related globals needed by CSVPrintFile
GM.Globals.setdefault(GM.CSV_OUTPUT_TRANSPOSE, False)
GM.Globals.setdefault(GM.CSV_TODRIVE, {})
GM.Globals.setdefault(GM.CSV_OUTPUT_COLUMN_DELIMITER, None)
GM.Globals.setdefault(GM.CSV_OUTPUT_QUOTE_CHAR, None)
GM.Globals.setdefault(GM.CSV_OUTPUT_NO_ESCAPE_CHAR, None)
GM.Globals.setdefault(GM.CSV_OUTPUT_SORT_HEADERS, [])
GM.Globals.setdefault(GM.CSV_OUTPUT_TIMESTAMP_COLUMN, '')
if not GM.Globals.get(GM.DATETIME_NOW):
GM.Globals[GM.DATETIME_NOW] = arrow.now('UTC')
yield yield

611
tests/test_csv_pf.py Normal file
View File

@@ -0,0 +1,611 @@
"""Unit tests for gam.util.csv_pf — RowFilterMatch and CSV output functions.
Tests cover all filter types: regex, boolean, count, date, time, length,
text, data, ranges, and their negations/combinations. Also covers
CSVPrintFile header processing, title management, and output formatting.
"""
import re
import pytest
import arrow
# ---------------------------------------------------------------------------
# RowFilterMatch tests
# ---------------------------------------------------------------------------
class TestRowFilterMatchRegex:
"""Test regex and notregex filter types."""
def test_regex_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'John Smith', 'email': 'john@example.com'}
titlesList = ['name', 'email']
# filterVal: (columnPat, anyMatch, filterType, ...)
columnPat = re.compile('^name$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'regex', re.compile('John', re.IGNORECASE))]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_regex_no_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'Jane Doe'}
titlesList = ['name']
columnPat = re.compile('^name$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'regex', re.compile('^John$', re.IGNORECASE))]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
def test_notregex_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'Jane Doe'}
titlesList = ['name']
columnPat = re.compile('^name$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'notregex', re.compile('^John$', re.IGNORECASE))]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_notregex_no_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'John Smith'}
titlesList = ['name']
columnPat = re.compile('^name$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'notregex', re.compile('John', re.IGNORECASE))]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
def test_regex_case_sensitive(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'john smith'}
titlesList = ['name']
columnPat = re.compile('^name$', re.IGNORECASE)
# Case-sensitive regex should NOT match lowercase
rowFilter = [(columnPat, True, 'regex', re.compile('^John', 0))]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
def test_regex_wildcard_column(self):
"""Regex column pattern matching multiple columns."""
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'no', 'email': 'yes@match.com'}
titlesList = ['name', 'email']
# anyMatch=True: at least one column must match
columnPat = re.compile('.*', re.IGNORECASE)
rowFilter = [(columnPat, True, 'regex', re.compile('match', re.IGNORECASE))]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
class TestRowFilterMatchBoolean:
"""Test boolean filter type."""
def test_boolean_true_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'active': 'True'}
titlesList = ['active']
columnPat = re.compile('^active$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'boolean', True)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_boolean_true_no_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'active': 'False'}
titlesList = ['active']
columnPat = re.compile('^active$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'boolean', True)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
def test_boolean_false_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'active': 'False'}
titlesList = ['active']
columnPat = re.compile('^active$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'boolean', False)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_boolean_native_bool(self):
from gam.util.csv_pf import RowFilterMatch
row = {'active': True}
titlesList = ['active']
columnPat = re.compile('^active$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'boolean', True)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_boolean_blank_is_false(self):
"""Blank string should be treated as False."""
from gam.util.csv_pf import RowFilterMatch
row = {'active': ''}
titlesList = ['active']
columnPat = re.compile('^active$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'boolean', False)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
class TestRowFilterMatchCount:
"""Test count/number filter types."""
def test_count_equals(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': '5'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'count', '=', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_count_greater(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': '10'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'count', '>', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_count_less(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': '3'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'count', '<', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_count_not_equal(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': '3'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'count', '!=', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_count_gte(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': '5'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'count', '>=', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_count_lte(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': '5'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'count', '<=', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_count_blank_is_zero(self):
"""Blank string count should be treated as 0."""
from gam.util.csv_pf import RowFilterMatch
row = {'count': ''}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'count', '=', 0)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_count_non_digit_no_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': 'abc'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'count', '=', 0)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
def test_count_native_int(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': 5}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'count', '=', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_number_alias(self):
"""'number' should work the same as 'count'."""
from gam.util.csv_pf import RowFilterMatch
row = {'val': '42'}
titlesList = ['val']
columnPat = re.compile('^val$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'number', '=', 42)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
class TestRowFilterMatchCountRange:
"""Test countrange/numberrange filter types."""
def test_countrange_in_range(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': '5'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'countrange', '=', 1, 10)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_countrange_out_of_range(self):
from gam.util.csv_pf import RowFilterMatch
row = {'count': '15'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'countrange', '=', 1, 10)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
def test_countrange_not_equal(self):
"""countrange with != should return True when value IS outside range."""
from gam.util.csv_pf import RowFilterMatch
row = {'count': '15'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'countrange', '!=', 1, 10)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_countrange_boundary(self):
"""Boundary values should be inclusive."""
from gam.util.csv_pf import RowFilterMatch
row = {'count': '10'}
titlesList = ['count']
columnPat = re.compile('^count$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'countrange', '=', 1, 10)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
class TestRowFilterMatchLength:
"""Test length and lengthrange filter types."""
def test_length_equals(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'hello'}
titlesList = ['name']
columnPat = re.compile('^name$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'length', '=', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_length_greater(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'hello world'}
titlesList = ['name']
columnPat = re.compile('^name$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'length', '>', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_lengthrange_in_range(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'hello'}
titlesList = ['name']
columnPat = re.compile('^name$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'lengthrange', '=', 3, 8)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_length_non_string_no_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 12345}
titlesList = ['name']
columnPat = re.compile('^name$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'length', '=', 5)]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
class TestRowFilterMatchText:
"""Test text and textrange filter types."""
def test_text_equals(self):
from gam.util.csv_pf import RowFilterMatch
row = {'status': 'active'}
titlesList = ['status']
columnPat = re.compile('^status$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'text', '=', 'active')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_text_not_equal(self):
from gam.util.csv_pf import RowFilterMatch
row = {'status': 'inactive'}
titlesList = ['status']
columnPat = re.compile('^status$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'text', '!=', 'active')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_text_greater(self):
from gam.util.csv_pf import RowFilterMatch
row = {'status': 'b'}
titlesList = ['status']
columnPat = re.compile('^status$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'text', '>', 'a')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_textrange_in_range(self):
from gam.util.csv_pf import RowFilterMatch
row = {'status': 'banana'}
titlesList = ['status']
columnPat = re.compile('^status$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'textrange', '=', 'apple', 'cherry')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_textrange_out_of_range(self):
from gam.util.csv_pf import RowFilterMatch
row = {'status': 'zebra'}
titlesList = ['status']
columnPat = re.compile('^status$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'textrange', '=', 'apple', 'cherry')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
class TestRowFilterMatchData:
"""Test data and notdata filter types."""
def test_data_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'role': 'admin'}
titlesList = ['role']
columnPat = re.compile('^role$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'data', {'admin', 'owner', 'editor'})]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_data_no_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'role': 'viewer'}
titlesList = ['role']
columnPat = re.compile('^role$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'data', {'admin', 'owner', 'editor'})]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
def test_notdata_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'role': 'viewer'}
titlesList = ['role']
columnPat = re.compile('^role$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'notdata', {'admin', 'owner', 'editor'})]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_notdata_no_match(self):
from gam.util.csv_pf import RowFilterMatch
row = {'role': 'admin'}
titlesList = ['role']
columnPat = re.compile('^role$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'notdata', {'admin', 'owner', 'editor'})]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
class TestRowFilterMatchDate:
"""Test date and time filter types."""
def test_date_greater(self):
from gam.util.csv_pf import RowFilterMatch
from gamlib import settings as GC
GC.Values[GC.NEVER_TIME] = 'Never'
row = {'created': '2025-06-15T10:30:00Z'}
titlesList = ['created']
columnPat = re.compile('^created$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'date', '>', '2025-01-01T00:00:00.000Z')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_date_less(self):
from gam.util.csv_pf import RowFilterMatch
from gamlib import settings as GC
GC.Values[GC.NEVER_TIME] = 'Never'
row = {'created': '2024-06-15T10:30:00Z'}
titlesList = ['created']
columnPat = re.compile('^created$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'date', '<', '2025-01-01T00:00:00.000Z')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_time_greater(self):
from gam.util.csv_pf import RowFilterMatch
from gamlib import settings as GC
GC.Values[GC.NEVER_TIME] = 'Never'
row = {'modified': '2025-06-15T10:30:00.000Z'}
titlesList = ['modified']
columnPat = re.compile('^modified$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'time', '>', '2025-01-01T00:00:00.000Z')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_date_empty_no_match(self):
from gam.util.csv_pf import RowFilterMatch
from gamlib import settings as GC
GC.Values[GC.NEVER_TIME] = 'Never'
row = {'created': ''}
titlesList = ['created']
columnPat = re.compile('^created$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'date', '>', '2025-01-01T00:00:00.000Z')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
def test_date_non_string_no_match(self):
from gam.util.csv_pf import RowFilterMatch
from gamlib import settings as GC
GC.Values[GC.NEVER_TIME] = 'Never'
row = {'created': None}
titlesList = ['created']
columnPat = re.compile('^created$', re.IGNORECASE)
rowFilter = [(columnPat, True, 'date', '>', '2025-01-01T00:00:00.000Z')]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
class TestRowFilterMatchModes:
"""Test rowFilter mode combinations (Any vs All) and drop filters."""
def test_no_filters_returns_true(self):
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'anything'}
assert RowFilterMatch(row, ['name'], [], True, [], True) is True
def test_row_filter_mode_any(self):
"""Any mode: at least one filter must match to select."""
from gam.util.csv_pf import RowFilterMatch
row = {'a': 'yes', 'b': 'no'}
titlesList = ['a', 'b']
f1 = (re.compile('^a$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE))
f2 = (re.compile('^b$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE))
# f1 matches, f2 doesn't — Any mode should select
assert RowFilterMatch(row, titlesList, [f1, f2], False, [], True) is True
def test_row_filter_mode_all(self):
"""All mode: all filters must match to select."""
from gam.util.csv_pf import RowFilterMatch
row = {'a': 'yes', 'b': 'no'}
titlesList = ['a', 'b']
f1 = (re.compile('^a$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE))
f2 = (re.compile('^b$', re.IGNORECASE), True, 'regex', re.compile('yes', re.IGNORECASE))
# f1 matches, f2 doesn't — All mode should NOT select
assert RowFilterMatch(row, titlesList, [f1, f2], True, [], True) is False
def test_drop_filter_any(self):
"""Drop filter in Any mode: any match drops the row."""
from gam.util.csv_pf import RowFilterMatch
row = {'status': 'deleted'}
titlesList = ['status']
dropFilter = [(re.compile('^status$', re.IGNORECASE), True, 'regex', re.compile('deleted', re.IGNORECASE))]
assert RowFilterMatch(row, titlesList, [], True, dropFilter, False) is False
def test_drop_filter_no_match_keeps(self):
"""Drop filter that doesn't match should keep the row."""
from gam.util.csv_pf import RowFilterMatch
row = {'status': 'active'}
titlesList = ['status']
dropFilter = [(re.compile('^status$', re.IGNORECASE), True, 'regex', re.compile('deleted', re.IGNORECASE))]
assert RowFilterMatch(row, titlesList, [], True, dropFilter, False) is True
def test_row_and_drop_filter_combined(self):
"""Row selected by rowFilter but dropped by dropFilter."""
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'John', 'status': 'deleted'}
titlesList = ['name', 'status']
rowFilter = [(re.compile('^name$', re.IGNORECASE), True, 'regex', re.compile('John', re.IGNORECASE))]
dropFilter = [(re.compile('^status$', re.IGNORECASE), True, 'regex', re.compile('deleted', re.IGNORECASE))]
assert RowFilterMatch(row, titlesList, rowFilter, True, dropFilter, False) is False
def test_column_not_in_titles(self):
"""Filter column not in titles should use [None] as columns."""
from gam.util.csv_pf import RowFilterMatch
row = {'name': 'John'}
titlesList = ['name']
# Filter on 'missing_col' which isn't in titlesList
rowFilter = [(re.compile('^missing_col$', re.IGNORECASE), True, 'regex', re.compile('John', re.IGNORECASE))]
# No columns match, so row.get(None, '') = '', regex doesn't match
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
def test_any_match_across_columns(self):
"""anyMatch=True across multiple columns: any column matching suffices."""
from gam.util.csv_pf import RowFilterMatch
row = {'col1': 'no', 'col2': 'yes'}
titlesList = ['col1', 'col2']
# Match any column starting with 'col'
columnPat = re.compile('^col', re.IGNORECASE)
rowFilter = [(columnPat, True, 'regex', re.compile('yes', re.IGNORECASE))]
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is True
def test_all_match_across_columns(self):
"""anyMatch=False (all mode) across columns: ALL columns must match."""
from gam.util.csv_pf import RowFilterMatch
row = {'col1': 'yes', 'col2': 'no'}
titlesList = ['col1', 'col2']
columnPat = re.compile('^col', re.IGNORECASE)
rowFilter = [(columnPat, False, 'regex', re.compile('yes', re.IGNORECASE))]
# col2 doesn't match — all mode fails
assert RowFilterMatch(row, titlesList, rowFilter, True, [], True) is False
# ---------------------------------------------------------------------------
# CSVPrintFile title management tests
# ---------------------------------------------------------------------------
class TestCSVPrintFileTitles:
"""Test CSVPrintFile title management methods."""
def test_add_title(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitle('email')
assert 'email' in pf.titlesList
assert 'email' in pf.titlesSet
def test_add_titles_no_duplicate(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitles(['email'])
pf.AddTitles(['email'])
assert pf.titlesList.count('email') == 1
def test_add_titles(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitles(['email', 'name', 'role'])
assert pf.titlesList == ['email', 'name', 'role']
def test_remove_titles(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitles(['email', 'name', 'role'])
pf.RemoveTitles(['name'])
assert 'name' not in pf.titlesList
assert 'name' not in pf.titlesSet
def test_set_titles(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitles(['a', 'b', 'c'])
pf.SetTitles(['x', 'y'])
assert pf.titlesList == ['x', 'y']
assert pf.titlesSet == {'x', 'y'}
def test_insert_titles(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitles(['b', 'c'])
pf.InsertTitles(0, ['a'])
assert pf.titlesList[0] == 'a'
def test_move_titles_to_end(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitles(['a', 'b', 'c'])
pf.MoveTitlesToEnd(['a'])
assert pf.titlesList == ['b', 'c', 'a']
def test_add_sort_title(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddSortTitle('email')
assert 'email' in pf.sortTitlesList
def test_add_sort_titles(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddSortTitles(['email', 'name'])
assert pf.sortTitlesList == ['email', 'name']
class TestCSVPrintFileRows:
"""Test CSVPrintFile row management."""
def test_write_row_no_filter(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitles(['name', 'email'])
pf.WriteRowNoFilter({'name': 'John', 'email': 'john@example.com'})
assert len(pf.rows) == 1
assert pf.rows[0]['name'] == 'John'
def test_append_row(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitles(['name'])
pf.AppendRow({'name': 'test'})
assert len(pf.rows) == 1
def test_header_filter_match_no_filter(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
# No filter = match all
assert pf.headerFilter == []
assert pf.headerDropFilter == []
def test_header_filter_set(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pattern = [(re.compile('^email$', re.IGNORECASE), True)]
pf.SetHeaderFilter(pattern)
assert pf.headerFilter == pattern
class TestCSVPrintFileMapTitles:
"""Test MapTitles method."""
def test_map_titles_basic(self):
from gam.util.csv_pf import CSVPrintFile
pf = CSVPrintFile()
pf.AddTitles(['old_name', 'old_email'])
pf.MapTitles('old_name', 'name')
pf.MapTitles('old_email', 'email')
assert pf.titlesList == ['name', 'email']
assert 'name' in pf.titlesSet
assert 'old_name' not in pf.titlesSet