Utilities

The framework provides a number of convenience and utility functions. These are not fundamental to the operation of the framework itself, but rathr provude functionality which is commonly needed in pipeline recipes.

Parset handling

Parsets (“parameter sets”) are files containing key-value pairs commonly used for the configuration of LOFAR tools. Many pipeline recipes will, at heart, run a standard tool over a number of datasets in parallel by substituting per-dataset values into a template parset. These routines are designed to simplify that process.

class lofarpipe.support.parset.Parset(filename=None, caseInsensitive=False)

Bases: lofar.parameterset.parameterset

This wraps lofar.parameterset to provide a convenient means of iterating over the parameterset’s keys.

It should be replaced (eventually) by rolling this functionality directly into the C++ implementation.

lofarpipe.support.parset.get_parset(parset_filename)

Returns an instance of Parset with the given file loaded.

lofarpipe.support.parset.patch_parset(parset, data, output_dir=None)

Generate a parset file by adding the contents of the data dictionary to the specified parset object. Write it to file, and return the filename.

parset may either be the filename of a parset-file or an instance of lofar.parameterset.parameterset.

lofarpipe.support.parset.patched_parset(*args, **kwds)

Wrap patch_parset() in a contextmanager which removes the generated parset when it finishes.

The never_unlink flag is to facilitate debugging – one can leave a patched parset in place for manual checking if required.

Cluster descriptions (clusterdesc) handling

Clusterdesc files (see distproc) describe the layout of the compute cluster. They can be used within the pipeline to help choose nodes to which jobs may be dispatched.

class lofarpipe.support.clusterdesc.ClusterDesc(filename)

Wrap a clusterdesc file, providing a more convenient, Pythonic interface for accessing its contents.

lofarpipe.support.clusterdesc.get_compute_nodes(clusterdesc)

Return a list of all compute nodes defined (under the key “Compute.Nodes”) in the ClusterDesc class object clusterdesc.

lofarpipe.support.clusterdesc.get_head_node(clusterdesc)

Return the head node, defined by the key “Head.Nodes” in the ClusterDesc class object clusterdesc.

Always return the first head node, even if there are several defined.

Grouping input data

Often, a long list of input datasets are grouped according to some criteria for processing. These routines provide some useful ways of batching-up data.

lofarpipe.support.group_data.group_files(logger, clusterdesc, node_directory, group_size, filenames)

Group a list of files into blocks suitable for simultaneous processing, such that a limited number of processes run on any given host at a time.

All node_directory on all compute nodes specified in clusterdesc is searched for any of the files listed in filenames. A generator is produced; on each call, no more than group_size files per node are returned.

lofarpipe.support.group_data.gvds_iterator(gvds_file, nproc=4)

Reads a GVDS file.

Provides a generator, which successively returns the contents of the GVDS file in the form (host, filename), in chunks suitable for processing across the cluster. Ie, no more than nproc files per host at a time.

lofarpipe.support.group_data.load_data_map(filename)

Load map-file filename containing tuples of (host,filepath)

Process control

Many pipeline recipes spawn an external executable and wait for it to complete. These routines can assist the recipe author by simplifying this process and automatically recovering from transient errors.

lofarpipe.support.utilities.spawn_process(cmd, logger, cwd=None, env=None, max_tries=2, max_timeout=30)

Tries to spawn a process.

If it hits an OSError due to lack of memory or too many open files, it will keep trying max_tries times, waiting timeout seconds between each.

If successful, the process object is returned. Otherwise, we eventually propagate the exception.

lofarpipe.support.utilities.catch_segfaults(cmd, cwd, env, logger, max=1, cleanup=<function <lambda> at 0x378a5f0>)

Run cmd in cwd with env, sending output to logger.

If it segfaults, retry upto max times.

File and directory maniupulaton

lofarpipe.support.utilities.get_mountpoint(path)

Return the path to the mount point containing the given path.

Parameters:path – Path to check
lofarpipe.support.utilities.create_directory(dirname)

Recursively create a directory, without failing if it already exists.

Iterators and generators

lofarpipe.support.utilities.is_iterable(obj)

Return True if the given object is iterable, else False.

#.. autofunction:: lofarpipe.support.utilities.izip_longest #TODO: problem with this function.. dunno what

lofarpipe.support.utilities.group_iterable(iterable, size)

Group the iterable into tuples of given size. Returns a generator.

Example: >>> for x in group([1,2,3,4,5], 3): print x (1, 2, 3) (4, 5)

Miscellaneous

lofarpipe.support.utilities.read_initscript(logger, filename, shell='/bin/sh')

Return a dict of the environment after sourcing the given script in a shell.

lofarpipe.support.utilities.string_to_list(my_string)

Convert a list-like string (as in pipeline.cfg) to a list of values.

Exceptions

The follow exceptions may be raised by pipeline components.

exception lofarpipe.support.lofarexceptions.ClusterError
exception lofarpipe.support.lofarexceptions.ExecutableMissing
exception lofarpipe.support.lofarexceptions.PipelineException
exception lofarpipe.support.lofarexceptions.PipelineQuit

If this exception is raised during a pipeline run, we skip over all subsequent steps and exit cleanly.

exception lofarpipe.support.lofarexceptions.PipelineReceipeNotFound
exception lofarpipe.support.lofarexceptions.PipelineRecipeFailed