********************** Recipe design tutorial ********************** The pipeline framework provides a simple system for dispatching compute-intensive jobs to remote hosts. This includes sending and receiving complex data as part of the job. This section provides a tutorial introduction to writing a recipe which takes advantage of this capability. Problem specification ===================== This tutorial addresses a simple real-world use case. This example was suggested by Anastasia Alexov, and addresses a requirement of the LOFAR Pulsar Pipeline. The pulsar pipeline runs across multiple compute nodes, generating a series of thumbnail plots on the local storage of each one -- that is, the plots are only accessible by a process running on the compute node, and are not exported via NFS or similar. The aim is to combine all the thumbnails on a given host into a single image, using the ``montage`` command provided by `ImageMagick <http://www.imagemagick.org/>`_. It is assumed that the thumbnails reside in the same path on each node. An initial implementation of the code which runs on each node was provided as a ``ksh`` script. .. code-block:: ksh :linenos: #!/bin/ksh #find all the th.png files and convert them into a list to paste together using "montage". if [ -f combined.th.png ] then echo "WARNING: deleting previous version of results: combined.th.png" rm combined.th.png fi find ./ -name "*.th.png" -print > /tmp/$$_combine_col1.txt find ./ -name "*.th.png" -print | sed -e 's/\// /g' -e 's/^.* //g' -e 's/.*_RSP/RSP/g' -e 's/\..*//g' -e 's/_PSR//g' > /tmp/$$_combine_col2.txt paste /tmp/$$_combine_col1.txt /tmp/$$_combine_col2.txt | awk '{print "-label "$2" "$1" "}' | tr -d '\n' | awk '{print "montage -background none "$0" combined.th.png"}' > combine_png.sh rm /tmp/$$_combine_col1.txt /tmp/$$_combine_col2.txt wc_convert=`wc -l combine_png.sh | awk '{print $1}'` if [[ $wc_convert > 0 ]] then chmod 777 combine_png.sh echo "Executing the following comamnd: " cat combine_png.sh ./combine_png.sh echo "" echo "Results: combined.th.png" echo "" else echo "" echo "No thumbnail (\*.th.png) files were found to combine." echo "" fi exit 0 Per-node script =============== First, we will consider the processing that must be done on each of the remote hosts. We start by converting the ``ksh`` script to a native Python version, then refining it to best take advantage of the framework capabilities. It may be worth emphasising that the conversion to Python is optional: an alternative approach would be to run code each node which simply spawned a copy of ``ksh`` and executed the script directly. In general, though, minimising forking is a wise approach -- and the Python code provides better opportunity to demosntrate the framework capabilities. First Python implementation --------------------------- A simple Python implementation of functionality similar to that provided by the ``ksh`` script is shown below. .. code-block:: python :linenos: import glob import subprocess import os def run(file_pattern, input_dir, output_file, clobber): # Returns 0 for success, 1 for faliure # Sanity checking if not os.path.exists(input_dir): return 1 if os.path.exists(output_file): if clobber: os.unlink(output_file) else: return 1 # Build list of input files input_files = glob.glob(os.path.join(input_dir, file_pattern)) try: # Run "montage" command subprocess.check_call(['/usr/bin/montage'] + input_files + [output_file]) except Exception, e: return 1 return 0 Note the following: - The Python version has been implemented as a function (``run``). - Success or failure is indicated by the return value of the function: in true Unix fashion, ``0`` represents success. - We allow the user to specify whether the output should be overwritten using the ``clobber`` argument. - The user can also specify the pattern of filenames to be searched for (so this code can be more generic than the simple ``*.th.png`` in the ``ksh`` version). - Arguments also enable the user to specify both the directory to search for thumbnail files, and the directory into which the output file should be written. - For simplicity, we have not implemented the logic used to add titles to the images (but extending the code to do so would be trivial). - Standard Python code is used to implement all the required functionality, with no added complexity. In particular, Python's `subprocess <http://docs.python.org/library/subprocess.html>`_ module is used to spawn the ``montage`` command. Using the :class:`~lofarpipe.support.lofarnode.LOFARnodeTCP` class ------------------------------------------------------------------ To integrate the Python code developed above into the framework, some minimal changes are required. First, we take our ``run()`` function, and make it a method of a class derived from :class:`lofarpipe.support.lofarnode.LOFARnodeTCP`. Secondly, we add some boilerplate such that when the script is run from the command line, it takes three arguments, then instantiates the class we have defined and executes its :meth:`~lofarpipe.support.lofarnode.LOFARnodeTCP.run_with_stored_arguments` method. Note that the script then exits with the value returned by that method. The result is shown below. .. code-block:: python :linenos: import sys import subprocess import glob import os from lofarpipe.support.lofarnode import LOFARnodeTCP class thumbnail_combine(LOFARnodeTCP): def run(self, file_pattern, input_dir, output_file, clobber): # Returns 0 for success, 1 for faliure # Sanity checking checking if not os.path.exists(input_dir): return 1 if os.path.exists(output_file): if clobber: os.unlink(output_file) else: return 1 # Build list of input files input_files = glob.glob(os.path.join(input_dir, file_pattern)) try: # Run "montage" command subprocess.check_call(['/usr/bin/montage'] + input_files + [output_file]) except Exception, e: return 1 return 0 if __name__ == "__main__": jobid, jobhost, jobport = sys.argv[1:4] sys.exit(thumbnail_combine(jobid, jobhost, jobport).run_with_stored_arguments()) Logging ------- Within the :class:`lofarpipe.support.lofarnode.LOFARnode` environment, we now have access to some other framework-provided services. Chief amont these is logging. The script is therefore updated to be more robust against failures and to report progress to the logger. .. code-block:: python :linenos: import sys import subprocess import glob import os from lofarpipe.support.lofarnode import LOFARnodeTCP class thumbnail_combine(LOFARnodeTCP): def run(self, file_pattern, input_dir, output_file, clobber): if not os.path.exists(input_dir): self.logger.error("Input directory (%s) not found" % input_dir) return 1 self.logger.info("Processing %s" % input_dir) if os.path.exists(output_file): if clobber: self.logger.warn( "Deleting previous version of results: %s" % output_file ) os.unlink(output_file) else: self.logger.error( "Refusing to overwrite existing file %s" % output_file ) return 1 input_files = glob.glob(os.path.join(input_dir, file_pattern)) try: # Run "montage" command subprocess.check_call(['/usr/bin/montage'] + input_files + [output_file]) except Exception, e: self.logger.error(str(e)) return 1 if not os.path.exists(output_file): self.logger.error( "Output file %s not created by montage exectuable" % output_file ) return 1 return 0 if __name__ == "__main__": jobid, jobhost, jobport = sys.argv[1:4] sys.exit(thumbnail_combine(jobid, jobhost, jobport).run_with_stored_arguments()) Note that ``self.logger`` in the above is an instance of :class:`logging.logger` from the `Python standard library <http://docs.python.org/library/logging.html>`_, with all the features that implies. Any messages sent to the logger will be automatically integrated with the overall pipeline logging system. Helper functions ---------------- The pipeline framework provides some (entirely optional!) convenience functions which can help the recipe author address common use cases. The :func:`~lofarpipe.support.utilites.catch_segfaults` function, for example, can automatically recover and re-run an external command in the event that it results in a segmentation fault. This can be integrated into our existing script as follows. .. code-block:: python :linenos: import sys import glob import os from lofarpipe.support.lofarnode import LOFARnodeTCP from lofarpipe.support.utilities import catch_segfaults class thumbnail_combine(LOFARnodeTCP): def run(self, executable, file_pattern, input_dir, output_file, clobber): if not os.path.exists(input_dir): self.logger.error("Input directory (%s) not found" % input_dir) return 1 self.logger.info("Processing %s" % input_dir) if os.path.exists(output_file): if clobber: self.logger.warn( "Deleting previous version of results: %s" % output_file ) os.unlink(output_file) else: self.logger.error( "Refusing to overwrite existing file %s" % output_file ) return 1 input_files = glob.glob(os.path.join(input_dir, file_pattern)) command_line = [executable] + input_files + [output_file] try: catch_segfaults(command_line, None, None, self.logger) except Exception, e: self.logger.error(str(e)) return 1 if not os.path.exists(output_file): self.logger.error( "Output file %s not created by montage exectuable" % output_file ) return 1 return 0 if __name__ == "__main__": jobid, jobhost, jobport = sys.argv[1:4] sys.exit(thumbnail_combine(jobid, jobhost, jobport).run_with_stored_arguments()) Note that we have also added the ``executable`` argument to define which external command should actually be run. There is no reason to avoid making the code as generic and reusable as possible! At this point, our node script is complete (at least in this simple form). To be useful, though, it needs to be executed across many different nodes as part of a pipeline. This is where the *recipe* needs to be defined. Defining the recipe =================== As described in the :ref:`overview <framework-overview>`, a recipe is the basic building block of pipelines: they describe how to perform an individual unit of pipeline processing. In this case, our recipe will specify the inputs for the node script we have written above, dispatch the jobs to a number of compute nodes, and finally collect the results. A basic recipe -------------- All pipeline recipes ultimately derive from :class:`lofarpipe.support.baserecipe.BaseRecipe`. A trivial example is shown below. .. code-block:: python :linenos: import sys from lofarpipe.support.baserecipe import BaseRecipe class thumbnail_combine(BaseRecipe): def go(self): self.logger.info("Starting thumbnail_combine run") super(thumbnail_combine, self).go() self.logger.info("This recipe does nothing") if __name__ == '__main__': sys.exit(thumbnail_combine().main()) This recipe does nothing except print a couple of lines to the log. However, note the following key features: - The control code for the recipe is all implemented within the ``go()`` method of a class derived from :class:`lofarpipe.support.baserecipe.BaseRecipe`. - Within that environment, we have access to a logger, which works in exactly the same way as it does on the node. (Enthusiasts may wish to note that this is actually an instance of :class:`lofarpipe.support.pipelinelogging.SearchingLogger`, but the practical difference is minimal). - It is important to call the ``go()`` method of the superclass (as shown at line 7) to ensure all the necessary initialisation is performed. - If called from the command line, we instantiate the object, call its ``main()`` method, and exit with its return value. Dispatching remote jobs ----------------------- One of the most fundamental aspects of the framework is its ability to dispatch jobs to remote hosts, and this is absolutely necessary for the problem under discussion. We can add this to the recipe as follows. .. code-block:: python :linenos: import sys from lofarpipe.support.baserecipe import BaseRecipe from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn from lofarpipe.support.remotecommand import ComputeJob class thumbnail_combine(BaseRecipe, RemoteCommandRecipeMixIn): def go(self): self.logger.info("Starting thumbnail_combine run") super(thumbnail_combine, self).go() # Hosts on which to execute hosts = ['lce019'] # Path to node script command = "python %s" % (self.__file__.replace('master', 'nodes')) # Build a list of jobs jobs = [] for host in hosts: jobs.append( ComputeJob( host, command, arguments=[ "/usr/bin/montage", # executable "\*.th.png", # file_pattern "/path/to/png/files", # input_dir "/path/to/output.png", # output_file True # clobber ] ) ) # And run them self._schedule_jobs(jobs) # The error flag is set if a job failed if self.error.isSet(): self.logger.warn("Failed compute job process detected") return 1 else: return 0 if __name__ == '__main__': sys.exit(thumbnail_combine().main()) This raises a number of relevant points to note. - The distribution system is activated for a given recipe by "mixin-in" the :class:`~lofarpipe.support.remotecommand.RemoteCommandRecipeMixIn` class to its definition. - In this case, we execute on only one remote host (``lce019``, as defined at line 13). However, as many as necessary could be defined. - Each remote processing job is defined as an instance of :class:`~lofarpipe.support.remotecommand.ComputeJob`. It takes three arguments: the name of the host on which to execute, the name of the command to be run, and any arguments which should be passed to that command. These are provided in lines 23 to 30. - The command to run can be any Python script. By convention, node scripts are named such that the name can be derived from the recipe name as shown at line 16, but this is entirely up to the author. - The arguments provided to :class:`~lofarpipe.support.remotecommand.ComputeJob` correspond exactly to those defined in the node script, above. - After all the jobs have been defined, they are passed (as a list) to :meth:`~lofarpipe.support.remotecommand.RemoteCommandRecipeMixIn._schedule_jobs`. This blocks until all jobs have finished. - If a job fails, the ``error`` attribute (an instance of :class:`threading.Event` from `Python's standard library <http://docs.python.org/library/threading.html>`_ is set. The recipe should check for this and act appropriately. Ingredients ----------- The recipe shown in the previous section contains many hard-coded elements: all the arguments to the compute job, the host on which to run, and so on. This is obviously inflexible and undesireable. We can overcome this using the *ingredients* system provided by the framework. An example is shown below. .. code-block:: python :linenos: import sys import lofarpipe.support.lofaringredient as ingredient from lofarpipe.support.baserecipe import BaseRecipe from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn from lofarpipe.support.remotecommand import ComputeJob class thumbnail_combine(BaseRecipe, RemoteCommandRecipeMixIn): inputs = { 'executable': ingredient.ExecField( '--executable', default="/usr/bin/montage", help="montage executable" ), 'file_pattern': ingredient.StringField( '--file-pattern', default="\*.th.png", help="File search pattern (glob)", ), 'input_dir': ingredient.StringField( '--input-dir', help="Directory containing input files" ), 'output_file': ingredient.StringField( '--output-file', help="Output filename" ), 'clobber': ingredient.BoolField( '--clobber', default=False, help="Clobber pre-existing output files" ), 'target_hosts': ingredient.ListField( '--target-hosts', help="Remote hosts on which to execute" ) } def go(self): self.logger.info("Starting thumbnail_combine run") super(thumbnail_combine, self).go() hosts = self.inputs['target_hosts'] command = "python %s" % (self.__file__.replace('master', 'nodes')) jobs = [] for host in hosts: jobs.append( ComputeJob( host, command, arguments=[ self.inputs['executable'], self.inputs['file_pattern'], self.inputs['input_dir'], self.inputs['output_file'], self.inputs['clobber'] ] ) ) self._schedule_jobs(jobs) if self.error.isSet(): self.logger.warn("Failed compute job process detected") return 1 else: return 0 if __name__ == '__main__': sys.exit(thumbnail_combine().main()) Using this system, the recipe author defines a list of inputs to the recipe. Each input is an instance of a class descended from :class:`lofarpipe.support.lofaringredients.Field`: the various sub-types of field enable the user to perform sanity-checking of inputs. For example, in the above, we can check that the executable provided really is an executable by making the relevant field an instance of :class:`~lofarpipe.support.lofaringredients.ExecField`, and that the ``clobber`` value is really a bool by making its field :class:`~~lofarpipe.support.lofaringredients.BoolField`. The :ref:`developer's guide <lofarpipe-ingredients>` provides a lot more information about the types of field available. Each of the ingredients is associated with a name in the ``inputs`` dict. Within the recipe, the values of the inputs are available as ``self.inputs[FIELDNAME]``, as seen (for example) at line 43. The various inputs can take their values from a number of sources. For example, as we will see, inputs can be read from the command line, provided in a configuration file, or take the default value specified in their definition. Whatever the source, though, they are always made available to the recipe in a consistent way: a :class:`~~lofarpipe.support.lofaringredients.BoolField` *always* contains a bool, and so on. User-defined ingredients ------------------------ The ingredients system is designed to take care of as much error & sanity checking for the developer as is possible. It is therefore extensible: as well as checking for basic types as shown above, we can construct specialist fields to (for example) check that a given input falls within a particular range. In this case, we know that ``target_hosts`` should be a list of hostnames of machines to which jobs may be dispatched. Above, we used :class:`~lofarpipe.support.lofaringredients.ListField` to simply check that it is a list. However, with a little imagination, we can define a list that is guaranteed to contain only resolvable hostnames. For example: .. code-block:: python :linenos: import lofarpipe.support.lofaringredient as ingredient class HostNameList(ingredient.ListField): @classmethod def is_valid(value): import socket for hostname in value: try: socket.gethostbyname(hostname) except: return False return True This checks that every element within the list is resolveable (using Python's standard :func:`socket.gethostbyname` function). We could incorporate it into the above recipe by simply changing line 33 to: .. code-block:: python 'target_hosts': HostNameList( Configuration file access ------------------------- In the above, we have expected the user to supply a list of hosts to run jobs on directly. However, in general the cluster layout is already known: this can, therefore, be determined automatically. As part of the :ref:`pipeline configuration <config-file>`, the user is able to specify a ``clusterdesc`` parameter. This contains the full path to a file which describes the cluster layout (see :ref:`the note on distproc <distproc-blurb>` for details). The recipe can access the pipeline configuration and extract the information from this file directly. We can simply drop the ``target_hosts`` input from our recipe, and replace line 43 with: .. code-block:: python from lofarpipe.support.clusterdesc import ClusterDesc, get_compute_nodes hosts = get_compute_nodes( ClusterDesc( self.config.get('cluster', "clusterdesc") ) ) There are a number of points to note here. The pipeline configuration file is available as the ``self.config`` attribute in the recipe. This is an instance of :class:`ConfigParser.SafeConfigParser` from the `standard library <http://docs.python.org/library/configparser.html>`_, and can be accessed exactly as described in the Python documentation. Here, we simply extract the value of ``clusterdesc`` from the ``cluster`` section. The framework provides some convenience routines from working with clusterdesc file. Here, we use :class:`lofarpipe.support.clusterdesc.ClusterDesc` and :func:`~lofarpipe.support.clusterdesc.get_compute_nodes` to extract a list of all the compute nodes defined in the cluster, and then proceed to use the list of hosts in the recipe exactly as before. Additional notes ================ Some important aspects of recipe design were not covered in the above discussion. Assigning jobs to specific hosts -------------------------------- The example we have considered above is, in one important respect, simpler than many pipeline recipes: it runs exactly the same code on each of the remote hosts. A more general situation is processing a large number of similar, but not identical, datasets (such as independent subbands of an observation). Due to limited storage capacities on the remote hosts, it is usually the case that each host only stores a subset of the total number of datasets locally. Therefore, when dispatching jobs to the host, the recipe author must be careful only to send jobs which refer to data it can reasonably process. From the recipe point of view, this procedure is straightforward. The recipe developer earlier contains code like: .. code-block:: python jobs = [] for host in hosts: jobs.append( ComputeJob( host, command, arguments=[ ... ] ) When specifying a job which must run on a specific host, the pipeline author can use a mapping of the form: .. code-block:: python job_list = [ ("hostname1", [arguments for job 1]), ("hostname2", [arguments for job 2]), ... ] And our earlier code can then simply be modified to: .. code-block:: python jobs = [] for host, arguments in job_list: jobs.append( ComputeJob( host, command, arguments=arguments ) In general, the recipe author must define the mapping between hostnames and job arguments themselves: this will depend on the details of the problem the recipe is addressing. Store them to disk in a :ref:`parset file <parset-handling>` formatted as follows: .. code-block:: none hostname1 = [ /path/to/filename1, /path/to/filename2 ] hostname2 = [ /path/to/filename3, /path/to/filename3 ] ... The :func:`lofarpipe.support.group_data.load_data_map` function makes it easy to read back this parset from disk and iterate over the values to dispatch compute jobs: see the imaging pipeline's :ref:`dppp-recipe` recipe for an example. .. todo:: Recipe outputs .. todo:: Combining recipes into a pipeline .. todo:: Testing this recipe by running it