The steps listed here describe what happens under the hood; they do not necessarily describe steps that would be apparent to the user, and certainly do not describe steps that would normally be invoked separately by the user.

Step 1: Apply Algorithm Config Overrides

Apply traditional configuration overrides to a Pipeline, coming from the command-line or an obs_* package.

Inputs:

  • Pipeline
  • dictionary of {name: ConfigOverride}
  • camera (optional)

Outputs:

  • Pipeline

Implementation

Applying a dict of ConfigOverrides or a (name, ConfigOverride) pair should probably be a method on Pipeline itself.

A shared component should exist that augments a command-line argument parser with config-override options, yielding a {name: ConfigOverride} dictionary when arguments are parsed.

A shared component should exist that looks up and applies configuration overrides from an obs_* package  provided camera.

Execution frameworks should always apply obs_* overrides before command-line overrides (assuming both are actually supported by that execution framework).

Complications

Applying obs_* overrides is much simpler in Gen2, where each Data Repository only holds data from one camera.  That is no longer the case in Gen3, so a data expression that yields Quanta for multiple cameras doesn't have a single obs_* package from which overrides should be loaded.

The simplest solution to this problem is to just limit the capabilities of Gen3, by requiring any multi-Camera QuantumGraphs to not use obs_* overrides.  That would in practice require those graphs to be split up and run separately, which would be inconvenient for users.

I don't have a complete solution for this in mind, but because this is new functionality beyond what Gen2/CmdLineTask could do, I suggest we do not try to solve this problem until we've reached parity with Gen2/CmdLineTask functionality.  The best idea I've come up with so far involves somehow restricting some SuperTasks in a Pipeline to certain data IDs (e.g. those with a particular camera or filter), essentially allowing one sub-Pipeline for some data IDs and other sub-Pipelines for others.  I have no idea how difficult this would be to implement in Preflight, and that's why I'd like to put it off for now.

Step 2: QuantumGraph Generation

Inputs:

  • Pipeline (read-only)
  • Registry (read-only - unless temporary tables are needed?)
  • User-provided data expression and/or custom SQL statements

Outputs:

  • QuantumGraph

Implementation:

Should be a shared Python component that is unencumbered by the needs of any other steps (e.g. command-line parsing, configuration handling).  All execution frameworks are expected to use the same code to generate QuantumGraphs.

Step 3: Apply Resource Configuration

"Resource configuration" is configuration that should not affect the scientific content of any final data products.  This includes (but is not limited to):

  • how many cores are assigned to each Quantum of a particular SuperTask;
  • whether intermediate/diagnostic datasets are retrieved from worker nodes, or even persisted at all;
  • what file format should be used for certain dataset types;
  • options specific to specific SuperTasks (e.g. subregion size in AssembleCoadd).

Resource configuration options should not be included in a SuperTask's ConfigClass; it must be possible to construct a SuperTask before resource configuration is applied.  In any case, a relatively small fraction of resource configuration options will apply to specific SuperTasks.

Some resource configuration is Butler configuration, especially for worker-node Butlers - note that while QuantumGraph generation requires a read-only Registry, a full Butler need not be constructed until after Apply Resource Configuration.

Given the above, I think it probably makes sense to use YAML instead of pex_config for all resource configuration, and to define a new mechanism to pass SuperTask-specific resource configuration values to those SuperTasks (perhaps kwargs passed to SuperTask.runQuantum and SuperTask.run).

Inputs:

  • QuantumGraph
  • Resource Configuration (YAML)

Outputs:

  • execution framework dependent: includes Butler configuration, options for specific SuperTasks, and options entirely specific to the execution framework, in whatever form is most useful for that execution framework.

Implementation:

I don't see any obvious way yet to share code between different execution frameworks for this step.

At least some of the structure of a resource configuration YAML file will be specific to the execution framework, but we should probably try to design a common configuration structure for components and ideally some top-level parts of the configuration tree that may be common (if not required) across different execution frameworks.  This should include:

  • Butler configuration for execution environment (includes file formats, dataset types for which I/O should be elided, etc)
  • SuperTask-specific resource configuration options (relevant for all execution frameworks, I think)

Step 4: Global Initial I/O

Prepare input and output Collections and Runs.  Write software versions, compute environment information, the Pipeline itself (together called "Launch Provenance" for now), etc. to a global (i.e. "not worker scratch") data repository.

Inputs:

  • QuantumGraph
  • Launch Provenance (includes the Pipeline)
  • (global) Butler configuration

