use multiprocessing instead of threading to take advantage of multi CPU systems

This commit is contained in:
Jay Lee
2016-11-18 15:45:57 -05:00
parent 349f2801c5
commit b0b3c18e99

View File

@ -126,8 +126,6 @@ GM_GAM_PATH = u'gpth'
GM_WINDOWS = u'wndo' GM_WINDOWS = u'wndo'
# Encodings # Encodings
GM_SYS_ENCODING = u'syen' GM_SYS_ENCODING = u'syen'
# Shared by batch_worker and run_batch
GM_BATCH_QUEUE = u'batq'
# Extra arguments to pass to GAPI functions # Extra arguments to pass to GAPI functions
GM_EXTRA_ARGS_DICT = u'exad' GM_EXTRA_ARGS_DICT = u'exad'
# Current API user # Current API user
@ -153,7 +151,6 @@ GM_Globals = {
GM_GAM_PATH: os.path.dirname(os.path.realpath(__file__)) if not getattr(sys, u'frozen', False) else os.path.dirname(sys.executable), GM_GAM_PATH: os.path.dirname(os.path.realpath(__file__)) if not getattr(sys, u'frozen', False) else os.path.dirname(sys.executable),
GM_WINDOWS: os.name == u'nt', GM_WINDOWS: os.name == u'nt',
GM_SYS_ENCODING: DEFAULT_CHARSET, GM_SYS_ENCODING: DEFAULT_CHARSET,
GM_BATCH_QUEUE: None,
GM_EXTRA_ARGS_DICT: {u'prettyPrint': False}, GM_EXTRA_ARGS_DICT: {u'prettyPrint': False},
GM_CURRENT_API_USER: None, GM_CURRENT_API_USER: None,
GM_CURRENT_API_SCOPES: [], GM_CURRENT_API_SCOPES: [],
@ -10159,36 +10156,26 @@ gam create project
else: else:
print u'It looks like you\'ve already authorized GAM. Refusing to overwrite existing file:\n\n%s' % GC_Values[GC_OAUTH2_TXT] print u'It looks like you\'ve already authorized GAM. Refusing to overwrite existing file:\n\n%s' % GC_Values[GC_OAUTH2_TXT]
def batch_worker():
while True:
item = GM_Globals[GM_BATCH_QUEUE].get()
ProcessGAMCommand(item)
GM_Globals[GM_BATCH_QUEUE].task_done()
def run_batch(items): def run_batch(items):
import Queue from multiprocessing import Pool
import threading
total_items = len(items) total_items = len(items)
current_item = 0 current_item = 0
gam_cmd = [u'gam'] gam_cmd = [u'gam']
num_worker_threads = min(total_items, GC_Values[GC_NUM_THREADS]) num_worker_threads = min(total_items, GC_Values[GC_NUM_THREADS])
GM_Globals[GM_BATCH_QUEUE] = Queue.Queue(maxsize=num_worker_threads) # GM_Globals[GM_BATCH_QUEUE].put() gets blocked when trying to create more items than there are workers pool = Pool(processes=num_worker_threads)
sys.stderr.write(u'starting %s worker threads...\n' % num_worker_threads) sys.stderr.write(u'Using %s processes...\n' % num_worker_threads)
for _ in range(num_worker_threads):
t = threading.Thread(target=batch_worker)
t.daemon = True
t.start()
for item in items: for item in items:
current_item += 1
if not current_item % 100:
sys.stderr.write(u'starting job %s / %s\n' % (current_item, total_items))
if item[0] == u'commit-batch': if item[0] == u'commit-batch':
sys.stderr.write(u'commit-batch - waiting for running processes to finish before proceeding...') sys.stderr.write(u'commit-batch - waiting for running processes to finish before proceeding...')
GM_Globals[GM_BATCH_QUEUE].join() pool.close()
pool.join()
pool = Pool(processes=num_worker_threads)
sys.stderr.write(u'done with commit-batch\n') sys.stderr.write(u'done with commit-batch\n')
continue continue
GM_Globals[GM_BATCH_QUEUE].put(gam_cmd+item) pool.apply_async(ProcessGAMCommand, [item])
GM_Globals[GM_BATCH_QUEUE].join() pool.close()
pool.join()
# #
# Process command line arguments, find substitutions # Process command line arguments, find substitutions
# An argument containing instances of ~~xxx~~ has xxx replaced by the value of field xxx from the CSV file # An argument containing instances of ~~xxx~~ has xxx replaced by the value of field xxx from the CSV file
@ -10285,7 +10272,7 @@ def ProcessGAMCommand(args):
if (not cmd) or cmd.startswith(u'#') or ((len(argv) == 1) and (cmd != u'commit-batch')): if (not cmd) or cmd.startswith(u'#') or ((len(argv) == 1) and (cmd != u'commit-batch')):
continue continue
if cmd == u'gam': if cmd == u'gam':
items.append([arg.encode(GM_Globals[GM_SYS_ENCODING]) for arg in argv[1:]]) items.append([arg.encode(GM_Globals[GM_SYS_ENCODING]) for arg in argv])
elif cmd == u'commit-batch': elif cmd == u'commit-batch':
items.append([cmd]) items.append([cmd])
else: else:
@ -10309,7 +10296,7 @@ def ProcessGAMCommand(args):
GAM_argv, subFields = getSubFields(i, csvFile.fieldnames) GAM_argv, subFields = getSubFields(i, csvFile.fieldnames)
items = [] items = []
for row in csvFile: for row in csvFile:
items.append(processSubFields(GAM_argv, row, subFields)) items.append([u'gam']+processSubFields(GAM_argv, row, subFields))
closeFile(f) closeFile(f)
run_batch(items) run_batch(items)
sys.exit(0) sys.exit(0)