Source code for blendersynth.run.blender_threading

import sys
from tqdm import tqdm
from time import perf_counter, sleep
import numpy as np
from subprocess import Popen
import atexit

from datetime import datetime, timedelta
import os


def _list_split(list, chunks):
    """Split a list into chunks"""
    return [[*x] for x in np.array_split(list, chunks)]


[docs] class BlenderThread: """Manages a list of jobs, which it feeds into sequential Blender instances in a single thread.""" def __init__( self, command, jobs, log_loc, progress_loc, name="", timeout: int = 100, to_stdout: bool = False, MAX_PER_JOB: int = 100, script_directory: str = None, ): """ :param progress_loc: .log file to write progress to :param timeout: longest time in (s) without render after which process is finished/failed. :param to_stdout: If True, print to stdout instead of to a log file. :param MAX_PER_JOB: Split command into jobs of size MAX_PER_JOB, and run each job in a separate process. :param script_directory: If given, add this to `sys.path` before running the script. """ self.command = command self.jobs = _list_split( jobs, np.ceil(len(jobs) / MAX_PER_JOB) ) # split jobs into chunks of MAX_PER_JOB self.njobs = len(self.jobs) self.size = len(jobs) self.timeout = timeout self.script_directory = script_directory self.prev_n = ( 0 # store how many rendered to check for updates for timeout purposes ) self.timer = perf_counter() self.to_stdout = to_stdout if not self.to_stdout and log_loc is not None: self.log_loc = log_loc self.logfile = open(self.log_loc, "a") else: self.log_loc, self.logfile = None, None self.logger_loc = progress_loc self.process = None self.job = -1 self.finished = False self.name = name self.status = f"STARTED THREAD {self.name}"
[docs] def check_in(self): """Checks if current job still running, if not move to next job. If all jobs complete, set self.finished = True""" if self.is_running: return self.job += 1 if self.job >= self.njobs: self.finished = True self.status = f"✓ THREAD {self.name} COMPLETED." return self.status = f"THREAD {self.name} RUNNING JOB {self.job + 1}/{self.njobs}..." self.start_job(self.job)
[docs] def start_job(self, job: int = 0): """ :param job: Index of job to start """ job_list = self.jobs[job] command = self.command.set_job(job_list).set_logger(self.logger_loc).command stdout = sys.stdout if self.to_stdout else self.logfile stderr = sys.stderr if self.to_stdout else self.logfile env = os.environ.copy() if self.script_directory is not None: env["PYTHONPATH"] = ( self.script_directory + os.pathsep + env.get("PYTHONPATH", "") ) self.process = Popen( command, universal_newlines=True, stdout=stdout, stderr=stderr, env=env )
[docs] def terminate(self): """End process""" self.process.kill()
def kill(self): self.terminate() @property def is_running(self): if self.process: return self.process.poll() is None return False @property def complete(self): if not self.is_running: self.logfile.flush() return True def __len__(self): return self.size @property def num_rendered(self): """Read through Log file to find how many renders have been completed""" if self.process is None or not os.path.isfile(self.logger_loc): return 0 # process not started yet x = 0 with open(self.logger_loc, "r") as f: for line in f.readlines(): x += 1 # currently only 1 type of message to logger - render complete return x def remaining_idxs(self): raise NotImplementedError() @property def success(self): return self.num_rendered == self.size
[docs] def check_status(self): """True if still running successfully, False if exceeded timeout""" n = self.num_rendered if n > self.prev_n: self.prev_n = n self.timer = perf_counter() elif (perf_counter() - self.timer) >= self.timeout: self.status = f"✖ THREAD {self.name} FAILED [TIMEOUT]." return False return True
[docs] class BlenderThreadManager: """Manager of multiple :class:`~blendersynth.run.blender_threading.BlenderThread` instances""" def __init__( self, command, jsons, output_directory, print_to_stdout=False, MAX_PER_JOB=100, script_directory=None, ): """ :param commands: Base Blender command to run :param jsons: A list of num_threads size, each element is a list of jsons to render from :param log_locs: :param MAX_PER_JOB: To prevent memory issues, split up jobs into chunks of MAX_PER_JOB :param script_directory: If given, add this to `sys.path` before running the script. """ self.num_threads = len(jsons) self.command = command # create logs session_name = datetime.now().strftime(r"%y%m%d-%H%M%S") log_dir = os.path.join(output_directory, "logs", session_name) progress_dir = os.path.join(log_dir, "_progress") # for storing progress os.makedirs(log_dir, exist_ok=True) os.makedirs(progress_dir, exist_ok=True) logs = [ os.path.join(log_dir, f"log_{i:02d}.txt") for i in range(self.num_threads) ] progresses = [ os.path.join(progress_dir, f"progress_{i:02d}.log") for i in range(self.num_threads) ] self.log_locs = logs # Set report name as report_xx, incrementing by 1 each report report_fname = f"report_{len([f for f in os.listdir(output_directory) if 'report' in f]):02d}.txt" self.report_loc = os.path.join(output_directory, report_fname) self.t0 = 0 self.session_start = None self.threads = [] for i in range(self.num_threads): thread = BlenderThread( command, jobs=jsons[i], log_loc=None if logs is None else logs[i], progress_loc=progresses[i], name=str(i), to_stdout=print_to_stdout, MAX_PER_JOB=MAX_PER_JOB, script_directory=script_directory, ) self.threads.append(thread) def __len__(self): return sum(map(len, self.threads))
[docs] def start( self, progress_bars: bool = True, tick: float = 0.5, report_every: float = 15.0, offset: float = 1.0, ): """Start all threads and job progress :param progress_bars: Show progress bars for each thread and overall progress :param tick: How often to update progress bars :param report_every: How often to print status updates to log :param offset: How long to wait before starting each thread (to avoid memory issues) """ self.t0 = perf_counter() # log start time self.session_start = datetime.now() for thread in self.threads: thread.check_in() sleep(offset) # register atexit handler atexit.register(self.terminate) last_report_time = perf_counter() if progress_bars: pbars = [] bar_format = "{l_bar}{bar:10}{r_bar}{bar:-10b}" for i, thread in enumerate(self.threads): p = tqdm( total=len(thread), initial=thread.num_rendered, bar_format=bar_format, position=i + 1, ) pbars.append(p) with tqdm(total=len(self), bar_format=bar_format, position=0) as pbar: while any(t.is_running for t in self.threads): sleep(tick) # Update progress bar rendered_images = self.num_rendered desc = "Session rendering..." desc += f" [Dataset: {rendered_images}/{len(self)}]" pbar.set_description(desc) pbar.n = self.num_rendered pbar.refresh() if (perf_counter() - last_report_time) >= report_every: self.update_report() last_report_time = perf_counter() # Update sub-progress bars, restart threads if needed for t, thread in enumerate(self.threads): if thread.finished: continue thread.check_in() if thread.success: pbars[t].n = thread.num_rendered else: thread_running = thread.check_status() if thread_running: pbars[t].n = thread.num_rendered else: # Thread failed, currently will only notify the user pass pbars[t].set_description(thread.status) self.update_report()
@property def num_rendered(self): return sum(t.num_rendered for t in self.threads) def update_report(self): t1 = perf_counter() elapsed = t1 - self.t0 report = [] if self.num_rendered > 0: # calculate number of seconds remaining s_remaining = (len(self) - self.num_rendered) * ( elapsed / self.num_rendered ) eta = datetime.now() + timedelta(seconds=s_remaining) report += [ f"Number of images rendered: {self.num_rendered}\n", f"Total session quota: {len(self)}\n", f"Time elapsed: {timedelta(seconds=round(elapsed))}\n", f"Time per render (s): {elapsed / self.num_rendered:.2f}\n\n", f"Session start: {self.session_start.strftime('%I:%M %p %d/%m/%y')}\n" f"Estimated End: {eta.strftime('%I:%M %p %d/%m/%y')}", ] with open(self.report_loc, "w") as outfile: outfile.writelines(report)
[docs] def terminate(self): """End all threads""" for thread in self.threads: thread.terminate()