mirror of
https://github.com/GAM-team/GAM.git
synced 2026-06-28 18:01:36 +00:00
gam copy vaultexport/storagebucket commands
This commit is contained in:
@@ -2,20 +2,148 @@ import base64
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
import googleapiclient
|
||||
|
||||
import gam
|
||||
from gam.gapi import errors as gapi_errors
|
||||
from gam.var import *
|
||||
from gam import controlflow
|
||||
from gam import fileutils
|
||||
from gam import gapi
|
||||
from gam import utils
|
||||
|
||||
|
||||
def build_gapi():
|
||||
def build():
|
||||
return gam.buildGAPIObject('storage')
|
||||
|
||||
|
||||
def copy_bucket():
|
||||
s = build()
|
||||
source_bucket = None
|
||||
target_bucket = None
|
||||
prefix = None
|
||||
i = 3
|
||||
while i < len(sys.argv):
|
||||
myarg = sys.argv[i].lower().replace('_', '')
|
||||
if myarg == 'sourcebucket':
|
||||
source_bucket = sys.argv[i+1]
|
||||
i += 2
|
||||
elif myarg == 'targetbucket':
|
||||
target_bucket = sys.argv[i+1]
|
||||
i += 2
|
||||
elif myarg == 'sourceprefix':
|
||||
prefix = sys.argv[i+1]
|
||||
i += 2
|
||||
elif myarg == 'targetprefix':
|
||||
target_prefix = sys.argv[i+1]
|
||||
i += 2
|
||||
else:
|
||||
controlflow.invalid_argument_exit(sys.argv[i],
|
||||
'gam copy storagebucket')
|
||||
if not target_bucket:
|
||||
controlflow.missing_argument_exit('target_bucket', 'gam copy storagebucket')
|
||||
if not source_bucket:
|
||||
controlflow.missing_argument_exit('source_bucket', 'gam copy storagebucket')
|
||||
page_message = gapi.got_total_items_msg('Storage Objects', '...\n')
|
||||
objects = gapi.get_all_pages(s.objects(),
|
||||
'list',
|
||||
items='items',
|
||||
page_message=page_message,
|
||||
prefix=prefix,
|
||||
bucket=source_bucket,
|
||||
fields='items(name,bucket,md5Hash),nextPageToken')
|
||||
copy_objects(objects,
|
||||
target_bucket,
|
||||
target_prefix)
|
||||
|
||||
|
||||
def copy_objects(objects,
|
||||
target_bucket,
|
||||
target_prefix):
|
||||
"""Copies objects to target_bucket.
|
||||
|
||||
Args:
|
||||
objects: list of object dicts
|
||||
[
|
||||
{
|
||||
bucket: source bucket,
|
||||
name: source object name,
|
||||
(optional) md5Hash: source file hash value
|
||||
},
|
||||
...
|
||||
]
|
||||
target_bucket: target bucket id
|
||||
target_prefix: prefix name to prepend to target object
|
||||
|
||||
"""
|
||||
|
||||
def process_rewrite(request_id, response, exception):
|
||||
file_ptr = int(request_id)
|
||||
if exception:
|
||||
# Poor man's backoff/retry
|
||||
if exception.status_code == 429 or exception.status_code > 499:
|
||||
print(f'Temporary error {exception.status_code}. Sleeping 10 seconds...')
|
||||
time.sleep(10)
|
||||
next_batch.add(s.objects().rewrite(**files_to_copy[file_ptr]['method']),
|
||||
request_id=request_id)
|
||||
return
|
||||
else:
|
||||
raise exception
|
||||
file_count = file_ptr + 1
|
||||
source_displayname = files_to_copy[file_ptr]['source_displayname']
|
||||
target_displayname = files_to_copy[file_ptr]['target_displayname']
|
||||
if response.get('done'):
|
||||
source_md5 = files_to_copy[file_ptr]['md5Hash']
|
||||
target_md5 = response['resource']['md5Hash']
|
||||
if source_md5 != target_md5:
|
||||
controlflow.system_error_exit(99, f'Target file {target_displayname} checksum {target_md5} does not match source {source_md5}. This should not happen')
|
||||
else:
|
||||
print(f'[ {file_count} / {total_files} ] 100% VERIFIED - finished copying:\n source: {source_displayname}\n dest: {target_displayname}')
|
||||
else:
|
||||
total_bytes = float(response.get('objectSize'))
|
||||
done_bytes = float(response.get('totalBytesRewritten'))
|
||||
pct = (done_bytes / total_bytes) * 100
|
||||
print(f'[ {file_count} / {total_files} ] {pct:.2f}%\n source: {source_displayname}\n dest:{target_displayname}')
|
||||
files_to_copy[file_ptr]['method']['rewriteToken'] = response.get('rewriteToken')
|
||||
next_batch.add(s.objects().rewrite(**files_to_copy[file_ptr]['method']),
|
||||
request_id=request_id)
|
||||
|
||||
s = build()
|
||||
sbatch = s.new_batch_http_request(callback=process_rewrite)
|
||||
files_to_copy = []
|
||||
for object_ in objects:
|
||||
files_to_copy.append(
|
||||
{
|
||||
'md5Hash': object_['md5Hash'],
|
||||
'source_displayname': f'{object_["bucket"]}:{object_["name"]}',
|
||||
'target_displayname': f'{target_bucket}:{target_prefix}{object_["name"]}',
|
||||
'method': {
|
||||
'destinationBucket': target_bucket,
|
||||
'destinationObject': f'{target_prefix}{object_["name"]}',
|
||||
'sourceBucket': object_['bucket'],
|
||||
'sourceObject': object_['name'],
|
||||
# 'maxBytesRewrittenPerCall': 1048576, # uncomment to easily test multiple rewrite API calls per object
|
||||
},
|
||||
})
|
||||
i = 0
|
||||
total_files = len(files_to_copy)
|
||||
for file in files_to_copy:
|
||||
while len(sbatch._order) == 100:
|
||||
next_batch = s.new_batch_http_request(callback=process_rewrite)
|
||||
sbatch.execute()
|
||||
sbatch = next_batch
|
||||
sbatch.add(s.objects().rewrite(**file['method']),
|
||||
request_id=str(i))
|
||||
i += 1
|
||||
while len(sbatch._order) > 0:
|
||||
next_batch = s.new_batch_http_request(callback=process_rewrite)
|
||||
sbatch.execute()
|
||||
sbatch = next_batch
|
||||
print('All done!')
|
||||
|
||||
|
||||
def get_cloud_storage_object(s,
|
||||
bucket,
|
||||
object_,
|
||||
@@ -60,7 +188,7 @@ def get_cloud_storage_object(s,
|
||||
|
||||
def download_bucket():
|
||||
bucket = sys.argv[3]
|
||||
s = build_gapi()
|
||||
s = build()
|
||||
page_message = gapi.got_total_items_msg('Files', '...')
|
||||
fields = 'nextPageToken,items(name,id,md5Hash)'
|
||||
objects = gapi.get_all_pages(s.objects(),
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from base64 import b64encode
|
||||
import datetime
|
||||
import json
|
||||
import sys
|
||||
@@ -519,7 +520,6 @@ def getHoldInfo():
|
||||
|
||||
|
||||
def convertExportNameToID(v, nameOrID, matterId):
|
||||
nameOrID = nameOrID.lower()
|
||||
cg = UID_PATTERN.match(nameOrID)
|
||||
if cg:
|
||||
return cg.group(1)
|
||||
@@ -530,7 +530,7 @@ def convertExportNameToID(v, nameOrID, matterId):
|
||||
matterId=matterId,
|
||||
fields=fields)
|
||||
for export in exports:
|
||||
if export['name'].lower() == nameOrID:
|
||||
if export['name'].lower() == nameOrID.lower():
|
||||
return export['id']
|
||||
controlflow.system_error_exit(
|
||||
4, f'could not find export name {nameOrID} '
|
||||
@@ -797,11 +797,49 @@ def getMatterInfo():
|
||||
display.print_json(result)
|
||||
|
||||
|
||||
def copyExport():
|
||||
v = buildGAPIObject()
|
||||
s = gapi_storage.build()
|
||||
matterId = getMatterItem(v, sys.argv[3])
|
||||
exportId = convertExportNameToID(v, sys.argv[4], matterId)
|
||||
target_bucket = None
|
||||
target_prefix = ''
|
||||
i = 5
|
||||
while i < len(sys.argv):
|
||||
myarg = sys.argv[i].lower().replace('_', '')
|
||||
if myarg == 'targetbucket':
|
||||
target_bucket = sys.argv[i+1]
|
||||
i += 2
|
||||
elif myarg == 'targetprefix':
|
||||
target_prefix = sys.argv[i+1]
|
||||
i += 2
|
||||
else:
|
||||
controlflow.invalid_argument_exit(sys.argv[i],
|
||||
'gam copy export')
|
||||
if not target_bucket:
|
||||
controlflow.missing_argument_exit('target_bucket', 'gam copy export')
|
||||
export = gapi.call(v.matters().exports(),
|
||||
'get',
|
||||
matterId=matterId,
|
||||
exportId=exportId)
|
||||
objects = []
|
||||
for s_file in export['cloudStorageSink']['files']:
|
||||
# Convert to md5Hash format Storage API uses
|
||||
# because OF COURSE they differ
|
||||
md5Hash = b64encode(bytes.fromhex(s_file['md5Hash'])).decode()
|
||||
objects.append({'bucket': s_file['bucketName'],
|
||||
'name': s_file['objectName'],
|
||||
'md5Hash': md5Hash})
|
||||
gapi_storage.copy_objects(objects,
|
||||
target_bucket,
|
||||
target_prefix)
|
||||
|
||||
|
||||
def downloadExport():
|
||||
verifyFiles = True
|
||||
extractFiles = True
|
||||
v = buildGAPIObject()
|
||||
s = gapi_storage.build_gapi()
|
||||
s = gapi_storage.build()
|
||||
matterId = getMatterItem(v, sys.argv[3])
|
||||
exportId = convertExportNameToID(v, sys.argv[4], matterId)
|
||||
targetFolder = GC_Values[GC_DRIVE_DIR]
|
||||
|
||||
Reference in New Issue
Block a user