From 18e1a66f885de79ca2a873893e68b9d7e570fb18 Mon Sep 17 00:00:00 2001 From: Bill Zorn Date: Sun, 8 Nov 2015 19:57:47 -0800 Subject: [PATCH] added parallel versions to cbow and namediff analyses, not used yet though --- lib/cbow.py | 69 ++++++++++++++++++++++++++++--------------------- lib/namediff.py | 61 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 89 insertions(+), 41 deletions(-) diff --git a/lib/cbow.py b/lib/cbow.py index ce18780..8899bae 100644 --- a/lib/cbow.py +++ b/lib/cbow.py @@ -8,20 +8,18 @@ import subprocess import os import struct import math +import multiprocessing + import utils import cardlib import transforms - -# # this would be nice, but doing it naively makes things worse -# from joblib import Parallel, delayed -# import multiprocessing +import namediff libdir = os.path.dirname(os.path.realpath(__file__)) datadir = os.path.realpath(os.path.join(libdir, '../data')) -# # multithreading control parameters -# cores = multiprocessing.cpu_count() -# segments = cores / 2 if cores / 2 > 0 else 1 +# multithreading control parameters +cores = multiprocessing.cpu_count() # max length of vocabulary entries max_w = 50 @@ -118,6 +116,33 @@ except ImportError: def cosine_similarity_name(cardvec, v, name): return (cosine_similarity(cardvec, v), name) +# we need to put the logic in a regular function (as opposed to a method of an object) +# so that we can pass the function to multiprocessing +def f_nearest(card, vocab, vecs, cardvecs, n): + if isinstance(card, cardlib.Card): + words = card.vectorize().split('\n\n')[0] + else: + # assume it's a string (that's already a vector) + words = card + + if not words: + return [] + + cardvec = makevector(vocab, vecs, words) + + comparisons = [cosine_similarity_name(cardvec, v, name) for (name, v) in cardvecs] + + comparisons.sort(reverse = True) + comp_n = comparisons[:n] + + if isinstance(card, cardlib.Card) and card.bside: + comp_n += f_nearest(card.bside, vocab, vecs, cardvecs, n=n) + + return comp_n + +def f_nearest_per_thread(workitem): + (workcards, vocab, vecs, cardvecs, n) = workitem + return map(lambda card: f_nearest(card, vocab, vecs, cardvecs, n), workcards) class CBOW: def __init__(self, verbose = True, @@ -147,8 +172,6 @@ class CBOW: self.cardvecs += [(name, makevector(self.vocab, self.vecs, card.vectorize()))] - - # self.par = Parallel(n_jobs=segments) if self.verbose: print '... Done.' @@ -157,25 +180,11 @@ class CBOW: print ' card vecs: ' + str(len(self.cardvecs)) def nearest(self, card, n=5): - if isinstance(card, cardlib.Card): - words = card.vectorize().split('\n\n')[0] - else: - # assume it's a string (that's already a vector) - words = card - - if not words: - return [] + return f_nearest(card, self.vocab, self.vecs, self.cardvecs, n) - cardvec = makevector(self.vocab, self.vecs, words) - - comparisons = [cosine_similarity_name(cardvec, v, name) for (name, v) in self.cardvecs] - # comparisons = self.par(delayed(cosine_similarity_name)(cardvec, v, name) - # for (name, v) in self.cardvecs) - - comparisons.sort(reverse = True) - comp_n = comparisons[:n] - - if isinstance(card, cardlib.Card) and card.bside: - comp_n += self.nearest(card.bside) - - return comp_n + def nearest_par(self, cards, n=5, threads=cores): + workpool = multiprocessing.Pool(threads) + proto_worklist = namediff.list_split(cards, threads) + worklist = map(lambda x: (x, self.vocab, self.vecs, self.cardvecs, n), proto_worklist) + donelist = workpool.map(f_nearest_per_thread, worklist) + return namediff.list_flatten(donelist) diff --git a/lib/namediff.py b/lib/namediff.py index 71cdf2e..2707ea1 100644 --- a/lib/namediff.py +++ b/lib/namediff.py @@ -1,11 +1,54 @@ +# This module is misleadingly named, as it has other utilities as well +# that are generally necessary when trying to postprocess output by +# comparing it against existing cards. + import difflib import os +import multiprocessing + import jdecode import cardlib libdir = os.path.dirname(os.path.realpath(__file__)) datadir = os.path.realpath(os.path.join(libdir, '../data')) +# multithreading control parameters +cores = multiprocessing.cpu_count() + +# split a list into n pieces; return a list of these lists +# has slightly interesting behavior, in that if n is large, it can +# run out of elements early and return less than n lists +def list_split(l, n): + if n <= 0: + return l + split_size = len(l) / n + if len(l) % n > 0: + split_size += 1 + return [l[i:i+split_size] for i in range(0, len(l), split_size)] + +# flatten a list of lists into a single list of all their contents, in order +def list_flatten(l): + return [item for sublist in l for item in sublist] + + +# isolated logic for multiprocessing +def f_nearest(name, matchers, n): + for m in matchers: + m.set_seq1(name) + ratios = [(m.ratio(), m.b) for m in matchers] + ratios.sort(reverse = True) + + if ratios[0][0] >= 1: + return ratios[:1] + else: + return ratios[:n] + +def f_nearest_per_thread(workitem): + (worknames, names, n) = workitem + # each thread (well, process) needs to generate its own matchers + matchers = [difflib.SequenceMatcher(b=name, autojunk=False) for name in names] + return map(lambda name: f_nearest(name, matchers, n), worknames) + class Namediff: def __init__(self, verbose = True, json_fname = os.path.join(datadir, 'AllSets.json')): @@ -43,15 +86,11 @@ class Namediff: print '... Done.' def nearest(self, name, n=3): - for m in self.matchers: - m.set_seq1(name) - ratios = [(m.ratio(), m.b) for m in self.matchers] - ratios.sort(reverse = True) - - if ratios[0][0] >= 1: - return ratios[:1] - else: - return ratios[:n] - - + return f_nearest(name, self.matchers, n) + def nearest_par(self, names, n=3, threads=cores): + workpool = multiprocessing.Pool(threads) + proto_worklist = list_split(names, threads) + worklist = map(lambda x: (x, self.names, n), proto_worklist) + donelist = workpool.map(f_nearest_per_thread, worklist) + return list_flatten(donelist)