One of the DRP team's top priorities in S15 is synchronization with the HSC fork of the stack, both because the divergence between the codebases has made joint development much more difficult, and because the HSC stack is now considerably ahead of the LSST stack in the quality and sophistication of the pipeline itself.  The HSC stack has large-scale parallelization capabilities that are not present on the LSST side, and while these may not be future-proof for LSST in either interface or implementation, they have been critical for rapid testing and development of algorithms on the HSC stack, and could play an important role in some upcoming LSST needs (including integration testing and a).  This page describes those capabilities and the way we use them on the HSC side, in the hopes that we can find a way to provide equivalent functionality on the LSST side and put together a migration plan that will allow us to transition smoothly to an approach based on the newer parallelization technologies the LSST middleware team is considering.

This page makes no attempt to account for more sophisticated parallelization needs that DRP may have in the future.  I think the capabilities describe here will be sufficient to get DRP through the next year of development, and we'll likely need very little beyond that for the next three years.

Requirements

Multi-node scatter/gather in Python.

For example: we want to be able to run Task1 at a {visit, ccd} level, then run Task2 at {visit} level, gathering the per-CCD outputs from Task1 and passing them as a list to Task2.

The vast majority of the work will be done at the most-scattered level, and we are currently content ignore CPU time losses due to parallelization overheads or time spent in less-parallel processing (e.g. in Task2).

We have no cases where the gathered outputs from scattered tasks will exceed the memory available on a typical core, so there's no need to run less-parallel steps in any special way.

