Related epic: DM-2077

 

The following is a straw man proposal for implementing a revised shared scan. The proposal seeks simplicity. I also point out some optimizations and, consequently, more complicated solutions. Finally, I end with some open questions. Anyway, I call this a single cursor shared scan (SCSS).

An SCSS is implemented as a shared scan manager object, say class SSManager. It does not actually run queries but is used to sequence queries for optimal memory and I/O utilization. The SCSS manager is initialized with the following pieces of information:

  1. std::vector<std:string> of chunk file paths
  2. maximum number of queries to run at once, and
  3. transfer rate (default is 50MB/Sec).

Once initialized, it can be used to control the running of queries across all of the registered chunks. Each query is represented by a query object, say class SSQuery, that has a single callback method SSQuery::Run(std::string chunkname, bool lastchunk). This callback is used to tell the query creator to run the query on a particular chunk. The name of the chunk is passed and if the query is running on the last chunk, “lastchunk” is set to true.

When qserv wants to run a shared scan query, it creates an SSQuery object and calls SSManager::AddQuery(SSQuery *). This method adds the query to the queue of queries to be run. There are actually two logical queues: a) the active queue containing all currently running queries; and b) the pending queue which contains all queries that need to run. Assume that variable k is the current cursor position (i.e. the index into the vector of chunk file paths). The execution sequence is as follows:

AddQuery() simply places the SSQuery at the end of the pending queue. Pending queries are considered for inclusion only when the cursor advances. The cursor advances when all of the queries at position k complete. When this occurs, the memory occupied by chunk k is unlocked. Then the file for chunk k+1 is memory mapped, locked in memory and optionally pre-population is started. All SSQuery objects that have run on the last chunk are removed from the active queue. The cursor is advanced. If the number of active queries is less than the maximum allowed, sufficient queries are moved from the pending queue to the active queue to reach the maximum. Then SSQuery::Run(std::string chunkname, bool lastchunk) is called for each query in the active queue.

That’s pretty much it. So, yes, it is very simple. Here are optimizations and open questions:

Optimization: Chunk piggy-backing.
When AddQuery() is called, the new query is not added to the currently running active set even if the active set is not full. An optimization would be to add the query to the active set. However, if this is done we need to make sure that we do not unduly delay the current set with the new query. We can do this (sort of) by seeing if there is sufficient time remaining to fully process the new query. We can do this by estimating the percentage of the expected deadline has gone by. The estimator can be the transfer rate. So, the deadline is t+(size_of_chunk/transfer_rate). If less than 20% (arbitrary) of the time has elapsed then we can add the query, otherwise the query must wait for the cursor to advance. It is not clear how much this actually buys us since the deadline is rather crude and there is no good way to do a good approximation without knowing the actual query speed.

Optimization: Fast Forwarding or Dual Cursor Shared Scan (DCSS)
The SCSS suffers from the problem that the fastest query is no faster than the slowest active query. This is because we do not advance the cursor until all of the current queries are done. This problem can be mitigated to some extend by allowing faster queries to race ahead of the slower queries. Here we implement two cursors, k and k’. If we find that 50% or more queries have completed, those queries are assigned to cursor k’ which is one ahead of k and is allowed to independently advance. Thus, slow queries have a much smaller impact on fast queries. The side-effects are significant as this requires more memory to be locked and may substantially increase disk I/O. The benefits are somewhat dubious as it is not clear that a query executes at the same rate for all chunks. If it does not, then this optimization is not particularly good (see the next optimization).

Optimization: Query Bagging
When a query is added it could be tagged as “slow”, “average”, or “fast”. The idea here is to add queries to the active set that have roughly the same execution speed. So, when running a set of “slow” queries, only “slow” queries would be added to the active set if at all possible. Needless to say, the scheduling is rather complex because we need to prevent query starvation. Depending on how the scheduling is actually implemented, we could get into a situation where less than the maximum number of queries would run from time to time as we transition from one bag to another. This can be mitigated by simply having three independent query managers (one for each type of query) and would be much simpler to implement though it would triple memory usage and disk I/O. Query bagging is better than the DCSS optimization but requires that queries be accurately categorized.

Optimization: Non-sequential Ordering
The simplicity of the SCSS is founded on the premise that we always process chunks in a sequential order. This really makes for uncomplicated code, but suffers from turbulent execution speed (or let’s says it is chunky). If we allow queries to enter and leave the active set in arbitrary order we can, over time, smooth out the flow. However, that means we need to keep track of all the chunks the query has touched and all the chunks that still remain (e.g. using a bit vector). It is not clear this actually buys us enough to warrant the complication.

Optimization: Double Buffering
This optimization is rather simple and usually quite effective. When all the queries for chunk k are started, the manager prepares chunk k+1. The idea is to make sure the pages of chunk k+1 are in memory before any query actually starts running on that chunk (ideally anyway). This of course increases memory usage. While there is no additional disk I/O, the disk I/O that there is may interfere with the current running queries. Double buffering becomes more intrusive as we add cursors and is rather complicated when using non-sequential ordering.

Below are some of the open questions.

Question: How does the manager know that a query completed?
There are two obvious approaches here. We can simply say that a query completes when SSQuery::Run() returns. This means that the callback needs to be executed on a new thread. Furthermore, it makes a great deal of assumptions on how mySQL queries are actually run. Alternatively, if we assume that SSQuery::Run() does a simple setup of the query and spawns a new thread if need be. In other words, it does very little actual processing and a return means nothing other than the query has been dispatched. This would require a new method, SSManager::QueryDone(SSQuery *), to be added and must be called when the query completes. Whether or not this is automatic when the last chunk is dispatched is an open question.

Question: How would one abort a query?
The simplest approach is to add an SSManager::RemoveQuery(SSQuery *) method. The idea here is that one should be able to remove a query at any time if, for example, the query failed on a particular chunk and there is no reason to continue. If query completion notification is done by a return from SSQuery::Run() then the call would need to return a value indicating whether one should proceed to the next chunk or remove the query from all queues.

Question: Can queries be suspended?
There is no reason why a query could not be suspended. All this means that it is moved to a suspend queue via a SSManager::SuspendQuery(SSQuery *) call and placed back to the top of the pending queue when SSManager::ResumeQuery(SSQuery *) is called. Suspension takes place when the cursor is advanced. In the sequential model, a resumed query may need to wait for the cursor to go fully around before it actually resumes. Whether or not this is a useful feature is questionable.

Question: What about multiple table scans?
This is a very sticky question. It seems to be spawned by the possibility that a query needs access to multiple tables which reside in different chunks. As far as I can see, the concurrency requirements are arbitrary, solely dependent on the query. In such a circumstance, if we require that all necessary chunks be in memory before the query starts then it essentially leads us to a bin-packing problem (i.e. NP-hard). So, I don’t have a good solution for this one. Perhaps I misunderstand what is really needed.

  • No labels