YubiKey improvements and PIV reset

This commit is contained in:
Jay Lee
2021-07-27 09:24:34 -04:00
parent ed20fe252e
commit a3d560a8a2
2 changed files with 104 additions and 17 deletions

View File

@@ -7912,6 +7912,10 @@ def doCreateOrRotateServiceAccountKeys(iam=None,
new_data['yubikey_key_type'] = f'RSA{local_key_size}' new_data['yubikey_key_type'] = f'RSA{local_key_size}'
new_data.pop('private_key', None) new_data.pop('private_key', None)
yk = yubikey.YubiKey(new_data) yk = yubikey.YubiKey(new_data)
if 'yubikey_serial_number' not in new_data:
new_data['yubikey_serial_number'] = yk.get_serial_number()
if 'yubikey_slot' not in new_data:
new_data['yubikey_slot'] = 'AUTHENTICATION'
publicKeyData = yk.get_certificate() publicKeyData = yk.get_certificate()
elif local_key_size: elif local_key_size:
# Generate private key locally, store in file # Generate private key locally, store in file
@@ -11847,6 +11851,12 @@ def ProcessGAMCommand(args):
elif command == 'getcommand': elif command == 'getcommand':
gapi_directory_cros.get_command() gapi_directory_cros.get_command()
sys.exit(0) sys.exit(0)
elif command in ['yubikey']:
action = sys.argv[2].lower().replace('_', '')
if action == 'resetpiv':
yk = yubikey.YubiKey()
yk.reset_piv()
sys.exit(0)
users = getUsersToModify() users = getUsersToModify()
command = sys.argv[3].lower() command = sys.argv[3].lower()
if command == 'print' and len(sys.argv) == 4: if command == 'print' and len(sys.argv) == 4:

View File

@@ -1,34 +1,57 @@
from base64 import b64encode from base64 import b64encode
import datetime
from secrets import SystemRandom
import string
import sys import sys
from threading import Timer from threading import Timer
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import padding
from ykman.device import connect_to_device from ykman.device import connect_to_device
from yubikit.piv import KEY_TYPE, SLOT, InvalidPinError, PivSession from ykman.piv import generate_self_signed_certificate, \
generate_chuid
from yubikit.piv import DEFAULT_MANAGEMENT_KEY, \
InvalidPinError, \
KEY_TYPE, \
MANAGEMENT_KEY_TYPE, \
PIN_POLICY, \
PivSession, \
OBJECT_ID, \
SLOT, \
TOUCH_POLICY
from yubikit.core.smartcard import ApduError from yubikit.core.smartcard import ApduError
from gam import controlflow from gam import controlflow
class YubiKey(): class YubiKey():
def __init__(self, service_account_info): def __init__(self, service_account_info=None):
key_type = service_account_info.get('yubikey_key_type', 'RSA2048') self.key_type = None
try: self.slot = None
self.key_type = getattr(KEY_TYPE, key_type.upper()) self.serial_number = None
except AttributeError: self.pin = None
controlflow.system_error_exit(6, f'{key_type} is not a valid value for yubikey_key_type') self.key_id = None
slot = service_account_info.get('yubikey_slot', 'AUTHENTICATION') if service_account_info:
try: key_type = service_account_info.get('yubikey_key_type', 'RSA2048')
self.slot = getattr(SLOT, slot.upper()) try:
except AttributeError: self.key_type = getattr(KEY_TYPE, key_type.upper())
controlflow.system_error_exit(6, f'{slot} is not a valid value for yubikey_slot') except AttributeError:
self.serial_number = service_account_info.get('yubikey_serial_number') controlflow.system_error_exit(6, f'{key_type} is not a valid value for yubikey_key_type')
self.pin = service_account_info.get('yubikey_pin') slot = service_account_info.get('yubikey_slot', 'AUTHENTICATION')
self.key_id = service_account_info.get('private_key_id') try:
self.slot = getattr(SLOT, slot.upper())
except AttributeError:
controlflow.system_error_exit(6, f'{slot} is not a valid value for yubikey_slot')
self.serial_number = service_account_info.get('yubikey_serial_number')
self.pin = service_account_info.get('yubikey_pin')
self.key_id = service_account_info.get('private_key_id')
def _connect(self):
conn, _, _ = connect_to_device(self.serial_number)
return conn
def get_certificate(self): def get_certificate(self):
try: try:
conn, _, _ = connect_to_device(self.serial_number) conn = self._connect()
with conn: with conn:
session = PivSession(conn) session = PivSession(conn)
if self.pin: if self.pin:
@@ -49,11 +72,65 @@ class YubiKey():
except ValueError as err: except ValueError as err:
controlflow.system_error_exit(9, f'YubiKey - {err}') controlflow.system_error_exit(9, f'YubiKey - {err}')
def get_serial_number(self):
try:
_, _, info = connect_to_device(self.serial_number)
return info.serial
except ValueError as err:
controlflow.system_error_exit(9, f'YubikKey = {err}')
def reset_piv(self):
'''Resets YubiKey PIV app and generates new key for GAM to use.'''
reply = str(input('This will wipe all PIV keys and configuration from your YubiKey. Are you sure? (y/N) ').lower().strip())
if reply != 'y':
sys.exit(1)
try:
conn = self._connect()
with conn:
piv = PivSession(conn)
piv.reset()
rnd = SystemRandom()
pin_puk_chars = string.ascii_letters + string.digits + string.punctuation
new_puk = ''.join(rnd.choice(pin_puk_chars) for _ in range(8))
new_pin = ''.join(rnd.choice(pin_puk_chars) for _ in range(8))
piv.change_puk('12345678', new_puk)
piv.change_pin('123456', new_pin)
print(f'PIN set to: {new_pin}')
piv.authenticate(MANAGEMENT_KEY_TYPE.TDES,
DEFAULT_MANAGEMENT_KEY)
piv.verify_pin(new_pin)
print('Yubikey is generating a non-exportable private key...')
pubkey = piv.generate_key(SLOT.AUTHENTICATION,
KEY_TYPE.RSA2048,
PIN_POLICY.ALWAYS,
TOUCH_POLICY.NEVER)
now = datetime.datetime.utcnow()
valid_to = now + datetime.timedelta(days=36500)
subject = 'CN=GAM Created Key'
piv.authenticate(MANAGEMENT_KEY_TYPE.TDES,
DEFAULT_MANAGEMENT_KEY)
piv.verify_pin(new_pin)
cert = generate_self_signed_certificate(piv,
SLOT.AUTHENTICATION,
pubkey,
subject,
now,
valid_to)
piv.put_certificate(SLOT.AUTHENTICATION,
cert)
piv.put_object(OBJECT_ID.CHUID,
generate_chuid())
except ValueError as err:
controlflow.system_error_exit(8, f'Yubikey - {err}')
def sign(self, message): def sign(self, message):
if 'mplock' in globals(): if 'mplock' in globals():
mplock.acquire() mplock.acquire()
try: try:
conn, _, _ = connect_to_device(self.serial_number) conn = self._connect()
with conn: with conn:
session = PivSession(conn) session = PivSession(conn)
if self.pin: if self.pin: