# Licensed under a MIT style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
digestor.sdss
=============
Convert SDSS SQL (MS SQL Server) table definitions to Data Lab SQL (PostgreSQL).
"""
import os
import re
import sys
import time
# from datetime import datetime
from argparse import ArgumentParser
from pkg_resources import resource_filename
# from pytz import utc
from jinja2 import Environment, PackageLoader, select_autoescape
import numpy as np
from astropy.table import Table
from .base import Digestor
[docs]
class SDSS(Digestor):
"""Convert SDSS FITS+SQL files into Data Lab-compatible forms.
"""
#
# Match lines in SQL definition files.
#
_SQLre = {'comment': re.compile(r'\s*--/(H|T)\s+(.*)$'),
'column': re.compile(r'\s*(\S+)\s+(\S+)\s*([^,]+),?\s*(--.*)$')}
#
# Map SQL Server data types to PostgreSQL.
#
_server2post = {'float': 'double precision', 'int': 'integer',
'tinyint': 'smallint', 'bit': 'boolean'}
#
# Ignore columns that are specific to the SDSS CAS system.
#
_skip_columns = ('htmid', 'loadversion')
#
# Identify columns that contain photometric flags
#
_flagre = re.compile(r'flags(|_[ugriz])$', re.I)
def __init__(self, *args, **kwargs):
if 'join' in kwargs:
self.join = kwargs['join']
del kwargs['join']
else:
self.join = False
super().__init__(*args, **kwargs)
self.NOFITS = dict()
self.env = Environment(loader=PackageLoader('digestor'),
autoescape=select_autoescape(),
trim_blocks=True)
#
# sdss_joinid is SDSS-specific, so we don't want to initialize
# that in the superclass.
#
if self.join:
self.tapSchema['columns'] += [self.tapColumn('sdss_joinid',
description="Unique ID based on PLATE, MJD, FIBERID for joining across data releases",
datatype='bigint',
indexed=1,
ucd='meta.id;src'), ]
[docs]
def parseSQL(self, filename):
"""Parse an entire SQL file.
Parameters
----------
filename : :class:`str`
Name of the SQL file.
"""
with open(filename) as SQL:
for line in SQL:
self.parseLine(line)
[docs]
def parseLine(self, line):
"""Parse a single line from a SQL file.
Parameters
----------
line : :class:`str`
A single line from a SQL file.
Notes
-----
* Currently, the long description (``--/T``) is thrown out.
"""
log = self.logName('sdss.SDSS.parseLine')
l = line.strip()
for r in self._SQLre:
m = self._SQLre[r].match(l)
if m is not None:
g = m.groups()
if r == 'comment':
ti = self.tableIndex()
if g[0] == 'H':
log.debug("self.tapSchema['tables'][%d]['description'] += '%s'", ti, g[1])
self.tapSchema['tables'][ti]['description'] += g[1]
if g[0] == 'T':
log.debug("self.tapSchema['tables'][%d]['long_description'] += '%s'", ti, g[1])
# self.tapSchema['tables'][ti]['long_description'] += g[1]+'\n'
return
elif r == 'column':
col = g[0].lower()
if col in self._skip_columns:
log.debug("Skipping column %s.", col)
return
typ = g[1].strip('[]').lower()
try:
post_type = self._server2post[typ]
except KeyError:
post_type = typ
log.debug(" %s %s %s,", col, post_type, g[2])
log.debug("metadata = '%s'", g[3])
p, r = self.parseColumnMetadata(col, g[3])
p['table_name'] = self.table
if post_type == 'double precision':
p['datatype'] = 'double'
elif post_type.startswith('varchar'):
p['datatype'] = 'character'
p['size'] = int(post_type.split('(')[1].strip(')'))
else:
p['datatype'] = post_type
self.tapSchema['columns'].append(p)
if r is not None:
log.debug("self.mapping['%s'] = '%s'", col, r)
self.mapping[col] = r
return
return
[docs]
def fixNOFITS(self, filename):
"""Fix any missing data designated by ``--/F NOFITS`` using
the YAML configuration file `filename`.
Parameters
----------
filename : :class:`str`
Name of the YAML configuration file.
"""
log = self.logName('sdss.SDSS.fixNOFITS')
config = self._getYAML(filename)
if config is not None:
try:
self.NOFITS = config[self.schema][self.table]['NOFITS']
except KeyError:
log.debug("No instructions found.")
return
[docs]
def fixMapping(self, filename):
"""Fix any FITS to SQL mapping problems using the YAML configuration
file `filename`.
Parameters
----------
filename : :class:`str`
Name of the YAML configuration file.
"""
log = self.logName('sdss.SDSS.fixMapping')
config = self._getYAML(filename)
if config is not None:
try:
mapping = config[self.schema][self.table]['mapping']
except KeyError:
log.debug("No mappings found.")
return
for sc in mapping:
log.debug("self.mapping['%s'] = '%s'", sc, mapping[sc])
self.mapping[sc] = mapping[sc]
return
[docs]
def mapColumns(self):
"""Complete mapping of FITS table columns to SQL columns.
Raises
------
:exc:`KeyError`
If an expected mapping cannot be found.
"""
log = self.logName('sdss.SDSS.mapColumns')
drop = list()
for sc in self.colNames:
if sc in self.mapping:
#
# Make sure the column actually exists.
#
verify_mapping = False
mc = self.mapping[sc]
index = ''
if '[' in mc:
foo = mc.split('[')
mc = foo[0]
index = '[' + foo[1]
if mc in self.FITS:
log.debug("FITS: %s -> SQL: %s", self.mapping[sc], sc)
verify_mapping = True
else:
#
# See if there is a column containing underscores that
# could correspond to this mapping.
#
for fc in self.FITS:
for fcl in (fc.lower(), fc.lower().replace('_', ''),):
if fcl == mc.lower():
log.debug("FITS: %s%s -> SQL: %s", fc, index, sc)
self.mapping[sc] = fc + index
verify_mapping = True
if not verify_mapping:
msg = "Could not find a FITS column corresponding to %s!"
log.error(msg, sc)
raise KeyError(msg % sc)
else:
for fc in self.FITS:
for fcl in (fc.lower(), fc.lower().replace('_', ''),):
if fcl == sc:
log.debug("FITS: %s -> SQL: %s", fc, sc)
self.mapping[sc] = fc
break
if sc in self.mapping:
break
if sc not in self.mapping:
if self.random and sc == 'random_id':
log.info("Skipping %s which will be added by FITS2DB.",
sc)
elif sc in self.NOFITS:
if self.NOFITS[sc] == 'drop':
log.info("Dropping %s as requested.", sc)
drop.append(sc)
elif self.NOFITS[sc] == 'defer':
log.info("Column %s will be added in post-processing.", sc)
else:
msg = "Unknown NOFITS instruction: %s!"
log.error(msg, sc)
raise KeyError(msg % sc)
else:
msg = "Could not find a FITS column corresponding to %s!"
log.error(msg, sc)
raise KeyError(msg % sc)
#
# Remove SQL columns that were requested to be dropped.
#
for sc in drop:
i = self.columnIndex(sc)
log.debug("del self.tapSchema['columns'][%d]", i)
del self.tapSchema['columns'][i]
#
# Check for FITS columns that are NOT mapped to the SQL file.
#
for col in self.FITS:
if col in self.mapping.values():
log.debug("FITS column %s will be transferred to SQL.", col)
else:
col_array = re.compile(col + r'\[\d+\]')
if any([col_array.match(sc) is not None for sc in self.mapping.values()]):
log.debug("FITS column %s will be transferred to SQL.", col)
else:
log.warning("FITS column %s will be dropped from SQL!", col)
return
[docs]
def _photoFlag(self, column, table):
"""Handle photometric flags in SDSS data.
Parameters
----------
column : :class:`dict`
A TapSchema column definition.
table : :class:`astropy.table.Table`
Table containing the input data.
Returns
-------
:class:`numpy.ndarray`
The combined flags and flags2 data, or ``None`` if the
column did not match.
Raises
------
:exc:`AssertionError`
If the required columns are not present in the FITS file.
"""
log = self.logName('sdss.SDSS._photoFlag')
m = self._flagre.match(column['column_name'])
if m is not None:
g = m.groups()[0].replace('_', '')
if g:
#
# Ensure FLAGS and FLAGS2 are present.
#
band = 'ugriz'.index(g)
assert column['datatype'] == 'bigint'
assert self.mapping[column['column_name']].lower() == 'flags[{0:d}]'.format(band)
assert 'FLAGS' in table.colnames
assert 'FLAGS2' in table.colnames
log.debug("np.left_shift(table['FLAGS2'][:, %d].astype(np.int64), 32) | table['FLAGS'][:, %d].astype(np.int64)", band, band)
return (np.left_shift(table['FLAGS2'][:, band].astype(np.int64), 32) |
table['FLAGS'][:, band].astype(np.int64))
else:
#
# Ensure OBJC_FLAGS and OBJC_FLAGS2 are present.
#
assert column['datatype'] == 'bigint'
assert self.mapping[column['column_name']].lower() == 'objc_flags'
assert 'OBJC_FLAGS' in table.colnames
assert 'OBJC_FLAGS2' in table.colnames
log.debug("np.left_shift(table['OBJC_FLAGS2'].astype(np.int64), 32) | table['OBJC_FLAGS'].astype(np.int64)")
return (np.left_shift(table['OBJC_FLAGS2'].astype(np.int64), 32) |
table['OBJC_FLAGS'].astype(np.int64))
return None
[docs]
def processFITS(self, hdu=1, overwrite=False):
"""Convert a pre-processed FITS file into one ready for database loading.
Parameters
----------
hdu : :class:`int`, optional
Read data from this HDU (default 1).
overwrite : :class:`bool`, optional
If ``True``, remove any existing file.
Returns
-------
:class:`str`
The name of the file written.
Raises
------
:exc:`ValueError`
If the FITS data type cannot be converted to SQL.
"""
log = self.logName('sdss.SDSS.processFITS')
out = "{0.schema}.{0.table}.fits".format(self)
if os.path.exists(out) and not overwrite:
log.info("Using existing file: %s.", out)
return out
if os.path.exists(out):
log.info("Removing existing file: %s.", out)
os.remove(out)
type_map = {'bigint': ('K', 'J', 'I', 'B'),
'integer': ('J', 'I', 'B'),
'smallint': ('I', 'B'),
'boolean': ('L'),
'double': ('D', 'E'),
'real': ('E',),
'character': ('A',)}
np_map = {'bigint': np.int64,
'integer': np.int32,
'smallint': np.int16,
'boolean': bool,
'double': np.float64,
'real': np.float32}
safe_conversion = {('J', 'smallint'): 2**15,
('A', 'smallint'): 2**15,
('A', 'integer'): 2**31,
('A', 'bigint'): 2**63}
rebase = re.compile(r'^(\d+)(\D+)')
columns = [c for c in self.tapSchema['columns']
if c['table_name'] == self.table]
old = Table.read(self._inputFITS, hdu=hdu)
new = Table()
for col in columns:
if self.random and col['column_name'] == 'random_id':
log.info("Creating %s column using numpy.random.random().",
col['column_name'])
stime = int(time.time())
log.debug('np.random.seed(%s)', stime)
np.random.seed(stime)
log.debug("new['%s'] = np.random.random((%d,)).astype(%s)",
col['column_name'], len(old), str(np_map[col['datatype']]))
new[col['column_name']] = 100.0*np.random.random((len(old),)).astype(np_map[col['datatype']])
continue
if col['column_name'] in self.NOFITS:
log.info("Creating placeholder column %s for post-processing.",
col['column_name'])
log.debug("new['%s'] = np.zeros((%d,), dtype=%s)",
col['column_name'], len(old), str(np_map[col['datatype']]))
new[col['column_name']] = np.zeros((len(old),), dtype=np_map[col['datatype']])
continue
if 'flags' in col['column_name']:
flags64 = self._photoFlag(col, old)
if flags64 is not None:
log.info("Combining photo flags for %s", col['column_name'])
new[col['column_name']] = flags64
continue
fcol = self.mapping[col['column_name']]
index = None
if '[' in fcol:
foo = fcol.split('[')
fcol = foo[0]
index = int(foo[1].strip(']'))
ftype = self.FITS[fcol]
fbasetype = rebase.sub(r'\2', ftype)
if fbasetype == type_map[col['datatype']][0]:
log.debug("Type match for %s -> %s.", fcol, col['column_name'])
if index is not None:
log.debug("new['%s'] = old['%s'][:, %d]",
col['column_name'], fcol, index)
new[col['column_name']] = old[fcol][:, index]
else:
log.debug("new['%s'] = old['%s']", col['column_name'], fcol)
new[col['column_name']] = old[fcol]
elif fbasetype in type_map[col['datatype']]:
log.debug("Safe type conversion possible for %s (%s) -> %s (%s).",
fcol, fbasetype, col['column_name'], col['datatype'])
if index is not None:
log.debug("new['%s'] = old['%s'][:, %d].astype(%s)",
col['column_name'], fcol, index,
str(np_map[col['datatype']]))
new[col['column_name']] = old[fcol][:, index].astype(np_map[col['datatype']])
else:
log.debug("new['%s'] = old['%s'].astype(%s)",
col['column_name'], fcol,
str(np_map[col['datatype']]))
new[col['column_name']] = old[fcol].astype(np_map[col['datatype']])
else:
if (fbasetype, col['datatype']) in safe_conversion:
limit = safe_conversion[(fbasetype, col['datatype'])]
if fbasetype == 'A':
try:
old[fcol].fill_value = b'0'
except AttributeError: # This can happen during testing.
pass
log.debug("String to integer conversion required for %s -> %s.", fcol, col['column_name'])
width = int(str(old[fcol].dtype).split(old[fcol].dtype.kind)[1])
blank = ' '*width
w = np.nonzero(old[fcol] == blank)[0]
if len(w) > 0:
log.debug("old['%s'][old['%s'] == blank] = blank[0:%d] + '0'",
fcol, fcol, width - 1)
old[fcol][w] = blank[0:(width-1)] + '0'
log.debug("test_old = old['%s'].astype(np.int64)", fcol)
try:
test_old = old[fcol].astype(np.int64)
except OverflowError:
log.debug("Attempting string to quasi-unsigned integer conversion for %s -> %s.",
fcol, col['column_name'])
uold = old[fcol].astype(np.uint64)
hi = np.nonzero(uold >= 2**63)[0]
lo = np.nonzero(uold < 2**63)[0]
test_old = np.zeros(uold.shape, dtype=np.int64)
test_old[lo] = uold[lo]
test_old[hi] = (uold[hi] - 2**63).astype(np.int64) - 2**63
else:
if index is not None:
test_old = old[fcol][:, index]
else:
test_old = old[fcol]
if ((test_old >= -limit) & (test_old <= limit - 1)).all():
if (fbasetype, col['datatype']) == ('A', 'bigint'):
log.debug("new['%s'] = test_old # quasi-unsigned integer", col['column_name'])
new[col['column_name']] = test_old
else:
if index is not None:
log.debug("new['%s'] = old['%s'][:, %d].astype(%s)", col['column_name'], fcol, index, str(np_map[col['datatype']]))
new[col['column_name']] = old[fcol][:, index].astype(np_map[col['datatype']])
else:
log.debug("new['%s'] = old['%s'].astype(%s)", col['column_name'], fcol, str(np_map[col['datatype']]))
new[col['column_name']] = old[fcol].astype(np_map[col['datatype']])
else:
msg = "Values too large for safe data type conversion for %s (%s) -> %s (%s)!"
log.error(msg, fcol, fbasetype, col['column_name'], col['datatype'])
raise ValueError(msg % (fcol, fbasetype, col['column_name'], col['datatype']))
else:
msg = "No safe data type conversion possible for %s (%s) -> %s (%s)!"
log.error(msg, fcol, fbasetype, col['column_name'], col['datatype'])
raise ValueError(msg % (fcol, fbasetype, col['column_name'], col['datatype']))
if fbasetype in ('D', 'E'):
new[col['column_name']][~np.isfinite(new[col['column_name']])] = -9999.0
log.debug("new.write('%s')", out)
new.write(out)
return out
[docs]
def writeSQL(self, filename):
"""Write the CREATE TABLE statement to `filename`, along with any
SQL commands to pre-load.
Parameters
----------
filename : :class:`str`
Name of the SQL file.
"""
template = self.env.get_template('sdss_preload.sql')
with open(filename, 'w') as POST:
POST.write(template.render(schema=self.schema))
POST.write('\n--\n-- Create {0.schema}.{0.table}\n--\n'.format(self))
POST.write(self.createSQL())
[docs]
def writePOSTSQL(self, filename, pkey='objid'):
"""Write additional SQL commands needed after loading the table itself.
Parameters
----------
filename : :class:`str`
Name of the SQL file.
pkey : :class:`str`, optional
Name of the PRIMARY KEY column (default 'objid').
"""
template = self.env.get_template('sdss_postload.sql')
with open(filename, 'w') as POST:
POST.write(template.render(schema=self.schema, table=self.table,
pkey=pkey, join=self.join))
[docs]
def get_options():
"""Parse command-line options.
Returns
-------
:class:`argparse.Namespace`
The parsed options.
"""
parser = ArgumentParser(description=__doc__.split("\n")[-2],
prog=os.path.basename(sys.argv[0]))
parser.add_argument('-c', '--configuration', dest='config', metavar='FILE',
default=resource_filename('digestor', 'data/sdss.yaml'),
help='Read table-specific configuration from FILE.')
parser.add_argument('-d', '--schema-description', dest='description',
metavar='TEXT',
default='Sloan Digital Sky Survey Data Release 14',
help='Short description of the schema.')
parser.add_argument('-E', '--no-ecliptic', dest='ecliptic', action='store_false',
help='Do not add ecliptic coordinates.')
parser.add_argument('-e', '--extension', dest='hdu', metavar='N',
type=int, default=1,
help='Read data from FITS HDU N (default %(default)s).')
parser.add_argument('-G', '--no-galactic', dest='galactic', action='store_false',
help='Do not add galactic coordinates.')
parser.add_argument('-J', '--no-join', dest='join', action='store_false',
help='Do not add sdss_joinid column.')
parser.add_argument('-j', '--output-json', dest='output_json', metavar='FILE',
help='Write table metadata to FILE.')
parser.add_argument('-k', '--keep', action='store_true',
help='Do not overwrite any existing intermediate files.')
parser.add_argument('-l', '--log', dest='log', metavar='FILE',
help='Log operations to FILE.')
parser.add_argument('-m', '--merge', dest='merge_json', metavar='FILE',
help='Merge metadata in FILE into final metadata output.')
parser.add_argument('-o', '--output-sql', dest='output_sql', metavar='FILE',
help='Write table definition to FILE.')
parser.add_argument('-p', '--primary-key', dest='pkey', metavar='COLUMN',
default='objid',
help='COLUMN is primary key (default %(default)s).')
parser.add_argument('-P', '--no-pixels', dest='pixels', action='store_false',
help='Do not add HTM & HEALPix columns.')
parser.add_argument('-r', '--ra', dest='ra', metavar='COLUMN', default='ra',
help='Right Ascension is in COLUMN (default %(default)s).')
parser.add_argument('-R', '--no-random', dest='random', action='store_false',
help='Do not add a random_id column.')
parser.add_argument('-s', '--schema', metavar='SCHEMA',
default='sdss_dr14',
help='Define table with this schema (default %(default)s).')
parser.add_argument('-t', '--table', metavar='TABLE',
help='Set the table name.')
parser.add_argument('-v', '--verbose', action='store_true',
help='Print extra information.')
parser.add_argument('fits', help='FITS file to convert.')
parser.add_argument('sql', help='SQL file to convert.')
return parser.parse_args()
[docs]
def main():
"""Entry-point for command-line script.
Returns
-------
:class:`int`
An integer suitable for passing to :func:`sys.exit`.
"""
options = get_options()
if not os.path.exists(options.fits):
print("%s does not exist!" % options.fits, file=sys.stderr)
return 1
if not os.path.exists(options.sql):
p = resource_filename('digestor', 'data/' + options.sql)
if os.path.exists(p):
options.sql = p
else:
print("%s does not exist!" % options.sql, file=sys.stderr)
return 1
if options.table is None:
options.table = os.path.splitext(os.path.basename(options.fits))[0]
if options.output_sql is None:
options.output_sql = os.path.join(os.path.dirname(options.fits),
"%s.%s.sql" % (options.schema, options.table))
if options.output_json is None:
options.output_json = options.output_sql.replace('sql', 'json')
if options.log is None:
options.log = options.output_sql.replace('sql', 'log')
try:
sdss = SDSS(options.schema, options.table,
description=options.description,
merge=options.merge_json,
pixels=options.pixels,
random=options.random,
ecliptic=options.ecliptic,
galactic=options.galactic,
join=options.join)
except ValueError as e:
#
# ValueError indicates failure to process a merge file.
#
print(str(e))
return 1
sdss.configureLog(options.log, options.verbose)
log = sdss.logName('sdss.main')
# ts = datetime.utcnow().replace(tzinfo=utc).strftime('%Y-%m-%dT%H:%M:%S %Z')
log.debug("options.fits = '%s'", options.fits)
log.debug("options.sql = '%s'", options.sql)
log.debug("options.schema = '%s'", options.schema)
log.debug("options.table = '%s'", options.table)
log.debug("options.output_sql = '%s'", options.output_sql)
log.debug("options.output_json = '%s'", options.output_json)
log.debug("options.log = '%s'", options.log)
#
# Preprocess the FITS file.
#
sdss.customSTILTS(options.config)
try:
dlfits = sdss.addDLColumns(options.fits, ra=options.ra,
overwrite=(not options.keep))
except ValueError as e:
log.error(str(e))
return 1
sdss.parseFITS(dlfits, hdu=options.hdu)
#
# Read the SQL file.
#
sdss.parseSQL(options.sql)
#
# Map the FITS columns to table columns.
#
sdss.fixNOFITS(options.config)
sdss.fixMapping(options.config)
try:
sdss.mapColumns()
except KeyError as k:
return 1
#
# Fix any table definition problems and sort the columns.
#
sdss.fixColumns(options.config)
try:
sdss.sortColumns()
except AssertionError as e:
log.error(str(e))
return 1
#
# Write the SQL files.
#
sdss.writeSQL(options.output_sql)
sdss.writePOSTSQL(options.output_sql.replace('.sql', '_post.sql'),
pkey=options.pkey)
#
# Write the JSON file.
#
sdss.writeTapSchema(options.output_json)
#
# Sort the FITS data table to match the columns. Do this last so that
# if it crashes, we at least have the SQL and JSON files.
#
try:
pgfits = sdss.processFITS(hdu=options.hdu,
overwrite=(not options.keep))
except ValueError as e:
return 1
# except Exception as e:
# log.error(str(e))
# return 2
return 0