mirror of
https://github.com/GAM-team/GAM.git
synced 2026-06-28 18:01:36 +00:00
Allow "rotating" to a YubiKey private key
This commit is contained in:
@@ -7694,32 +7694,14 @@ def _generatePrivateKeyAndPublicCert(client_id, key_size):
|
||||
return private_pem, publicKeyData
|
||||
|
||||
|
||||
def _formatOAuth2ServiceData(project_id, client_email, client_id, private_key,
|
||||
private_key_id):
|
||||
quoted_email = quote(client_email)
|
||||
key_json = {
|
||||
'auth_provider_x509_cert_url':
|
||||
'https://www.googleapis.com/oauth2/v1/certs',
|
||||
'auth_uri':
|
||||
'https://accounts.google.com/o/oauth2/auth',
|
||||
'client_email':
|
||||
client_email,
|
||||
'client_id':
|
||||
client_id,
|
||||
'client_x509_cert_url':
|
||||
f'https://www.googleapis.com/robot/v1/metadata/x509/{quoted_email}',
|
||||
'private_key':
|
||||
private_key,
|
||||
'private_key_id':
|
||||
private_key_id,
|
||||
'project_id':
|
||||
project_id,
|
||||
'token_uri':
|
||||
'https://oauth2.googleapis.com/token',
|
||||
'type':
|
||||
'service_account',
|
||||
}
|
||||
return json.dumps(key_json, indent=2, sort_keys=True)
|
||||
def _formatOAuth2ServiceData(service_data):
|
||||
quoted_email = quote(service_data.get('client_email', ''))
|
||||
service_data['auth_provider_x509_cert_url'] = 'https://www.googleapis.com/oauth2/v1/certs'
|
||||
service_data['auth_uri'] = 'https://accounts.google.com/o/oauth2/auth'
|
||||
service_data['client_x509_cert_url'] = f'https://www.googleapis.com/robot/v1/metadata/x509/{quoted_email}'
|
||||
service_data['token_uri'] = 'https://oauth2.googleapis.com/token'
|
||||
service_data['type'] = 'service_account'
|
||||
return json.dumps(service_data, indent=2, sort_keys=True)
|
||||
|
||||
|
||||
def doShowServiceAccountKeys():
|
||||
@@ -7765,10 +7747,22 @@ def doCreateOrRotateServiceAccountKeys(iam=None,
|
||||
client_email=None,
|
||||
client_id=None):
|
||||
local_key_size = 2048
|
||||
mode = 'retainexisting'
|
||||
body = {}
|
||||
if iam:
|
||||
mode = 'retainexisting'
|
||||
new_data = {
|
||||
'client_email': client_email,
|
||||
'project_id': project_id,
|
||||
'client_id': client_id,
|
||||
'key_type': 'default'
|
||||
}
|
||||
else:
|
||||
_getSvcAcctData()
|
||||
# dict() ensures we have a real copy, not pointer
|
||||
new_data = dict(GM_Globals[GM_OAUTH2SERVICE_JSON_DATA])
|
||||
oldPrivateKeyId = new_data.get('private_key_id')
|
||||
# assume default key type unless we are told otherwise
|
||||
new_data['key_type'] = 'default'
|
||||
mode = 'retainnone'
|
||||
i = 3
|
||||
iam = buildGAPIServiceObject('iam', None)
|
||||
@@ -7793,38 +7787,59 @@ def doCreateOrRotateServiceAccountKeys(iam=None,
|
||||
'localkeysize must be 1024, 2048 or 4096. 1024 is weak and dangerous. 2048 is recommended. 4096 is slow.'
|
||||
)
|
||||
i += 2
|
||||
elif myarg == 'yubikey':
|
||||
new_data['key_type'] = 'yubikey'
|
||||
i += 1
|
||||
elif myarg == 'yubikeyslot':
|
||||
new_data['yubikey_slot'] = sys.argv[i+1].upper()
|
||||
i =+ 2
|
||||
elif myarg == 'yubikeypin':
|
||||
new_data['yubikey_pin'] = input('Enter your YubiKey PIN: ')
|
||||
i += 1
|
||||
elif myarg == 'yubikeyserialnumber':
|
||||
new_data['yubikey_serial_number'] = sys.argv[i+1]
|
||||
i += 2
|
||||
elif myarg in ['retainnone', 'retainexisting', 'replacecurrent']:
|
||||
mode = myarg
|
||||
i += 1
|
||||
else:
|
||||
controlflow.invalid_argument_exit(myarg, 'gam rotate sakeys')
|
||||
currentPrivateKeyId = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA][
|
||||
'private_key_id']
|
||||
project_id = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['project_id']
|
||||
client_email = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['client_email']
|
||||
client_id = GM_Globals[GM_OAUTH2SERVICE_JSON_DATA]['client_id']
|
||||
clientId = GM_Globals[GM_OAUTH2SERVICE_ACCOUNT_CLIENT_ID]
|
||||
name = f'projects/-/serviceAccounts/{clientId}'
|
||||
if mode != 'retainexisting':
|
||||
keys = gapi.get_items(iam.projects().serviceAccounts().keys(),
|
||||
'list',
|
||||
'keys',
|
||||
name=name,
|
||||
keyTypes='USER_MANAGED')
|
||||
sa_name = f'projects/-/serviceAccounts/{new_data["client_id"]}'
|
||||
if new_data.get('key_type') == 'yubikey':
|
||||
# Use yubikey private key
|
||||
new_data['yubikey_key_type'] = f'RSA{local_key_size}'
|
||||
new_data.pop('private_key', None)
|
||||
yk = yubikey.YubiKey(new_data)
|
||||
publicKeyData = yk.get_certificate()
|
||||
elif local_key_size:
|
||||
# Generate private key locally, store in file
|
||||
new_data['private_key'], publicKeyData = _generatePrivateKeyAndPublicCert(
|
||||
sa_name, local_key_size)
|
||||
new_data['key_type'] = 'default'
|
||||
for key in list(new_data):
|
||||
if key.startswith('yubikey_'):
|
||||
new_data.pop(key, None)
|
||||
if local_key_size:
|
||||
private_key, publicKeyData = _generatePrivateKeyAndPublicCert(
|
||||
name, local_key_size)
|
||||
# Upload public cert for yubikey or local generated
|
||||
print(' Uploading new public certificate to Google...')
|
||||
throw_reasons = [
|
||||
gapi_errors.ErrorReason.FOUR_O_O,
|
||||
gapi_errors.ErrorReason.NOT_FOUND
|
||||
]
|
||||
max_retries = 10
|
||||
for i in range(1, max_retries + 1):
|
||||
try:
|
||||
result = gapi.call(
|
||||
iam.projects().serviceAccounts().keys(),
|
||||
'upload',
|
||||
throw_reasons=[gapi_errors.ErrorReason.NOT_FOUND],
|
||||
name=name,
|
||||
throw_reasons=throw_reasons,
|
||||
name=sa_name,
|
||||
body={'publicKeyData': publicKeyData})
|
||||
break
|
||||
except googleapiclient.errors.HttpError:
|
||||
print('WARNING: that key already exists.')
|
||||
result = {'name': oldPrivateKeyId}
|
||||
break
|
||||
except gapi_errors.GapiNotFoundError as e:
|
||||
if i == max_retries:
|
||||
raise e
|
||||
@@ -7834,31 +7849,36 @@ def doCreateOrRotateServiceAccountKeys(iam=None,
|
||||
f'Waiting for Service Account creation to complete. Sleeping {sleep_time} seconds\n'
|
||||
)
|
||||
time.sleep(sleep_time)
|
||||
private_key_id = result['name'].rsplit('/', 1)[-1]
|
||||
oauth2service_data = _formatOAuth2ServiceData(project_id, client_email,
|
||||
client_id, private_key,
|
||||
private_key_id)
|
||||
newPrivateKeyId = result['name'].rsplit('/', 1)[-1]
|
||||
new_data['private_key_id'] = newPrivateKeyId
|
||||
new_data_str = _formatOAuth2ServiceData(new_data)
|
||||
else:
|
||||
# Ask Google to generate private key, store locally
|
||||
result = gapi.call(iam.projects().serviceAccounts().keys(),
|
||||
'create',
|
||||
name=name,
|
||||
body=body)
|
||||
oauth2service_data = base64.b64decode(
|
||||
new_data_str = base64.b64decode(
|
||||
result['privateKeyData']).decode(UTF8)
|
||||
private_key_id = result['name'].rsplit('/', 1)[-1]
|
||||
newPrivateKeyId = result['name'].rsplit('/', 1)[-1]
|
||||
fileutils.write_file(GC_Values[GC_OAUTH2SERVICE_JSON],
|
||||
oauth2service_data,
|
||||
new_data_str,
|
||||
continue_on_error=False)
|
||||
print(
|
||||
f' Wrote new private key {private_key_id} to {GC_Values[GC_OAUTH2SERVICE_JSON]}'
|
||||
f' Wrote new service account data for {newPrivateKeyId} to {GC_Values[GC_OAUTH2SERVICE_JSON]}'
|
||||
)
|
||||
if mode != 'retainexisting':
|
||||
keys = gapi.get_items(iam.projects().serviceAccounts().keys(),
|
||||
'list',
|
||||
'keys',
|
||||
name=sa_name,
|
||||
keyTypes='USER_MANAGED')
|
||||
count = len(keys) if mode == 'retainnone' else 1
|
||||
print(
|
||||
f' Revoking {count} existing key(s) for Service Account {clientId}')
|
||||
f' Revoking {count} existing key(s) for Service Account {new_data["client_id"]}')
|
||||
for key in keys:
|
||||
keyName = key['name'].rsplit('/', 1)[-1]
|
||||
if mode == 'retainnone' or keyName == currentPrivateKeyId:
|
||||
if (mode == 'retainnone' or keyName == oldPrivateKeyId) and keyName != newPrivateKeyId:
|
||||
print(f' Revoking existing key {keyName} for service account')
|
||||
gapi.call(iam.projects().serviceAccounts().keys(),
|
||||
'delete',
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
from base64 import b64encode
|
||||
import sys
|
||||
from threading import Timer
|
||||
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
from ykman.device import connect_to_device
|
||||
from yubikit.piv import KEY_TYPE, SLOT, InvalidPinError, PivSession
|
||||
@@ -25,6 +26,28 @@ class YubiKey():
|
||||
self.pin = service_account_info.get('yubikey_pin')
|
||||
self.key_id = service_account_info.get('private_key_id')
|
||||
|
||||
def get_certificate(self):
|
||||
try:
|
||||
conn, _, _ = connect_to_device(self.serial_number)
|
||||
session = PivSession(conn)
|
||||
if self.pin:
|
||||
try:
|
||||
session.verify_pin(self.pin)
|
||||
except InvalidPinError as err:
|
||||
controlflow.system_error_exit(7, f'YubiKey - {err}')
|
||||
try:
|
||||
cert = session.get_certificate(self.slot)
|
||||
cert_pem = cert.public_bytes(
|
||||
serialization.Encoding.PEM).decode()
|
||||
publicKeyData = b64encode(cert_pem.encode())
|
||||
if isinstance(publicKeyData, bytes):
|
||||
publicKeyData = publicKeyData.decode()
|
||||
return publicKeyData
|
||||
except ApduError as err:
|
||||
controlflow.system_error_exit(8, f'YubiKey - {err}')
|
||||
except ValueError as err:
|
||||
controlflow.system_error_exit(9, f'YubiKey - {err}')
|
||||
|
||||
def sign(self, message):
|
||||
if 'mplock' in globals():
|
||||
mplock.acquire()
|
||||
|
||||
Reference in New Issue
Block a user