mtgencode/scripts/streamcards.py

124 lines
4.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -- STOLEN FROM torch-rnn/scripts/streamfile.py -- #
import os
import threading
import time
import signal
import traceback
import psutil
# correctly setting up a stream that won't get orphaned and left clutting the operating
# system proceeds in 3 parts:
# 1) invoke install_suicide_handlers() to ensure correct behavior on interrupt
# 2) get threads by invoking spawn_stream_threads
# 3) invoke wait_and_kill_self_noreturn(threads)
# or, use the handy wrapper that does it for you
def spawn_stream_threads(fds, runthread, mkargs):
threads = []
for i, fd in enumerate(fds):
stream_thread = threading.Thread(target=runthread, args=mkargs(i, fd))
stream_thread.daemon = True
stream_thread.start()
threads.append(stream_thread)
return threads
def force_kill_self_noreturn():
# We have a strange issue here, which is that our threads will refuse to die
# to a normal exit() or sys.exit() because they're all blocked in write() calls
# on full pipes; the simplest workaround seems to be to ask the OS to terminate us.
# This kinda works, but...
#os.kill(os.getpid(), signal.SIGTERM)
# psutil might have useful features like checking if the pid has been reused before killing it.
# Also we might have child processes like l2e luajits to think about.
me = psutil.Process(os.getpid())
for child in me.children(recursive=True):
child.terminate()
me.terminate()
def handler_kill_self(signum, frame):
if signum != signal.SIGQUIT:
traceback.print_stack(frame)
print(
('caught signal {:d} - streamer sending SIGTERM to self'.format(signum)))
force_kill_self_noreturn()
def install_suicide_handlers():
for sig in [signal.SIGHUP, signal.SIGINT, signal.SIGQUIT]:
signal.signal(sig, handler_kill_self)
def wait_and_kill_self_noreturn(threads):
running = True
while running:
running = False
for thread in threads:
if thread.is_alive():
running = True
if(os.getppid() <= 1):
# exit if parent process died (and we were reparented to init)
break
time.sleep(1)
force_kill_self_noreturn()
def streaming_noreturn(fds, write_stream, mkargs):
install_suicide_handlers()
threads = spawn_stream_threads(fds, write_stream, mkargs)
wait_and_kill_self_noreturn(threads)
assert False, 'should not return from streaming'
# -- END STOLEN FROM torch-rnn/scripts/streamfile.py -- #
import sys
import random
libdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../lib')
sys.path.append(libdir)
import utils
import jdecode
import transforms
def main(args):
fds = args.fds
fname = args.fname
block_size = args.block_size
main_seed = args.seed if args.seed != 0 else None
# simple default encoding for now, will add more options with the curriculum
# learning feature
cards = jdecode.mtg_open_file(fname, verbose=True, linetrans=True)
def write_stream(i, fd):
local_random = random.Random(main_seed)
local_random.jumpahead(i)
local_cards = [card for card in cards]
with open('/proc/self/fd/'+str(fd), 'wt') as f:
while True:
local_random.shuffle(local_cards)
for card in local_cards:
f.write(card.encode(randomize_mana=True, randomize_lines=True))
f.write(utils.cardsep)
def mkargs(i, fd):
return i, fd
streaming_noreturn(fds, write_stream, mkargs)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('fds', type=int, nargs='+',
help='file descriptors to write streams to')
parser.add_argument('-f', '--fname', default=os.path.join(libdir, '../data/output.txt'),
help='file to read cards from')
parser.add_argument('-n', '--block_size', type=int, default=10000,
help='number of characters each stream should read/write at a time')
parser.add_argument('-s', '--seed', type=int, default=0,
help='random seed')
args = parser.parse_args()
main(args)