Outputs:

  • Butler instance (optional; not needed after this step if execution will write to scratch Butlers)
  • (in data repository) a new or existing Run to be associated with all outputs
  • (in data repository) any new or existing custom Execution records to be associated with the Run by this particular execution framework.
  • (in data repository) a new or existing Collection to be associated with all outputs, now associated with all pre-existing inputs
  • (in data repository) persisted Datasets recording all Launch Provenance (some associated with Runs, some associated with custom Execution records)

Implementation:

While this step isn't the same for all execution frameworks, there is a lot in common, and it would be highly desirable to have some shared code here.

At present, we need a Run in order to create a Butler that can write, but we also need to be able to write to a data repository to be able to create a Run and save the Launch Provenance datasets already associated with it.  I think this means we need a shared high-level component for simultaneously creating a Run, associating it with a Collection, writing the Launch Provenance datasets associated with it (see Run.environment_id and Run.pipeline_id in DMTN-073), and creating a Butler with that Run and Collection.  Ideally, that component would be extensible to allow custom Executions and Launch Provenance Datasets to be created at the same time.

Code to walk a QuantumGraph and add all input Datasets to the Collection that will be used for output should also be a shared component (maybe the same one, maybe not).  Note that it's desirable that the output Collection contain not just the outputs of the Pipeline and its direct inputs, but all indirect inputs going all the way back to raw data.  I think that means using the recorded Quantum provenance for the direct inputs to find these and associate them as well.

Step 5: Quantum Execution

Run all Quanta.  Stage data to and from workers as needed.

Inputs:

  • QuantumGraph
  • (worker) Butler configuration
  • results of step 3 (execution framework dependent)

Outputs:

  • (in data repository) output datasets, including any desired intermediates
  • (in data repository) QuantumGraph-level provenance

Implementation:

While different execution frameworks will do completely different things here, I think two shared components jump out as being generally useful:

  • A QuantumGraph-traversal helper that maintains a list of completed Quanta and can return a sequence of "ready" Quanta.
  • A helper class that uses Python multiprocessing to execute a QuantumGraph on a single node, suitable for use by a fully-featured "laptop" command-line execution framework and multi-node execution frameworks that want to use it to execute per-node subsets of a larger QuantumGraph.  Note that this means it must not do any of the previous steps described on this page (as those may be done differently by the higher-level execution framework), and it should expect to operate on snippets of the resource configuration, not the whole tree.


  • No labels

