Move file utilities and add tests (#1059)

* Move file utilities and add tests

Separates common file operations into their own component with
appropriate unit tests.

* Remove use of unittest.mock.mock_open

Its behavior is slightly different across Python 3.5, 3.6, and 3.7
making it difficult to test across Python versions.
This commit is contained in:
ejochman
2019-12-28 07:47:04 -08:00
committed by Jay Lee
parent cb9f5eab14
commit 6a14964f8f
3 changed files with 454 additions and 122 deletions

174
src/fileutils.py Normal file
View File

@@ -0,0 +1,174 @@
"""Common file operations."""
import io
import os
import sys
import controlflow
import display
from var import GM_Globals
from var import GM_SYS_ENCODING
from var import UTF8_SIG
def _open_file(filename, mode, encoding=None, newline=None):
"""Opens a file with no error handling."""
# Determine which encoding to use
if 'b' in mode:
encoding = None
elif not encoding:
encoding = GM_Globals[GM_SYS_ENCODING]
elif 'r' in mode and encoding.lower().replace('-', '') == 'utf8':
encoding = UTF8_SIG
return open(
os.path.expanduser(filename), mode, newline=newline, encoding=encoding)
def open_file(filename,
mode='r',
encoding=None,
newline=None,
strip_utf_bom=False):
"""Opens a file.
Args:
filename: String, the name of the file to open, or '-' to use stdin/stdout,
to read/write, depending on the mode param, respectively.
mode: String, the common file mode to open the file with. Default is read.
encoding: String, the name of the encoding used to decode or encode the
file. This should only be used in text mode.
newline: See param description in
https://docs.python.org/3.7/library/functions.html#open
strip_utf_bom: Boolean, True if the file being opened should seek past the
UTF Byte Order Mark before being returned.
See more: https://en.wikipedia.org/wiki/UTF-8#Byte_order_mark
Returns:
The opened file.
"""
try:
if filename == '-':
# Read from stdin, rather than a file
if 'r' in mode:
return io.StringIO(str(sys.stdin.read()))
return sys.stdout
# Open a file on disk
f = _open_file(filename, mode, newline=newline, encoding=encoding)
if strip_utf_bom:
utf_bom = u'\ufeff'
has_bom = False
if 'b' in mode:
has_bom = f.read(3).decode('UTF-8') == utf_bom
elif f.encoding and not f.encoding.lower().startswith('utf'):
# Convert UTF BOM into ISO-8859-1 via Bytes
utf8_bom_bytes = utf_bom.encode('UTF-8')
iso_8859_1_bom = utf8_bom_bytes.decode('iso-8859-1').encode(
'iso-8859-1')
has_bom = f.read(3).encode('iso-8859-1', 'replace') == iso_8859_1_bom
else:
has_bom = f.read(1) == utf_bom
if not has_bom:
f.seek(0)
return f
except IOError as e:
controlflow.system_error_exit(6, e)
def close_file(f):
"""Closes a file.
Args:
f: The file to close
Returns:
Boolean, True if the file was successfully closed. False if an error
was encountered while closing.
"""
try:
f.close()
return True
except IOError as e:
display.print_error(e)
return False
def read_file(filename,
mode='r',
encoding=None,
newline=None,
continue_on_error=False,
display_errors=True):
"""Reads a file from disk.
Args:
filename: String, the path of the file to open from disk, or "-" to read
from stdin.
mode: String, the mode in which to open the file.
encoding: String, the name of the encoding used to decode or encode the
file. This should only be used in text mode.
newline: See param description in
https://docs.python.org/3.7/library/functions.html#open
continue_on_error: Boolean, If True, suppresses any IO errors and returns to
the caller without any externalities.
display_errors: Boolean, If True, prints error messages when errors are
encountered and continue_on_error is True.
Returns:
The contents of the file, or stdin if filename == "-". Returns None if
an error is encountered and continue_on_errors is True.
"""
try:
if filename == '-':
# Read from stdin, rather than a file.
return str(sys.stdin.read())
with _open_file(filename, mode, newline=newline, encoding=encoding) as f:
return f.read()
except IOError as e:
if continue_on_error:
if display_errors:
display.print_warning(e)
return None
controlflow.system_error_exit(6, e)
except (LookupError, UnicodeDecodeError, UnicodeError) as e:
controlflow.system_error_exit(2, str(e))
def write_file(filename,
data,
mode='w',
continue_on_error=False,
display_errors=True):
"""Writes data to a file.
Args:
filename: String, the path of the file to write to disk.
data: Serializable data to write to the file.
mode: String, the mode in which to open the file and write to it.
continue_on_error: Boolean, If True, suppresses any IO errors and returns to
the caller without any externalities.
display_errors: Boolean, If True, prints error messages when errors are
encountered and continue_on_error is True.
Returns:
Boolean, True if the write operation succeeded, or False if not.
"""
try:
with _open_file(filename, mode) as f:
f.write(data)
return True
except IOError as e:
if continue_on_error:
if display_errors:
display.print_error(e)
return False
else:
controlflow.system_error_exit(6, e)

234
src/fileutils_test.py Normal file
View File

@@ -0,0 +1,234 @@
"""Tests for fileutils."""
import io
import os
import unittest
from unittest.mock import MagicMock
from unittest.mock import patch
import fileutils
class FileutilsTest(unittest.TestCase):
def setUp(self):
self.fake_path = '/some/path/to/file'
super(FileutilsTest, self).setUp()
@patch.object(fileutils.sys, 'stdin')
def test_open_file_stdin(self, mock_stdin):
mock_stdin.read.return_value = 'some stdin content'
f = fileutils.open_file('-', mode='r')
self.assertIsInstance(f, fileutils.io.StringIO)
self.assertEqual(f.getvalue(), mock_stdin.read.return_value)
def test_open_file_stdout(self):
f = fileutils.open_file('-', mode='w')
self.assertEqual(fileutils.sys.stdout, f)
@patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open)
def test_open_file_opens_correct_path(self, mock_open):
f = fileutils.open_file(self.fake_path)
self.assertEqual(self.fake_path, mock_open.call_args[0][0])
self.assertEqual(mock_open.return_value, f)
@patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open)
def test_open_file_expands_user_file_path(self, mock_open):
file_path = '~/some/path/containing/tilde/shortcut/to/home'
fileutils.open_file(file_path)
opened_path = mock_open.call_args[0][0]
home_path = os.environ.get('HOME')
self.assertIsNotNone(home_path)
self.assertIn(home_path, opened_path)
@patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open)
def test_open_file_opens_correct_mode(self, mock_open):
fileutils.open_file(self.fake_path)
self.assertEqual('r', mock_open.call_args[0][1])
@patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open)
def test_open_file_encoding_for_binary(self, mock_open):
fileutils.open_file(self.fake_path, mode='b')
self.assertIsNone(mock_open.call_args[1]['encoding'])
@patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open)
def test_open_file_default_system_encoding(self, mock_open):
fileutils.open_file(self.fake_path)
self.assertEqual(fileutils.GM_Globals[fileutils.GM_SYS_ENCODING],
mock_open.call_args[1]['encoding'])
@patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open)
def test_open_file_utf8_encoding_specified(self, mock_open):
fileutils.open_file(self.fake_path, encoding='UTF-8')
self.assertEqual(fileutils.UTF8_SIG, mock_open.call_args[1]['encoding'])
def test_open_file_strips_utf_bom_in_utf(self):
bom_prefixed_data = u'\ufefffoobar'
fake_file = io.StringIO(bom_prefixed_data)
mock_open = MagicMock(spec=open, return_value=fake_file)
with patch.object(fileutils, 'open', mock_open):
f = fileutils.open_file(self.fake_path, strip_utf_bom=True)
self.assertEqual('foobar', f.read())
def test_open_file_strips_utf_bom_in_non_utf(self):
bom_prefixed_data = b'\xef\xbb\xbffoobar'.decode('iso-8859-1')
# We need to trick the method under test into believing that a StringIO
# instance is a file with an encoding. Since StringIO does not usually have,
# an encoding, we'll mock it and add our own encoding, but send the other
# methods in use (read and seek) back to the real StringIO object.
real_stringio = io.StringIO(bom_prefixed_data)
mock_file = MagicMock(spec=io.StringIO)
mock_file.read.side_effect = real_stringio.read
mock_file.seek.side_effect = real_stringio.seek
mock_file.encoding = 'iso-8859-1'
mock_open = MagicMock(spec=open, return_value=mock_file)
with patch.object(fileutils, 'open', mock_open):
f = fileutils.open_file(self.fake_path, strip_utf_bom=True)
self.assertEqual('foobar', f.read())
def test_open_file_strips_utf_bom_in_binary(self):
bom_prefixed_data = u'\ufefffoobar'.encode('UTF-8')
fake_file = io.BytesIO(bom_prefixed_data)
mock_open = MagicMock(spec=open, return_value=fake_file)
with patch.object(fileutils, 'open', mock_open):
f = fileutils.open_file(self.fake_path, mode='rb', strip_utf_bom=True)
self.assertEqual(b'foobar', f.read())
def test_open_file_strip_utf_bom_when_no_bom_in_data(self):
no_bom_data = 'This data has no BOM'
fake_file = io.StringIO(no_bom_data)
mock_open = MagicMock(spec=open, return_value=fake_file)
with patch.object(fileutils, 'open', mock_open):
f = fileutils.open_file(self.fake_path, strip_utf_bom=True)
# Since there was no opening BOM, we should be back at the beginning of
# the file.
self.assertEqual(fake_file.tell(), 0)
self.assertEqual(f.read(), no_bom_data)
@patch.object(fileutils, 'open', new_callable=unittest.mock.mock_open)
def test_open_file_exits_on_io_error(self, mock_open):
mock_open.side_effect = IOError('Fake IOError')
with self.assertRaises(SystemExit) as context:
fileutils.open_file(self.fake_path)
self.assertEqual(context.exception.code, 6)
def test_close_file_closes_file_successfully(self):
mock_file = MagicMock()
self.assertTrue(fileutils.close_file(mock_file))
self.assertEqual(mock_file.close.call_count, 1)
def test_close_file_with_error(self):
mock_file = MagicMock()
mock_file.close.side_effect = IOError()
self.assertFalse(fileutils.close_file(mock_file))
self.assertEqual(mock_file.close.call_count, 1)
@patch.object(fileutils.sys, 'stdin')
def test_read_file_from_stdin(self, mock_stdin):
mock_stdin.read.return_value = 'some stdin content'
self.assertEqual(fileutils.read_file('-'), mock_stdin.read.return_value)
@patch.object(fileutils, '_open_file')
def test_read_file_default_params(self, mock_open_file):
fake_content = 'some fake content'
mock_open_file.return_value.__enter__().read.return_value = fake_content
self.assertEqual(fileutils.read_file(self.fake_path), fake_content)
self.assertEqual(mock_open_file.call_args[0][0], self.fake_path)
self.assertEqual(mock_open_file.call_args[0][1], 'r')
self.assertIsNone(mock_open_file.call_args[1]['newline'])
@patch.object(fileutils.display, 'print_warning')
@patch.object(fileutils, '_open_file')
def test_read_file_continues_on_errors_without_displaying(
self, mock_open_file, mock_print_warning):
mock_open_file.side_effect = IOError()
contents = fileutils.read_file(
self.fake_path, continue_on_error=True, display_errors=False)
self.assertIsNone(contents)
self.assertFalse(mock_print_warning.called)
@patch.object(fileutils.display, 'print_warning')
@patch.object(fileutils, '_open_file')
def test_read_file_displays_errors(self, mock_open_file, mock_print_warning):
mock_open_file.side_effect = IOError()
fileutils.read_file(
self.fake_path, continue_on_error=True, display_errors=True)
self.assertTrue(mock_print_warning.called)
@patch.object(fileutils, '_open_file')
def test_read_file_exits_code_6_when_continue_on_error_is_false(
self, mock_open_file):
mock_open_file.side_effect = IOError()
with self.assertRaises(SystemExit) as context:
fileutils.read_file(self.fake_path, continue_on_error=False)
self.assertEqual(context.exception.code, 6)
@patch.object(fileutils, '_open_file')
def test_read_file_exits_code_2_on_lookuperror(self, mock_open_file):
mock_open_file.return_value.__enter__().read.side_effect = LookupError()
with self.assertRaises(SystemExit) as context:
fileutils.read_file(self.fake_path)
self.assertEqual(context.exception.code, 2)
@patch.object(fileutils, '_open_file')
def test_read_file_exits_code_2_on_unicodeerror(self, mock_open_file):
mock_open_file.return_value.__enter__().read.side_effect = UnicodeError()
with self.assertRaises(SystemExit) as context:
fileutils.read_file(self.fake_path)
self.assertEqual(context.exception.code, 2)
@patch.object(fileutils, '_open_file')
def test_read_file_exits_code_2_on_unicodedecodeerror(self, mock_open_file):
fake_decode_error = UnicodeDecodeError('fake-encoding', b'fakebytes', 0, 1,
'testing only')
mock_open_file.return_value.__enter__().read.side_effect = fake_decode_error
with self.assertRaises(SystemExit) as context:
fileutils.read_file(self.fake_path)
self.assertEqual(context.exception.code, 2)
@patch.object(fileutils, '_open_file')
def test_write_file_writes_data_to_file(self, mock_open_file):
fake_data = 'some fake data'
fileutils.write_file(self.fake_path, fake_data)
self.assertEqual(mock_open_file.call_args[0][0], self.fake_path)
self.assertEqual(mock_open_file.call_args[0][1], 'w')
opened_file = mock_open_file.return_value.__enter__()
self.assertTrue(opened_file.write.called)
self.assertEqual(opened_file.write.call_args[0][0], fake_data)
@patch.object(fileutils.display, 'print_error')
@patch.object(fileutils, '_open_file')
def test_write_file_continues_on_errors_without_displaying(
self, mock_open_file, mock_print_error):
mock_open_file.side_effect = IOError()
status = fileutils.write_file(
self.fake_path,
'foo data',
continue_on_error=True,
display_errors=False)
self.assertFalse(status)
self.assertFalse(mock_print_error.called)
@patch.object(fileutils.display, 'print_error')
@patch.object(fileutils, '_open_file')
def test_write_file_displays_errors(self, mock_open_file, mock_print_error):
mock_open_file.side_effect = IOError()
fileutils.write_file(
self.fake_path, 'foo data', continue_on_error=True, display_errors=True)
self.assertTrue(mock_print_error.called)
@patch.object(fileutils, '_open_file')
def test_write_file_exits_code_6_when_continue_on_error_is_false(
self, mock_open_file):
mock_open_file.side_effect = IOError()
with self.assertRaises(SystemExit) as context:
fileutils.write_file(self.fake_path, 'foo data', continue_on_error=False)
self.assertEqual(context.exception.code, 6)
if __name__ == '__main__':
unittest.main()

