added parallel versions to cbow and namediff analyses, not used yet though

This commit is contained in:
Bill Zorn 2015-11-08 19:57:47 -08:00
parent b3860eb924
commit 18e1a66f88
2 changed files with 89 additions and 41 deletions

View file

@ -8,20 +8,18 @@ import subprocess
import os
import struct
import math
import multiprocessing
import utils
import cardlib
import transforms
# # this would be nice, but doing it naively makes things worse
# from joblib import Parallel, delayed
# import multiprocessing
import namediff
libdir = os.path.dirname(os.path.realpath(__file__))
datadir = os.path.realpath(os.path.join(libdir, '../data'))
# # multithreading control parameters
# cores = multiprocessing.cpu_count()
# segments = cores / 2 if cores / 2 > 0 else 1
# multithreading control parameters
cores = multiprocessing.cpu_count()
# max length of vocabulary entries
max_w = 50
@ -118,6 +116,33 @@ except ImportError:
def cosine_similarity_name(cardvec, v, name):
return (cosine_similarity(cardvec, v), name)
# we need to put the logic in a regular function (as opposed to a method of an object)
# so that we can pass the function to multiprocessing
def f_nearest(card, vocab, vecs, cardvecs, n):
if isinstance(card, cardlib.Card):
words = card.vectorize().split('\n\n')[0]
else:
# assume it's a string (that's already a vector)
words = card
if not words:
return []
cardvec = makevector(vocab, vecs, words)
comparisons = [cosine_similarity_name(cardvec, v, name) for (name, v) in cardvecs]
comparisons.sort(reverse = True)
comp_n = comparisons[:n]
if isinstance(card, cardlib.Card) and card.bside:
comp_n += f_nearest(card.bside, vocab, vecs, cardvecs, n=n)
return comp_n
def f_nearest_per_thread(workitem):
(workcards, vocab, vecs, cardvecs, n) = workitem
return map(lambda card: f_nearest(card, vocab, vecs, cardvecs, n), workcards)
class CBOW:
def __init__(self, verbose = True,
@ -148,8 +173,6 @@ class CBOW:
self.vecs,
card.vectorize()))]
# self.par = Parallel(n_jobs=segments)
if self.verbose:
print '... Done.'
print ' vocab size: ' + str(len(self.vocab))
@ -157,25 +180,11 @@ class CBOW:
print ' card vecs: ' + str(len(self.cardvecs))
def nearest(self, card, n=5):
if isinstance(card, cardlib.Card):
words = card.vectorize().split('\n\n')[0]
else:
# assume it's a string (that's already a vector)
words = card
return f_nearest(card, self.vocab, self.vecs, self.cardvecs, n)
if not words:
return []
cardvec = makevector(self.vocab, self.vecs, words)
comparisons = [cosine_similarity_name(cardvec, v, name) for (name, v) in self.cardvecs]
# comparisons = self.par(delayed(cosine_similarity_name)(cardvec, v, name)
# for (name, v) in self.cardvecs)
comparisons.sort(reverse = True)
comp_n = comparisons[:n]
if isinstance(card, cardlib.Card) and card.bside:
comp_n += self.nearest(card.bside)
return comp_n
def nearest_par(self, cards, n=5, threads=cores):
workpool = multiprocessing.Pool(threads)
proto_worklist = namediff.list_split(cards, threads)
worklist = map(lambda x: (x, self.vocab, self.vecs, self.cardvecs, n), proto_worklist)
donelist = workpool.map(f_nearest_per_thread, worklist)
return namediff.list_flatten(donelist)

View file

@ -1,11 +1,54 @@
# This module is misleadingly named, as it has other utilities as well
# that are generally necessary when trying to postprocess output by
# comparing it against existing cards.
import difflib
import os
import multiprocessing
import jdecode
import cardlib
libdir = os.path.dirname(os.path.realpath(__file__))
datadir = os.path.realpath(os.path.join(libdir, '../data'))
# multithreading control parameters
cores = multiprocessing.cpu_count()
# split a list into n pieces; return a list of these lists
# has slightly interesting behavior, in that if n is large, it can
# run out of elements early and return less than n lists
def list_split(l, n):
if n <= 0:
return l
split_size = len(l) / n
if len(l) % n > 0:
split_size += 1
return [l[i:i+split_size] for i in range(0, len(l), split_size)]
# flatten a list of lists into a single list of all their contents, in order
def list_flatten(l):
return [item for sublist in l for item in sublist]
# isolated logic for multiprocessing
def f_nearest(name, matchers, n):
for m in matchers:
m.set_seq1(name)
ratios = [(m.ratio(), m.b) for m in matchers]
ratios.sort(reverse = True)
if ratios[0][0] >= 1:
return ratios[:1]
else:
return ratios[:n]
def f_nearest_per_thread(workitem):
(worknames, names, n) = workitem
# each thread (well, process) needs to generate its own matchers
matchers = [difflib.SequenceMatcher(b=name, autojunk=False) for name in names]
return map(lambda name: f_nearest(name, matchers, n), worknames)
class Namediff:
def __init__(self, verbose = True,
json_fname = os.path.join(datadir, 'AllSets.json')):
@ -43,15 +86,11 @@ class Namediff:
print '... Done.'
def nearest(self, name, n=3):
for m in self.matchers:
m.set_seq1(name)
ratios = [(m.ratio(), m.b) for m in self.matchers]
ratios.sort(reverse = True)
if ratios[0][0] >= 1:
return ratios[:1]
else:
return ratios[:n]
return f_nearest(name, self.matchers, n)
def nearest_par(self, names, n=3, threads=cores):
workpool = multiprocessing.Pool(threads)
proto_worklist = list_split(names, threads)
worklist = map(lambda x: (x, self.names, n), proto_worklist)
donelist = workpool.map(f_nearest_per_thread, worklist)
return list_flatten(donelist)