12 Comments

  1. In our earlier meetings on Resource Configuration I think the idea was that resources will be sort of two-way communication between SuperTasks and execution framework. For example some supertask may want to tell to framework:

    • I need an absolute minimum of 2GB of memory and 4 cores to run
    • But optimal resources for me are 8GB and 16 cores

    framework would check that and based on what is actually available can tell to supertask:

    • You can use max of 6GB of memory and 10 cores.

    I guess that sort of implies that resource config should be defined by supertask very much like standard Config, it makes sense to keep that default config close to task itself so that developers could update/extend it when needed.

    I'm a bit worried about multitudes of file formats (pex_config vs YAML) used for data which is sort of logically very similar in meaning. I'd probably prefer pex_config to YAML just because it is already there.

    1. Note that there is also a lot of Resource Configuration (e.g. Butler configuration for file formats) that is not connected to any SuperTask.

      I do agree that it makes sense to define the Resource Configuration that is associated with a SuperTask close to the SuperTask, and that that means pex_config would be a good way to do it.  However, i would like to keep that Config object distinct from the algorithmic Config object (not even nested).  I also think that most SuperTasks will have no Resource Configuration options, or perhaps just use a common Config that provides common options for all SuperTasks.

      Finally, I would ultimately like to have pex_config support YAML as an I/O format, so configuration options can still be defined using the existing Python pex_config syntax, but config files can be saved and nested more flexibly with YAML. That's how I envision saving an entire Pipeline to a single human-readable file in the future, for instance.

  2. Andy Salnikov, when does the SuperTask know what its minimums and optimal values are?    Does SuperTask require pre-flight because we expect these values to be dependent upon the number of inputs?    Are the values dependent upon whether running in single pipeline (get 1 done fastest way possible) vs many/campaign (get many done where throughput matters over single done fastest)?     And what about number of cores dependent upon the type of machine being used (throw more cores at slower machine)?

    On the production side, it is expected that these resource requirements will always be set outside of the SuperTask either by a person or using information from prior runs.

    1. My guess is that some of those resources numbers could be guessed or "negotiated" with task authors, I can imagine something like number of threads could be just a policy setting. But other numbers can be hard to guess right and they certainly depend on the size of the problem. But they could be measured rather than predicted, i.e. before full campaign we could run few typical jobs on typical data and get estimates from there. 

      I'm not sure what "outside" means in a production case, and I think we agreed long time ago that production indeed needs to be able to override resource requirements. But when production does that I guess it still needs to communicate its decision back to supertask.


  3. We consider that there may be at least 3 types of pipeline activators:   

    • Desktop: minimal/no workload system (e.g., no HTCondor), Butler repository exists locally, etc

    • Multi-Node: workload system (e.g., HTCondor), set of compute nodes, may or may not have the central dataset repository local to compute nodes, etc. (replacement for ctrl_pool)

    • Production: (which may turn out to be usable for “Multi-Node” with some features turned off): workload system (e.g., HTCondor), set of compute nodes, central dataset repository not initially local to compute nodes, etc.


    The Production use case and requirements are being fleshed out.    Do these exist for the other 2 types of pipeline activators?

    It would be best to flesh these all out before figuring out what code can be shared.    We are working through those for Production, focusing on pieces that may need to be done between SuperTask executions.

    One of the items we are wondering about is what state the repository is in after a SuperTask fails in order to understand how to bring information back for debugging purposes as well as how to handle automatic retries (in a limited set of cases).   Just to list a few different types of SuperTask (and Butler) failures at runtime:

    • Incomplete/inconsistent Butler Registry

    • Couldn’t read input files

    • Science error (not sure if different types of science errors look different to execution framework)

    • Could not write (all) output files

    • Problems updating Butler Registry

      • Can’t write sqlite file to disk

      • Missing metadata or provenance?

    • SuperTask killed by external process

    Is there planned behavior in the above cases (and any more not listed) around which we can design the execution framework?    We are currently planning on having to isolate each SuperTask execution from each other with regards to working repository in order to enable clean SuperTask retries.    We haven’t worked through yet how to cherry-pick good/complete information from the repository when bringing home for debugging (i.e., good enough values can be ingested into the Operations Data Backbone).

  4. The Production use case and requirements are being fleshed out.    Do these exist for the other 2 types of pipeline activators?

    I was not planning to write anything separate from the SuperTask WG requirements.


    Incomplete/inconsistent Butler Registry

    I assume this is only happens when some previous (possibly internal) error is not recognized by the Butler or the execution framework?  In any case, I imagine it would manifest the same way as...

    Couldn’t read input files

    Could not write (all) output files

    Can’t write sqlite file to disk

    ...which I imagine would result in Python exceptions propagating up from the Butler (through the Tasks) to the execution framework.  I would expect the execution framework to then prune any dependent vertices from the QuantumGraph but continue to run any vertices that are not dependent on the failed Quantum.

    Science error (not sure if different types of science errors look different to execution framework)

    Some of these will also be Python exceptions that propagate up from the Tasks to the execution layer.  As we discussed back in the SuperTask WG, those that represent known failure modes that do not necessarily invalidate subsequent steps (e.g. "this visit doesn't actually have any good pixels that overlap this patch" will be represented by special output datasets that will be recognized by downstream SuperTasks, so they won't look any different to a file-based execution system like Pegasus.

    Missing metadata or provenance?

    This sounds like an exception that would be raised by the execution system or the butler - in any case, common code - that should bring down the execution of this Quantum and prune any dependent nodes from the QuantumGraph.

    SuperTask killed by external process

    I would also expect this to be an exception caught and handled by the execution framework by pruning dependencies from the QuantumGraph.  All SuperTasks should be responsible for here is not trying to trap and recover from external signals internally.

  5. The Production use case and requirements are being fleshed out. Do these exist for the other 2 types of pipeline activators?

    I was not planning to write anything separate from the SuperTask WG requirements.

    I might have missed it, but I could not find descriptions in LDM-556 about what the Supervisory Framework should do with SuperTask failures or any other behavior other than executing each Quantum, etc.

    re: SuperTask failure discussion...

    The part of my question that didn't get answered was what the state of the repository is when a SuperTask fails. If a SuperTask fails, will we ever get any outputs other than logging? Can a SuperTask contain multiple subtasks which output their own datasets or is a SuperTask all or nothing?

    Should the Supervisory Framework/Pipeline make a repository for each SuperTask to sandbox failures from making bad state in the central repository?
    * Are we guaranteed that there will never be any data discovery inside SuperTask's runQuantum method? (How much does the activator have to guarantee the Butler Registry and DataStore have exactly the same data in order to guarantee reproducibility)

    ...which I imagine would result in Python exceptions propagating up from the Butler (through the Tasks) to the execution framework. I would expect the execution framework to then prune any dependent vertices from the QuantumGraph but continue to run any vertices that are not dependent on the failed Quantum.

    We haven't discussed any runtime pruning of the QuantumGraph only at pre-flight time.
    * Is SuperTask failure all or nothing (if SuperTask normally produces 2 distinct datasets and 1 is produced successfully, is there a way to tell and if so is it pruning based upon the output datasets individually)?
    * For the pre-flight pruning, we talked about SuperTasks which could take fewer numbers of inputs. When pruning during runtime, is the pruning based upon "should have been input" or the pre-flight pruning run it if it has enough inputs?


    Production wants the ability to turn on/off automatic retries on a different machine for certain failure cases to work around machine-specific problems. These failure cases are either distinct from pruning cases or the different machine would be tried first before pruning (I can't come up with a use case where pruning would be attempted before different machine for the same SuperTask failure.)

  6. I might have missed it, but I could not find descriptions in LDM-556 about what the Supervisory Framework should do with SuperTask failures or any other behavior other than executing each Quantum, etc.

    While I think we discussed failure handling in the WG, I do believe this confluence page may be the first place we're writing anything on that subject down.  I think that reflects that we don't actually have high-level requirements for this; we just need to come up with a sensible design.

    The part of my question that didn't get answered was what the state of the repository is when a SuperTask fails. If a SuperTask fails, will we ever get any outputs other than logging?

    In general, yes, SuperTask failures may still result in some outputs being written.  In practice, unless the error is I/O related, it will be quite rare for a any outputs to be written by a failing SuperTask, because most SuperTasks will do all of their output after all of the algorithmic code has been run.  This is not guaranteed, however.

    While they are not an on-disk output, the Python exception raised in the course of the vast majority of SuperTask failures should provide a much more precise and reliable indication of what went wrong than the logs; I would certainly expect any supervisory framework to catch those exceptions (and probably at least translate them into log messages).

    Can a SuperTask contain multiple subtasks which output their own datasets or is a SuperTask all or nothing?

    All I/O should be done by the SuperTask itself, but there is no guarantee it will do this at only one point in the code.

    Should the Supervisory Framework/Pipeline make a repository for each SuperTask to sandbox failures from making bad state in the central repository?

    This should not be necessary, as it's the Butler's responsibility to make sure (via transactions, atomic operations, etc) that it is impossible for callers to put a repository in a bad state.  Perhaps paranoia on this front is merited until we have demonstrated that we can meet those guarantees, but I think I'd characterize per-SuperTask or per-Quantum repositories as something we'd only want to consider as a fallback if we have trouble with corruption that we can't fix on the Butler side.

    Are we guaranteed that there will never be any data discovery inside SuperTask's runQuantum method? (How much does the activator have to guarantee the Butler Registry and DataStore have exactly the same data in order to guarantee reproducibility

    runQuantum may only access the Datasets contained in the Quantum it is given (or a subset of these).  And it can only call a very limited set of Butler methods on these (get, put, datasetExists, markInputUsed, and the TBD method to create a virtual deferred composite are the only ones I can think of), and those don't involve any data discovery.

    I believe it would be best to support this by having a Registry method that identifies a Data Repository subset from a QuantumGraph; a supervisory framework could then pass the subset of the QuantumGraph (which is itself a QuantumGraph) to be run against a particular staging repository to the Registry and learn what it needs to transfer in terms of Datastore and Registry content.

    We haven't discussed any runtime pruning of the QuantumGraph only at pre-flight time.'

    Maybe "pruning" was the wrong term.  Here's an example:

    1. We run a ProcessCcd-JointCal-MakeWarp Pipeline on all of the Visits that overlap 10 Tracts.  ProcessCcd fails on one Visit for unknown reasons.
    2. The next SuperTask, JointCal, requires all Visits in a Tract to have succeeded.
    3. We want to still run JointCal and MakeWarp on any Quantums that did not use the failed Visit as input (i.e. those corresponding to Tracts that did not overlap the bad Visit)
    4. We do not want to run JointCal and MarkWarp on any Quantums that did use the failed Visit as input.  I was calling these JointCal and MakeWarp Quantums "pruned".

    * Is SuperTask failure all or nothing (if SuperTask normally produces 2 distinct datasets and 1 is produced successfully, is there a way to tell and if so is it pruning based upon the output datasets individually)?

    I think the case where for some output Datasets of a SuperTask to have been written successfully while the others are not will be quite rare, so I don't have a terribly strong opinion.  However, an implementation that just uses the individual output datasets to determine what to prune seems much, much cleaner (because QuantumGraphs only have Dataset→Quantum and Quantum→Dataset edges, not Quantum→Quantum edges).

    * For the pre-flight pruning, we talked about SuperTasks which could take fewer numbers of inputs. When pruning during runtime, is the pruning based upon "should have been input" or the pre-flight pruning run it if it has enough inputs?

    I'm going to need some examples to understand what you mean here.  My recollection of preflight pruning is that it's any entirely different operation (and really an implementation detail of Preflight), but my memory is fuzzy.

    For SuperTasks that can take fewer of inputs, my understanding was that the possibility of a Pegasus implementation imposed a very hard constraint on how we do this, essentially demanding that we write dummy files for the not-to-be-used inputs.  As a result, my plan was for the SuperTasks that are responsible for those optional inputs to write dummy Datasets as well that the downstream SuperTask can ignore internally.  That would essentially hide this behavior entirely from the supervisory framework.  I'm not sure that's the most natural approach for SuperTask developers, but I think it's something we could live with.

  7. Will SuperTasks need to output QC values to be saved/brought home even if the SuperTasks fail?    Is it still planned for the subtasks to output boost files?   If so, would they be written in any types of failure cases (and are they tracked by Butler)?

    All I/O should be done by the SuperTask itself, but there is no guarantee it will do this at only one point in the code.

    This should not be necessary, as it's the Butler's responsibility to make sure (via transactions, atomic operations, etc) that it is impossible for callers to put a repository in a bad state.

    The atomic operations are at the put level of a single dataset or the whole SuperTask?   For example, is the following allowed inside a single SuperTask:

    get(A)
    Subtask 1 run (uses A produces B)
    put(B)
    get(B)   (I'm not sure about this get)
    Subtask 2 run (uses B produces C)
    put(C)


    Assuming above is valid, if Subtask 2 has a science issue, will B exist in the Butler Registry and DataStore after SuperTask stops running?   


    The pruning example and discussion helps.   Still thinking it through.

    1. Will SuperTasks need to output QC values to be saved/brought home even if the SuperTasks fail?

      Probably.  Design for this is still TBD (I'm hoping the QAWG will at least say something about it.  I think the main question in this context is whether QA datasets are written by the SuperTask directly (which would make them indistinguishable from any other datasets written on failure, but make writing on failure more common), or written by the supervisory framework after being passed out of the SuperTask via some other mechanism.

      Is it still planned for the subtasks to output boost files?

      No.  As per  RFC-482 - Getting issue details... STATUS , Boost persistence will never be written by the Gen3 Butler.


      The atomic operations are at the put level of a single dataset or the whole SuperTask?

      I believe we will be supporting both.  Single dataset put operations are always atomic, but SuperTasks are permitted to wrap multiple puts in a single transaction.  I don't know whether we'll typically use that functionality or not; writing one dataset and not the others is not data repository corruption, and partial writes will probably always be useful for diagonstic purposes.


      Your example is valid (but would probably be rare), and B will exist in the Butler Registry and Datastore after the SuperTask stops running.  The get(B) call is permitted but I can't think of a reason right now why a SuperTask would make that call in practice.

  8. Do I understand RFC-482 correctly:   The subtask metadata will still be written.   It will just not be in boost format, but in a yaml file using normal Butler mechanisms.   Right?

    I agree partial writes will be useful for diagnostic purposes.    And it is not "data repository corruption".   However if we want to (automatically) rerun the SuperTask, don't we need to remove that output information and files from the repository (or start with a saved copy of the repository prior to the failed SuperTask)?

  9. Do I understand RFC-482 correctly:   The subtask metadata will still be written.   It will just not be in boost format, but in a yaml file using normal Butler mechanisms.   Right?

    Correct.  I would also like to move the actual writing of these from the SuperTask to the supervisory framework as well, since it's just boilerplate to have each SuperTask do exactly the same thing, and that could also give the supervisory framework flexibility in what it actually does with these.

    I agree partial writes will be useful for diagnostic purposes.    And it is not "data repository corruption".   However if we want to (automatically) rerun the SuperTask, don't we need to remove that output information and files from the repository (or start with a saved copy of the repository prior to the failed SuperTask)?

    Good point.  I was (perhaps naively) thinking that retries would always use a new Run, and hence there would be no conflict and no need to remove the old outputs, but I'm not sure that's desirable for automatic retries.  Using multiple Runs somehow (perhaps by moving orphaned partial outputs to a special diagnostic Run) seems like it would be better than just removing things, but we didn't previously have a use case for changing the Run of an existing Dataset, so I'm not sure if that would cause any trouble elsewhere.  Things might be simpler if we only support automatic retries when using local scratch output repos (then we could change the Run when bringing the orphaned outputs home), but that may be too limiting.