Refactor scope selection menu (#882)

-Add flexibility to menu creation and feature customization
-Colorize menu error messages on supported platforms
-Add 'email' as a required scope for increased transparency to the user
-Improve readability of menu creation and operation
This commit is contained in:
ejochman
2019-04-12 03:04:16 -07:00
committed by Jay Lee
parent e602d3fef0
commit 2ce5915e70

View File

@@ -209,6 +209,41 @@ def getCharSet(i):
return (i, GC_Values.get(GC_CHARSET, GM_Globals[GM_SYS_ENCODING]))
return (i+2, sys.argv[i+1])
def supportsColoredText():
"""Determines if the current terminal environment supports colored text.
Returns:
Bool, True if the current terminal environment supports colored text via
ANSI escape characters.
"""
# Make a rudimentary check for Windows. Though Windows does seem to support
# colorization with VT100 emulation, it is disabled by default. Therefore,
# we'll simply disable it in GAM on Windows for now.
return not GM_Globals[GM_WINDOWS]
def createColoredText(text, color):
"""Uses ANSI escape characters to create colored text in supported terminals.
See more at https://en.wikipedia.org/wiki/ANSI_escape_code#Colors
Args:
text: String, The text to colorize using ANSI escape characters.
color: String, An ANSI escape sequence denoting the color of the text to be
created. See more at https://en.wikipedia.org/wiki/ANSI_escape_code#Colors
Returns:
The input text with appropriate ANSI escape characters to create
colorization in a supported terminal environment.
"""
END_COLOR_SEQUENCE = '\033[0m' # Ends the applied color formatting
if supportsColoredText():
return color + text + END_COLOR_SEQUENCE
return text # Hand back the plain text, uncolorized.
def createRedText(text):
"""Uses ANSI encoding to create red colored text, if supported."""
return createColoredText(text, '\033[91m')
COLORHEX_PATTERN = re.compile(r'^#[0-9a-fA-F]{6}$')
def getColor(color):
@@ -12626,111 +12661,480 @@ OAUTH2_SCOPES = [
{u'name': u'Cloud Storage (Vault Export - read only)',
u'subscopes': [],
u'scopes': u'https://www.googleapis.com/auth/devstorage.read_only'},
{u'name': u'User Profile (Email address - read only)',
u'subscopes': [],
u'scopes': u'email',
u'required': True},
]
OAUTH2_MENU = u'''
def getScopesFromUser(menu_options=None):
"""Prompts the user to choose from a list of scopes to authorize.
Args:
menu_options: An optional list of ScopeMenuOptions to be presented in the
menu. If no menu_options are provided, menu options will be generated
from static OAUTH2_SCOPES definitions.
Returns:
A list of user-selected scopes to authorize.
"""
if not menu_options:
menu_options = [ScopeMenuOption.from_gam_oauth2scope(definition)
for definition in OAUTH2_SCOPES]
menu = ScopeSelectionMenu(menu_options)
try:
menu.run()
except ScopeSelectionMenu.UserRequestedExitException:
systemErrorExit(0, '')
return menu.get_selected_scopes()
class ScopeMenuOption():
"""A single GAM API/feature with scopes that can be turned on/off."""
def __init__(self, oauth_scopes, description,
is_required=False,
is_selected=False,
supported_restrictions=None,
restriction=None):
"""A data structure for storing and toggling feature/API scope attributes.
Args:
oauth_scopes: A list of Google OAuth scope strings required for the
feature or API. If the applicable scopes can vary in permission level,
the scopes provided in this list should contain the highest level of
permissions. More restrictive scopes are implemented by utilizing the
`supported_restrictions` argument.
description: String, a name or brief description of this API/feature.
is_required: Bool, whether this API/feature is required for GAM. If True,
the ScopeMenuOption cannot be unselected/disabled.
is_selected: Bool, whether the ScopeMenuOption is currently
selected/enabled.
supported_restrictions: A list of strings that can be appended to the
oauth_scopes, separated by '.', to restrict their permissions.
For example, the directory API supports a 'readonly' mode on most
scopes such that
https://www.googleapis.com/auth/admin.directory.domain
becomes
https://www.googleapis.com/auth/admin.directory.domain.readonly
restriction: String, the currently enabled restriction on all scopes in
this ScopeMenuOption. Default is no restrictions (highest permission).
"""
self.scopes = oauth_scopes
self.description = description
self.is_required = is_required
# Required scopes must be selected
self.is_selected = is_required or is_selected
self.supported_restrictions = (
supported_restrictions if supported_restrictions is not None else [])
self._restriction = restriction
def select(self, restriction=None):
"""Selects/enables the ScopeMenuOption with an optional restriction."""
if restriction is not None:
self.restrict_to(restriction)
self.is_selected = True
def unselect(self):
"""Unselects/disables the ScopeMenuOption."""
self.is_selected = False
@property
def is_selected(self):
return self._is_selected
@is_selected.setter
def is_selected(self, is_selected):
if self.is_required and not is_selected:
raise ValueError('Required scope cannot be unselected')
if not is_selected:
# Disable all applied restrictions
self.unrestrict()
self._is_selected = is_selected
@property
def is_restricted(self):
return self._restriction is not None
def supports_restriction(self, restriction):
"""Determines if a scope restriction is supported by this ScopeMenuOption.
Args:
restriction: String, the text appended to a full permission scope which
will restrict its permissiveness. e.g. 'readonly' or 'action'.
Returns:
Bool, True if the scope restriction can be applied to this option.
"""
return restriction in self.supported_restrictions
@property
def restriction(self):
return self._restriction
@restriction.setter
def restriction(self, restriction):
self.restrict_to(restriction)
def restrict_to(self, restriction):
"""Applies a scope restriction to all scopes associated with this option.
Args:
restriction: String, a scope restriction which is appended to the
full-permission scope with a leading '.'. e.g. if the full scope is
https://www.googleapis.com/auth/admin.directory.domain
providing 'readonly' here will make the effective scope
https://www.googleapis.com/auth/admin.directory.domain.readonly
"""
if self.supports_restriction(restriction):
self._restriction = restriction
else:
error = 'Scope does not support a %s restriction.' % restriction
if self.supported_restrictions is not None:
restriction_list = ', '.join(self.supported_restrictions)
error.append(' Supported restrictions are: %s' % restriction_list)
raise ValueError(error)
def unrestrict(self):
"""Removes all scope restrictions currently applied."""
self._restriction = None
def get_effective_scopes(self):
"""Gets all scopes for this option, including their restrictions.
Restrictions are applied in the form of trailing text which limit the
scope's capabilities.
"""
effective_scopes = []
for scope in self.scopes:
if self.is_restricted:
scope = '%s.%s' % (scope, self._restriction)
effective_scopes.append(scope)
return effective_scopes
@classmethod
def from_gam_oauth2scope(cls, scope_definition):
"""Generates a ScopeMenuOption from a dict-style OAUTH2_SCOPES definition.
Dict fields:
name: Some description of the API/feature.
subscopes: A list of compatible scope restrictions such as 'action' or
'readonly'. Each scope in the scopes list must support this
restriction text appended to the end of its normal scope text.
scopes: A list of scopes that are required for the API/feature.
offByDefault: A bool indicating whether this feature/scope should be off
by default (when no prior selection has been made). Default is False
(the item will be on/selected by default).
required: A bool indicating the API/feature is required. This scope
cannot be unselected. Default is False.
Example:
{
'name': 'Made up API',
'subscopes': ['action'],
'scopes': ['https://www.googleapis.com/auth/some.scope'],
}
Args:
scope_definition: A dict following the syntax of scopes defined in
OAUTH2_SCOPES.
Returns:
A ScopeMenuOption object.
"""
scopes = scope_definition.get('scopes', [])
# If the scope is a single string, make it into a list.
scope_list = scopes if isinstance(scopes, list) else [scopes]
return cls(
oauth_scopes=scope_list,
description=scope_definition.get('name'),
is_selected=not scope_definition.get('offByDefault'),
supported_restrictions=scope_definition.get('subscopes', []),
is_required=scope_definition.get('required', False))
class ScopeSelectionMenu():
"""A text menu which prompts the user to select the scopes to authorize."""
class MenuChoiceError(Exception):
"""Error when an invalid or incompatible user choice is made."""
pass
class UserRequestedExitException(Exception):
"""Exception when the user requests immediate exit."""
pass
def __init__(self, options):
"""A menu of scope options to choose from.
Args:
options: A list of ScopeMenuOption objects from which to generate the menu
Options will be presented on screen in the same order as the provided
list.
""""
self._options = options
def get_options(self):
"""Returns all options that are available on this menu."""
return self._options
def get_selected_options(self):
"""Returns all currently selected ScopeMenuOptions."""
return [option for option in self._options if option.is_selected]
def get_selected_scopes(self):
"""Returns the aggregate set of oauth scopes currently selected."""
selected_scopes = [scope for option in self.get_selected_options()
for scope in option.get_effective_scopes()]
return set(selected_scopes)
MENU_CHOICE = {
'SELECT_ALL_SCOPES': 's',
'UNSELECT_ALL_SCOPES': 'u',
'EXIT': 'e',
'CONTINUE': 'c'
}
_MENU_DISPLAY_TEXT = '''
Select the authorized scopes by entering a number.
Append an 'r' to grant read-only access or an 'a' to grant action-only access.
'''
for a_scope in OAUTH2_SCOPES:
OAUTH2_MENU += u'[%%%%s] %%2d) %s' % (a_scope[u'name'])
if a_scope[u'subscopes']:
OAUTH2_MENU += u' (supports %s)' % (u' and '.join(a_scope[u'subscopes']))
OAUTH2_MENU += '\n'
OAUTH2_MENU += '''
%s
s) Select all scopes
u) Unselect all scopes
e) Exit without changes
c) Continue to authorization
'''
OAUTH2_CMDS = [u's', u'u', u'e', u'c']
MAXIMUM_SCOPES = 48 # max of 50 - 2 for email scope always included
def getScopesFromUser():
"""Prompts the user to choose from a list of scopes to authorize."""
def _checkMakeScopesList(scopes):
del scopes[:]
for i in range(num_scopes):
if selected_scopes[i] == u'*':
if not isinstance(OAUTH2_SCOPES[i][u'scopes'], list):
scopes.append(OAUTH2_SCOPES[i][u'scopes'])
else:
scopes += OAUTH2_SCOPES[i][u'scopes']
elif selected_scopes[i] == u'R':
scopes.append(u'%s.readonly' % OAUTH2_SCOPES[i][u'scopes'])
elif selected_scopes[i] == u'A':
scopes.append(u'%s.action' % OAUTH2_SCOPES[i][u'scopes'])
if len(scopes) > MAXIMUM_SCOPES:
return (False, u'ERROR: {0} scopes selected, maximum is {1}, please unselect some.\n'.format(len(scopes), MAXIMUM_SCOPES))
if len(scopes) == 0:
return (False, u'ERROR: No scopes selected, please select at least one.\n')
scopes.insert(0, u'email') # Email Display Scope, always included
return (True, u'')
def get_menu_text(self):
"""Returns a text menu with numbered options."""
scope_menu_items = [
self._build_scope_menu_item(option, counter)
for counter, option in enumerate(self._options)
]
return ScopeSelectionMenu._MENU_DISPLAY_TEXT % '\n'.join(scope_menu_items)
num_scopes = len(OAUTH2_SCOPES)
menu = OAUTH2_MENU % tuple(range(num_scopes))
selected_scopes = []
for scope in OAUTH2_SCOPES:
if scope.get(u'offByDefault', False):
selected_scopes.append(u' ')
else:
selected_scopes.append(u'*')
scopes = []
prompt = u'Please enter 0-{0}[a|r] or {1}: '.format(num_scopes-1, u'|'.join(OAUTH2_CMDS))
message = u''
while True:
os.system([u'clear', u'cls'][GM_Globals[GM_WINDOWS]])
if message:
sys.stdout.write(message)
message = u''
sys.stdout.write(menu % tuple(selected_scopes))
def _build_scope_menu_item(self, scope_option, option_number):
"""Builds a text line representing a single scope selection in the menu.
The returned line is in the format:
[<*>] <##>) <Feature description> (<supported scope modifiers>) [required]
* = A single character indicating the option's selection status.
'*' - All scopes are selected with full permissions.
' ' - No scopes are selected.
'X' - Where 'X' is the first letter of the selected scope restriction,
such as 'R' for readonly or 'A' for action, indicates scopes are
selected with the corresponding restriction.
## = The line item number associated to this option.
Feature description = The ScopeMenuOption description.
scope modifiers = The supported scope restrictions for the ScopeMenuOption.
When appended to the unrestricted, full oauth scope, these strings
modify or restrict the access level of the given scope.
[required] = Will only appear if the ScopeMenuOption is required and cannot
be unselected.
e.g. [*] 2) Directory API - Domains (supports 'readonly')
Args:
scope_option: The ScopeMenuOption associated with this line item.
option_number: The selectable option number that is associated with
modifying this line item.
Returns:
A string containing the line item text without a trailing newline.
"""
SELECTION_INDICATOR = {
'ALL_SELECTED': '*',
'UNSELECTED': ' ',
}
indicator = SELECTION_INDICATOR['UNSELECTED']
if scope_option.is_selected:
if scope_option.is_restricted:
# Use the first letter of the restriction as the indicator.
indicator = scope_option.restriction[0].upper()
else:
indicator = SELECTION_INDICATOR['ALL_SELECTED']
item_description = [
'[%s]' % indicator,
'%2d)' % option_number,
scope_option.description,
]
if scope_option.supported_restrictions:
item_description.append(
'(supports %s)' % ' and '.join(scope_option.supported_restrictions))
if scope_option.is_required:
item_description.append('[required]')
return ' '.join(item_description)
def get_prompt_text(self):
"""Builds and returns a prompt requesting user input."""
# Get all the available restrictions and create the list of available
# commands that the user can input.
restrictions = set([
restriction for option in self._options
for restriction in option.supported_restrictions
])
restriction_choices = [
restriction[0].lower() for restriction in restrictions
]
return ('Please enter 0-%d[%s] or %s: ' %
(len(self._options)-1, # Keep the menu options 0-based
'|'.join(restriction_choices),
'|'.join(ScopeSelectionMenu.MENU_CHOICE.values())))
def run(self):
"""Displays the ScopeSelectionMenu to the user and prompts for input.
The menu will continue to display until the user finishes adjusting all
desired items and requests to return.
After the menu is run, callers may use `get_selected_scopes()` or
`get_selected_options()` methods to understand what the user's final choice
was.
Raises: ScopeSelectionMenu.UserRequestedExitException if the user chooses
to exit the application entirely, rather than continue execution. This
allows callers to decide how to handle the exit.
"""
error_message = None
while True:
choice = raw_input(prompt)
if choice:
selection = choice.lower()
if selection.find(u'r') >= 0:
mode = u'R'
selection = selection.replace(u'r', u'')
elif selection.find(u'a') >= 0:
mode = u'A'
selection = selection.replace(u'a', u'')
else:
mode = u' '
if selection and selection.isdigit():
selection = int(selection)
if isinstance(selection, int) and selection < num_scopes:
if mode == u'R':
if u'readonly' not in OAUTH2_SCOPES[selection][u'subscopes']:
sys.stdout.write(u'{0}Scope {1} does not support read-only mode!\n'.format(ERROR_PREFIX, selection))
continue
elif mode == u'A':
if u'action' not in OAUTH2_SCOPES[selection][u'subscopes']:
sys.stdout.write(u'{0}Scope {1} does not support action-only mode!\n'.format(ERROR_PREFIX, selection))
continue
elif selected_scopes[selection] != u'*':
mode = u'*'
else:
mode = u' '
selected_scopes[selection] = mode
break
elif isinstance(selection, str) and selection in OAUTH2_CMDS:
if selection == u's':
for i in range(num_scopes):
selected_scopes[i] = u'*'
elif selection == u'u':
for i in range(num_scopes):
selected_scopes[i] = u' '
elif selection == u'e':
return None
break
sys.stdout.write(u'{0}Invalid input "{1}"\n'.format(ERROR_PREFIX, choice))
if selection == u'c':
status, message = _checkMakeScopesList(scopes)
if status:
break
return scopes
os.system(['clear', 'cls'][GM_Globals[GM_WINDOWS]])
sys.stdout.write(self.get_menu_text())
if error_message is not None:
colored_error = createRedText(ERROR_PREFIX + error_message + '\n')
sys.stdout.write(colored_error)
error_message = None # Clear the pending error message
user_input = raw_input(self.get_prompt_text())
try:
prompt_again = self._process_menu_input(user_input)
if not prompt_again:
return
except ScopeSelectionMenu.MenuChoiceError as e:
error_message = e.message
_SINGLE_SCOPE_CHANGE_REGEX = re.compile(
'\s*(?P<scope_number>\d{1,2})\s*(?P<restriction>[a-z]?)', re.IGNORECASE)
# Google-defined maximum number of scopes that can be authorized on a single
# access token.
MAXIMUM_NUM_SCOPES = 50
def _process_menu_input(self, raw_menu_input):
"""Processes the raw user input provided to the menu prompt.
Args:
raw_menu_input: The raw, unaltered string provided by the user in response
to the menu prompt.
Returns:
True, if the user should be prompted for further input. False, if the
user has finished input and requested to continue execution of the
program.
Raises:
ScopeSelectionMenu.UserRequestedExitException if the user requests to exit
the application immediately.
ScopeSelectionMenu.MenuChoiceError upon invalid user input.
"""
user_input = raw_menu_input.lower().strip()
single_scope_change = (
ScopeSelectionMenu._SINGLE_SCOPE_CHANGE_REGEX.match(user_input))
if single_scope_change:
scope_number, restriction_command = single_scope_change.group(
'scope_number', 'restriction')
# Make sure we get an actual number to deal with.
scope_number = int(scope_number)
# Scope option numbers displayed in the menu are 0-based and map directly
# to the indices in the list of scopes.
if scope_number < 0 or scope_number > len(self._options) - 1:
raise ScopeSelectionMenu.MenuChoiceError(
'Invalid scope number "%d"' % scope_number)
selected_option = self._options[scope_number]
# Find the restriction that the user intended to apply.
if restriction_command != '':
matching_restrictions = filter(
lambda r: r.startswith(restriction_command),
selected_option.supported_restrictions)
if len(matching_restrictions) < 1:
raise ScopeSelectionMenu.MenuChoiceError(
'Scope "%s" does not support "%s" mode!' % (
selected_option.description, restriction_command))
restriction = matching_restrictions[0]
else:
restriction = None
self._update_option(selected_option, restriction=restriction)
elif user_input == ScopeSelectionMenu.MENU_CHOICE['SELECT_ALL_SCOPES']:
for option in self._options:
self._update_option(option, selected=True)
elif user_input == ScopeSelectionMenu.MENU_CHOICE['UNSELECT_ALL_SCOPES']:
for option in self._options:
# Force-select required options
self._update_option(option, selected=option.is_required)
elif user_input == ScopeSelectionMenu.MENU_CHOICE['CONTINUE']:
return False
elif user_input == ScopeSelectionMenu.MENU_CHOICE['EXIT']:
raise ScopeSelectionMenu.UserRequestedExitException()
else:
raise ScopeSelectionMenu.MenuChoiceError(
'Invalid input "%s"' % user_input)
return True
def _update_option(self, option, selected=None, restriction=None):
"""Validates changes and updates the internal state of options on the menu.
Args:
option: The ScopeMenuOption to update
selected: If provided, updates the "selected" status of the option. If
not provided, the "selected" status will be toggled to its opposite
state.
restriction: If provided, applies a restriction to the provided option.
Raises:
ScopeSelectionMenu.MenuChoiceError on change validation errors.
"""
if option.is_required and (not selected or selected is None):
raise ScopeSelectionMenu.MenuChoiceError(
'Scope "%s" is required and cannot be unselected!' %
option.description)
elif selected and not option.is_selected:
# Make sure we're not about to exceed the maximum number of scopes
# authorized on a single token.
num_scopes_to_add = len(option.get_effective_scopes())
num_selected_scopes = len(self.get_selected_scopes())
expected_num_scopes = num_scopes_to_add + num_selected_scopes
if expected_num_scopes > ScopeSelectionMenu.MAXIMUM_NUM_SCOPES:
raise ScopeSelectionMenu.MenuChoiceError(
'Too many scopes selected (%d). Maximum is %d. Please remove some '
'scopes and try again.' % (
expected_num_scopes, ScopeSelectionMenu.MAXIMUM_NUM_SCOPES))
if restriction is None:
if selected is None:
# Toggle the option on/off
option.is_selected = not option.is_selected
else:
option.is_selected = selected
else:
if option.supports_restriction(restriction):
option.select(restriction)
else:
raise ScopeSelectionMenu.MenuChoiceError(
'Scope "%s" does not support %s mode!' % (
option.description, restriction))
def init_gam_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)