DM-29818 - Getting issue details... STATUS and DM-29761 - Getting issue details... STATUS present very different solutions to the problem of how to handle optional PipelineTask outputs; we need a way to allow PipelineTasks to not produce something, while still distinguishing between (at least) a well-understood "nothing to do" case where there is no dataset, and an algorithmic or environmental failure that merits retries or debugging.  We would like to also allow slightly richer annotations, such as variants of a PipelineTask-specific Enum, and ideally allow those annotations to be present when the main dataset is present, too, to indicate "qualified success".  These approaches are very different, but they both have problems:

  • DM-29818 may introduce more complexity than the problem warrants (in part because it also attempts to address a few other existing - but much more minor - problems);
  • DM-29761 relies on getting some degree of per-quantum provenance implemented as well (I'll call these "executed quanta" from here on).  That is a big project (especially to finish, rather than just start) that'd we'd rather not have blocking this one, even if we plan to do it anyway.

So, if we deside that the "side problems" addressed by DM-29818 are worth solving in their own right (and we like this solution, or some simplification thereof), we can take that approach, and leave per-quantum provenance as a separate (still desirable) project.

If not (or we'd like to at least explore other options), I think it's worth trying to plan out how we will save and access per-quantum provenance in the future, to see if we can identify a path that solves the nothing-to-do propagation problem quickly.

Complications with saving Registry-based per-quantum provenance

We have always planned to eventually write executed quanta to tables in the Registry database, because that's the natural place to retrieve it from.  There are queries for datasets that would naturally use those tables, and it's exactly the kind of organizational information the Registry was designed for.

But it's hard to get the executed quanta into the Registry database for two reasons:

  1. It's a schema change, and we're not ready to do migrations yet (this will of course resolve itself on its own shortly, but we really want to be able to handle nothing-to-do propagation ASAP).
  2. In the no-shared-DB processing case, we can't write to the database while we're running, so we both have to work out some other way to transfer the information out of the jobs and then wait for it to be loaded into the database (which may be well after the quantum has finished running).  That's inconvenient for anything trying to use this provenance for status reporting, and it rules out using the executed quanta tables to propagate dataset annotations/status between running PipelineTasks.

Do we save per-quantum provenance to files instead?

DM-29818 proposes saving executed quanta to files mediated by the Datastore (or at least ButlerURI), instead.  That completely solves the problem of how to write them, but it yields a poor interface for using them.

Adding a Registry interface that's backed by these files is problematic both because it hard-codes certain dataset types, storage classes, or URIs into the Registry implementation(s) (which seems like bad layering) and because it means Registry needs to have a Datastore (which is already inconsistent with our current layering).

Transferring the provenance information from these datasets into Registry tables some time after execution (e.g. when the output datasets are actually inserted into the shared Registry) and then using those tables to back Registry interfaces seems like a good way to solve the problem of long-term access to provenance, though (we'll still need a migration, of course).  And, as it turns out, the kind of access to these files that's needed before that is quite different anyway, and doesn't really benefit from being mediated by Registry:

  • PipelineTasks don't use Registry or full Butler directly while running; they only have access to a special ButlerQuantumContext object that can really only do getDirect and put.  And since all these PipelineTasks need from the provenance files would be the annotations/status of their input datasets, a custom interface on ButlerQuantumContext that looks directly at the executed quantum datasets seems totally natural (and reasonable layered).  We would of course need to then fetch those provenance files to worker jobs, but I don't think this would result in substantially more fetches than usual, and not obviously more than at least some of the DM-29818 disassembly variants.  As a bonus, it's the ButlerQuantumContext object that also needs to have the interfaces to extract more provenance from the running PipelineTask (e.g. actual vs. predicted inputs), so it's quite natural for it to be able to read these files since it will be writing them anyway.
  • Code that wants to monitor in-progress QuantumGraph execution can't use Registry interfaces anyway because the datasets aren't going to be there (yet).  And similar code that wants to do the same on already-completed QuantumGraphs (e.g. to look at memory usage, execution times, etc.) doesn't really need the dimension-based organization that Registry provides either.  So this is one more place where it makes sense to have a different-from-Registry interface to the provenance information anyway, and it makes sense to use the executed quantum files to back this.  That probably makes sense even after we've loaded the executed quanta into the database, because it seems likely that we'd want to keep the files around so they could continue to store things that don't make sense for the database (maybe even logs).

So now we're using the datastore as a database?

Pretty much, because it's the only scalable shared thing we've got, and if we step back a bit, that seems a little crazy.  There are lots of scalable databases out there for the kinds of things we need to do during execution, because again, that's basically just getDirect  and put, and that doesn't need all of the relational SQL stuff that we use in QuantumGraph generation and ad-hoc analysis.  And in the broader world I gather it's a pretty common thing to stand up some kind of fast, scalable NoSQL database as essentially a fast staging point for "colder" storage in SQL (it's in the Redis FAQ; something like Redis or memcached seems like the right kind of NoSQL database for this problem, but this is a space that I'm very much not an expert in).

If we had such a system in hand, we wouldn't have had the scaling problems that led to jump through our current no-shared-database approach to begin with, we'd have a natural place to put executed quanta (and maybe even planned quanta) while execution is ongoing.  We'd also have a natural system to back a Datastore oriented toward smaller files (like metrics), and maybe even something we could use to help scale the client/server Butler to many users.

Of course, easier said than done:

  1. Even if there's prior art, standing up another third-party server and making it stage to and from our SQL database is a huge project, especially if we want to be rigorous about consistency.
  2. Having a fast NoSQL database doesn't help us that much with implementing a  Registry interface to it; despite the fact that we can't scale it up to back execution, I still think SQL is the right kind of database for all of the heavily-relational dimension stuff, and I don't want to get in the business of writing my own query planner and join system, even against something more table-oriented-yet-scalable like Cassandra.

Let's start with (2): it'd be nice to clarify what we would and would not be able to do (in terms of interfaces) with that kind of scalable staging database, since "everything Registry can do" is not a viable answer, and "only the things a (SQL-backed) Registry can already do" isn't a helpful one.  But we already discussed essentially the same problem above; during execution, we don't need any of the SQL-oriented things Registry can do; we just need getDirect  and put and do some provenance things; the Registry interface is already a bad match to that problem.  And that brings us back to (1): if we can define that execution-oriented Registry-replacement interface and implement it with file storage now, we could reimplement it with a scalable shared NoSQL database later (or not, if that never actually seems worthwhile).

This is an idea I've been mulling in one form or another for a while, and I think Nate Lust has been too; what was missing (at least for me) was a way to move forward on the "let's just use QuantumGraphs and files instead of Registry during execution" idea without eliminating or prejudging the "maybe we should just set up a new kind of database" possibility (and not wanting to distract from the current lightweight/read-only Registry work, which is still IMO the fastest path to solving our biggest scaling problems and hence discovering the next set of scaling problems).

What this might look like

Let's call the new interface I've been motivating "QuantumDirectory" for now (don't read much into this _temporary_ name; "directory" == "all the good nouns were taken"). It's an ABC that's a sibling to Registry and Datastore; it's maybe a bit more like Registry, but not enough that I think an inheritance relationship between those is an obviously good idea. A QuantumDirectory's configuration would live in ButlerConfig, and it would at least sometimes be a Butler instance attribute. The initial implementation would be backed by JSON files and ButlerURI, but we'd try to leave room for future implementations backed by shared NoSQL databases.

Here's what I think it'd be able to do:

  • Save predicted quanta during or after QuantumGraph generation (we would always store QGs here, rather than in arbitrary external files).
  • Retrieve predicted quanta at the start of execution.
  • Save provenance information (actual inputs, output annotations/status, resource usage) during and immediately after execution.
  • Retrieve dataset annotations/status within downstream running quanta.
  • Retrieve provenance information for status and run-analysis applications (outside the running QuantumGraph).
  • Provide opaque table storage for FileDatastore internal records (and perhaps a future tiny-object Datastore for e.g. metrics). In the JSON+ButlerURI backend, these would be held in memory until a quantum is done executing, and then saved in the same file as the executed quanta.  This will let us retire datastore modes that support `get` based on prediction and `put` with no records (and I think that's a good thing).  It's also a big piece of why I want the initial QuantumDirectory implementation to be backed directly by ButlerURI, not by delegating to a Datastore.  The other reason is that I think we want to leave room for some QuantumDirectory artifacts to not satisfy all of the conditions of being a Registry/Datastore Dataset, without weakening those conditions elsewhere.
  • Ingest logs and WMS-specific files into the data repository (could be in-place when WMS writes this directly within the ButlerURI root, or involve a transfer to a permanent home otherwise).
  • "Bring home" an executed quantum to the Registry, transferring opaque table information as well as datasets and (once we have updated the schema) provenance linkage. The degree to which this removes information from the QuantumDirectory at the same time is TBD.

That isn't a complete list; I think it'll also need to be able to support (with Datastore) one or more deletion operations to handle clobbering or cleaning up partially-executed runs, and probably some various kinds of queries to support (at least) various retries and restarts.

Butler zoology

Now that we've got three potential butler attributes, let's think about which combinations of these are useful. I'm not certain yet whether some or all of these should get their own specialized Butler class (if so, the base class would be more about code reuse than interfaces, I think). A big open question for these cases is how much responsibility QuantumDirectory will have for post-run provenance storage and queries.

  • Registry only: dimension manipulation (rw), most queries on datasets (ro).
  • Datastore only: not possible; needs a Registry or a QuantumDirectory to provide internal record storage in order to work.
  • QuantumDirectory only: status and execution analysis queries for currently-executing QGs (ro).
  • Registry+Datastore: post-run ad-hoc analysis (ro), ingest of truly external datasets (rw); data repository export and import (rw; only if all provenance is moved/copied out of QuantumDirectory after execution).
  • QuantumDirectory+Datastore: active QuantumGraph execution (rw).
  • Registry+QuantumDirectory: QuantumGraph generation (Registry is rw, QuantumDirectory is rw); dataset queries that involve provenance (ro; only if Registry DB still has little or no provenance after runs are "brought home").
  • Registry+Datastore+QuantumDirectory: ad-hoc analysis that involves currently-executing QGs and/or provenance inspection (ro); data repository export and import (rw; only if some provenance is never moved/copied into Registry or Datastore).

What this means for pipetask and BPS

PipelineTask execution will be backed by QuantumDirectory and Datastore (not Registry) even when run locally by pipetask. QuantumGraphs will now always be saved to the data repository upon creation, instead of to external files. And any logs or WMS artifacts that are worth keeping after execution completes will be part of the data repository - but in a way we can design to be suitable for the way they are created (which is not necessarily true of Registry-managed datasets).

That means the SQL database doesn't *need* to be modified at all until a run has been successfully completed (though in some cases it could be if requested by the user), and hence with our initial JSON+ButlerURI implementation of QuantumDirectory, users could clean up their work much more easily, or muck with CHAINED collections until they're done running and want to make the results available through the Registry.

Here's a sketch of how I see the execution workflow proceeding under the new system.  I've written this out as if there'd be a new qg  tool to sort-of replace pipetask , but I'm not wedded to that; I just didn't want to imply any continuity with verbs pipetask  uses, and I do think it's pedagogically useful to think of these as primarily operations on the graphs, not the pipelines or tasks.

QuantumDirectory-workflow


I see BPS fitting into this primarily as a tool that can be used to invoke qg run  at scale, while providing at least some simple invocations of qg build  (to submit jobs immediately after QG is built), qg reset  (for simple automatic retries), and qg upload  (when there's no ambiguity about what should be uploaded, e.g. because everything succeeded).

And I think pipetask  - the high-level tool that can run multiple qg  steps together on a single node - should go away as a separate command-line and just become a local-execution BPS plugin.

Back to propagating nothing-to-do

This proposal gives us lots of good ways to propagate nothing-to-do between PipelineTasks, and that means we have to decide what we actually want for the highest-priority problem on the table (at least from my perspcetive).  Let's be a little more clear about what we want from that, on different timescales:

  • In the very near term, we need to solve what I'll call the easy case, preferably in a future-proof way: PipelineTask TaskA has nothing to do and nothing to write, but this can only happen one way.  Downstream PipelineTask TaskB1 that consumes one of TaskA's outputs by definition has nothing to do and nothing to write when those outputs don't exist, while TaskB2 (which also consumes potentially the same TaskA output) wants to be still be run, but as if its quantum never included that now-missing dataset as an input (while still potentially containing other datasets of the same type that were produced; usually this is a case where there are multiple input datasets of that dataset type).  Ideally, the simple case is implemented entirely by the execution harness and enabled by declarative statements in Connections classes (e.g. optional=True on a connection).  And whatever options we add to the Connections declarations should probably play a checking role in QuantumGraph generation, too, to make running a Pipeline in one QG vs. split up as similar as possible.
  • In the long term, we may want to solve the hard case: PipelineTask TaskC can experience a few different recoverable failure modes (maybe discrete, maybe only continuously different), and can't write one or more of its outputs (but maybe can still write some).  Downstream tasks need more than just boolean information about what went wrong to decide how to proceed.  The information about what went wrong needs to be propagated per-dataset, and needs to include dataset-specific information.  And Task code has to make the decision about what to do, not the execution harness.

It is tempting to say that we should implement the easy case as higher-level code on top of the hard case, but they are sufficiently different in detail that I'm not sure that's actually wise.  It's also totally possible to do the hard case with everything we have right now, if  the file format for a dataset (and its formatter) already permit writing the alternate information desired.  We have done that in one case already (MakeWarpTask) that would really prefer to use the easy case when it's available, because that was particularly important for stringing the pipelines together.  We have not done it in other places not because of the file format limitation (these are all FITS files with basically FITS-assuming pytypes in practice, and it's easy enough to Butler.put an empty primary HDU with a header where an image or catalog is expected) but because the automatic handling of the easy case would be so much more convenient for PipelineTask authors (and it's what they've come to expect from Gen2+pipe_drivers).

If you buy that last paragraph, it has a very important implication: adding extra expressiveness to storage class names and extra functionality to datastore to allow it to save qualitatively different kinds of Python objects with the same dataset type (as DM-29818 - Getting issue details... STATUS and variants thereof propose) does just two things here:

  1. it provides a syntax for Connections classes to declare an input or output optional (for the easy case);
  2. it provides a way to save alternative data without requiring the "regular" pytype to have a way to represent it (for the hard case).

But it's actually quite bad at (1): in order for the execution harness to handle the easy case automatically (with just declarative information from the Connections class), it has to go parse the StorageClass name to see if it's optional.  And so does QuantumGraph generation, if we want that to behave consistently (also yuck).

Instead, what we really want for the easy case are connection options that look something like this:

class ConnectionsA(...):
    output = Output(..., optional=True)   # might not be produced

class ConnectionsB1(...):
    input = Input(..., optional=False)   # if not present, do not generate Quantum for and/or do not run this Task

class ConnectionsB2(...):
    input = Input(..., optional=True)   # if not present, run this Quantum without it.

We could use that to modify the storage class name before creating the dataset type, and then continue to delegate to the datastore to save the alternative thing (in this case nothing).  But we'd need to be able to represent this optionality in the string storage class name anyway (so we could put it in database columns), and that starts to get confusing.  In fact, any system where the above kind of declarations on the Connections class map to something intrinsic to the dataset type is pretty confusing; consider:

  • output=True  on an output pretty clearly means "this dataset type is optional".
  • But how would we express "this dataset type might be an optional for some tasks, but this one always produces it?"
  • And why should this task have to care that some other task doesn't produce it?
  • And optional=False doesn't mean "this dataset type isn't optional in general", it means "this dataset type isn't optional here ".

I think this is yet more evidence that intrinsic-to-dataset-type solutions for the hard problem (including those involving storage classes and datastore) aren't a good path to solving the easy problem, and we really should consider "easy" and "hard" as separate problems.  So, let's keep those ideas in mind for the hard problem in the future, but for now, focus on a different solution to the easy problem.  Let's keep the connections syntax above, but make "optional" a property of a connection, not the dataset type itself.

And that brings us right back to DM-29761 - Getting issue details... STATUS , which I'm going to reopen: I think it's the right solution to the easy problem, and essentially fully decoupled from DM-29818 - Getting issue details... STATUS (one way to solve the hard problem).  One final side thought on the latter: if we do decide to do something like that, the proposal to use QuantumDirectory to back Datastore's opaque tables during execution (instead of having a predictive lookups in Datastore) gives us another way to avoid doing anything intrusive with Formatters; the Datastore could then dispatch on the file extension there, just as Tim Jenness wanted.

Starting small

This proposal got pretty ambitious by the end, and I think it's a good idea for a lot of reasons.  But it's very important that we:

  • Start this project by solving the the nothing-to-do propagation problem, as that's currently a big problem for Science Pipelines in defining our concrete Gen3 pipelines.
  • Split all of the work up into small tickets, so it can't get fully stalled by either Jim Bosch or Nate Lust  (who I see as the likely candidates to do most of the work) being sidetracked by other priorities.

With that in mind, if the middleware team is broadly in favor of this page as at least a vision document, I am inclined to:

  1. Reopen DM-29761 - Getting issue details... STATUS as actually the best path towards solving the nothing-to-do propagation problem.
  2. Define the simplest possible interface for QuantumDirectory that would let it save predicted and executed quanta.
  3. Plug that into pipetask by making it always save the predicted QG to the QuantumDirectory (but still save it to a file as well to not disrupt anything else).
  4. Add a ways for connections to declare how to handle missing datasets (maybe just the boolean optional  kwargs above, maybe a slightly richer enum).
  5. Add logic to pipetask (specifically SingleQuantumExecutor) and QG generation to obey those new connections declarations.  The former will need to read executed quanta to distinguish between nothing-to-do and failures, but nothing more, so if the executed-quantum schema isn't quite complete, it's no a big deal as long as we can extend it later.

We can keep DM-29818 - Getting issue details... STATUS open but lower priority as a way to potentially pass richer information along with datasets in the future.

Packaging and Dependencies

Any scheme to start saving quantum-like provenance anywhere in a data repository raises some packaging questions for us, because most of our Quantum and QuantumGraph infrastructure is in pipe_base, not daf_butler (which has the current Quantum class, but nothing else).  It's hard to keep anything that works with Quantum unaware of PipelineTask (and hence Task, pex_config, etc), and that includes persistence of Quanta  to e.g. Registry , which definitely has to go in daf_butler .  And with this proposal a lot of QuantumGraph  stuff, not just Quantum , would need to be in daf_butler .  So, it's worth thinking about our options:

  • Move as much Quantum  stuff as needed into daf_butler , and just try to keep dependencies on pipe_base things informal (i.e. opaque task class strings and occasional duck typing of PipelineTask or Connection interfaces).  This could involve making all Quantum stuff in daf_butler abstract, with concrete implementations in pipe_base, but that's tricky because whatever is in daf_butler needs to have all of any concrete object's persistent state in it already, so we can map it to various on-disk or in-database schemas.
  • Move as much Quantum  stuff as needed into daf_butler , but formalize its relationship with pipe_base via new ABC or Protocols that PipelineTask satisfies by building on top of Task and pex_config.  This probably moves some PipelineTask stuff down to daf_butler, but doesn't make daf_butler depend on Task or pex_config.
  • Leverage the ongoing task_base effort, and just have daf_butler depend on that so we can define PipelineTask in daf_butler.  That would make the residual pipe_base a lot smaller, but I think there would still be enough there to merit its continued existence as a package.

Import, Export, and Local Execution

I've already discussed how the execution workflow transfers information between the Registry and QuantumDirectory for a single data repository, and even when that's a shared data repository, that already provides better support for cases where the user doesn't really care about sharing (at least most of) their processing results with others, and instead wants it to be easy to clean up any "messes" that processing creates, because the default file-based implementations QuantumDirectory+Datastore very cleanly segregate a particular processing run's output from the rest of the data repository until/unless it is "uploaded" back to the Registry.  And if we can provide basic butler get on top of QuantumDirectory+Datastore, too, we may be able to make uploading a rare operation.

But there are probably cases where the user wants their processing even more isolated, or their outputs brought home into a personal data repository rather than a shared one.  Let's examine some use cases, going from simple to complex.

1. Shared repository as input, completely local execution against a brand new local repository, no upload at all

Build QuantumGraph against shared repository, but write output QG to a new data repository.  The new data repository has a chained Datastore, writing to and reading from it its own local space first, but reading from the shared repository's Datastore as well.  It also has a chained QuantumDirectory, working the same way (the new QG goes in the local space).  It doesn't need a Registry at all for this use case, but might make sense to create one now so the same export-at-QG-gen functionality can support other use cases, and I'll comment on that in the next use case.

2. Shared repository as input, completely local execution against a brand new local repository, upload to local Registry

Same as the previous case, but here we need to initialize the new data repository's Registry with the datasets (and associated dimensions) for all pure inputs in the QG.  The QG itself naturally has almost all of the information needed to populate such a Registry - I think it's just missing the non-RUN collection memberships and CHAINED structure.  If we stuff that information into the QG, too, it becomes a self-sufficient export file, and we can choose whether to create and populate the local Registry when we set up the new data repository before execution, or only if/when the user chooses to upload at the end (or any time in between, I guess), all without going back to the shared Registry.  That's especially nice because it means we don't have to worry about the shared Registry changing in the meantime; we take a snapshot at QG-gen time and then we're done.

Note that once the local Registry is populated, the user can then re-run QG generation against the local data repository only (e.g. to work on code changes that affect that step), with the caveat that the new graph could never contain inputs that weren't in the original one.

3. Shared repository as input, completely local execution against an existing local repository, no upload at all

This works exactly like (1); the local repository's Datastore  and QuantumDirectory  are already chained, and we can add new datasets and quanta with no trouble at all.  That's true whether the outputs go into a new RUN or extend an existing one: in the first case, there are no clashes at all, and in the second, we extend the QG and Datastore the same way and conflicts are automatically avoided in the same way they would be when running against the original shared repo.

4. Shared repository as input, completely local execution against an existing local repository, upload to local Registry

If this is the first time we're uploading to the local Registry, this is still easy; all of the chaining just works and the Registry is brand new and hence there are no conflicts.  If the local Registry is not empty, we have a hard but perhaps not insurmountable merge problem, to combine the new dimensions, collections, dataset types, etc. with the old ones, because those can conflict.  And set unions operations on some things (at least CHAINED collections) are ill-defined.

5. Shared and local repositories both as input

This is the classic "chained Registry" problem, and it's still really hard; QG-gen relies on being able to join across tables that are split across different databases, and on the database query planners heuristics to guess whether a particular dimension is constrained more by (say) a spatial join or the datasets present in a particular question.  To satisfy this use case, I think we have to tell users to either upload/export back to the shared Registry so QG-gen can run against that alone, or download/import more stuff into the local Registry so QG-gen can run against that alone.  QG-based export definitions will make that easier, but it will still require more manual effort from the user and probably some pain debugging empty QGs when it turns out they haven't transferred everything they need.

  • No labels

2 Comments

  1. Somehow I've lost the comment I'm replying to but:

    > "(instead of having a predictive lookups in Datastore)"

    The predictive lookups were a way to solve the problem of minimizing interactions with registry and not requiring any information to be passed from job to job by the workflow management system. Doesn't the QuantumDirectory approach mean that we now have a situation where every PipelineTask is doing a pseudo-registry lookup again? Is the idea that this is almost as efficient as direct object store access so won't have a problem scaling? 

    1. Yes; if we have a file+ButlerURI backend for QuantumDirectory then it scales exactly like Datastore, and we wouldn't write a NoSQL database backend unless it scaled at least that well.