mirror of
https://github.com/GAM-team/GAM.git
synced 2026-06-28 09:51:36 +00:00
Support for YubiKey private key storage
This commit is contained in:
@@ -27,6 +27,7 @@ import webbrowser
|
||||
import zipfile
|
||||
import http.client as http_client
|
||||
from multiprocessing import Pool as mp_pool
|
||||
from multiprocessing import Lock as mp_lock
|
||||
from urllib.parse import quote, urlencode, urlparse
|
||||
import dateutil.parser
|
||||
|
||||
@@ -45,6 +46,7 @@ from cryptography.x509.oid import NameOID
|
||||
|
||||
import gam.auth.oauth
|
||||
from gam import auth
|
||||
from gam.auth import yubikey
|
||||
from gam import controlflow
|
||||
from gam import display
|
||||
from gam import fileutils
|
||||
@@ -822,8 +824,14 @@ def _getSvcAcctData():
|
||||
def getSvcAcctCredentials(scopes, act_as):
|
||||
try:
|
||||
_getSvcAcctData()
|
||||
credentials = google.oauth2.service_account.Credentials.from_service_account_info(
|
||||
GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
|
||||
sign_method = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA].get('key_type', 'default')
|
||||
if sign_method == 'default':
|
||||
credentials = google.oauth2.service_account.Credentials.from_service_account_info(
|
||||
GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
|
||||
elif sign_method == 'yubikey':
|
||||
yksigner = yubikey.YubiKey(GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
|
||||
credentials = google.oauth2.service_account.Credentials._from_signer_and_info(yksigner,
|
||||
GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
|
||||
credentials = credentials.with_scopes(scopes)
|
||||
if act_as:
|
||||
credentials = credentials.with_subject(act_as)
|
||||
@@ -10849,7 +10857,9 @@ Append an 'r' to grant read-only access or an 'a' to grant action-only access.
|
||||
)
|
||||
|
||||
|
||||
def init_gam_worker():
|
||||
def init_gam_worker(l):
|
||||
global mplock
|
||||
mplock = l
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
|
||||
@@ -10857,7 +10867,8 @@ def run_batch(items):
|
||||
if not items:
|
||||
return
|
||||
num_worker_threads = min(len(items), GC_Values[GC_NUM_THREADS])
|
||||
pool = mp_pool(num_worker_threads, init_gam_worker, maxtasksperchild=200)
|
||||
l = mp_lock()
|
||||
pool = mp_pool(num_worker_threads, init_gam_worker, maxtasksperchild=200, initargs=(l,))
|
||||
sys.stderr.write(f'Using {num_worker_threads} processes...\n')
|
||||
try:
|
||||
results = []
|
||||
@@ -10868,7 +10879,7 @@ def run_batch(items):
|
||||
)
|
||||
pool.close()
|
||||
pool.join()
|
||||
pool = mp_pool(num_worker_threads, init_gam_worker)
|
||||
pool = mp_pool(num_worker_threads, init_gam_worker, maxtasksperchild=200, initargs=(1,))
|
||||
sys.stderr.write(
|
||||
'commit-batch - running processes finished, proceeding\n')
|
||||
continue
|
||||
|
||||
@@ -5,7 +5,9 @@ import os
|
||||
|
||||
from google.auth.jwt import Credentials as JWTCredentials
|
||||
|
||||
import gam
|
||||
from gam.auth import oauth
|
||||
from gam.auth import yubikey
|
||||
from gam.var import _FN_OAUTH2_TXT
|
||||
from gam.var import _FN_OAUTH2SERVICE_JSON
|
||||
from gam.var import GC_OAUTH2_TXT
|
||||
@@ -36,10 +38,17 @@ def get_admin_credentials(api=None):
|
||||
with open(credential_file, 'r') as f:
|
||||
creds_data = json.load(f)
|
||||
# Validate that enable DASA matches content of authorization file
|
||||
if GC_Values[GC_ENABLE_DASA] and 'private_key' in creds_data:
|
||||
if GC_Values[GC_ENABLE_DASA] and 'private_key_id' in creds_data:
|
||||
audience = f'https://{api}.googleapis.com/'
|
||||
return JWTCredentials.from_service_account_info(creds_data,
|
||||
audience=audience)
|
||||
key_type = creds_data.get('key_type', 'default')
|
||||
if key_type == 'default':
|
||||
return JWTCredentials.from_service_account_info(creds_data,
|
||||
audience=audience)
|
||||
elif key_type == 'yubikey':
|
||||
yksigner = yubikey.YubiKey(creds_data)
|
||||
return JWTCredentials._from_signer_and_info(yksigner,
|
||||
creds_data,
|
||||
audience=audience)
|
||||
elif not GC_Values[GC_ENABLE_DASA] and 'token' in creds_data:
|
||||
return oauth.Credentials.from_credentials_file(credential_file)
|
||||
else:
|
||||
|
||||
63
src/gam/auth/yubikey.py
Normal file
63
src/gam/auth/yubikey.py
Normal file
@@ -0,0 +1,63 @@
|
||||
import sys
|
||||
from threading import Timer
|
||||
|
||||
# hack to avoid ImportError on unneccessary libraries
|
||||
class fake_open():
|
||||
open_devices = None
|
||||
sys.modules['ykman.driver_otp'] = fake_open
|
||||
|
||||
import ykman.descriptor
|
||||
from ykman.piv import SLOT, ALGO, PivController, DEFAULT_MANAGEMENT_KEY
|
||||
from ykman.util import TRANSPORT
|
||||
|
||||
from gam import controlflow
|
||||
|
||||
class YubiKey():
|
||||
|
||||
def __init__(self, service_account_info):
|
||||
algo = service_account_info.get('yubikey_algo', 'RSA2048')
|
||||
try:
|
||||
self.algo = getattr(ALGO, algo.upper())
|
||||
except AttributeError:
|
||||
controlflow.system_error_exit(6, f'{algo} is not a valid value for yubikey_algo')
|
||||
slot = service_account_info.get('yubikey_slot', 'AUTHENTICATION')
|
||||
try:
|
||||
self.slot = getattr(SLOT, slot.upper())
|
||||
except AttributeError:
|
||||
controlflow.system_error_exit(6, f'{slot} is not a valid value for yubikey_slot')
|
||||
self.pin = service_account_info.get('yubikey_pin')
|
||||
self.key_id = service_account_info.get('private_key_id')
|
||||
|
||||
def touch_callback(self):
|
||||
sys.stderr.write('\nTouch your YubiKey...\n')
|
||||
|
||||
def sign(self, message):
|
||||
timer = Timer(0.5, self.touch_callback)
|
||||
if 'mplock' in globals():
|
||||
mplock.acquire()
|
||||
try:
|
||||
with ykman.descriptor.open_device(transports=TRANSPORT.CCID) as yk:
|
||||
controller = PivController(yk.driver)
|
||||
if self.pin:
|
||||
controller.verify(self.pin)
|
||||
timer.start() # if sign() takes more than .5 sec we need touch
|
||||
try:
|
||||
signed = controller.sign(self.slot, self.algo, message)
|
||||
except ykman.driver_ccid.APDUError: # We need PIN
|
||||
timer.cancel() # reset timer while user enters PIN
|
||||
sys.stderr.write('\nEnter your YubiKey PIN:\n')
|
||||
self.pin = input()
|
||||
timer = Timer(0.5, self.touch_callback)
|
||||
controller.verify(self.pin)
|
||||
timer.start()
|
||||
signed = controller.sign(self.slot, self.algo, message)
|
||||
timer.cancel()
|
||||
except ykman.descriptor.FailedOpeningDeviceException:
|
||||
controlflow.system_error_exit(5, 'No YubiKey found. Is it plugged in?')
|
||||
except ykman.piv.WrongPin:
|
||||
controlflow.system_error_exit(7, 'Wrong PIN for YubiKey.')
|
||||
except ykman.piv.AuthenticationBlocked:
|
||||
controlflow.system_error_exit(8, 'YubiKey PIN is blocked.')
|
||||
if 'mplock' in globals():
|
||||
mplock.release()
|
||||
return signed
|
||||
@@ -8,3 +8,4 @@ google-auth>=1.11.2
|
||||
httplib2>=0.17.0
|
||||
passlib>=1.7.2; sys_platform == 'win32'
|
||||
python-dateutil
|
||||
yubikey-manager
|
||||
|
||||
Reference in New Issue
Block a user