from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals from caffe2.python import core, queue_util from caffe2.python.dataio import Reader, Writer from caffe2.python.net_builder import NetBuilder from caffe2.python.schema import as_record, Field from caffe2.python.task import Task, TaskGroup class Output(object): """ Represents the result of a processor function. A processor can either return an Output, or it can return a record, in which case an Output will be created for it afterwards. """ def __init__(self, nets=None, record=None, should_stop=None): builder_children = NetBuilder.current().get() assert nets is None or len(builder_children) == 0, ( 'Cannot both use `ops` syntax and return a list of nets.') if nets is None: nets = builder_children if isinstance(nets, core.Net): nets = [nets] self.nets = [] if nets is None else list(nets) self.record = None if record is None else as_record(record) self.should_stop = should_stop DEFAULT_QUEUE_CAPACITY = 100 def _init_output(output, capacity, global_init_net, global_exit_net): if isinstance(output, Writer): assert capacity is None, 'capacity would not be used.' out_queue = None writer = output elif hasattr(output, 'writer'): assert capacity is None, 'capacity would not be used.' out_queue = output writer = output.writer() elif output is None: out_queue = queue_util.Queue( capacity=( capacity if capacity is not None else DEFAULT_QUEUE_CAPACITY)) writer = out_queue.writer() else: raise ValueError('output must be a reader, queue or stream.') writer.setup_ex(global_init_net, global_exit_net) return out_queue, writer def make_processor(processor): if processor is None: return lambda rec: rec elif isinstance(processor, core.Net): return NetProcessor(processor) else: return processor def normalize_processor_output(output): """ Allow for processors to return results in several formats. TODO(azzolini): simplify once all processors use NetBuilder API. """ if isinstance(output, Output): """ Processor returned an Output. """ return output elif isinstance(output, Field): """ Processor returned a record. """ return Output(record=output) elif isinstance(output, tuple): is_record_and_blob = ( len(output) == 2 and isinstance(output[0], Field) and isinstance(output[1], core.BlobReference)) if is_record_and_blob: """ Processor returned (record, stop_blob) """ return Output(None, *output) else: """ Processor returned (nets, record, stop_blob) """ return Output(*output) else: """ Processor returned nets, no output """ return Output(output) def pipe( input, output=None, num_threads=1, processor=None, name=None, capacity=None, group=None): """ Given a Reader, Queue or DataStream in `input`, and optionally, a Writer, Queue or DataStream in `output`, creates a Task that, when run, will pipe the input into the output, using multiple parallel threads. Additionally, if a processor is given, it will be called between reading and writing steps, allowing it to transform the record. Args: input: either a Reader, Queue or DataStream that will be read until a stop is signaled either by the reader or the writer. output: either a Writer, a Queue or a DataStream that will be writen to as long as neither reader or writer signal a stop condition. If output is not provided or is None, a Queue is created with given `capacity` and writen to. num_threads: number of concurrent threads used for processing and piping. If set to 0, no Task is created, and a reader is returned instead -- the reader returned will read from the reader passed in and process it. processor: (optional) function that takes an input record and optionally returns a record; this will be called between read and write steps. If the processor does not return a record, a writer will not be instantiated. Processor can also be a core.Net with input and output records properly set. In that case, a NetProcessor is instantiated, cloning the net for each of the threads. name: (optional) name of the task to be created. capacity: when output is not passed, a queue of given `capacity` is created and written to. group: (optional) explicitly add the created Task to this TaskGroup, instead of using the currently active one. Returns: Output Queue, DataStream, Reader, or None, depending on the parameters passed. """ result, step = _pipe_step( input, output, num_threads, processor, name, capacity, group) if step is not None: Task(step=step, group=group) return result def pipe_and_output( input, output=None, num_threads=1, processor=None, name=None, capacity=None, group=None, final_outputs=None): """ Similar to `pipe`, with the additional ability for the pipe Task to return output values to the `Session` once done. Returns: Tuple (out_queue, *task_outputs) out_queue: same as return value of `pipe`. task_outputs: TaskOutput object, fetchable from the client after session.run() returns. """ result, step = _pipe_step( input, output, num_threads, processor, name, capacity, group, final_outputs) assert step is not None task = Task(step=step, group=group, outputs=final_outputs) output = None if final_outputs is not None: output = task.outputs() if type(final_outputs) not in (list, tuple): output = output[0] return result, output def _pipe_step( input, output=None, num_threads=1, processor=None, name=None, capacity=None, group=None, final_outputs=None): """ """ group = TaskGroup.current(group) if name is None: name = 'processor:%d' % group.num_registered_tasks() if isinstance(input, Reader): reader = input elif hasattr(input, 'reader'): reader = input.reader() else: raise ValueError('in must be a reader, queue or streaam.') if processor is not None: reader = ProcessingReader(reader, processor) if num_threads == 0: assert output is None return reader, None global_exit_net = core.Net(name + '_producer_global_exit') global_init_net = core.Net(name + '_producer_global_init') out_queue = None writer = None reader.setup_ex(global_init_net, global_exit_net) steps = [] for thread_id in range(num_threads): init_net = core.Net(name + "_init_net_%d" % thread_id) exit_net = core.Net(name + "_exit_net_%d" % thread_id) read_nets, status, rec = reader.read_record_ex(init_net, exit_net) if rec is not None: if writer is None: out_queue, writer = _init_output( output, capacity, global_init_net, global_exit_net) write_nets, _ = writer.write_record_ex( rec, init_net, exit_net, status) else: write_nets = [] step = core.execution_step( name + "_thread_%d" % thread_id, [ core.execution_step(name + "_init_step", init_net), core.execution_step( name + "_worker_step", list(read_nets) + list(write_nets), should_stop_blob=status ), core.execution_step(name + "_exit_step", exit_net) ] ) steps.append(step) step = core.execution_step( "sender_step", [ core.execution_step('init_step', global_init_net), core.execution_step( "sender_steps", steps, concurrent_substeps=True), core.execution_step('finish_step', global_exit_net), ]) return out_queue, step class ProcessingReader(Reader): """ Reader that reads from a upstream reader, calls the processor, and returns the processed record. """ def __init__(self, reader, processor): Reader.__init__(self) self.reader = reader self.processor = make_processor(processor) def setup_ex(self, init_net, finish_net): self.reader.setup_ex(init_net, finish_net) def read_ex(self, init_net, exit_net): read_nets, status, rec = self.reader.read_record_ex(init_net, exit_net) with NetBuilder(): # Current NetBuilder is optionally used inside the processor, # then its children are retrived inside of # normalize_processor_output. # Once readers and writers also use NetBuilder, # this logic will be more natural. result = normalize_processor_output(self.processor(rec)) read_nets += result.nets if result.should_stop is not None: stop_net = core.Net('stop_net') stop_net.Copy([result.should_stop], [status]) read_nets.append(stop_net) if hasattr(self.processor, 'setup'): init_net.add_attribute(TaskGroup.LOCAL_SETUP, self.processor) self._set_schema(result.record) fields = result.record.field_blobs() if result.record else None return read_nets, status, fields class NetProcessor(object): """ Processor that clones a core.Net each time it's called, executing the cloned net as the processor. It requires the Net to have input and (optionally) output records set, with net.set_input_record() and net.set_output_record(). """ def __init__(self, net, stop_signal=None, thread_init_nets=None): assert isinstance(net, core.Net) assert stop_signal is None or isinstance( stop_signal, core.BlobReference) self.thread_init_nets = thread_init_nets or [] self.net = net self._stop_signal = stop_signal self._blob_maps = [] self._frozen = False self._cloned_init_nets = [] def setup(self, init_net): self._frozen = True cloned_init_nets = self._cloned_init_nets self._cloned_init_nets = [] return cloned_init_nets def __call__(self, rec): assert not self._frozen prefix = '/worker:%d/' % len(self._blob_maps) blob_remap = {} for net in self.thread_init_nets: new_net, _ = core.clone_and_bind_net( net, str(net) + prefix, prefix, blob_remap) self._cloned_init_nets.append(new_net) new_net, remappings = core.clone_and_bind_net( self.net, str(self.net) + prefix, prefix, blob_remap, rec) if self._stop_signal is None: stop_signal = None elif str(self._stop_signal) in remappings: stop_signal = core.BlobReference( remappings[str(self._stop_signal)], net=new_net) else: stop_signal = self._stop_signal self._blob_maps.append(remappings) return Output([new_net], new_net.output_record(), stop_signal) def blob_maps(self): self._frozen = True return self._blob_maps