View File

@@ -77,6 +77,7 @@ from cryptography.x509.oid import NameOID
import controlflow
import display
import fileutils
import gapi.errors
import gapi
import utils
@@ -459,83 +460,6 @@ def normalizeStudentGuardianEmailAddressOrUID(emailAddressOrUID):
return emailAddressOrUID
return normalizeEmailAddressOrUID(emailAddressOrUID)
#
# Set file encoding to handle UTF8 BOM
#
def setEncoding(mode, encoding):
if 'b' in mode:
return {}
if not encoding:
encoding = GM_Globals[GM_SYS_ENCODING]
if 'r' in mode and encoding.lower().replace('-', '') == 'utf8':
encoding = UTF8_SIG
return {'encoding': encoding}
#
# Open a file
#
def openFile(filename, mode='r', encoding=None, newline=None,
stripUTFBOM=False):
try:
if filename != '-':
kwargs = setEncoding(mode, encoding)
f = open(os.path.expanduser(filename), mode, newline=newline, **kwargs)
if stripUTFBOM:
if 'b' in mode or not kwargs['encoding'].lower().startswith('utf'):
if f.read(3).encode('iso-8859-1', 'replace') != b'\xef\xbb\xbf':
f.seek(0)
else:
if f.read(1) != '\ufeff':
f.seek(0)
return f
if 'r' in mode:
return io.StringIO(str(sys.stdin.read()))
return sys.stdout
except IOError as e:
controlflow.system_error_exit(6, e)
#
# Close a file
#
def closeFile(f):
try:
f.close()
return True
except IOError as e:
display.print_error(e)
return False
#
# Read a file
#
def readFile(filename, mode='r', encoding=None, newline=None,
continueOnError=False, displayError=True):
try:
if filename != '-':
kwargs = setEncoding(mode, encoding)
with open(os.path.expanduser(filename), mode, newline=newline, **kwargs) as f:
return f.read()
return str(sys.stdin.read())
except IOError as e:
if continueOnError:
if displayError:
display.print_warning(e)
return None
controlflow.system_error_exit(6, e)
except (LookupError, UnicodeDecodeError, UnicodeError) as e:
controlflow.system_error_exit(2, str(e))
#
# Write a file
#
def writeFile(filename, data, mode='w', continueOnError=False, displayError=True):
try:
kwargs = setEncoding(mode, None)
with open(os.path.expanduser(filename), mode, **kwargs) as f:
f.write(data)
return True
except IOError as e:
if continueOnError:
if displayError:
display.print_error(e)
return False
controlflow.system_error_exit(6, e)
#
# Set global variables
# Check for GAM updates based on status of noupdatecheck.txt
#
@@ -752,7 +676,7 @@ def doGAMCheckForUpdates(forceCheck=False):
if forceCheck:
check_url = GAM_ALL_RELEASES # includes pre-releases
else:
last_check_time_str = readFile(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], continueOnError=True, displayError=False)
last_check_time_str = fileutils.read_file(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], continue_on_error=True, display_errors=False)
last_check_time = int(last_check_time_str) if last_check_time_str and last_check_time_str.isdigit() else 0
if last_check_time > now_time-604800:
return
@@ -777,7 +701,7 @@ def doGAMCheckForUpdates(forceCheck=False):
if forceCheck or (latest_version > current_version):
print('Version Check:\n Current: {0}\n Latest: {1}'.format(current_version, latest_version))
if latest_version <= current_version:
writeFile(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], str(now_time), continueOnError=True, displayError=forceCheck)
fileutils.write_file(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], str(now_time), continue_on_error=True, display_errors=forceCheck)
return
announcement = release_data.get('body_text', 'No details about this release')
sys.stderr.write('\nGAM %s release notes:\n\n' % latest_version)
@@ -789,7 +713,7 @@ def doGAMCheckForUpdates(forceCheck=False):
webbrowser.open(release_data['html_url'])
printLine(MESSAGE_GAM_EXITING_FOR_UPDATE)
sys.exit(0)
writeFile(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], str(now_time), continueOnError=True, displayError=forceCheck)
fileutils.write_file(GM_Globals[GM_LAST_UPDATE_CHECK_TXT], str(now_time), continue_on_error=True, display_errors=forceCheck)
return
except (httplib2.HttpLib2Error, httplib2.ServerNotFoundError, RuntimeError, socket.timeout):
return
@@ -879,7 +803,7 @@ def _getServerTLSUsed(location):
def _getSvcAcctData():
if not GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]:
json_string = readFile(GC_Values[GC_OAUTH2SERVICE_JSON], continueOnError=True, displayError=True)
json_string = fileutils.read_file(GC_Values[GC_OAUTH2SERVICE_JSON], continueOnError=True, displayError=True)
if not json_string:
printLine(MESSAGE_INSTRUCTIONS_OAUTH2SERVICE_JSON)
controlflow.system_error_exit(6, None)
@@ -914,9 +838,9 @@ def readDiscoveryFile(api_version):
else:
pyinstaller_disc_file = None
if os.path.isfile(disc_file):
json_string = readFile(disc_file)
json_string = fileutils.read_file(disc_file)
elif pyinstaller_disc_file:
json_string = readFile(pyinstaller_disc_file)
json_string = fileutils.read_file(pyinstaller_disc_file)
else:
controlflow.system_error_exit(11, MESSAGE_NO_DISCOVERY_INFORMATION.format(disc_file))
try:
@@ -926,7 +850,7 @@ def readDiscoveryFile(api_version):
controlflow.invalid_json_exit(disc_file)
def getOauth2TxtStorageCredentials():
oauth_string = readFile(GC_Values[GC_OAUTH2_TXT], continueOnError=True, displayError=False)
oauth_string = fileutils.read_file(GC_Values[GC_OAUTH2_TXT], continue_on_error=True, display_errors=False)
if not oauth_string:
return
oauth_data = json.loads(oauth_string)
@@ -2918,11 +2842,11 @@ def changeCalendarAttendees(users):
else:
controlflow.system_error_exit(2, '%s is not a valid argument for "gam <users> update calattendees"' % sys.argv[i])
attendee_map = {}
f = openFile(csv_file)
f = fileutils.open_file(csv_file)
csvFile = csv.reader(f)
for row in csvFile:
attendee_map[row[0].lower()] = row[1].lower()
closeFile(f)
fileutils.close_file(f)
for user in users:
sys.stdout.write('Checking user %s\n' % user)
user, cal = buildCalendarGAPIObject(user)
@@ -3283,7 +3207,7 @@ def doPrintJobFetch():
jobid = job['id']
fileName = os.path.join(targetFolder, '{0}-{1}'.format(''.join(c if c in FILENAME_SAFE_CHARS else '_' for c in job['title']), jobid))
_, content = cp._http.request(uri=fileUrl, method='GET')
if writeFile(fileName, content, mode='wb', continueOnError=True):
if fileutils.write_file(fileName, content, mode='wb', continue_on_error=True):
# ticket = gapi.call(cp.jobs(), u'getticket', jobid=jobid, use_cjt=True)
result = gapi.call(cp.jobs(), 'update', jobid=jobid, semantic_state_diff=ssd)
checkCloudPrintResult(result)
@@ -3425,7 +3349,7 @@ def doPrintJobSubmit():
mimetype = mimetypes.guess_type(filepath)[0]
if mimetype is None:
mimetype = 'application/octet-stream'
filecontent = readFile(filepath, mode='rb')
filecontent = fileutils.read_file(filepath, mode='rb')
form_files['content'] = {'filename': content, 'content': filecontent, 'mimetype': mimetype}
#result = gapi.call(cp.printers(), u'submit', body=body)
body, headers = encode_multipart(form_fields, form_files)
@@ -3849,7 +3773,7 @@ def doPhoto(users):
print(e)
continue
else:
image_data = readFile(filename, mode='rb', continueOnError=True, displayError=True)
image_data = fileutils.read_file(filename, mode='rb', continue_on_error=True, display_errors=True)
if image_data is None:
continue
body = {'photoData': base64.urlsafe_b64encode(image_data).decode(UTF8)}
@@ -3897,7 +3821,7 @@ def getPhoto(users):
print(' no photo for %s' % user)
continue
decoded_photo_data = base64.urlsafe_b64decode(photo_data)
writeFile(filename, decoded_photo_data, mode='wb', continueOnError=True)
fileutils.write_file(filename, decoded_photo_data, mode='wb', continue_on_error=True)
def deletePhoto(users):
cd = buildGAPIObject('directory')
@@ -4935,7 +4859,7 @@ def downloadDriveFile(users):
if targetStdout and content[-1] != '\n':
fh.write('\n')
if not targetStdout:
closeFile(fh)
fileutils.close_file(fh)
fileDownloaded = True
break
except (IOError, httplib2.HttpLib2Error) as e:
@@ -4952,7 +4876,7 @@ def downloadDriveFile(users):
fileDownloadFailed = True
break
if fh and not targetStdout:
closeFile(fh)
fileutils.close_file(fh)
os.remove(filename)
if not fileDownloaded and not fileDownloadFailed and not csvSheetNotFound:
display.print_error('Format ({0}) not available'.format(','.join(exportFormatChoices)))
@@ -5150,7 +5074,7 @@ def sendOrDropEmail(users, method='send'):
elif myarg == 'file':
filename = sys.argv[i+1]
i, encoding = getCharSet(i+2)
body = readFile(filename, encoding=encoding)
body = fileutils.read_file(filename, encoding=encoding)
elif myarg == 'subject':
subject = sys.argv[i+1]
i += 2
@@ -5459,7 +5383,7 @@ def addUpdateSendAs(users, i, addCmd):
if signature.lower() == 'file':
filename = sys.argv[i]
i, encoding = getCharSet(i+1)
signature = readFile(filename, encoding=encoding)
signature = fileutils.read_file(filename, encoding=encoding)
elif myarg == 'html':
html = True
i += 1
@@ -5730,7 +5654,7 @@ def addSmime(users):
myarg = sys.argv[i].lower()
if myarg == 'file':
smimefile = sys.argv[i+1]
smimeData = readFile(smimefile, mode='rb')
smimeData = fileutils.read_file(smimefile, mode='rb')
body['pkcs12'] = base64.urlsafe_b64encode(smimeData).decode(UTF8)
i += 2
elif myarg == 'password':
@@ -6595,7 +6519,7 @@ def doSignature(users):
if sys.argv[i].lower() == 'file':
filename = sys.argv[i+1]
i, encoding = getCharSet(i+2)
signature = readFile(filename, encoding=encoding)
signature = fileutils.read_file(filename, encoding=encoding)
else:
signature = getString(i, 'String', minLen=0)
i += 1
@@ -6663,7 +6587,7 @@ def doVacation(users):
elif myarg == 'file':
filename = sys.argv[i+1]
i, encoding = getCharSet(i+2)
message = readFile(filename, encoding=encoding)
message = fileutils.read_file(filename, encoding=encoding)
elif myarg == 'replace':
matchTag = getString(i+1, 'Tag')
matchReplacement = getString(i+2, 'String', minLen=0)
@@ -7224,7 +7148,7 @@ def getUserAttributes(i, cd, updateCmd):
if sys.argv[i].lower() == 'file':
filename = sys.argv[i+1]
i, encoding = getCharSet(i+2)
note['value'] = readFile(filename, encoding=encoding)
note['value'] = fileutils.read_file(filename, encoding=encoding)
else:
note['value'] = sys.argv[i].replace('\\n', '\n')
i += 1
@@ -7584,7 +7508,7 @@ def _createClientSecretsOauth2service(httpObj, projectId):
name=service_account['name'], body={'privateKeyType': 'TYPE_GOOGLE_CREDENTIALS_FILE', 'keyAlgorithm': 'KEY_ALG_RSA_2048'})
_grantSARotateRights(iam, service_account['name'].rsplit('/', 1)[-1])
oauth2service_data = base64.b64decode(key['privateKeyData']).decode(UTF8)
writeFile(GC_Values[GC_OAUTH2SERVICE_JSON], oauth2service_data, continueOnError=False)
fileutils.write_file(GC_Values[GC_OAUTH2SERVICE_JSON], oauth2service_data, continue_on_error=False)
console_credentials_url = 'https://console.developers.google.com/apis/credentials/consent/edit?createClient&newAppInternalUser=true&project=%s' % projectId
while True:
print('''Please go to:
@@ -7623,7 +7547,7 @@ def _createClientSecretsOauth2service(httpObj, projectId):
"token_uri": "https://oauth2.googleapis.com/token"
}
}''' % (client_id, client_secret, projectId)
writeFile(GC_Values[GC_CLIENT_SECRETS_JSON], cs_data, continueOnError=False)
fileutils.write_file(GC_Values[GC_CLIENT_SECRETS_JSON], cs_data, continue_on_error=False)
print('That\'s it! Your GAM Project is created and ready to use.')
VALIDEMAIL_PATTERN = re.compile(r'^[^@]+@[^@]+\.[^@]+$')
@@ -7640,7 +7564,7 @@ def _getValidateLoginHint(login_hint=None):
login_hint = None
def _getCurrentProjectID():
cs_data = readFile(GC_Values[GC_CLIENT_SECRETS_JSON], continueOnError=True, displayError=True)
cs_data = fileutils.read_file(GC_Values[GC_CLIENT_SECRETS_JSON], continue_on_error=True, display_errors=True)
if not cs_data:
controlflow.system_error_exit(14, 'Your client secrets file:\n\n%s\n\nis missing. Please recreate the file.' % GC_Values[GC_CLIENT_SECRETS_JSON])
try:
@@ -7875,7 +7799,7 @@ def doUpdateProjects():
http=httpObj, cache_discovery=False,
discoveryServiceUrl=googleapiclient.discovery.V2_DISCOVERY_URI)
_getSvcAcctData() # needed to read in GM_OAUTH2SERVICE_JSON_DATA
sa_email = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['client_email']
sa_email = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['client_email']
_grantSARotateRights(iam, sa_email)
def _generatePrivateKeyAndPublicCert(client_id, key_size):
@@ -7984,7 +7908,7 @@ def rotateServiceAccountKeys():
result = gapi.call(iam.projects().serviceAccounts().keys(), 'create', name=name, body=body)
oauth2service_data = base64.b64decode(result['privateKeyData']).decode(UTF8)
private_key_id = result.get('name').rsplit('/', 1)[-1]
writeFile(GC_Values[GC_OAUTH2SERVICE_JSON], oauth2service_data, continueOnError=False)
fileutils.write_file(GC_Values[GC_OAUTH2SERVICE_JSON], oauth2service_data, continueOnError=False)
print(' Wrote new private key {0} to {1}'.format(private_key_id, GC_Values[GC_OAUTH2SERVICE_JSON]))
if delete_existing:
for akey in keys.get('keys'):
@@ -8391,7 +8315,7 @@ def _getCloudStorageObject(s, bucket, object_, local_file=None, expectedMd5=None
file_path = os.path.dirname(local_file)
if not os.path.exists(file_path):
os.makedirs(file_path)
f = openFile(local_file, 'wb')
f = fileutils.open_file(local_file, 'wb')
downloader = googleapiclient.http.MediaIoBaseDownload(f, request)
done = False
while not done:
@@ -8403,17 +8327,17 @@ def _getCloudStorageObject(s, bucket, object_, local_file=None, expectedMd5=None
# https://stackoverflow.com/a/13762137/1503886
f.flush()
os.fsync(f.fileno())
closeFile(f)
fileutils.close_file(f)
if expectedMd5:
f = openFile(local_file, 'rb')
f = fileutils.open_file(local_file, 'rb')
sys.stdout.write(' Verifying file hash is %s...' % expectedMd5)
sys.stdout.flush()
md5MatchesFile(local_file, expectedMd5, True)
print('VERIFIED')
closeFile(f)
fileutils.close_file(f)
def md5MatchesFile(local_file, expected_md5, exitOnError):
f = openFile(local_file, 'rb')
f = fileutils.open_file(local_file, 'rb')
hash_md5 = hashlib.md5()
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
@@ -8471,7 +8395,7 @@ def doDownloadVaultExport():
filename = os.path.join(targetFolder, s_object.replace('/', '-'))
print('saving to %s' % filename)
request = s.objects().get_media(bucket=bucket, object=s_object)
f = openFile(filename, 'wb')
f = fileutils.open_file(filename, 'wb')
downloader = googleapiclient.http.MediaIoBaseDownload(f, request)
done = False
while not done:
@@ -8483,7 +8407,7 @@ def doDownloadVaultExport():
# https://stackoverflow.com/a/13762137/1503886
f.flush()
os.fsync(f.fileno())
closeFile(f)
fileutils.close_file(f)
if verifyFiles:
expected_hash = s_file['md5Hash']
sys.stdout.write(' Verifying file hash is %s...' % expected_hash)
@@ -10475,7 +10399,7 @@ def doGetCrosInfo():
if deviceFile:
downloadfilename = os.path.join(targetFolder, 'cros-logs-{0}-{1}.zip'.format(deviceId, deviceFile['createTime']))
_, content = cd._http.request(deviceFile['downloadUrl'])
writeFile(downloadfilename, content, mode='wb', continueOnError=True)
fileutils.write_file(downloadfilename, content, mode='wb', continue_on_error=True)
print('Downloaded: {0}'.format(downloadfilename))
elif downloadfile:
print('ERROR: no files to download.')
@@ -10562,7 +10486,7 @@ def doSiteVerifyShow():
webserver_file_record = gapi.call(verif.webResource(), 'getToken', body={'site':{'type':'SITE', 'identifier':'http://%s/' % a_domain}, 'verificationMethod':'FILE'})
webserver_file_token = webserver_file_record['token']
print('Saving web server verification file to: %s' % webserver_file_token)
writeFile(webserver_file_token, 'google-site-verification: {0}'.format(webserver_file_token), continueOnError=True)
fileutils.write_file(webserver_file_token, 'google-site-verification: {0}'.format(webserver_file_token), continue_on_error=True)
print('Verification File URL: http://%s/%s' % (a_domain, webserver_file_token))
print()
webserver_meta_record = gapi.call(verif.webResource(), 'getToken', body={'site':{'type':'SITE', 'identifier':'http://%s/' % a_domain}, 'verificationMethod':'META'})
@@ -13042,12 +12966,12 @@ def getUsersToModify(entity_type=None, entity=None, silent=False, member_type=No
users = doPrintLicenses(returnFields='userId', skus=entity.split(','))
elif entity_type in ['file', 'crosfile']:
users = []
f = openFile(entity, stripUTFBOM=True)
f = fileutils.open_file(entity, strip_utf_bom=True)
for row in f:
user = row.strip()
if user:
users.append(user)
closeFile(f)
fileutils.close_file(f)
if entity_type == 'crosfile':
entity = 'cros'
elif entity_type in ['csv', 'csvfile', 'croscsv', 'croscsvfile']:
@@ -13055,7 +12979,7 @@ def getUsersToModify(entity_type=None, entity=None, silent=False, member_type=No
if filenameColumn.find(':') == -1:
controlflow.system_error_exit(2, 'Expected {0} FileName:FieldName'.format(entity_type))
(filename, column) = filenameColumn.split(':')
f = openFile(drive+filename)
f = fileutils.open_file(drive+filename)
input_file = csv.DictReader(f, restval='')
if column not in input_file.fieldnames:
controlflow.csv_field_error_exit(column, input_file.fieldnames)
@@ -13064,7 +12988,7 @@ def getUsersToModify(entity_type=None, entity=None, silent=False, member_type=No
user = row[column].strip()
if user:
users.append(user)
closeFile(f)
fileutils.close_file(f)
if entity_type in ['croscsv', 'croscsvfile']:
entity = 'cros'
elif entity_type in ['courseparticipants', 'teachers', 'students']:
@@ -13241,7 +13165,7 @@ def writeCredentials(creds):
controlflow.system_error_exit(13, 'Wrong OAuth 2.0 credentials issuer. Got %s, expected one of %s' % (_getValueFromOAuth('iss', creds), ', '.join(expected_iss)))
creds_data['decoded_id_token'] = GC_Values[GC_DECODED_ID_TOKEN]
data = json.dumps(creds_data, indent=2, sort_keys=True)
writeFile(GC_Values[GC_OAUTH2_TXT], data)
fileutils.write_file(GC_Values[GC_OAUTH2_TXT], data)
def doRequestOAuth(login_hint=None):
credentials = getOauth2TxtStorageCredentials()
@@ -13265,7 +13189,7 @@ def getOAuthClientIDAndSecret():
gam create project
'''
filename = GC_Values[GC_CLIENT_SECRETS_JSON]
cs_data = readFile(filename, continueOnError=True, displayError=True)
cs_data = fileutils.read_file(filename, continue_on_error=True, display_errors=True)
if not cs_data:
controlflow.system_error_exit(14, MISSING_CLIENT_SECRETS_MESSAGE)
try:
@@ -13958,7 +13882,7 @@ def ProcessGAMCommand(args):
i = 2
filename = sys.argv[i]
i, encoding = getCharSet(i+1)
f = openFile(filename, encoding=encoding, stripUTFBOM=True)
f = fileutils.open_file(filename, encoding=encoding, strip_utf_bom=True)
items = []
errors = 0
for line in f:
@@ -13981,7 +13905,7 @@ def ProcessGAMCommand(args):
sys.stderr.write(utils.convertUTF8('Command: >>>{0}<<<\n'.format(line.strip())))
sys.stderr.write('{0}Invalid: Expected <gam|commit-batch>\n'.format(ERROR_PREFIX))
errors += 1
closeFile(f)
fileutils.close_file(f)
if errors == 0:
run_batch(items)
sys.exit(0)
@@ -13993,7 +13917,7 @@ def ProcessGAMCommand(args):
i = 2
filename = sys.argv[i]
i, encoding = getCharSet(i+1)
f = openFile(filename, encoding=encoding)
f = fileutils.open_file(filename, encoding=encoding)
csvFile = csv.DictReader(f)
if (i == len(sys.argv)) or (sys.argv[i].lower() != 'gam') or (i+1 == len(sys.argv)):
controlflow.system_error_exit(3, '"gam csv <filename>" must be followed by a full GAM command...')
@@ -14002,7 +13926,7 @@ def ProcessGAMCommand(args):
items = []
for row in csvFile:
items.append(['gam']+processSubFields(GAM_argv, row, subFields))
closeFile(f)
fileutils.close_file(f)
run_batch(items)
sys.exit(0)
elif command == 'version':