In some cases, the number of scattered jobs will exceed the number of cores per node, and we would like to be able to run them all simultaneously, even if this results in more idle CPUs in the less-parallel steps, because a quick turnaround for moderate-size jobs is more important during development than maximizing CPU usage.  I think we'd still be willing to consider approaches that supported only intra-node scatter/gather (and hence just queued some jobs until cores became available on the same node), if that brought some other advantages.  In that case, however, we'd still definitely need to be able to submit jobs that are parallelized over multiple nodes with intra-node sequence points (for instance, in the example above we'd need to be able to process multiple visits in parallel on different nodes).

We will need to be able to pass basic afw types (geometries, ExposuresCatalogs, etc.) between Tasks with different parallelization axes as arguments (either IPC or temporary shared storage to simulate it would likely be acceptable - though I don't know much about the tradeoffs - but we don't want the overhead of needing a Butler dataset for every intermediate data product).

Consistent interface for multiple batch systems

We'd like a command-line interface for submitting batch jobs with the same options as CmdLineTask for passing data IDs and configuration parameters, with only a small amount of additional information needed to switch from intra-node multiprocessing to a large scale batch job - typically a job name, a queue name, the number of nodes and cores requested, and an estimate of the time per data unit (from which each top-level script can estimate the total processing time, given the data IDs passed).  Obviously, some of this is in the top-level scripts themselves, which should be written and maintained by Science Pipelines team members.

In order to avoid maintaining distinct parallelization systems on HSC and LSST, we need to be able to support at least PBS and Slurm, which are used on HSC clusters.

The same top-level script should be usable both for batch-job submission and intra-node processing of smaller jobs (using e.g. Python's multiprocessing module), with (ideally) only a slight difference in command-line options.

Current HSC Implementation

From Paul Price:

The implementation for these we use for HSC is in the hscPipeBase package (https://github.com/HyperSuprime-Cam/hscPipeBase). There are two main features:

1. A process pool using MPI (pool.py) so you can do scatter/gather over multiple nodes (or on a single node).  This is mainly inspired by multiprocessing.Pool, and the idea is to make it as easy for the user as I could.  We used to have code where the master and slave nodes took the same path, and had "if rank == 0" scattered all over, which made it hard to follow and keep track of what was defined where.  With the process pool, you run functions (or methods) on the nodes, so everything's self-contained, and the program flow is much easier to follow.  This file also has some additional conveniences for doing the MPI (e.g., a pickleSniffer context manager that catches a pickling error and attempts to figure out which object caused it).

2. Facilities to submit jobs to batch systems (parallel.py).  This currently supports PBS, Slurm and SMP (for a single node), and I think Condor support could be added easily.  There is a Task class to support this, analogous to CmdLineTask.  There's also a Task class that does this and starts up the process pool, which is what we use for our exposure and stack processing.

While I'm sure this implementation isn't perfect, yet it serves our needs for HSC and perhaps it might be useful, if only as a starting point for discussion, for LSST.

I'll go further and suggest that it's probably easiest if we just bring this implementation over to LSST, while focusing on changes that will remove any assumptions about the implementation from the interface.  While we're using MPI in the implementation (just because it's what we're familiar with, and it's ubiquitous on existing clusters), I'd guess the concepts we're presenting to Task writers are already compatible with other IPC tools we may be considering.  I think it's also worth keeping in mind that most algorithm code will be quite decoupled from even the interfaces here, and I don't think there's any danger of being "locked in" to a particular interface with whatever we come up with here; there will only ever be a handful of relatively small scripts that make use of it.

Note that mpi.py in hscPipeBase has been retired and would not need to be ported to LSST.

Current Use Cases

HSC has several top-level BatchPoolTasks that make use of the parallelization interfaces, which essentially just aggregate regular CmdLineTasks that are present on the LSST side (or will be soon).  These component CmdLineTasks can still be run in serial or with the "-j" single-node Python multiprocessing option.  The top-level BatchPoolTasks include:

ProcessExposureTask

For each visit, this runs ProcessCcdTask in parallel over CCDs, then gathers all CCDs from a visit together to compute visit-wide astrometric and photometric solutions.  In the near future, we will probably need to modify this to scatter again to multiple CCDs after the visit-wide processing.  Because the per-CCD processing dominates the compute time and number of CCDs in a visit is larger than the typical number of cores per node, this is the main driver for intra-node scatter/gather.

StackTask

Data unit: tract+filter (automatically parallelizes over patches).

For each patch+filter, this runs MakeCoaddTempExpTask in parallel over visits, then gathers all visits in the patch+filter and runs AssembleCoaddTask and ProcessCoaddTask.  We'll likely remove most of ProcessCoaddTask from this stage in the future, as it's being replaced by the new multi-band processing.  This will automatically 

MultiBandTask

For each patch, this merges the per-filter parent Footprints and Peaks from ProcessCoaddTask, generating a new set of cross-band-consistent parent sources.  We then scatter to deblend and measure separately in each band, then gather to create a unified catalog to use for forced photometry, then scatter once more to run forced photometry (on coadds) in different bands.

Possible Future Use Cases

Integration Testing

Critical pieces of our codebase are essentially not being tested in an automated way right now, because they require a significant amount of input data and earlier processing to be exercised.  While we ultimately need to come up with a "smarter" test data package that includes many of pipeline intermediate outputs for small-scale tests of algorithms, it's both easier and more important to start running moderately large quantities of data through the single-frame, coaddition, and coadd-processing pipeline on a regular basis.  While we could build scripts that essentially hard-wire the parallelization and processing flow for a test dataset, running the long sequence of CmdLineTasks in the same way a human would run them manually, it'd be a much better use of developer time to have the integration test harness delegate to the driver scripts enabled by these parallelization capabilities.

S15 and W15 Algorithm Development

Near the end of S15 we're scheduled to start working on full focal-plane PSF estimation, which involves precisely the kind of scatter/gather we currently do on the HSC side in ProcessExposureTask.  W16 work on the deblender and background matching will require similar operations.  At present, we don't really have a plan for how to do this without constructs like those described here (I could imagine workarounds involving Python multiprocessing and internode-only scatter/gather, but we'd rather have a better option available).

S15 Galaxy Shear Precision Experiments

The work in  DM-1108 - Getting issue details... STATUS  will require running a our galaxy-fitting code on a large suite of postage-stamp simulations.  While none of the processing flow will look anything like a pipeline for real data, we'll likely need to come up with some way to parallelize it over multiple nodes on large clusters, and we'd almost certainly make use of the capabilities described here if they were available on the LSST side.

Timeline

If we agree on an approach that involves mostly just moving over the HSC implementation, we'd like to start on that immediately (and the DRP team can and probably should do much of the work, depending on where exactly this lands in the LSST codebase and how much modification is needed).  We're already moving over multi-band processing code that would be much easier to test if it and the steps that proceed it could be run at larger scale in a more automated way (and we have plenty of compute resources at Princeton to do this right now, so that's not a limiting factor).  If the approach we agree on requires significant new code on the LSST side, without the HSC implementation being available as a stop-gap measure, we'd really like to have that in place by the end of S15 at the latest; we're still hoping to be able to completely synchronize with the HSC stack by then and essentially move all HSC-driven algorithm development to the main LSST codebase.

Comparison with ctrl_execute

The existing ctrl_execute and ctrl_platform_* packages provide some batch submission functionality and another approach to allow the same code to be run on differently configured systems.  These existing LSST tools are largely focused on different technologies (condor glide-in and orca, with some use of PBS to allocate nodes in advance on certain systems) than the HSC tools (direct submission to PBS or slurm, or local execution on a single node, with MPI controlling the program flow).  While we probably want to unify these at some point, it looks like it would require significant changes to ctrl_execute and ctrl_platform_* to support non-condor/orca jobs.

The ctrl_platform_* packages provide configuration that insulates users from having to know the details of how to submit a batch job on a particular cluster.  On the HSC side, these are mostly command-line arguments that have to be provided by the user, requiring such users to remember a bit more about the platform than they'd ideally like to know.  I think it would make sense to create new directories in the ctrl_platform_* packages to provide defaults for these command-line arguments, while still allowing them to be overridden on the command line (e.g. some clusters may have multiple queues we'd want to submit to, depending on the size of the job, so the default may not always be best).  This would be entirely parallel to the configuration in ctrl_platform_* that's used by ctrl_execute for now.

However, if we bring the HSC tools to the LSST with little or no merging with ctrl_execute, I don't think we'll have a way to use those new tools on any NCSA machines, which, to my knowledge, all use condor for batch submission.  Paul Price has stated that it should be fairly trivial to add condor support to the HSC tools, and if it's roughly comparable to PBS and slurm in functionality, I think that's right; the HSC tools provide a pretty good abstraction layer for batch systems.  But I'm also no certain what sort of technologies LSST will be using for this in the future; I think it may have been a while since anyone used ctrl_execute (it certainly hasn't seen any recent development), and our hardware platform plans seem to be evolving rapidly, so I'm not sure if we even want to be targeting condor in order to be able to run on non-XSEDE NCSA machines in the future.

With a future merging of the HSC and ctrl_execute functionality in mind (or the development of a completely separate approach that supersedes both), here are a few more points of comparison that may be useful:

  • The HSC tools tie more naturally into the command-line parsing capabilities provided by CmdLineTask than ctrl_execute does, reducing code duplication in those areas and providing a slightly more familiar interface to users.  However, this requires inheriting from a particular base class, limiting the command-line drivers that can be submitted to batch systems to a few higher-level drivers (lower-level CmdLineTask cannot be submitted to batch systems at all).  This is very closely related to the work Unknown User (mcarras2) and Gregory Dubois-Felsmann are planning to separate argument parsing from CmdLineTask; I suspect that work will make it much easier to get the best of both worlds here.
  • The HSC tools support much more complex parallelization, but seem to have little concern for load-balancing, which may result in much larger idle times than we've seen while using ctrl_execute, at least for very large jobs.  Of course, with more complex parallization minimizing idle time is a much harder problem.
  • The HSC tools provide very convenient functionality to estimate the total walltime of a job (a requirement for most batch queues) from a user-specified time for just one part of the processing.  This isn't quite right; we need multiple knobs for the user to be able to turn, and the driver task should provide sensible defaults for these.  But it's vastly better than having the user estimate the wall time on their own.
  • The HSC tools assume the worker nodes have access to at least some of the same filesystems as the head node, and moreover assume that both the EUPS-managed software stack and the data are available to both in the same location.  These assumptions appear not to be valid for some XSEDE systems.

 

 

  • No labels

1 Comment

  1. I believe these packages may still be in active use both on the LSST HTCondor pool and in some of the Docker work;  I know it was previously used to run jobs on Blue Waters as well.   Once these were released, I know that there were a couple of instances of people running jobs using these tools on the local pool without me being aware of it until much later.

    It should be possible to take the templating mechanisms that ctrl_execute/ctrl_platform_* use and extend them to additional tools to support non-condor/orca jobs.  These packages are used together to take templates of scripts and configuration files, substitute keywords in those files, and then write out the results.   While most of this is HTCondor specific, it should be possible to use this same idea to add additional tools to support other use cases.   There probably should be some refactoring to separate the getenv/setup portions so we have the option to write setup scripts that will execute faster when starting up jobs.

    We're currently using HTCondor for AP, and when we were sketching out the DRP tasks, had prototyped really large DAGs using Pegasus, but at the time of testing Pegasus couldn't handle what we were throwing at it.  There have been improvements to Pegasus since that time, but we haven't gone back to look at this.