1. Scope

This document shall not be treated as a working example due to changes made to the API of the Ingest System. The latest version of the API is documented at The workflow developer's guide for the Qserv Ingest system.

This document represents a transcript of the very first use of the new Ingest  system to upload a sample of the HSC data into Qserv at lsp-int  (former PDAC). The input represents a single tract  of Object-like catalog produced/processed in a context of the LSST effort rc2. The test was meant to demo the typical workflow for ingesting catalogs into Qserv using the newly developed tools.


2. Notes on the setup of the test

A Qserv instance lsp-int (former PDAC) deployed at NCSA was used for the test. This instance runs an unreleased version of the Replication and Ingest System as per the following tickets:

The input data to be ingested into Qserv were prepared at the LSST development machine lsst-dev.ncsa.illinois.edu. The input and intermediate data were placed at the GPFS folder:

/datasets/gapon/hsc_rc2/

The main reason to have the data here was that the very same filesystem is also mounted at all nodes of the Qserv cluster. This could make it simpler to transfer the files to the Qserv master  node lsst-qserv-master01.ncsa.illinois.edu from where all catalog loading operations were being run. 

2.1. Run-time environment at lsst-dev

The only application which is really needed here is sph-partition. The tool partitions CSV/TSV files produced by the Parquet-to-CSV/TSV translators into the so called chunks  ready to be uploaded into Qserv. It turns out the current version of the LSST Stack doesn't have GitHub package partition  included. Hence we had to build it before we do the partitioning stage. Here is how this was done during the demo (at lsst-dev.ncsa.illinois.edu):

% exec scl enable devtoolset-6 rh-git29 bash
% source /lsst/software/lsstsw/stack/loadLSST.bash
% mkdir /datasets/gapon/hsc_rc2
% cd /datasets/gapon/hsc_rc2
% git clone https://github.com/lsst/partition.git
% cd partition/
% setup -r .
% scons
% which sph-partition
/datasets/gapon/hsc_rc2/partition/bin/sph-partition

2.2. Run-time environment at lsst-qserv-master01

The machine has an old version of the LSST Stack which doesn't require devtoolset-6. Hence it's sufficient do just this to have tools like mysql  in the environment:

% source /datasets/gapon/stack/loadLSST.bash
% setup -t qserv-dev qserv_distrib

We're also going to use the Replication System's binary tools for uploading the TSV/CSV files into Qserv. The tools are available with the following Docker container which can be pull into the node by:

% docker pull qserv/replica:tools

And of course one must have proper privileges in order to run the Docker containers on this (or other) node of the Qserv cluster. In the specific environment of the cluster this means to be a member of the POSIX group docker.

The loaded applications from that container could be used directly (which would require passing tons of parameters to the container launcher), or by using convenience script run-no-config.sh  located at:

% ls -al /home/gapon/development/DM-17113/qserv/admin/tools/docker/replication/deployment/ncsa/
...
-rwxr-xr-x 1 gapon grp_202 1982 Sep 16 17:13 run-no-config.sh

The script approach was taken in this demo as it will be seen shortly below.

3. Notes on the Web services used during the Ingest

The new Ingest  workflow relies on specially developed REST services provided by the built-in Web service of the Master Replication Controller  run on the Qserv master  node lsst-qserv-master01.ncsa.illinois.edu. The services are accessible at the following base URL:

http://localhost:25080/ingest/v1/

All services (in addition to the usual HTTP status codes) return JSON objects which include (among other, request-specific fields) these two fields:

{
...
"error":"",
"success":1
}

In the above shown example an operation was successful. Otherwise, the success  code would be set to 0  and the error  field would be carrying a specific error code explaining a reason why the operation failed.

4. Preparing data

NOTE: In the real production scenarios catalog ingest workflows which will be using the Qserv Ingest & Replication Framework are going to deal with files in the Apache Parquet format and catalog descriptions in the felis format. The Parquet data files would have to be translated into the TSV/CSV formatted files supported by the Ingest tools. And the catalog descriptions would be used to automatically generate various configuration and schema files needed during the ingest. The current demo skipped those stages, and used a pre-translated file which was already in the CSV format and a schema definition file in the simplified JSON format. These files were further manually (or semi-manually) processed to make well-formed input required by the Ingest framework. Each of these steps will be explained downstream this document.

