rough draft work to move GAM off oob auth #1483

This commit is contained in:
Jay Lee
2022-02-18 16:06:40 -05:00
committed by GitHub
parent 4a9d571cb1
commit 0b7a79bce0

View File

@@ -1,11 +1,19 @@
"""OAuth2.0 user credentials."""
import datetime
import ipaddress
import json
import multiprocessing
import os
import re
from socket import gethostbyname
import sys
from time import sleep
import threading
from urllib.parse import urlencode
from urllib.parse import urlencode, urlparse, parse_qs
import wsgiref.simple_server
import wsgiref.util
import webbrowser
from filelock import FileLock
import google_auth_oauthlib.flow
@@ -14,10 +22,10 @@ import google.oauth2.id_token
from gam import fileutils
from gam import transport
from gam.var import GM_Globals
from gam.var import GM_WINDOWS
from gam.var import GM_Globals, GM_WINDOWS
from gam import utils
MESSAGE_CONSOLE_AUTHORIZATION_PROMPT = ('\nGo to the following link in your '
'browser:\n\n\t{url}\n')
MESSAGE_CONSOLE_AUTHORIZATION_CODE = 'Enter verification code: '
@@ -295,19 +303,8 @@ class Credentials(google.oauth2.credentials.Credentials):
if login_hint:
flow_kwargs['login_hint'] = login_hint
# TODO: Move code for browser detection somewhere in this file so that the
# messaging about `nobrowser.txt` is co-located with the logic that uses it.
if use_console_flow:
flow.run_console(
authorization_prompt_message=
MESSAGE_CONSOLE_AUTHORIZATION_PROMPT,
authorization_code_message=MESSAGE_CONSOLE_AUTHORIZATION_CODE,
flow.run_dual(use_console_flow,
**flow_kwargs)
else:
flow.run_local_server(authorization_prompt_message=
MESSAGE_LOCAL_SERVER_AUTHORIZATION_PROMPT,
success_message=MESSAGE_LOCAL_SERVER_SUCCESS,
**flow_kwargs)
return cls.from_google_oauth2_credentials(flow.credentials,
filename=filename)
@@ -516,18 +513,117 @@ class Credentials(google.oauth2.credentials.Credentials):
http.request(revoke_uri, 'GET')
def _localhost_to_ip():
'''returns IPv4 or IPv6 loopback address which localhost resolves to.
If localhost does not resolve to valid loopback IP address then returns
127.0.0.1'''
# TODO gethostbyname() will only ever return ipv4
# find a way to support IPv6 here and get preferred IP
# note that IPv6 may be broken on some systems also :-(
# for now IPv4 should do.
local_ip = gethostbyname('localhost')
local_ipaddress = ipaddress.ip_address(local_ip)
ip4_local_range = ipaddress.ip_network('127.0.0.0/8')
ip6_local_range = ipaddress.ip_network('::1/128')
if local_ipaddress not in ip4_local_range and \
local_ipaddress not in ip6_local_ranage:
local_ip = '127.0.0.1'
return local_ip
def _wait_for_http_client(d):
wsgi_app = google_auth_oauthlib.flow._RedirectWSGIApp('')
wsgiref.simple_server.WSGIServer.allow_reuse_address = False
# Convert hostn to IP since apparently binding to the IP
# reduces odds of firewall blocking us
local_ip = _localhost_to_ip()
for port in range(8080, 8099):
try:
local_server = wsgiref.simple_server.make_server(
local_ip,
port,
wsgi_app,
handler_class=wsgiref.simple_server.WSGIRequestHandler
)
break
except OSError:
pass
redirect_uri_format = (
"http://{}:{}/" if d['trailing_slash'] else "http://{}:{}"
)
# provide redirect_uri to main process so it can formulate auth_url
d['redirect_uri'] = redirect_uri_format.format(*local_server.server_address)
# wait until main process provides auth_url
# so we can open it in web browser.
while 'auth_url' not in d:
sleep(0.1)
if d['open_browser']:
webbrowser.open(d['auth_url'], new=1, autoraise=True)
local_server.handle_request()
authorization_response = wsgi_app.last_request_uri.replace("http", "https")
d['code'] = authorization_response
local_server.server_close()
def _wait_for_user_input(d):
sys.stdin = open(0)
code = input('enter the code:')
d['code'] = code
class _ShortURLFlow(google_auth_oauthlib.flow.InstalledAppFlow):
"""InstalledAppFlow which utilizes a URL shortener for authorization URLs."""
URL_SHORTENER_ENDPOINT = 'https://gam-shortn.appspot.com/create'
def authorization_url(self, http=None, **kwargs):
"""Gets a shortened authorization URL."""
long_url, state = super().authorization_url(**kwargs)
short_url = utils.shorten_url(long_url)
print(short_url)
return short_url, state
def run_dual(self,
use_console_flow,
authorization_prompt_message='',
console_prompt_message='',
web_success_message='',
open_browser=True,
redirect_uri_trailing_slash=True,
**kwargs):
mgr = multiprocessing.Manager()
d = mgr.dict()
d['trailing_slash'] = redirect_uri_trailing_slash
d['open_browser'] = use_console_flow
http_client = multiprocessing.Process(target=_wait_for_http_client,
args=(d,))
user_input = multiprocessing.Process(target=_wait_for_user_input,
args=(d,))
http_client.start()
# we need to wait until web server starts on avail port
# so we know redirect_uri to use
while 'redirect_uri' not in d:
sleep(0.1)
self.redirect_uri = d['redirect_uri']
d['auth_url'], _ = self.authorization_url(**kwargs)
self.auth_url = d['auth_url']
print(f'URL is: {self.auth_url}')
user_input.start()
while True:
sleep(0.1)
if not http_client.is_alive():
user_input.terminate()
break
elif not user_input.is_alive():
http_client.terminate()
break
code = d['code']
if code.startswith('http'):
parsed_url = urlparse(code)
parsed_params = parse_qs(parsed_url.query)
code = parsed_params.get('code', [None])[0]
self.fetch_token(code=code)
return self.credentials
class _FileLikeThreadLock:
"""A threading.lock which has the same interface as filelock.Filelock."""