diff --git a/src/fileutils.py b/src/fileutils.py new file mode 100644 index 00000000..0274540d --- /dev/null +++ b/src/fileutils.py @@ -0,0 +1,174 @@ +"""Common file operations.""" + +import io +import os +import sys + +import controlflow +import display +from var import GM_Globals +from var import GM_SYS_ENCODING +from var import UTF8_SIG + + +def _open_file(filename, mode, encoding=None, newline=None): + """Opens a file with no error handling.""" + # Determine which encoding to use + if 'b' in mode: + encoding = None + elif not encoding: + encoding = GM_Globals[GM_SYS_ENCODING] + elif 'r' in mode and encoding.lower().replace('-', '') == 'utf8': + encoding = UTF8_SIG + + return open( + os.path.expanduser(filename), mode, newline=newline, encoding=encoding) + + +def open_file(filename, + mode='r', + encoding=None, + newline=None, + strip_utf_bom=False): + """Opens a file. + + Args: + filename: String, the name of the file to open, or '-' to use stdin/stdout, + to read/write, depending on the mode param, respectively. + mode: String, the common file mode to open the file with. Default is read. + encoding: String, the name of the encoding used to decode or encode the + file. This should only be used in text mode. + newline: See param description in + https://docs.python.org/3.7/library/functions.html#open + strip_utf_bom: Boolean, True if the file being opened should seek past the + UTF Byte Order Mark before being returned. + See more: https://en.wikipedia.org/wiki/UTF-8#Byte_order_mark + + Returns: + The opened file. + """ + try: + if filename == '-': + # Read from stdin, rather than a file + if 'r' in mode: + return io.StringIO(str(sys.stdin.read())) + return sys.stdout + + # Open a file on disk + f = _open_file(filename, mode, newline=newline, encoding=encoding) + if strip_utf_bom: + utf_bom = u'\ufeff' + has_bom = False + + if 'b' in mode: + has_bom = f.read(3).decode('UTF-8') == utf_bom + elif f.encoding and not f.encoding.lower().startswith('utf'): + # Convert UTF BOM into ISO-8859-1 via Bytes + utf8_bom_bytes = utf_bom.encode('UTF-8') + iso_8859_1_bom = utf8_bom_bytes.decode('iso-8859-1').encode( + 'iso-8859-1') + has_bom = f.read(3).encode('iso-8859-1', 'replace') == iso_8859_1_bom + else: + has_bom = f.read(1) == utf_bom + + if not has_bom: + f.seek(0) + + return f + + except IOError as e: + controlflow.system_error_exit(6, e) + + +def close_file(f): + """Closes a file. + + Args: + f: The file to close + + Returns: + Boolean, True if the file was successfully closed. False if an error + was encountered while closing. + """ + try: + f.close() + return True + except IOError as e: + display.print_error(e) + return False + + +def read_file(filename, + mode='r', + encoding=None, + newline=None, + continue_on_error=False, + display_errors=True): + """Reads a file from disk. + + Args: + filename: String, the path of the file to open from disk, or "-" to read + from stdin. + mode: String, the mode in which to open the file. + encoding: String, the name of the encoding used to decode or encode the + file. This should only be used in text mode. + newline: See param description in + https://docs.python.org/3.7/library/functions.html#open + continue_on_error: Boolean, If True, suppresses any IO errors and returns to + the caller without any externalities. + display_errors: Boolean, If True, prints error messages when errors are + encountered and continue_on_error is True. + + Returns: + The contents of the file, or stdin if filename == "-". Returns None if + an error is encountered and continue_on_errors is True. + """ + try: + if filename == '-': + # Read from stdin, rather than a file. + return str(sys.stdin.read()) + + with _open_file(filename, mode, newline=newline, encoding=encoding) as f: + return f.read() + + except IOError as e: + if continue_on_error: + if display_errors: + display.print_warning(e) + return None + controlflow.system_error_exit(6, e) + except (LookupError, UnicodeDecodeError, UnicodeError) as e: + controlflow.system_error_exit(2, str(e)) + + +def write_file(filename, + data, + mode='w', + continue_on_error=False, + display_errors=True): + """Writes data to a file. + + Args: + filename: String, the path of the file to write to disk. + data: Serializable data to write to the file. + mode: String, the mode in which to open the file and write to it. + continue_on_error: Boolean, If True, suppresses any IO errors and returns to + the caller without any externalities. + display_errors: Boolean, If True, prints error messages when errors are + encountered and continue_on_error is True. + + Returns: + Boolean, True if the write operation succeeded, or False if not. + """ + try: + with _open_file(filename, mode) as f: + f.write(data) + return True + + except IOError as e: + if continue_on_error: + if display_errors: + display.print_error(e) + return False + else: + controlflow.system_error_exit(6, e) diff --git a/src/fileutils_test.py b/src/fileutils_test.py new file mode 100644 index 00000000..5493071b --- /dev/null +++ b/src/fileutils_test.py @@ -0,0 +1,234 @@ +"""Tests for fileutils.""" + +import io +import os +import unittest +from unittest.mock import MagicMock +from unittest.mock import patch + +import fileutils + + +class FileutilsTest(unittest.TestCase): + + def setUp(self): + self.fake_path = '/some/path/to/file' + super(FileutilsTest, self).setUp() + + @patch.object(fileutils.sys, 'stdin') + def test_open_file_stdin(self, mock_stdin): + mock_stdin.read.return_value = 'some stdin content' + f = fileutils.open_file('-', mode='r') + self.assertIsInstance(f, fileutils.io.StringIO) + self.assertEqual(f.getvalue(), mock_stdin.read.return_value) + + def test_open_file_stdout(self): + f = fileutils.open_file('-', mode='w') + self.assertEqual(fileutils.sys.stdout, f) + + @patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open) + def test_open_file_opens_correct_path(self, mock_open): + f = fileutils.open_file(self.fake_path) + self.assertEqual(self.fake_path, mock_open.call_args[0][0]) + self.assertEqual(mock_open.return_value, f) + + @patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open) + def test_open_file_expands_user_file_path(self, mock_open): + file_path = '~/some/path/containing/tilde/shortcut/to/home' + fileutils.open_file(file_path) + opened_path = mock_open.call_args[0][0] + home_path = os.environ.get('HOME') + self.assertIsNotNone(home_path) + self.assertIn(home_path, opened_path) + + @patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open) + def test_open_file_opens_correct_mode(self, mock_open): + fileutils.open_file(self.fake_path) + self.assertEqual('r', mock_open.call_args[0][1]) + + @patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open) + def test_open_file_encoding_for_binary(self, mock_open): + fileutils.open_file(self.fake_path, mode='b') + self.assertIsNone(mock_open.call_args[1]['encoding']) + + @patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open) + def test_open_file_default_system_encoding(self, mock_open): + fileutils.open_file(self.fake_path) + self.assertEqual(fileutils.GM_Globals[fileutils.GM_SYS_ENCODING], + mock_open.call_args[1]['encoding']) + + @patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open) + def test_open_file_utf8_encoding_specified(self, mock_open): + fileutils.open_file(self.fake_path, encoding='UTF-8') + self.assertEqual(fileutils.UTF8_SIG, mock_open.call_args[1]['encoding']) + + def test_open_file_strips_utf_bom_in_utf(self): + bom_prefixed_data = u'\ufefffoobar' + fake_file = io.StringIO(bom_prefixed_data) + mock_open = MagicMock(spec=open, return_value=fake_file) + with patch.object(fileutils, 'open', mock_open): + f = fileutils.open_file(self.fake_path, strip_utf_bom=True) + self.assertEqual('foobar', f.read()) + + def test_open_file_strips_utf_bom_in_non_utf(self): + bom_prefixed_data = b'\xef\xbb\xbffoobar'.decode('iso-8859-1') + + # We need to trick the method under test into believing that a StringIO + # instance is a file with an encoding. Since StringIO does not usually have, + # an encoding, we'll mock it and add our own encoding, but send the other + # methods in use (read and seek) back to the real StringIO object. + real_stringio = io.StringIO(bom_prefixed_data) + mock_file = MagicMock(spec=io.StringIO) + mock_file.read.side_effect = real_stringio.read + mock_file.seek.side_effect = real_stringio.seek + mock_file.encoding = 'iso-8859-1' + + mock_open = MagicMock(spec=open, return_value=mock_file) + with patch.object(fileutils, 'open', mock_open): + f = fileutils.open_file(self.fake_path, strip_utf_bom=True) + self.assertEqual('foobar', f.read()) + + def test_open_file_strips_utf_bom_in_binary(self): + bom_prefixed_data = u'\ufefffoobar'.encode('UTF-8') + fake_file = io.BytesIO(bom_prefixed_data) + mock_open = MagicMock(spec=open, return_value=fake_file) + with patch.object(fileutils, 'open', mock_open): + f = fileutils.open_file(self.fake_path, mode='rb', strip_utf_bom=True) + self.assertEqual(b'foobar', f.read()) + + def test_open_file_strip_utf_bom_when_no_bom_in_data(self): + no_bom_data = 'This data has no BOM' + fake_file = io.StringIO(no_bom_data) + mock_open = MagicMock(spec=open, return_value=fake_file) + + with patch.object(fileutils, 'open', mock_open): + f = fileutils.open_file(self.fake_path, strip_utf_bom=True) + # Since there was no opening BOM, we should be back at the beginning of + # the file. + self.assertEqual(fake_file.tell(), 0) + self.assertEqual(f.read(), no_bom_data) + + @patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open) + def test_open_file_exits_on_io_error(self, mock_open): + mock_open.side_effect = IOError('Fake IOError') + with self.assertRaises(SystemExit) as context: + fileutils.open_file(self.fake_path) + self.assertEqual(context.exception.code, 6) + + def test_close_file_closes_file_successfully(self): + mock_file = MagicMock() + self.assertTrue(fileutils.close_file(mock_file)) + self.assertEqual(mock_file.close.call_count, 1) + + def test_close_file_with_error(self): + mock_file = MagicMock() + mock_file.close.side_effect = IOError() + self.assertFalse(fileutils.close_file(mock_file)) + self.assertEqual(mock_file.close.call_count, 1) + + @patch.object(fileutils.sys, 'stdin') + def test_read_file_from_stdin(self, mock_stdin): + mock_stdin.read.return_value = 'some stdin content' + self.assertEqual(fileutils.read_file('-'), mock_stdin.read.return_value) + + @patch.object(fileutils, '_open_file') + def test_read_file_default_params(self, mock_open_file): + fake_content = 'some fake content' + mock_open_file.return_value.__enter__().read.return_value = fake_content + self.assertEqual(fileutils.read_file(self.fake_path), fake_content) + self.assertEqual(mock_open_file.call_args[0][0], self.fake_path) + self.assertEqual(mock_open_file.call_args[0][1], 'r') + self.assertIsNone(mock_open_file.call_args[1]['newline']) + + @patch.object(fileutils.display, 'print_warning') + @patch.object(fileutils, '_open_file') + def test_read_file_continues_on_errors_without_displaying( + self, mock_open_file, mock_print_warning): + mock_open_file.side_effect = IOError() + contents = fileutils.read_file( + self.fake_path, continue_on_error=True, display_errors=False) + self.assertIsNone(contents) + self.assertFalse(mock_print_warning.called) + + @patch.object(fileutils.display, 'print_warning') + @patch.object(fileutils, '_open_file') + def test_read_file_displays_errors(self, mock_open_file, mock_print_warning): + mock_open_file.side_effect = IOError() + fileutils.read_file( + self.fake_path, continue_on_error=True, display_errors=True) + self.assertTrue(mock_print_warning.called) + + @patch.object(fileutils, '_open_file') + def test_read_file_exits_code_6_when_continue_on_error_is_false( + self, mock_open_file): + mock_open_file.side_effect = IOError() + with self.assertRaises(SystemExit) as context: + fileutils.read_file(self.fake_path, continue_on_error=False) + self.assertEqual(context.exception.code, 6) + + @patch.object(fileutils, '_open_file') + def test_read_file_exits_code_2_on_lookuperror(self, mock_open_file): + mock_open_file.return_value.__enter__().read.side_effect = LookupError() + with self.assertRaises(SystemExit) as context: + fileutils.read_file(self.fake_path) + self.assertEqual(context.exception.code, 2) + + @patch.object(fileutils, '_open_file') + def test_read_file_exits_code_2_on_unicodeerror(self, mock_open_file): + mock_open_file.return_value.__enter__().read.side_effect = UnicodeError() + with self.assertRaises(SystemExit) as context: + fileutils.read_file(self.fake_path) + self.assertEqual(context.exception.code, 2) + + @patch.object(fileutils, '_open_file') + def test_read_file_exits_code_2_on_unicodedecodeerror(self, mock_open_file): + fake_decode_error = UnicodeDecodeError('fake-encoding', b'fakebytes', 0, 1, + 'testing only') + mock_open_file.return_value.__enter__().read.side_effect = fake_decode_error + with self.assertRaises(SystemExit) as context: + fileutils.read_file(self.fake_path) + self.assertEqual(context.exception.code, 2) + + @patch.object(fileutils, '_open_file') + def test_write_file_writes_data_to_file(self, mock_open_file): + fake_data = 'some fake data' + fileutils.write_file(self.fake_path, fake_data) + self.assertEqual(mock_open_file.call_args[0][0], self.fake_path) + self.assertEqual(mock_open_file.call_args[0][1], 'w') + + opened_file = mock_open_file.return_value.__enter__() + self.assertTrue(opened_file.write.called) + self.assertEqual(opened_file.write.call_args[0][0], fake_data) + + @patch.object(fileutils.display, 'print_error') + @patch.object(fileutils, '_open_file') + def test_write_file_continues_on_errors_without_displaying( + self, mock_open_file, mock_print_error): + mock_open_file.side_effect = IOError() + status = fileutils.write_file( + self.fake_path, + 'foo data', + continue_on_error=True, + display_errors=False) + self.assertFalse(status) + self.assertFalse(mock_print_error.called) + + @patch.object(fileutils.display, 'print_error') + @patch.object(fileutils, '_open_file') + def test_write_file_displays_errors(self, mock_open_file, mock_print_error): + mock_open_file.side_effect = IOError() + fileutils.write_file( + self.fake_path, 'foo data', continue_on_error=True, display_errors=True) + self.assertTrue(mock_print_error.called) + + @patch.object(fileutils, '_open_file') + def test_write_file_exits_code_6_when_continue_on_error_is_false( + self, mock_open_file): + mock_open_file.side_effect = IOError() + with self.assertRaises(SystemExit) as context: + fileutils.write_file(self.fake_path, 'foo data', continue_on_error=False) + self.assertEqual(context.exception.code, 6) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/gam.py b/src/gam.py index 4257b7d8..44c26dd2 100755 --- a/src/gam.py +++ b/src/gam.py @@ -77,6 +77,7 @@ from cryptography.x509.oid import NameOID import controlflow import display +import fileutils import gapi.errors import gapi import utils @@ -459,83 +460,6 @@ def normalizeStudentGuardianEmailAddressOrUID(emailAddressOrUID): return emailAddressOrUID return normalizeEmailAddressOrUID(emailAddressOrUID) # -# Set file encoding to handle UTF8 BOM -# -def setEncoding(mode, encoding): - if 'b' in mode: - return {} - if not encoding: - encoding = GM_Globals[GM_SYS_ENCODING] - if 'r' in mode and encoding.lower().replace('-', '') == 'utf8': - encoding = UTF8_SIG - return {'encoding': encoding} -# -# Open a file -# -def openFile(filename, mode='r', encoding=None, newline=None, - stripUTFBOM=False): - try: - if filename != '-': - kwargs = setEncoding(mode, encoding) - f = open(os.path.expanduser(filename), mode, newline=newline, **kwargs) - if stripUTFBOM: - if 'b' in mode or not kwargs['encoding'].lower().startswith('utf'): - if f.read(3).encode('iso-8859-1', 'replace') != b'\xef\xbb\xbf': - f.seek(0) - else: - if f.read(1) != '\ufeff': - f.seek(0) - return f - if 'r' in mode: - return io.StringIO(str(sys.stdin.read())) - return sys.stdout - except IOError as e: - controlflow.system_error_exit(6, e) -# -# Close a file -# -def closeFile(f): - try: - f.close() - return True - except IOError as e: - display.print_error(e) - return False -# -# Read a file -# -def readFile(filename, mode='r', encoding=None, newline=None, - continueOnError=False, displayError=True): - try: - if filename != '-': - kwargs = setEncoding(mode, encoding) - with open(os.path.expanduser(filename), mode, newline=newline, **kwargs) as f: - return f.read() - return str(sys.stdin.read()) - except IOError as e: - if continueOnError: - if displayError: - display.print_warning(e) - return None - controlflow.system_error_exit(6, e) - except (LookupError, UnicodeDecodeError, UnicodeError) as e: - controlflow.system_error_exit(2, str(e)) -# -# Write a file -# -def writeFile(filename, data, mode='w', continueOnError=False, displayError=True): - try: - kwargs = setEncoding(mode, None) - with open(os.path.expanduser(filename), mode, **kwargs) as f: - f.write(data) - return True - except IOError as e: - if continueOnError: - if displayError: - display.print_error(e) - return False - controlflow.system_error_exit(6, e) -# # Set global variables # Check for GAM updates based on status of noupdatecheck.txt # @@ -752,7 +676,7 @@ def doGAMCheckForUpdates(forceCheck=False): if forceCheck: check_url = GAM_ALL_RELEASES # includes pre-releases else: - last_check_time_str = readFile(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], continueOnError=True, displayError=False) + last_check_time_str = fileutils.read_file(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], continue_on_error=True, display_errors=False) last_check_time = int(last_check_time_str) if last_check_time_str and last_check_time_str.isdigit() else 0 if last_check_time > now_time-604800: return @@ -777,7 +701,7 @@ def doGAMCheckForUpdates(forceCheck=False): if forceCheck or (latest_version > current_version): print('Version Check:\n Current: {0}\n Latest: {1}'.format(current_version, latest_version)) if latest_version <= current_version: - writeFile(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], str(now_time), continueOnError=True, displayError=forceCheck) + fileutils.write_file(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], str(now_time), continue_on_error=True, display_errors=forceCheck) return announcement = release_data.get('body_text', 'No details about this release') sys.stderr.write('\nGAM %s release notes:\n\n' % latest_version) @@ -789,7 +713,7 @@ def doGAMCheckForUpdates(forceCheck=False): webbrowser.open(release_data['html_url']) printLine(MESSAGE_GAM_EXITING_FOR_UPDATE) sys.exit(0) - writeFile(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], str(now_time), continueOnError=True, displayError=forceCheck) + fileutils.write_file(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], str(now_time), continue_on_error=True, display_errors=forceCheck) return except (httplib2.HttpLib2Error, httplib2.ServerNotFoundError, RuntimeError, socket.timeout): return @@ -879,7 +803,7 @@ def _getServerTLSUsed(location): def _getSvcAcctData(): if not GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]: - json_string = readFile(GC_Values[GC_OAUTH2SERVICE_JSON], continueOnError=True, displayError=True) + json_string = fileutils.read_file(GC_Values[GC_OAUTH2SERVICE_JSON], continueOnError=True, displayError=True) if not json_string: printLine(MESSAGE_INSTRUCTIONS_OAUTH2SERVICE_JSON) controlflow.system_error_exit(6, None) @@ -914,9 +838,9 @@ def readDiscoveryFile(api_version): else: pyinstaller_disc_file = None if os.path.isfile(disc_file): - json_string = readFile(disc_file) + json_string = fileutils.read_file(disc_file) elif pyinstaller_disc_file: - json_string = readFile(pyinstaller_disc_file) + json_string = fileutils.read_file(pyinstaller_disc_file) else: controlflow.system_error_exit(11, MESSAGE_NO_DISCOVERY_INFORMATION.format(disc_file)) try: @@ -926,7 +850,7 @@ def readDiscoveryFile(api_version): controlflow.invalid_json_exit(disc_file) def getOauth2TxtStorageCredentials(): - oauth_string = readFile(GC_Values[GC_OAUTH2_TXT], continueOnError=True, displayError=False) + oauth_string = fileutils.read_file(GC_Values[GC_OAUTH2_TXT], continue_on_error=True, display_errors=False) if not oauth_string: return oauth_data = json.loads(oauth_string) @@ -2918,11 +2842,11 @@ def changeCalendarAttendees(users): else: controlflow.system_error_exit(2, '%s is not a valid argument for "gam update calattendees"' % sys.argv[i]) attendee_map = {} - f = openFile(csv_file) + f = fileutils.open_file(csv_file) csvFile = csv.reader(f) for row in csvFile: attendee_map[row[0].lower()] = row[1].lower() - closeFile(f) + fileutils.close_file(f) for user in users: sys.stdout.write('Checking user %s\n' % user) user, cal = buildCalendarGAPIObject(user) @@ -3283,7 +3207,7 @@ def doPrintJobFetch(): jobid = job['id'] fileName = os.path.join(targetFolder, '{0}-{1}'.format(''.join(c if c in FILENAME_SAFE_CHARS else '_' for c in job['title']), jobid)) _, content = cp._http.request(uri=fileUrl, method='GET') - if writeFile(fileName, content, mode='wb', continueOnError=True): + if fileutils.write_file(fileName, content, mode='wb', continue_on_error=True): # ticket = gapi.call(cp.jobs(), u'getticket', jobid=jobid, use_cjt=True) result = gapi.call(cp.jobs(), 'update', jobid=jobid, semantic_state_diff=ssd) checkCloudPrintResult(result) @@ -3425,7 +3349,7 @@ def doPrintJobSubmit(): mimetype = mimetypes.guess_type(filepath)[0] if mimetype is None: mimetype = 'application/octet-stream' - filecontent = readFile(filepath, mode='rb') + filecontent = fileutils.read_file(filepath, mode='rb') form_files['content'] = {'filename': content, 'content': filecontent, 'mimetype': mimetype} #result = gapi.call(cp.printers(), u'submit', body=body) body, headers = encode_multipart(form_fields, form_files) @@ -3849,7 +3773,7 @@ def doPhoto(users): print(e) continue else: - image_data = readFile(filename, mode='rb', continueOnError=True, displayError=True) + image_data = fileutils.read_file(filename, mode='rb', continue_on_error=True, display_errors=True) if image_data is None: continue body = {'photoData': base64.urlsafe_b64encode(image_data).decode(UTF8)} @@ -3897,7 +3821,7 @@ def getPhoto(users): print(' no photo for %s' % user) continue decoded_photo_data = base64.urlsafe_b64decode(photo_data) - writeFile(filename, decoded_photo_data, mode='wb', continueOnError=True) + fileutils.write_file(filename, decoded_photo_data, mode='wb', continue_on_error=True) def deletePhoto(users): cd = buildGAPIObject('directory') @@ -4935,7 +4859,7 @@ def downloadDriveFile(users): if targetStdout and content[-1] != '\n': fh.write('\n') if not targetStdout: - closeFile(fh) + fileutils.close_file(fh) fileDownloaded = True break except (IOError, httplib2.HttpLib2Error) as e: @@ -4952,7 +4876,7 @@ def downloadDriveFile(users): fileDownloadFailed = True break if fh and not targetStdout: - closeFile(fh) + fileutils.close_file(fh) os.remove(filename) if not fileDownloaded and not fileDownloadFailed and not csvSheetNotFound: display.print_error('Format ({0}) not available'.format(','.join(exportFormatChoices))) @@ -5150,7 +5074,7 @@ def sendOrDropEmail(users, method='send'): elif myarg == 'file': filename = sys.argv[i+1] i, encoding = getCharSet(i+2) - body = readFile(filename, encoding=encoding) + body = fileutils.read_file(filename, encoding=encoding) elif myarg == 'subject': subject = sys.argv[i+1] i += 2 @@ -5459,7 +5383,7 @@ def addUpdateSendAs(users, i, addCmd): if signature.lower() == 'file': filename = sys.argv[i] i, encoding = getCharSet(i+1) - signature = readFile(filename, encoding=encoding) + signature = fileutils.read_file(filename, encoding=encoding) elif myarg == 'html': html = True i += 1 @@ -5730,7 +5654,7 @@ def addSmime(users): myarg = sys.argv[i].lower() if myarg == 'file': smimefile = sys.argv[i+1] - smimeData = readFile(smimefile, mode='rb') + smimeData = fileutils.read_file(smimefile, mode='rb') body['pkcs12'] = base64.urlsafe_b64encode(smimeData).decode(UTF8) i += 2 elif myarg == 'password': @@ -6595,7 +6519,7 @@ def doSignature(users): if sys.argv[i].lower() == 'file': filename = sys.argv[i+1] i, encoding = getCharSet(i+2) - signature = readFile(filename, encoding=encoding) + signature = fileutils.read_file(filename, encoding=encoding) else: signature = getString(i, 'String', minLen=0) i += 1 @@ -6663,7 +6587,7 @@ def doVacation(users): elif myarg == 'file': filename = sys.argv[i+1] i, encoding = getCharSet(i+2) - message = readFile(filename, encoding=encoding) + message = fileutils.read_file(filename, encoding=encoding) elif myarg == 'replace': matchTag = getString(i+1, 'Tag') matchReplacement = getString(i+2, 'String', minLen=0) @@ -7224,7 +7148,7 @@ def getUserAttributes(i, cd, updateCmd): if sys.argv[i].lower() == 'file': filename = sys.argv[i+1] i, encoding = getCharSet(i+2) - note['value'] = readFile(filename, encoding=encoding) + note['value'] = fileutils.read_file(filename, encoding=encoding) else: note['value'] = sys.argv[i].replace('\\n', '\n') i += 1 @@ -7584,7 +7508,7 @@ def _createClientSecretsOauth2service(httpObj, projectId): name=service_account['name'], body={'privateKeyType': 'TYPE_GOOGLE_CREDENTIALS_FILE', 'keyAlgorithm': 'KEY_ALG_RSA_2048'}) _grantSARotateRights(iam, service_account['name'].rsplit('/', 1)[-1]) oauth2service_data = base64.b64decode(key['privateKeyData']).decode(UTF8) - writeFile(GC_Values[GC_OAUTH2SERVICE_JSON], oauth2service_data, continueOnError=False) + fileutils.write_file(GC_Values[GC_OAUTH2SERVICE_JSON], oauth2service_data, continue_on_error=False) console_credentials_url = 'https://console.developers.google.com/apis/credentials/consent/edit?createClient&newAppInternalUser=true&project=%s' % projectId while True: print('''Please go to: @@ -7623,7 +7547,7 @@ def _createClientSecretsOauth2service(httpObj, projectId): "token_uri": "https://oauth2.googleapis.com/token" } }''' % (client_id, client_secret, projectId) - writeFile(GC_Values[GC_CLIENT_SECRETS_JSON], cs_data, continueOnError=False) + fileutils.write_file(GC_Values[GC_CLIENT_SECRETS_JSON], cs_data, continue_on_error=False) print('That\'s it! Your GAM Project is created and ready to use.') VALIDEMAIL_PATTERN = re.compile(r'^[^@]+@[^@]+\.[^@]+$') @@ -7640,7 +7564,7 @@ def _getValidateLoginHint(login_hint=None): login_hint = None def _getCurrentProjectID(): - cs_data = readFile(GC_Values[GC_CLIENT_SECRETS_JSON], continueOnError=True, displayError=True) + cs_data = fileutils.read_file(GC_Values[GC_CLIENT_SECRETS_JSON], continue_on_error=True, display_errors=True) if not cs_data: controlflow.system_error_exit(14, 'Your client secrets file:\n\n%s\n\nis missing. Please recreate the file.' % GC_Values[GC_CLIENT_SECRETS_JSON]) try: @@ -7875,7 +7799,7 @@ def doUpdateProjects(): http=httpObj, cache_discovery=False, discoveryServiceUrl=googleapiclient.discovery.V2_DISCOVERY_URI) _getSvcAcctData() # needed to read in GM_OAUTH2SERVICE_JSON_DATA - sa_email = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['client_email'] + sa_email = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['client_email'] _grantSARotateRights(iam, sa_email) def _generatePrivateKeyAndPublicCert(client_id, key_size): @@ -7984,7 +7908,7 @@ def rotateServiceAccountKeys(): result = gapi.call(iam.projects().serviceAccounts().keys(), 'create', name=name, body=body) oauth2service_data = base64.b64decode(result['privateKeyData']).decode(UTF8) private_key_id = result.get('name').rsplit('/', 1)[-1] - writeFile(GC_Values[GC_OAUTH2SERVICE_JSON], oauth2service_data, continueOnError=False) + fileutils.write_file(GC_Values[GC_OAUTH2SERVICE_JSON], oauth2service_data, continueOnError=False) print(' Wrote new private key {0} to {1}'.format(private_key_id, GC_Values[GC_OAUTH2SERVICE_JSON])) if delete_existing: for akey in keys.get('keys'): @@ -8391,7 +8315,7 @@ def _getCloudStorageObject(s, bucket, object_, local_file=None, expectedMd5=None file_path = os.path.dirname(local_file) if not os.path.exists(file_path): os.makedirs(file_path) - f = openFile(local_file, 'wb') + f = fileutils.open_file(local_file, 'wb') downloader = googleapiclient.http.MediaIoBaseDownload(f, request) done = False while not done: @@ -8403,17 +8327,17 @@ def _getCloudStorageObject(s, bucket, object_, local_file=None, expectedMd5=None # https://stackoverflow.com/a/13762137/1503886 f.flush() os.fsync(f.fileno()) - closeFile(f) + fileutils.close_file(f) if expectedMd5: - f = openFile(local_file, 'rb') + f = fileutils.open_file(local_file, 'rb') sys.stdout.write(' Verifying file hash is %s...' % expectedMd5) sys.stdout.flush() md5MatchesFile(local_file, expectedMd5, True) print('VERIFIED') - closeFile(f) + fileutils.close_file(f) def md5MatchesFile(local_file, expected_md5, exitOnError): - f = openFile(local_file, 'rb') + f = fileutils.open_file(local_file, 'rb') hash_md5 = hashlib.md5() for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) @@ -8471,7 +8395,7 @@ def doDownloadVaultExport(): filename = os.path.join(targetFolder, s_object.replace('/', '-')) print('saving to %s' % filename) request = s.objects().get_media(bucket=bucket, object=s_object) - f = openFile(filename, 'wb') + f = fileutils.open_file(filename, 'wb') downloader = googleapiclient.http.MediaIoBaseDownload(f, request) done = False while not done: @@ -8483,7 +8407,7 @@ def doDownloadVaultExport(): # https://stackoverflow.com/a/13762137/1503886 f.flush() os.fsync(f.fileno()) - closeFile(f) + fileutils.close_file(f) if verifyFiles: expected_hash = s_file['md5Hash'] sys.stdout.write(' Verifying file hash is %s...' % expected_hash) @@ -10475,7 +10399,7 @@ def doGetCrosInfo(): if deviceFile: downloadfilename = os.path.join(targetFolder, 'cros-logs-{0}-{1}.zip'.format(deviceId, deviceFile['createTime'])) _, content = cd._http.request(deviceFile['downloadUrl']) - writeFile(downloadfilename, content, mode='wb', continueOnError=True) + fileutils.write_file(downloadfilename, content, mode='wb', continue_on_error=True) print('Downloaded: {0}'.format(downloadfilename)) elif downloadfile: print('ERROR: no files to download.') @@ -10562,7 +10486,7 @@ def doSiteVerifyShow(): webserver_file_record = gapi.call(verif.webResource(), 'getToken', body={'site':{'type':'SITE', 'identifier':'http://%s/' % a_domain}, 'verificationMethod':'FILE'}) webserver_file_token = webserver_file_record['token'] print('Saving web server verification file to: %s' % webserver_file_token) - writeFile(webserver_file_token, 'google-site-verification: {0}'.format(webserver_file_token), continueOnError=True) + fileutils.write_file(webserver_file_token, 'google-site-verification: {0}'.format(webserver_file_token), continue_on_error=True) print('Verification File URL: http://%s/%s' % (a_domain, webserver_file_token)) print() webserver_meta_record = gapi.call(verif.webResource(), 'getToken', body={'site':{'type':'SITE', 'identifier':'http://%s/' % a_domain}, 'verificationMethod':'META'}) @@ -13042,12 +12966,12 @@ def getUsersToModify(entity_type=None, entity=None, silent=False, member_type=No users = doPrintLicenses(returnFields='userId', skus=entity.split(',')) elif entity_type in ['file', 'crosfile']: users = [] - f = openFile(entity, stripUTFBOM=True) + f = fileutils.open_file(entity, strip_utf_bom=True) for row in f: user = row.strip() if user: users.append(user) - closeFile(f) + fileutils.close_file(f) if entity_type == 'crosfile': entity = 'cros' elif entity_type in ['csv', 'csvfile', 'croscsv', 'croscsvfile']: @@ -13055,7 +12979,7 @@ def getUsersToModify(entity_type=None, entity=None, silent=False, member_type=No if filenameColumn.find(':') == -1: controlflow.system_error_exit(2, 'Expected {0} FileName:FieldName'.format(entity_type)) (filename, column) = filenameColumn.split(':') - f = openFile(drive+filename) + f = fileutils.open_file(drive+filename) input_file = csv.DictReader(f, restval='') if column not in input_file.fieldnames: controlflow.csv_field_error_exit(column, input_file.fieldnames) @@ -13064,7 +12988,7 @@ def getUsersToModify(entity_type=None, entity=None, silent=False, member_type=No user = row[column].strip() if user: users.append(user) - closeFile(f) + fileutils.close_file(f) if entity_type in ['croscsv', 'croscsvfile']: entity = 'cros' elif entity_type in ['courseparticipants', 'teachers', 'students']: @@ -13241,7 +13165,7 @@ def writeCredentials(creds): controlflow.system_error_exit(13, 'Wrong OAuth 2.0 credentials issuer. Got %s, expected one of %s' % (_getValueFromOAuth('iss', creds), ', '.join(expected_iss))) creds_data['decoded_id_token'] = GC_Values[GC_DECODED_ID_TOKEN] data = json.dumps(creds_data, indent=2, sort_keys=True) - writeFile(GC_Values[GC_OAUTH2_TXT], data) + fileutils.write_file(GC_Values[GC_OAUTH2_TXT], data) def doRequestOAuth(login_hint=None): credentials = getOauth2TxtStorageCredentials() @@ -13265,7 +13189,7 @@ def getOAuthClientIDAndSecret(): gam create project ''' filename = GC_Values[GC_CLIENT_SECRETS_JSON] - cs_data = readFile(filename, continueOnError=True, displayError=True) + cs_data = fileutils.read_file(filename, continue_on_error=True, display_errors=True) if not cs_data: controlflow.system_error_exit(14, MISSING_CLIENT_SECRETS_MESSAGE) try: @@ -13958,7 +13882,7 @@ def ProcessGAMCommand(args): i = 2 filename = sys.argv[i] i, encoding = getCharSet(i+1) - f = openFile(filename, encoding=encoding, stripUTFBOM=True) + f = fileutils.open_file(filename, encoding=encoding, strip_utf_bom=True) items = [] errors = 0 for line in f: @@ -13981,7 +13905,7 @@ def ProcessGAMCommand(args): sys.stderr.write(utils.convertUTF8('Command: >>>{0}<<<\n'.format(line.strip()))) sys.stderr.write('{0}Invalid: Expected \n'.format(ERROR_PREFIX)) errors += 1 - closeFile(f) + fileutils.close_file(f) if errors == 0: run_batch(items) sys.exit(0) @@ -13993,7 +13917,7 @@ def ProcessGAMCommand(args): i = 2 filename = sys.argv[i] i, encoding = getCharSet(i+1) - f = openFile(filename, encoding=encoding) + f = fileutils.open_file(filename, encoding=encoding) csvFile = csv.DictReader(f) if (i == len(sys.argv)) or (sys.argv[i].lower() != 'gam') or (i+1 == len(sys.argv)): controlflow.system_error_exit(3, '"gam csv " must be followed by a full GAM command...') @@ -14002,7 +13926,7 @@ def ProcessGAMCommand(args): items = [] for row in csvFile: items.append(['gam']+processSubFields(GAM_argv, row, subFields)) - closeFile(f) + fileutils.close_file(f) run_batch(items) sys.exit(0) elif command == 'version':