4.1. Input files

Two files were copied from:

% ls -al /project/hchiang2/qserv/
total 32000
drwxr-sr-x  2 hchiang2 lsst_users     4096 Sep 25 18:09 .
drwxr-sr-x 15 hchiang2 lsst_users     4096 Sep 26 19:21 ..
-rw-r--r--  1 hchiang2 lsst_users    11232 Sep 25 18:09 schema
-rw-r--r--  1 hchiang2 grp_202    32633616 Sep 25 17:14 small2.csv

Into the temporary folder to allow further post-processing:

% cp /project/hchiang2/qserv/* /datasets/gapon/hsc_rc2

4.2. Analyzing and post-processing the input files

Te goal of this stage was to perform some sort of manual QA on the contents and compatibility of files:

% wc -l small2.csv
10000 small2.csv

And the first line of the file also container the names of the columns:

% head -1 small2.csv
id,objectId,parentObjectId,coord_ra,coord_dec,Ra,Dec ... yPsFlux,yPsFluxErr

These names were extracted (and removed from there) from that file and put into a separate file which has the following structure:

% cat columns.txt
id
objectId
parentObjectId
coord_ra
coord_dec
Ra
Dec
...
yPsFlux
yPsFluxErr

This file will be injected at the next stage into a configuration file of the partitioning tool sph-partition.

After eliminating the first line, the CSV file had 9999 data rows. Further analysis of the content of that file showed the following problems:

  • the database NULL  values in that file were represented by a lack of any symbol between two commas (or after the very last comma, or before the very first comma) in a line as ,, instead of symbol \N 
  • boolean values were represented by literals: True  and False which would be a problem for MySQL's LOAD DATA INFILE... 

These two problems were corrected using regular expressions. And, finally the file was renamed into:

Object.csv

The second input file schema has the following structure:

schema
% cat schema 
{'name': 'objectId', 'type': 'INT64'}
{'name': 'parentObjectId', 'type': 'INT64'}
{'name': 'coord_ra', 'type': 'DOUBLE'}
{'name': 'coord_dec', 'type': 'DOUBLE'}
{'name': 'Ra', 'type': 'DOUBLE'}
{'name': 'Dec', 'type': 'DOUBLE'}
...
{'name': 'ra', 'type': 'DOUBLE'}
{'name': 'dec', 'type': 'DOUBLE'}
..
{'name': 'yPsFlux', 'type': 'DOUBLE'}
{'name': 'yPsFluxErr', 'type': 'DOUBLE'}
{'name': 'id', 'type': 'INT64'}

It turned out this file had the following problems:

  • column id  is listed in the very end of the files, which is not what the data (CSV) file has
  • columns Dec  and dec  are reserved words in the SQL (and MySQL) standards and this may cause problems for Qserv query parser/processor
  • columns Dec, decRa, and ra would pose a problem for the Qserv parser because it's configured to be case-insensitive or the names of the columns
  • and finally, the types of the columns are not the valid MySQL types, these are the low-level (storage) type of the Parquet file format

Based on these observations, the following transformations were performed on the content of the file:

  • putting the last line (with column id) of the file to its very beginning
  • Renaming column Dec into Decl (and same for the other pair)
  • Renaming columns dec and ra into dec1 and ra1
  • Replacing Parquet data types into the corresponding MySQL types
  • adding comma at the end of each (but the last) line
  • adding two extra columns which will be injected by the partitioning tool into the output chunk  files

The resulting file now looks like this:

schema
% cat schema 
{'name': 'id', 'type': 'bigint(20) NOT NULL'},
{'name': 'objectId', 'type': 'bigint(20) NOT NULL'},
{'name': 'parentObjectId', 'type': 'bigint(20) DEFAULT NULL'},
{'name': 'coord_ra', 'type': 'DOUBLE NOT NULL'},
{'name': 'coord_dec', 'type': 'DOUBLE NOT NULL'},
{'name': 'Ra', 'type': 'DOUBLE DEFAULT NULL'},
{'name': 'Decl', 'type': 'DOUBLE DEFAULT NULL'},
..
{'name': 'ra1', 'type': 'DOUBLE DEFAULT NULL'},
{'name': 'decl1', 'type': 'DOUBLE DEFAULT NULL'},
..
{'name': 'yPsFlux', 'type': 'DOUBLE DEFAULT NULL'},
{'name': 'yPsFluxErr', 'type': 'DOUBLE DEFAULT NULL'},
{"name":"chunkId","type":"int(11) NOT NULL"},
{"name":"subChunkId","type":"int(11) NOT NULL"}

Here is the map for the type translations:

ParquetMySQL
INT64

BIGINT(20)

INT32INT
DOUBLEDOUBLE
FLOATFLOAT
BOOLEANBOOLEAN

BYTE_ARRAY

TEXT

Also note the additional DEFAULT NULL qualifier added for most columns of the schema, while requiring NOT NULL for the mandatory columns such as objectId , coord_ra , coord_dec , chunkId , and subChunkId.

4.3. Partitioning the CSV file

The first step here was to prepare a configuration file Object.cfg for the partitioner:

Object.cfg
mr = {
    # Using 4 cores, 8 GB of memory
    num-workers = 4
    block-size  = 128
    pool-size   = 8192
}
part = {
    # Common partitioning parameters.
    num-stripes     = 340
    num-sub-stripes = 12
    chunk           = chunkId
    sub-chunk       = subChunkId
    # The partitioning position is the object's point-source model position.
    pos = 'coord_ra, coord_dec'
    # Overlap radius in degrees.
    overlap = 0.01667
}
in.csv = {
    # input file format
    null      = '\\N'
    delimiter = ','
    escape    = '\\'
}
# Output CSV format.
out.csv = {
    null      = '\\N'
    delimiter = ','
    escape    = '\\'
    no-quote  = true
}
in.csv = {
    # List of Object table column names, in order of occurrence.
    field = [
       id
       objectId
       parentObjectId
       coord_ra
       coord_dec
       Ra
       Dec
       ..
       yPsFlux
       yPsFluxErr
    ]
}

NOTES:

  • keep in mind that if the new database is expected to be used in the JOIN  type of queries involving this and other (existing in Qserv) databases then the key partitioning parameters of the new database (num-stripes, num-sub-stripes, and overlap) must match the ones of the production  database family in Qserv. 
  • the last section (in.cvs) of the configuration was pre-populated from  the previously mentioned file schema.

Before running the partitioner the following folder for output chunk  files was created:

$ pwd
/datasets/gapon/hsc_rc2
% mkdir Object

Now run the partitioner as:

% sph-partition --verbose -c Object.cfg  --out.dir=Object/ --in=Object.tsv >& partition.log&

Inspect the log file partition.log  to see if there were any problems. And indeed, everything was fine. Two chunks got created:

partition.log
{
	"chunkStats": {
		"nrec":      9999,
		"n":         2,
		"min":       4636,
		"max":       5363,
		"quartile": [5363, 5363, 5363],
		"mean":      5e+03,
		"sigma":     364,
		"skewness":  0,
		"kurtosis":  -2
	},
	"overlapChunkStats": {
		"nrec":      20764,
		"n":         2,
		"min":       9455,
		"max":       11309,
		"quartile": [11309, 11309, 11309],
		"mean":      1.04e+04,
		"sigma":     927,
		"skewness":  0,
		"kurtosis":  -2
	},
	"subChunkStats": {
		"nrec":      9999,
		"n":         36,
		"min":       0,
		"max":       618,
		"quartile": [0, 313, 509],
		"mean":      278,
		"sigma":     235,
		"skewness":  -0.0304,
		"kurtosis":  -1.74
	},
	"overlapSubChunkStats": {
		"nrec":      20764,
		"n":         36,
		"min":       21,
		"max":       1217,
		"quartile": [245, 513, 941],
		"mean":      577,
		"sigma":     367,
		"skewness":  0.331,
		"kurtosis":  -1.17
	},
	"chunks": [
		{"id":  115329, "nrec": [5363, 11309]},
		{"id":  116009, "nrec": [4636, 9455]}
	]
}

And the output chunks were found at:

% ls -al Object/
total 99328
drwxr-xr-x 2 gapon grp_202     4096 Sep 25 17:43 .
drwxr-xr-x 4 gapon grp_202     4096 Sep 27 15:20 ..
-rw-r--r-- 1 gapon grp_202 37285035 Sep 25 17:43 chunk_115329_overlap.txt
-rw-r--r-- 1 gapon grp_202 17692583 Sep 25 17:43 chunk_115329.txt
-rw-r--r-- 1 gapon grp_202 31209547 Sep 25 17:43 chunk_116009_overlap.txt
-rw-r--r-- 1 gapon grp_202 15278560 Sep 25 17:43 chunk_116009.txt
-rw-r--r-- 1 gapon grp_202      864 Sep 25 17:43 chunk_index.bin

Note the chunk numbers: 115329  and 116009. We will need these at the later stages of the ingest process.

5. Ingesting catalogs into Qserv

All these steps were performed from Qserv master node lsst-qserv-master01  where the Master Replication Controller is running, and from where the worker nodes are accessible. The run-time environment should be set as explained earlier in this document. And the current working directory would be:

% pwd
/home/gapon/development/DM-17113/qserv

5.1. Creating new database in Qserv

% echo '{"database":"hsc_rc2","num_stripes":340,"num_sub_stripes":12,"overlap":0.01667}' | \
    curl 'http://localhost:25080/ingest/v1/database' \
      -X POST \
      -H "Content-Type: application/json" \
      -d @-

Note how the key parameters of the new database are presented to the Ingest system. These three parameters num_stripes , num_sub_stripes , and overlap must have the same values as the ones in the earlier used configuration file Object.cfg  when partitioning the input CSV file into chunks.

5.2. Registering table Object of the new database

This step required a configuration file in the JSON  format:

/datasets/gapon/hsc_rc2/hsc_rc2.json
{"database":"hsc_rc2",
 "table":"Object",
 "is_partitioned":1,
 "chunk_id_key":"chunkId",
 "sub_chunk_id_key":"subChunkId",
 "is_director":1,
 "director_key":"id",
 "latitude_key":"coord_dec",
 "longitude_key":"coord_ra",
 "schema":[
  {"name": "id", "type": "bigint(20) NOT NULL"},
  {"name": "objectId", "type": "bigint(20) NOT NULL"},
  ..
  {"name": "yPsFluxErr", "type": "DOUBLE DEFAULT NULL"},
  {"name":"chunkId","type":"int(11) NOT NULL"},
  {"name":"subChunkId","type":"int(11) NOT NULL"}
 ]
}

Note that the schema  section of that file was prepared based on column definitions which were prepared earlier in file schema

% cat /datasets/gapon/hsc_rc2/hsc_rc2.json | \
    curl 'http://localhost:25080/ingest/v1/table' \
      -X POST \
      -H "Content-Type: application/json" \
      -d @-

5.3. Starting a super-transaction in a context of the new database ingest

% echo '{"database":"hsc_rc2"}' | \
    curl 'http://localhost:25080/ingest/v1/trans' \
      -X POST \
      -H "Content-Type: application/json" \
      -d @-

This method will return a JSON  object which (in case of the successful completion of the operation) will carry a unique identifier of the new transaction. We need to memorize this number (it's 37 in this case) as it will be used later for uploading chunks into the database:

{
 ..
 "transaction": {
  "id":37",
  "state":"STARTED",
  ..
 }
}

5.4. Loading chunks into Qserv

5.4.1. Registering new chunks in the Ingest/Replication system

As it was explained earlier, the partitioning stage has resulted in two chunks with numbers:

115329
116009

Now we need to ask the Ingest system at which workers each of these chunks will be loaded before proceeding to the actual data ingest. The system will make its decision based on various criteria, such as the current occupancy of the worker nodes both in terms of the total number of chunks (per workers), the amount of free space in its data file system, or some other factors. This steps has to be done just ONCE  per each unique chunk  regardless of how many files contributing rows into a particular chunk we might have as a result of the input data partitioning. Note that today we're exploring a simple case of one input Parquet file translated into one CSV  file. In more realistic scenarios there would be (most likely) many files with contributions into the same chunk. It's also worth mentioning that subsequent queries for  a location of a particular chunk will always return the same result. It's only the very first request for each unique chunk which would map that chunk to some specific worker.

Do this for chunk 115329:

% echo '{"transaction_id":37,"chunk":115329}' | \
    curl 'http://localhost:25080/ingest/v1/chunk' \
      -X POST \
      -H "Content-Type: application/json" \
      -d @-

The request returns:

{"location":{
   "worker": "db13,
   "host": "lsst-qserv-db13",
   "port": 25002
 },
 ..
}

And the same request for chunk 116009:

% echo '{"transaction_id":37,"chunk":116009}' | \
    curl 'http://localhost:25080/ingest/v1/chunk' \
      -X POST \
      -H "Content-Type: application/json" \
      -d @-

The request returns:

{"location":{
   "worker": "db28,
   "host": "lsst-qserv-db28",
   "port": 25002
 },
 ..
}

The final map:

chunkhostport
115329
lsst-qserv-db13
25002
116009
lsst-qserv-db28
25002

5.4.2. Loading chunk data

This step requires launching the binary loading tool qserv-replica-file-ingest which employs the efficient binary protocol to communicate with the Ingest System's services run at the ports recorded in the chunks disposition map in the end of the previous sub-section. Each worker node runs its own instance of the service. This application could be built either locally (with all complications of setting the LSST Stack  and other required dependencies on each node from which the loading will be attempted), or it could be used from a pre-built Docker container qserv/replica:tools. This demo uses the second approach. Moreover, in order to avoid passing numerous parameters to that container a handy launcher script will be used here:

% pwd
/home/gapon/development/DM-17113/qserv
% ls admin/tools/docker/replication/deployment/ncsa/run-no-config.sh
admin/tools/docker/replication/deployment/ncsa/run-no-config.sh

A problem with this wrapper is that it can't read directly the input chunk data files from the GPFS filesystem where the partitioning was made earlier. The only data folder which the container instance launched by the script can read is:

% /qserv/replication/work/

Besides, all containerized applications packaged into that container have the above mentioned folder as their CWD. That's why, instead of modifying the wrapper to mount an additional volume when launching the container, a simple workaround was made for the chunked files by copying them into the work directory of the container:

% \sudo -u qserv cp -rvf /datasets/gapon/hsc_rc2/Object /qserv/replication/work/
% ls -al /qserv/replication/work/Object/
total 90168
drwxr-xr-x 2 qserv qserv      141 Sep 25 18:20 .
drwxr-xr-x 6 qserv qserv     4096 Sep 25 18:16 ..
-rw-r--r-- 1 qserv qserv 33920551 Sep 25 18:20 chunk_115329_overlap.txt
-rw-r--r-- 1 qserv qserv 16097161 Sep 25 18:20 chunk_115329.txt
-rw-r--r-- 1 qserv qserv 28397642 Sep 25 18:20 chunk_116009_overlap.txt
-rw-r--r-- 1 qserv qserv 13901013 Sep 25 18:20 chunk_116009.txt
-rw-r--r-- 1 qserv qserv      864 Sep 25 18:16 chunk_index.bin

After that the following 4 (one per each chunk file) loading requests were launched:

% admin/tools/docker/replication/deployment/ncsa/run-no-config.sh \
    qserv-replica-file-ingest \
      lsst-qserv-db13 \
      25002 \
      37 \
      Object \
      /qserv/work/Object/chunk_115329.txt

% admin/tools/docker/replication/deployment/ncsa/run-no-config.sh \
    qserv-replica-file-ingest \
      lsst-qserv-db13 \
      25002 \
      37 \
      Object \
      /qserv/work/Object/chunk_115329_overlap.txt

% admin/tools/docker/replication/deployment/ncsa/run-no-config.sh \
    qserv-replica-file-ingest \
      lsst-qserv-db28 \
      25002 \
      37 \
      Object \
      /qserv/work/Object/chunk_116009.txt

% admin/tools/docker/replication/deployment/ncsa/run-no-config.sh \
    qserv-replica-file-ingest \
      lsst-qserv-db28 \
      25002 \
      37 \
      Object \
      /qserv/work/Object/chunk_116009_overlap.txt

NOTES:

  • each invocation of  the loader has the name of a host (lsst-qserv-db13 or lsst-qserv-db28), the port number (25002) of the Ingest service, a unique identifier of the started super-transaction (37), the name of the table (Object), and a path to the corresponding input file relative to the application's CWD. Note that the chunk number if derived from the name of a file.
  • mistakes  in the number of super-transaction  or using the wrong (for a chunk) host name would be intercepted by the Ingest Service  and be reported back via the resulting JSON  object as explained earlier.

5.5. Committing the super-transaction

After all chunks were successfully loaded we do (once):

% curl 'http://localhost:25080/ingest/v1/trans/37?abort=0&build-secondary-index=1' -X PUT

Note that this request has two parameters:

  • abort=0 to tell the Ingest System  that the transaction is supposed to be COMMITTED  rather than ABORTED 
  • build-secondary-index=1  to ask the system to also add entries corresponding to the loaded objects into so called secondary index 

5.6. Publishing the new database

This is going to be the final stage of the ingest. Despite of the seemingly simple interface for invoking the operation it actually does a lot, including:

  • turning the database into the PUBLISHED  state thus allowing the ReplicationSystem  to take over it and create extra replicas if needed
  • registering the database and the table in the Qserv metadata system CSS 
  • creating the so called empty chunk list file for the database
  • finalizing the secondary index 
  • granting privileges to the MySQL accounts of the Qserv service at both master  and worker  nodes
  • enabling  the database in Qserv so that it could accept and process queries agains the new database
  • as well as some other operations
% curl 'http://localhost:25080/ingest/v1/database/hsc_rc2' -X PUT

6. Testing the new catalog

NOTE: after the initial ingest was done Qserv failed to process queries made against the new catalog. The problem was later identified as two minor bugs in the implementation of the new Ingest  system. And workarounds were applied directly at the Qserv worker servers' databases to fix the problem. After that the following tests successfully passed.

% mysql --protocol=tcp -hlocalhost -P 4040 -uqsmaster \
   -e "SELECT COUNT(*) FROM hsc_rc2.Object"
+----------+
| COUNT(*) |
+----------+
|     9999 |
+----------+
% mysql --protocol=tcp -hlocalhost -P 4040 -uqsmaster \
   -e "SELECT objectId,coord_ra,coord_dec FROM hsc_rc2.Object LIMIT 1"
+-------------------+--------------------+--------------------+
| objectId          | coord_ra           | coord_dec          |
+-------------------+--------------------+--------------------+
| 42287217204273872 | 216.94955988259971 | 0.0532926097673182 |
+-------------------+--------------------+--------------------+
% mysql --protocol=tcp -hlocalhost -P 4040 -uqsmaster \
   -e "SELECT objectId,coord_ra,coord_dec,chunkId FROM hsc_rc2.Object WHERE objectId=42287217204268674"
+-------------------+--------------------+-----------------------+---------+
| objectId          | coord_ra           | coord_dec             | chunkId |
+-------------------+--------------------+-----------------------+---------+
| 42287217204268674 | 217.04367836854124 | -0.034688399931665045 |  115329 |
+-------------------+--------------------+-----------------------+---------+




  • No labels