diff --git a/py/specprodDB/attic/zpix_target.txt b/py/specprodDB/attic/zpix_target.txt new file mode 100644 index 0000000..11176b4 --- /dev/null +++ b/py/specprodDB/attic/zpix_target.txt @@ -0,0 +1,155 @@ +def zpix_target(specprod): + """Replace targeting bitmasks in the redshift tables for `specprod`. + + Parameters + ---------- + specprod : :class:`str` + The spectroscopic production, normally the value of :envvar:`SPECPROD`. + """ + specprod_survey_program = {'fuji': {'cmx': ('other', ), + 'special': ('dark', ), + 'sv1': ('backup', 'bright', 'dark', 'other'), + 'sv2': ('backup', 'bright', 'dark'), + 'sv3': ('backup', 'bright', 'dark')}, + 'guadalupe': {'special': ('bright', 'dark'), + 'main': ('bright', 'dark')}, + 'iron': {'cmx': ('other', ), + 'main': ('backup', 'bright', 'dark'), + 'special': ('backup', 'bright', 'dark', 'other'), + 'sv1': ('backup', 'bright', 'dark', 'other'), + 'sv2': ('backup', 'bright', 'dark'), + 'sv3': ('backup', 'bright', 'dark')}} + target_bits = {'cmx': {'cmx': Target.cmx_target}, + 'sv1': {'desi': Target.sv1_desi_target, 'bgs': Target.sv1_bgs_target, 'mws': Target.sv1_mws_target}, + 'sv2': {'desi': Target.sv2_desi_target, 'bgs': Target.sv2_bgs_target, 'mws': Target.sv2_mws_target}, + 'sv3': {'desi': Target.sv3_desi_target, 'bgs': Target.sv3_bgs_target, 'mws': Target.sv3_mws_target}, + 'main': {'desi': Target.desi_target, 'bgs': Target.bgs_target, 'mws': Target.mws_target}, + 'special': {'desi': Target.desi_target, 'bgs': Target.bgs_target, 'mws': Target.mws_target}} + # + # Find targetid assigned to multiple tiles. + # + assigned_multiple_tiles = dict() + for survey in specprod_survey_program[specprod]: + assigned_multiple_tiles[survey] = dict() + for program in specprod_survey_program[specprod][survey]: + assigned_multiple_tiles[survey][program] = dbSession.query(Target.targetid).join(Fiberassign, + and_(Target.targetid == Fiberassign.targetid, + Target.tileid == Fiberassign.tileid)).filter(Target.survey == survey).filter(Target.program == program).group_by(Target.targetid).having(func.count(Target.tileid) > 1) + # + # From that set, find cases targetid and a targeting bit are distinct pairs. + # + distinct_target = dict() + for survey in assigned_multiple_tiles: + distinct_target[survey] = dict() + for program in assigned_multiple_tiles[survey]: + distinct_target[survey][program] = dict() + for bits in target_bits[survey]: + distinct_target[survey][program][bits] = dbSession.query(Target.targetid, target_bits[survey][bits]).filter(Target.targetid.in_(assigned_multiple_tiles[survey][program])).filter(Target.survey == survey).filter(Target.program == program).distinct().subquery() + # + # Obtain the list of targetids where a targeting bit appears more than once with different values. + # + multiple_target = dict() + for survey in distinct_target: + multiple_target[survey] = dict() + for program in distinct_target[survey]: + multiple_target[survey][program] = dict() + for bits in distinct_target[survey][program]: + if survey.startswith('sv'): + column = getattr(distinct_target[survey][program][bits].c, f"{survey}_{bits}_target") + elif survey == 'cmx': + column = distinct_target[survey][program][bits].c.cmx_target + else: + column = getattr(distinct_target[survey][program][bits].c, f"{bits}_target") + multiple_target[survey][program][bits] = [row[0] for row in dbSession.query(distinct_target[survey][program][bits].c.targetid).group_by(distinct_target[survey][program][bits].c.targetid).having(func.count(column) > 1).all()] + # + # Consolidate the list of targetids. + # + targetids_to_fix = dict() + for survey in multiple_target: + for program in multiple_target[survey]: + for bits in multiple_target[survey][program]: + if multiple_target[survey][program][bits]: + if survey not in targetids_to_fix: + targetids_to_fix[survey] = dict() + if program in targetids_to_fix[survey]: + log.debug("targetids_to_fix['%s']['%s'] += multiple_target['%s']['%s']['%s']", + survey, program, survey, program, bits) + targetids_to_fix[survey][program] += multiple_target[survey][program][bits] + else: + log.debug("targetids_to_fix['%s']['%s'] = multiple_target['%s']['%s']['%s']", + survey, program, survey, program, bits) + targetids_to_fix[survey][program] = multiple_target[survey][program][bits] + # + # ToO observations that had targeting bits zeroed out. + # + if specprod == 'fuji': + # + # Maybe this problem only affects fuji, but need to confirm that. + # + zero_ToO = dict() + for survey in specprod_survey_program[specprod]: + zero_ToO[survey] = dict() + for program in specprod_survey_program[specprod][survey]: + zero_ToO[survey][program] = [row[0] for row in dbSession.query(Zpix.targetid).filter((Zpix.targetid.op('&')((2**16 - 1) << 42)).op('>>')(42) == 9999).filter(Zpix.survey == survey).filter(Zpix.program == program).all()] + for survey in zero_ToO: + for program in zero_ToO[survey]: + if zero_ToO[survey][program]: + if survey not in targetids_to_fix: + targetids_to_fix[survey] = dict() + if program in targetids_to_fix[survey]: + log.debug("targetids_to_fix['%s']['%s'] += zero_ToO['%s']['%s']", + survey, program, survey, program) + targetids_to_fix[survey][program] += zero_ToO[survey][program] + else: + log.debug("targetids_to_fix['%s']['%s'] = zero_ToO['%s']['%s']", + survey, program, survey, program) + targetids_to_fix[survey][program] = zero_ToO[survey][program] + # + # Generate the query to obtain the bitwise-or of each targeting bit. + # + # table = 'zpix' + surveys = ('', 'sv1', 'sv2', 'sv3') + programs = ('desi', 'bgs', 'mws', 'scnd') + masks = ['cmx_target'] + [('_'.join(p) if p[0] else p[1]) + '_target' + for p in itertools.product(surveys, programs)] + bit_or_query = dict() + for survey in targetids_to_fix: + bit_or_query[survey] = dict() + for program in targetids_to_fix[survey]: + log.debug("SELECT t.targetid, " + + ', '.join([f"BIT_OR(t.{m}) AS {m}" for m in masks]) + + f" FROM {specprod}.target AS t WHERE t.targetid IN ({', '.join(map(str, set(targetids_to_fix[survey][program])))}) AND t.survey = '{survey}' AND t.program = '{program}' GROUP BY t.targetid;") + bq = ("dbSession.query(Target.targetid, " + + ', '.join([f"func.bit_or(Target.{m}).label('{m}')" for m in masks]) + + f").filter(Target.targetid.in_([{', '.join(map(str, set(targetids_to_fix[survey][program])))}])).filter(Target.survey == '{survey}').filter(Target.program == '{program}').group_by(Target.targetid)") + log.debug(bq) + bit_or_query[survey][program] = eval(bq) + # + # Apply the updates + # + # update_string = '{' + ', '.join([f"Zpix.{m}: {{0.{m}:d}}" for m in masks]) + '}' + for survey in bit_or_query: + for program in bit_or_query[survey]: + for row in bit_or_query[survey][program].all(): + zpix_match = dbSession.query(Zpix).filter(Zpix.targetid == row.targetid).filter(Zpix.survey == survey).filter(Zpix.program == program).one() + for m in masks: + log.info("%s.%s = %s", zpix_match, m, str(getattr(row, m))) + zpix_match.cmx_target = row.cmx_target + zpix_match.desi_target = row.desi_target + zpix_match.bgs_target = row.bgs_target + zpix_match.mws_target = row.mws_target + zpix_match.scnd_target = row.scnd_target + zpix_match.sv1_desi_target = row.sv1_desi_target + zpix_match.sv1_bgs_target = row.sv1_bgs_target + zpix_match.sv1_mws_target = row.sv1_mws_target + zpix_match.sv1_scnd_target = row.sv1_scnd_target + zpix_match.sv2_desi_target = row.sv2_desi_target + zpix_match.sv2_bgs_target = row.sv2_bgs_target + zpix_match.sv2_mws_target = row.sv2_mws_target + zpix_match.sv2_scnd_target = row.sv2_scnd_target + zpix_match.sv3_desi_target = row.sv3_desi_target + zpix_match.sv3_bgs_target = row.sv3_bgs_target + zpix_match.sv3_mws_target = row.sv3_mws_target + zpix_match.sv3_scnd_target = row.sv3_scnd_target + dbSession.commit() + return diff --git a/py/specprodDB/data/load_specprod_db.ini b/py/specprodDB/data/load_specprod_db.ini index 40f9f09..192b8c9 100644 --- a/py/specprodDB/data/load_specprod_db.ini +++ b/py/specprodDB/data/load_specprod_db.ini @@ -7,9 +7,9 @@ # This is intended to configure (meta)data releated to the load process itself. # username = desi_admin -hostname = specprod-db.desi.lbl.gov +# hostname = specprod-db.desi.lbl.gov +hostname = db2-loadbalancer.specprod.production.svc.spin.nersc.org chunksize = 50000 -maxrows = 0 [fuji] # diff --git a/py/specprodDB/load.py b/py/specprodDB/load.py index 0f0af43..1ea7684 100644 --- a/py/specprodDB/load.py +++ b/py/specprodDB/load.py @@ -268,6 +268,8 @@ def convert(cls, data, row_index=None): data_column = ((data[row_index]['RELEASE'].data.astype(np.int64) << 40) | (data[row_index]['BRICKID'].data.astype(np.int64) << 16) | (data[row_index]['BRICK_OBJID'].data.astype(np.int64))).tolist() + elif column.name == 'gaia_astrometric_params_solved' and data[column.name.upper()].dtype.kind != 'i': + data_column = data[column.name.upper()][row_index].data.astype(np.int16).tolist() elif column.name in expand_dchisq: j = expand_dchisq.index(column.name) data_column = data['DCHISQ'][row_index, j].tolist() @@ -323,17 +325,17 @@ def __repr__(self): return "Target(targetid={0.targetid:d}, tileid={0.tileid:d}, survey='{0.survey}')".format(self) @classmethod - def convert(cls, data, survey, tileid, row_index=None): + def convert(cls, data, survey=None, tileid=None, row_index=None): """Convert `data` into ORM objects ready for loading. Parameters ---------- data : :class:`~astropy.table.Table` Data table to convert. - survey : :class:`str` - Survey name. - tileid : :class:`int` - Tile ID number. + survey : :class:`str`, optional + Survey name. If not set, it will be obtained from `data`. + tileid : :class:`int`, optional + Tile ID number. If not set, it will be obtained from `data`. row_index : :class:`numpy.ndarray`, optional Only convert the rows indexed by `row_index`. If not specified, convert all rows. @@ -342,15 +344,33 @@ def convert(cls, data, survey, tileid, row_index=None): ------- :class:`list` A list of ORM objects. + + Raises + ------ + KeyError + If `survey` or `tileid` are not set and could not be obtained from `data`. """ if row_index is None: row_index = np.arange(len(data)) if len(row_index) == 0: return [] + check_columns = {'survey': survey, 'tileid': tileid} + for column in check_columns: + if check_columns[column] is None: + if column.upper() in data.colnames: + log.info("Obtaining '%s' from input data file.", column) + else: + msg = "Could not obtain '%s' from input data file." + log.critical(msg, column) + raise KeyError(msg % (column, )) data_columns = list() for column in cls.__table__.columns: if column.name == 'id': - id0 = np.array([surveyid(survey) << 32 | tileid]*len(row_index), dtype=np.int64) + if survey is None or tileid is None: + s = np.array([surveyid(s) for s in data['SURVEY'][row_index].tolist()], dtype=np.int64) + id0 = s << 32 | data['TILEID'].astype(np.int64) + else: + id0 = np.array([surveyid(survey) << 32 | tileid]*len(row_index), dtype=np.int64) data_column = [i0 << 64 | i1 for i0, i1 in zip(id0.tolist(), data['TARGETID'].tolist())] else: data_column = data[column.name.upper()][row_index].tolist() @@ -681,15 +701,15 @@ def __repr__(self): return "Fiberassign(tileid={0.tileid:d}, targetid={0.targetid:d}, location={0.location:d})".format(self) @classmethod - def convert(cls, data, tileid, row_index=None): + def convert(cls, data, tileid=None, row_index=None): """Convert `data` into ORM objects ready for loading. Parameters ---------- data : :class:`~astropy.table.Table` Data table to convert. - tileid : :class:`int` - Tile ID number. + tileid : :class:`int`, optional + Tile ID number. If not set, it will be obtained from `data`. row_index : :class:`numpy.ndarray`, optional Only convert the rows indexed by `row_index`. If not specified, convert all rows. @@ -698,11 +718,22 @@ def convert(cls, data, tileid, row_index=None): ------- :class:`list` A list of ORM objects. + + Raises + ------ + KeyError + If `tileid` is not set and could not be obtained from `data`. """ if row_index is None: row_index = np.arange(len(data)) if len(row_index) == 0: return [] + if tileid is None: + try: + tileid = data.meta['TILEID'] + except KeyError: + log.critical("Could not obtain 'TILEID' from metadata!") + raise data_columns = list() for column in cls.__table__.columns: if column.name == 'id': @@ -710,6 +741,12 @@ def convert(cls, data, tileid, row_index=None): data_column = [(i0 << 64) | i1 for i0, i1 in zip(id0.tolist(), data['TARGETID'][row_index].tolist())] elif column.name == 'tileid': data_column = [tileid]*len(row_index) + elif column.name == 'plate_ra' and 'PLATE_RA' not in data.colnames: + # This will usually be ignored, because plate_ra is not necessarily a database column. + data_column = data['TARGET_RA'][row_index].tolist() + elif column.name == 'plate_dec' and 'PLATE_DEC' not in data.colnames: + # This will usually be ignored, because plate_dec is not necessarily a database column. + data_column = data['TARGET_DEC'][row_index].tolist() else: data_column = data[column.name.upper()][row_index].tolist() data_columns.append(data_column) @@ -738,15 +775,15 @@ def __repr__(self): return "Potential(tileid={0.tileid:d}, targetid={0.targetid:d}, location={0.location:d})".format(self) @classmethod - def convert(cls, data, tileid, row_index=None): + def convert(cls, data, tileid=None, row_index=None): """Convert `data` into ORM objects ready for loading. Parameters ---------- data : :class:`~astropy.table.Table` Data table to convert. - tileid : :class:`int` - Tile ID number. + tileid : :class:`int`, optional + Tile ID number. If not set, it will be obtained from `data`. row_index : :class:`numpy.ndarray`, optional Only convert the rows indexed by `row_index`. If not specified, convert all rows. @@ -755,11 +792,22 @@ def convert(cls, data, tileid, row_index=None): ------- :class:`list` A list of ORM objects. + + Raises + ------ + KeyError + If `tileid` is not set and could not be obtained from `data`. """ if row_index is None: row_index = np.arange(len(data)) if len(row_index) == 0: return [] + if tileid is None: + try: + tileid = data.meta['TILEID'] + except KeyError: + log.critical("Could not obtain 'TILEID' from metadata!") + raise data_columns = list() for column in cls.__table__.columns: if column.name == 'id': @@ -900,6 +948,87 @@ def __table_args__(cls): def __repr__(self): return "Zpix(targetid={0.targetid:d}, survey='{0.survey}', program='{0.program}')".format(self) + @classmethod + def convert(cls, data, survey=None, program=None, row_index=None): + """Convert `data` into ORM objects ready for loading. + + Parameters + ---------- + data : :class:`~astropy.table.Table` + Data table to convert. + survey : :class:`str`, optional + Survey name. If not set, it will be obtained from `data`. + program : :class:`str`, optional + Program name. If not set, it will be obtained from `data`. + row_index : :class:`numpy.ndarray`, optional + Only convert the rows indexed by `row_index`. If not specified, + convert all rows. + spgrp : :class:`str`, optional + Normally this will be set to the default value: 'cumulative'. + + Returns + ------- + :class:`list` + A list of ORM objects. + + Raises + ------ + KeyError + If `survey` or `program` are not set and the equivalent data + are not available in `data`. + """ + if row_index is None: + row_index = np.arange(len(data)) + if len(row_index) == 0: + return [] + default_columns = {'spgrp': 'healpix', + 'sv_nspec': 0, 'main_nspec': 0, 'zcat_nspec': 0, + 'sv_primary': False, 'main_primary': False, 'zcat_primary': False} + # + # Reductions like guadalupe may not have the full set of target bitmasks + # + surveys = ('', 'sv1', 'sv2', 'sv3') + programs = ('desi', 'bgs', 'mws', 'scnd') + masks = ['cmx_target'] + [('_'.join(p) if p[0] else p[1]) + '_target' + for p in itertools.product(surveys, programs)] + for mask in masks: + default_columns[mask] = 0 + check_columns = {'survey': survey, 'program': program} + for column in check_columns: + if check_columns[column] is None: + if column.upper() in data.colnames: + log.info("Obtaining '%s' from input data file.", column) + else: + msg = "Could not obtain '%s' from input data file." + log.critical(msg, column) + raise KeyError(msg % (column, )) + else: + default_columns[column] = check_columns[column] + data_columns = list() + for column in cls.__table__.columns: + if column.name == 'id': + if 'survey' in default_columns: + id0 = programid(program) << 32 | surveyid(survey) + else: + s = np.array([surveyid(s) for s in data['SURVEY']], dtype=np.int64) + p = np.array([programid(s) for s in data['PROGRAM']], dtype=np.int64) + id0 = p << 32 | s + data_column = [(i0 << 64) | i1 for i0, i1 in zip(id0.tolist(), data['TARGETID'][row_index].tolist())] + elif column.name == 'desiname': + data_column = radec_to_desiname(data['TARGET_RA'][row_index], data['TARGET_DEC'][row_index]).tolist() + elif column.name == 'spgrpval': + data_column = data['HEALPIX'][row_index].tolist() + elif column.name in default_columns and column.name.upper() not in data.colnames: + data_column = [default_columns[column.name]]*len(row_index) + elif column.name.startswith('coeff_'): + coeff_index = int(column.name.split('_')[1]) + data_column = data['COEFF'][row_index, coeff_index].tolist() + else: + data_column = data[column.name.upper()][row_index].tolist() + data_columns.append(data_column) + data_rows = list(zip(*data_columns)) + return [cls(**(dict([(col.name, dat) for col, dat in zip(cls.__table__.columns, row)]))) for row in data_rows] + class Ztile(SchemaMixin, Base): """Representation of the ``ZCATALOG`` table in ztile files. @@ -1007,7 +1136,7 @@ def __repr__(self): return "Ztile(targetid={0.targetid:d}, tileid={0.tileid:d}, spgrp='{0.spgrp}', spgrpval={0.spgrpval:d})".format(self) @classmethod - def convert(cls, data, survey, program, tileid, night, + def convert(cls, data, survey=None, program=None, tileid=None, night=None, row_index=None, spgrp='cumulative'): """Convert `data` into ORM objects ready for loading. @@ -1015,14 +1144,15 @@ def convert(cls, data, survey, program, tileid, night, ---------- data : :class:`~astropy.table.Table` Data table to convert. - survey : :class:`str` - Survey name. - program : :class:`str` - Program name. - tileid : :class:`int` - Tile ID number. - night : :class:`int` + survey : :class:`str`, optional + Survey name. If not set, it will be obtained from `data`. + program : :class:`str`, optional + Program name. If not set, it will be obtained from `data`. + tileid : :class:`int`, optional + Tile ID number. If not set, it will be obtained from `data`. + night : :class:`int`, optional Night number. This is loaded into the ``firstnight`` column. + If not set, it will be obtained from `data`. row_index : :class:`numpy.ndarray`, optional Only convert the rows indexed by `row_index`. If not specified, convert all rows. @@ -1034,10 +1164,16 @@ def convert(cls, data, survey, program, tileid, night, :class:`list` A list of ORM objects. + Raises + ------ + KeyError + If `survey`, `program`, `tileid` or `night` are not set and the + equivalent data are not available in `data`. + Notes ----- - * This method currently assumes that `data` comes from one and only one - tile, which is represented by `tileid`. + * If `tileid` is set, this method assumes `data` comes from one and only one + tile. * The above has a secondary assumption that, at least for cumulative tile-based spectra, the first night is the same for all spectra. * `night` becomes ``firstnight``, while ``spgrpval`` is equivalent to @@ -1047,20 +1183,36 @@ def convert(cls, data, survey, program, tileid, night, row_index = np.arange(len(data)) if len(row_index) == 0: return [] - data_columns = list() - default_columns = {'spgrp': spgrp, 'survey': survey, 'program': program, 'firstnight': night, + default_columns = {'spgrp': spgrp, 'sv_nspec': 0, 'main_nspec': 0, 'zcat_nspec': 0, 'sv_primary': False, 'main_primary': False, 'zcat_primary': False} + check_columns = {'survey': survey, 'program': program, + 'tileid': tileid, 'firstnight': night} + for column in check_columns: + if check_columns[column] is None: + if column.upper() in data.colnames: + log.info("Obtaining '%s' from input data file.", column) + else: + msg = "Could not obtain '%s' from input data file." + log.critical(msg, column) + raise KeyError(msg % (column, )) + else: + default_columns[column] = check_columns[column] + data_columns = list() for column in cls.__table__.columns: if column.name == 'id': id0 = spgrpid(spgrp) << 27 | data['SPGRPVAL'][row_index].base.astype(np.int64) data_column = [(i0 << 64) | i1 for i0, i1 in zip(id0.tolist(), data['TARGETID'][row_index].tolist())] elif column.name == 'targetphotid': - id0 = np.array([surveyid(survey) << 32 | tileid]*len(row_index), dtype=np.int64) + if 'survey' in default_columns: + id0 = np.array([surveyid(survey) << 32 | tileid]*len(row_index), dtype=np.int64) + else: + s = np.array([surveyid(s) for s in data['SURVEY'][row_index].tolist()], dtype=np.int64) + id0 = s << 32 | data['TILEID'][row_index].astype(np.int64) data_column = [(i0 << 64) | i1 for i0, i1 in zip(id0.tolist(), data['TARGETID'][row_index].tolist())] elif column.name == 'desiname': data_column = radec_to_desiname(data['TARGET_RA'][row_index], data['TARGET_DEC'][row_index]).tolist() - elif column.name in default_columns: + elif column.name in default_columns and column.name.upper() not in data.colnames: data_column = [default_columns[column.name]]*len(row_index) elif column.name.startswith('coeff_'): coeff_index = int(column.name.split('_')[1]) @@ -1072,167 +1224,7 @@ def convert(cls, data, survey, program, tileid, night, return [cls(**(dict([(col.name, dat) for col, dat in zip(cls.__table__.columns, row)]))) for row in data_rows] -def _frameid(data): - """Update the ``frameid`` column. - - Parameters - ---------- - data : :class:`astropy.table.Table` - The initial data read from the file. - - Returns - ------- - :class:`astropy.table.Table` - Updated data table. - """ - frameid = 100*data['EXPID'] + np.array([cameraid(c) for c in data['CAMERA']], dtype=data['EXPID'].dtype) - data.add_column(frameid, name='FRAMEID', index=0) - return data - - -def _tileid(data): - """Update the ``tileid`` column. Also check for the presence of ``PLATE_RA``, ``PLATE_DEC``. - - Parameters - ---------- - data : :class:`astropy.table.Table` - The initial data read from the file. - - Returns - ------- - :class:`astropy.table.Table` - Updated data table. - """ - try: - tileid = data.meta['TILEID']*np.ones(len(data), dtype=np.int32) - except KeyError: - log.error("Could not find TILEID in metadata!") - raise - data.add_column(tileid, name='TILEID', index=0) - if 'TARGET_RA' in data.colnames and 'PLATE_RA' not in data.colnames: - log.debug("Adding PLATE_RA, PLATE_DEC.") - data['PLATE_RA'] = data['TARGET_RA'] - data['PLATE_DEC'] = data['TARGET_DEC'] - id0 = data['LOCATION'].base.astype(np.int64) << 32 | data['TILEID'].base.astype(np.int64) - composite_id = np.array([id0, data['TARGETID'].base]).T - data.add_column(composite_id, name='ID', index=0) - return data - - -def _survey_program(data): - """Add ``SURVEY``, ``PROGRAM``, ``SPGRP`` columns to zpix and ztile tables. - - Parameters - ---------- - data : :class:`astropy.table.Table` - The initial data read from the file. - - Returns - ------- - :class:`astropy.table.Table` - Updated data table. - - Raises - ------ - KeyError - If a necessary header could not be found. - """ - # for i, key in enumerate(('SURVEY', 'PROGRAM', 'SPGRP')): - for i, key in enumerate(('SURVEY', 'PROGRAM')): - if key in data.colnames: - log.info("Column %s is already in the table.", key) - else: - try: - val = data.meta[key] - except KeyError: - log.error("Could not find %s in metadata!", key) - raise - log.debug("Adding %s column.", key) - data.add_column(np.array([val]*len(data)), name=key, index=i+1) - # objid, brickid, release, mock, sky, gaiadr = decode_targetid(data['TARGETID']) - # data.add_column(sky, name='SKY', index=0) - if 'FIRSTNIGHT' not in data.colnames: - log.info("Adding FIRSTNIGHT column") - data.add_column(np.array([0]*len(data), dtype=np.int32), name='FIRSTNIGHT', index=data.colnames.index('PROGRAM')+1) - if 'LASTNIGHT' not in data.colnames: - log.info("Adding LASTNIGHT column") - data.add_column(np.array([0]*len(data), dtype=np.int32), name='LASTNIGHT', index=data.colnames.index('PROGRAM')+2) - if 'MAIN_NSPEC' not in data.colnames: - data.add_column(np.array([0]*len(data), dtype=np.int16), name='MAIN_NSPEC', index=data.colnames.index('SV_PRIMARY')+1) - data.add_column(np.array([False]*len(data), dtype=np.int16), name='MAIN_PRIMARY', index=data.colnames.index('MAIN_NSPEC')+1) - if 'SV_NSPEC' not in data.colnames: - data.add_column(np.array([0]*len(data), dtype=np.int16), name='SV_NSPEC', index=data.colnames.index('TSNR2_LRG')+1) - data.add_column(np.array([False]*len(data), dtype=np.int16), name='SV_PRIMARY', index=data.colnames.index('SV_NSPEC')+1) - # - # Reductions like guadalupe may not have the full set of target bitmasks - # - surveys = ('', 'sv1', 'sv2', 'sv3') - programs = ('desi', 'bgs', 'mws', 'scnd') - masks = ['cmx_target'] + [('_'.join(p) if p[0] else p[1]) + '_target' - for p in itertools.product(surveys, programs)] - mask_index = data.colnames.index('NUMOBS_INIT') + 1 - for mask in masks: - if mask.upper() not in data.colnames: - log.info("Adding %s at index %d.", mask.upper(), mask_index) - data.add_column(np.array([0]*len(data), dtype=np.int64), name=mask.upper(), index=mask_index) - mask_index += 1 - if 'TILEID' in data.colnames: - data.add_column(np.array(['cumulative']*len(data)), name='SPGRP', index=data.colnames.index('PROGRAM')+1) - data = _target_unique_id(data) - data.rename_column('ID', 'TARGETPHOTID') - s = np.array([spgrpid(s) for s in data['SPGRP']], dtype=np.int64) - id0 = (s << 27 | data['SPGRPVAL'].base.astype(np.int64)) << 32 | data['TILEID'].base.astype(np.int64) - else: - data.add_column(np.array(['healpix']*len(data)), name='SPGRP', index=data.colnames.index('PROGRAM')+1) - s = np.array([surveyid(s) for s in data['SURVEY']], dtype=np.int64) - p = np.array([programid(s) for s in data['PROGRAM']], dtype=np.int64) - id0 = p << 32 | s - composite_id = np.array([id0, data['TARGETID'].base]).T - data.add_column(composite_id, name='ID', index=0) - return data - - -def _target_unique_id(data): - """Add composite ``ID`` column for later conversion. - - Parameters - ---------- - data : :class:`astropy.table.Table` - The initial data read from the file. - - Returns - ------- - :class:`astropy.table.Table` - Updated data table. - """ - s = np.array([surveyid(s) for s in data['SURVEY']], dtype=np.int64) - id0 = s << 32 | data['TILEID'].base.astype(np.int64) - composite_id = np.array([id0, data['TARGETID'].base]).T - data.add_column(composite_id, name='ID', index=0) - return data - - -def _add_ls_id(data): - """Add LS_ID to targetphot data. - - Parameters - ---------- - data : :class:`astropy.table.Table` - The initial data read from the file. - - Returns - ------- - :class:`astropy.table.Table` - Updated data table. - """ - ls_id = ((data['RELEASE'].data.astype(np.int64) << 40) | - (data['BRICKID'].data.astype(np.int64) << 16) | - data['BRICK_OBJID'].data.astype(np.int64)) - data.add_column(ls_id, name='LS_ID', index=0) - return data - - -def _deduplicate_targetid(data): +def deduplicate_targetid(data): """Find targetphot rows that are not already loaded into the Photometry table *and* resolve any duplicate TARGETID. @@ -1269,54 +1261,7 @@ def _deduplicate_targetid(data): return load_rows -def _remove_loaded_targetid(data): - """Remove rows with TARGETID already loaded into the database. - - Parameters - ---------- - data : :class:`astropy.table.Table` - The initial data read from the file. - - Returns - ------- - :class:`numpy.ndarray` - An array of rows that are safe to load. - """ - targetid = data['TARGETID'].data - good_rows = np.ones((len(targetid),), dtype=bool) - q = dbSession.query(Photometry.targetid).filter(Photometry.targetid.in_(targetid.tolist())).all() - for row in q: - good_rows[targetid == row[0]] = False - return good_rows - - -def _remove_loaded_unique_id(data): - """Remove rows with UNIQUE ID already loaded into the database. - - Parameters - ---------- - data : :class:`astropy.table.Table` - The initial data read from the file. - - Returns - ------- - :class:`numpy.ndarray` - An array of rows that are safe to load. - """ - rows = dbSession.query(Target.id).all() - loaded_id = [r[0] for r in rows] - data_id = [(int(data['ID'][k][0]) << 64) | int(data['ID'][k][1]) - for k in range(len(data))] - id_index = dict(zip(data_id, range(len(data)))) - good_rows = np.ones((len(data),), dtype=bool) - for i in loaded_id: - good_rows[id_index[i]] = False - return good_rows - - -def load_file(filepaths, tcls, hdu=1, preload=None, expand=None, insert=None, convert=None, - index=None, rowfilter=None, q3c=None, - chunksize=50000, maxrows=0): +def load_file(filepaths, tcls, hdu=1, row_filter=None, q3c=None, chunksize=50000): """Load data file into the database, assuming that column names map to database column names with no surprises. @@ -1328,22 +1273,7 @@ def load_file(filepaths, tcls, hdu=1, preload=None, expand=None, insert=None, co The table to load, represented by its class. hdu : :class:`int` or :class:`str`, optional Read a data table from this HDU (default 1). - preload : callable, optional - A function that takes a :class:`~astropy.table.Table` as an argument. - Use this for more complicated manipulation of the data before loading, - for example a function that depends on multiple columns. The return - value should be the updated Table. - expand : :class:`dict`, optional - If set, map FITS column names to one or more alternative column names. - insert : :class:`dict`, optional - If set, insert one or more columns, before an existing column. The - existing column will be copied into the new column(s). - convert : :class:`dict`, optional - If set, convert the data for a named (database) column using the - supplied function. - index : :class:`str`, optional - If set, add a column that just counts the number of rows. - rowfilter : callable, optional + row_filter : callable, optional If set, apply this filter to the rows to be loaded. The function should return :class:`bool`, with ``True`` meaning a good row. q3c : :class:`str`, optional @@ -1351,9 +1281,6 @@ def load_file(filepaths, tcls, hdu=1, preload=None, expand=None, insert=None, co named `q3c`. chunksize : :class:`int`, optional If set, load database `chunksize` rows at a time (default 50000). - maxrows : :class:`int`, optional - If set, stop loading after `maxrows` are loaded. Alteratively, - set `maxrows` to zero (0) to load all rows. Returns ------- @@ -1378,13 +1305,6 @@ def load_file(filepaths, tcls, hdu=1, preload=None, expand=None, insert=None, co else: log.error("Unrecognized data file, %s!", filepath) return - if maxrows == 0 or len(data) < maxrows: - mr = len(data) - else: - mr = maxrows - if preload is not None: - data = preload(data) - log.info("Preload function complete on %s.", tn) try: colnames = data.names except AttributeError: @@ -1393,10 +1313,10 @@ def load_file(filepaths, tcls, hdu=1, preload=None, expand=None, insert=None, co for col in colnames: if data[col].dtype.kind == 'f': if isinstance(data[col], MaskedColumn): - bad = np.isnan(data[col].data.data[0:mr]) + bad = ~np.isfinite(data[col].data.data) masked[col] = True else: - bad = np.isnan(data[col][0:mr]) + bad = ~np.isfinite(data[col]) if np.any(bad): if bad.ndim == 1: log.warning("%d rows of bad data detected in column " + @@ -1412,84 +1332,43 @@ def load_file(filepaths, tcls, hdu=1, preload=None, expand=None, insert=None, co # TODO: is this replacement appropriate for all columns? # if col in masked: - data[col].data.data[0:mr][bad] = -9999.0 + data[col].data.data[bad] = -9999.0 else: - data[col][0:mr][bad] = -9999.0 + data[col][bad] = -9999.0 log.info("Integrity check complete on %s.", tn) - if rowfilter is None: - good_rows = np.ones((mr,), dtype=bool) + if row_filter is None: + good_rows = np.ones((len(data),), dtype=bool) else: - good_rows = rowfilter(data[0:mr]) + good_rows = row_filter(data) if good_rows.sum() == 0: log.info("Row filter removed all data rows, skipping %s.", filepath) continue log.info("Row filter applied on %s; %d rows remain.", tn, good_rows.sum()) - data_list = list() - for col in colnames: - if col in masked: - data_list.append(data[col].data.data[0:mr][good_rows].tolist()) - else: - data_list.append(data[col][0:mr][good_rows].tolist()) - data_names = [col.lower() for col in colnames] - finalrows = len(data_list[0]) - log.info("Initial column conversion complete on %s.", tn) - if expand is not None: - for col in expand: - i = data_names.index(col.lower()) - if isinstance(expand[col], str): - # - # Just rename a column. - # - log.debug("Renaming column %s (at index %d) to %s.", data_names[i], i, expand[col]) - data_names[i] = expand[col] - else: - # - # Assume this is an expansion of an array-valued column - # into individual columns. - # - del data_names[i] - del data_list[i] - for j, n in enumerate(expand[col]): - log.debug("Expanding column %d of %s (at index %d) to %s.", j, col, i, n) - data_names.insert(i + j, n) - data_list.insert(i + j, data[col][:, j].tolist()) - log.debug(data_names) - log.info("Column expansion complete on %s.", tn) + # + # Need to pass required arguments to convert, which depends on the table. + # + # if tn == 'ztile': + # convert_inputs['survey'] = 0 + # convert_inputs['program'] = 0 + # convert_inputs['tileid'] = 0 + # convert_inputs['night'] = 0 + orm_objects = tcls.convert(data, row_filter=good_rows) + log.info("Converted data to ORM objects on %s.", tn) del data - if insert is not None: - for col in insert: - i = data_names.index(col) - for item in insert[col]: - data_names.insert(i, item) - data_list.insert(i, data_list[i].copy()) # Dummy values - log.info("Column insertion complete on %s.", tn) - if convert is not None: - for col in convert: - i = data_names.index(col) - data_list[i] = [convert[col](x) for x in data_list[i]] - log.info("Column conversion complete on %s.", tn) - if index is not None: - data_list.insert(0, list(range(1, finalrows+1))) - data_names.insert(0, index) - log.info("Added index column '%s'.", index) - data_rows = list(zip(*data_list)) - del data_list - log.info("Converted columns into rows on %s.", tn) + finalrows = len(orm_objects) n_chunks = finalrows//chunksize if finalrows % chunksize: n_chunks += 1 - with engine.connect() as connection: - for k in range(n_chunks): - data_chunk = [dict(zip(data_names, row)) - for row in data_rows[k*chunksize:(k+1)*chunksize]] - if len(data_chunk) > 0: - loaded_rows += len(data_chunk) - connection.execute(tcls.__table__.insert(), data_chunk) - log.info("Inserted %d rows in %s.", - min((k+1)*chunksize, finalrows), tn) - else: - log.error("Detected empty data chunk in %s!", tn) - connection.commit() + for k in range(n_chunks): + data_chunk = orm_objects[k*chunksize:(k+1)*chunksize] + if len(data_chunk) > 0: + loaded_rows += len(data_chunk) + dbSession.add_all(data_chunk) + dbSession.commit() + log.info("Inserted %d rows in %s.", + min((k+1)*chunksize, finalrows), tn) + else: + log.error("Detected empty data chunk in %s!", tn) if q3c is not None: q3c_index(tn, ra=q3c) return loaded_rows @@ -1552,163 +1431,6 @@ def load_versions(photometry, redshift, release, specprod, tiles): return -def zpix_target(specprod): - """Replace targeting bitmasks in the redshift tables for `specprod`. - - Parameters - ---------- - specprod : :class:`str` - The spectroscopic production, normally the value of :envvar:`SPECPROD`. - """ - specprod_survey_program = {'fuji': {'cmx': ('other', ), - 'special': ('dark', ), - 'sv1': ('backup', 'bright', 'dark', 'other'), - 'sv2': ('backup', 'bright', 'dark'), - 'sv3': ('backup', 'bright', 'dark')}, - 'guadalupe': {'special': ('bright', 'dark'), - 'main': ('bright', 'dark')}, - 'iron': {'cmx': ('other', ), - 'main': ('backup', 'bright', 'dark'), - 'special': ('backup', 'bright', 'dark', 'other'), - 'sv1': ('backup', 'bright', 'dark', 'other'), - 'sv2': ('backup', 'bright', 'dark'), - 'sv3': ('backup', 'bright', 'dark')}} - target_bits = {'cmx': {'cmx': Target.cmx_target}, - 'sv1': {'desi': Target.sv1_desi_target, 'bgs': Target.sv1_bgs_target, 'mws': Target.sv1_mws_target}, - 'sv2': {'desi': Target.sv2_desi_target, 'bgs': Target.sv2_bgs_target, 'mws': Target.sv2_mws_target}, - 'sv3': {'desi': Target.sv3_desi_target, 'bgs': Target.sv3_bgs_target, 'mws': Target.sv3_mws_target}, - 'main': {'desi': Target.desi_target, 'bgs': Target.bgs_target, 'mws': Target.mws_target}, - 'special': {'desi': Target.desi_target, 'bgs': Target.bgs_target, 'mws': Target.mws_target}} - # - # Find targetid assigned to multiple tiles. - # - assigned_multiple_tiles = dict() - for survey in specprod_survey_program[specprod]: - assigned_multiple_tiles[survey] = dict() - for program in specprod_survey_program[specprod][survey]: - assigned_multiple_tiles[survey][program] = dbSession.query(Target.targetid).join(Fiberassign, - and_(Target.targetid == Fiberassign.targetid, - Target.tileid == Fiberassign.tileid)).filter(Target.survey == survey).filter(Target.program == program).group_by(Target.targetid).having(func.count(Target.tileid) > 1) - # - # From that set, find cases targetid and a targeting bit are distinct pairs. - # - distinct_target = dict() - for survey in assigned_multiple_tiles: - distinct_target[survey] = dict() - for program in assigned_multiple_tiles[survey]: - distinct_target[survey][program] = dict() - for bits in target_bits[survey]: - distinct_target[survey][program][bits] = dbSession.query(Target.targetid, target_bits[survey][bits]).filter(Target.targetid.in_(assigned_multiple_tiles[survey][program])).filter(Target.survey == survey).filter(Target.program == program).distinct().subquery() - # - # Obtain the list of targetids where a targeting bit appears more than once with different values. - # - multiple_target = dict() - for survey in distinct_target: - multiple_target[survey] = dict() - for program in distinct_target[survey]: - multiple_target[survey][program] = dict() - for bits in distinct_target[survey][program]: - if survey.startswith('sv'): - column = getattr(distinct_target[survey][program][bits].c, f"{survey}_{bits}_target") - elif survey == 'cmx': - column = distinct_target[survey][program][bits].c.cmx_target - else: - column = getattr(distinct_target[survey][program][bits].c, f"{bits}_target") - multiple_target[survey][program][bits] = [row[0] for row in dbSession.query(distinct_target[survey][program][bits].c.targetid).group_by(distinct_target[survey][program][bits].c.targetid).having(func.count(column) > 1).all()] - # - # Consolidate the list of targetids. - # - targetids_to_fix = dict() - for survey in multiple_target: - for program in multiple_target[survey]: - for bits in multiple_target[survey][program]: - if multiple_target[survey][program][bits]: - if survey not in targetids_to_fix: - targetids_to_fix[survey] = dict() - if program in targetids_to_fix[survey]: - log.debug("targetids_to_fix['%s']['%s'] += multiple_target['%s']['%s']['%s']", - survey, program, survey, program, bits) - targetids_to_fix[survey][program] += multiple_target[survey][program][bits] - else: - log.debug("targetids_to_fix['%s']['%s'] = multiple_target['%s']['%s']['%s']", - survey, program, survey, program, bits) - targetids_to_fix[survey][program] = multiple_target[survey][program][bits] - # - # ToO observations that had targeting bits zeroed out. - # - if specprod == 'fuji': - # - # Maybe this problem only affects fuji, but need to confirm that. - # - zero_ToO = dict() - for survey in specprod_survey_program[specprod]: - zero_ToO[survey] = dict() - for program in specprod_survey_program[specprod][survey]: - zero_ToO[survey][program] = [row[0] for row in dbSession.query(Zpix.targetid).filter((Zpix.targetid.op('&')((2**16 - 1) << 42)).op('>>')(42) == 9999).filter(Zpix.survey == survey).filter(Zpix.program == program).all()] - for survey in zero_ToO: - for program in zero_ToO[survey]: - if zero_ToO[survey][program]: - if survey not in targetids_to_fix: - targetids_to_fix[survey] = dict() - if program in targetids_to_fix[survey]: - log.debug("targetids_to_fix['%s']['%s'] += zero_ToO['%s']['%s']", - survey, program, survey, program) - targetids_to_fix[survey][program] += zero_ToO[survey][program] - else: - log.debug("targetids_to_fix['%s']['%s'] = zero_ToO['%s']['%s']", - survey, program, survey, program) - targetids_to_fix[survey][program] = zero_ToO[survey][program] - # - # Generate the query to obtain the bitwise-or of each targeting bit. - # - # table = 'zpix' - surveys = ('', 'sv1', 'sv2', 'sv3') - programs = ('desi', 'bgs', 'mws', 'scnd') - masks = ['cmx_target'] + [('_'.join(p) if p[0] else p[1]) + '_target' - for p in itertools.product(surveys, programs)] - bit_or_query = dict() - for survey in targetids_to_fix: - bit_or_query[survey] = dict() - for program in targetids_to_fix[survey]: - log.debug("SELECT t.targetid, " + - ', '.join([f"BIT_OR(t.{m}) AS {m}" for m in masks]) + - f" FROM {specprod}.target AS t WHERE t.targetid IN ({', '.join(map(str, set(targetids_to_fix[survey][program])))}) AND t.survey = '{survey}' AND t.program = '{program}' GROUP BY t.targetid;") - bq = ("dbSession.query(Target.targetid, " + - ', '.join([f"func.bit_or(Target.{m}).label('{m}')" for m in masks]) + - f").filter(Target.targetid.in_([{', '.join(map(str, set(targetids_to_fix[survey][program])))}])).filter(Target.survey == '{survey}').filter(Target.program == '{program}').group_by(Target.targetid)") - log.debug(bq) - bit_or_query[survey][program] = eval(bq) - # - # Apply the updates - # - # update_string = '{' + ', '.join([f"Zpix.{m}: {{0.{m}:d}}" for m in masks]) + '}' - for survey in bit_or_query: - for program in bit_or_query[survey]: - for row in bit_or_query[survey][program].all(): - zpix_match = dbSession.query(Zpix).filter(Zpix.targetid == row.targetid).filter(Zpix.survey == survey).filter(Zpix.program == program).one() - for m in masks: - log.info("%s.%s = %s", zpix_match, m, str(getattr(row, m))) - zpix_match.cmx_target = row.cmx_target - zpix_match.desi_target = row.desi_target - zpix_match.bgs_target = row.bgs_target - zpix_match.mws_target = row.mws_target - zpix_match.scnd_target = row.scnd_target - zpix_match.sv1_desi_target = row.sv1_desi_target - zpix_match.sv1_bgs_target = row.sv1_bgs_target - zpix_match.sv1_mws_target = row.sv1_mws_target - zpix_match.sv1_scnd_target = row.sv1_scnd_target - zpix_match.sv2_desi_target = row.sv2_desi_target - zpix_match.sv2_bgs_target = row.sv2_bgs_target - zpix_match.sv2_mws_target = row.sv2_mws_target - zpix_match.sv2_scnd_target = row.sv2_scnd_target - zpix_match.sv3_desi_target = row.sv3_desi_target - zpix_match.sv3_bgs_target = row.sv3_bgs_target - zpix_match.sv3_mws_target = row.sv3_mws_target - zpix_match.sv3_scnd_target = row.sv3_scnd_target - dbSession.commit() - return - - def setup_db(dbfile='specprod.db', hostname=None, username='desi_admin', schema=None, overwrite=False, public=False, verbose=False): """Initialize the database connection. @@ -1909,29 +1631,22 @@ def main(): tiles_type = 'fits' tiles_version = config[specprod]['tiles'] chunksize = config[specprod].getint('chunksize') - maxrows = config[specprod].getint('maxrows') loaders = {'exposures': [{'filepaths': os.path.join(options.datapath, 'spectro', 'redux', specprod, f'tiles-{specprod}.{tiles_type}'), 'tcls': Tile, 'hdu': 'TILE_COMPLETENESS', # Ignored for CSV files. 'q3c': 'tilera', - 'chunksize': chunksize, - 'maxrows': maxrows + 'chunksize': chunksize }, {'filepaths': os.path.join(options.datapath, 'spectro', 'redux', specprod, f'exposures-{specprod}.fits'), 'tcls': Exposure, 'hdu': 'EXPOSURES', - 'insert': {'mjd': ('date_obs',)}, - 'convert': {'date_obs': lambda x: Time(x, format='mjd').to_value('datetime').replace(tzinfo=utc)}, 'q3c': 'tilera', - 'chunksize': chunksize, - 'maxrows': maxrows + 'chunksize': chunksize }, {'filepaths': os.path.join(options.datapath, 'spectro', 'redux', specprod, f'exposures-{specprod}.fits'), 'tcls': Frame, 'hdu': 'FRAMES', - 'preload': _frameid, - 'chunksize': chunksize, - 'maxrows': maxrows + 'chunksize': chunksize }], # # The potential targets are supposed to include data for all targets. @@ -1940,12 +1655,7 @@ def main(): 'photometry': [{'filepaths': glob.glob(os.path.join(options.datapath, 'vac', release, 'lsdr9-photometry', specprod, photometry_version, 'potential-targets', 'tractorphot', 'tractorphot*.fits')), 'tcls': Photometry, 'hdu': 'TRACTORPHOT', - 'expand': {'DCHISQ': ('dchisq_psf', 'dchisq_rex', 'dchisq_dev', 'dchisq_exp', 'dchisq_ser',), - 'OBJID': 'brick_objid', - 'TYPE': 'morphtype'}, - # 'rowfilter': _remove_loaded_targetid, - 'chunksize': chunksize, - 'maxrows': maxrows + 'chunksize': chunksize }], # # This stage loads targets, and such photometry as they have, that did not @@ -1954,69 +1664,41 @@ def main(): 'targetphot': [{'filepaths': target_files, 'tcls': Photometry, 'hdu': 'TARGETPHOT', - 'preload': _add_ls_id, - 'expand': {'DCHISQ': ('dchisq_psf', 'dchisq_rex', 'dchisq_dev', 'dchisq_exp', 'dchisq_ser',)}, - 'convert': {'gaia_astrometric_params_solved': lambda x: int(x)}, - 'rowfilter': _deduplicate_targetid, + 'row_filter': deduplicate_targetid, 'q3c': 'ra', - 'chunksize': chunksize, - 'maxrows': maxrows + 'chunksize': chunksize }], 'target': [{'filepaths': target_files, 'tcls': Target, 'hdu': 'TARGETPHOT', - 'preload': _target_unique_id, - 'convert': {'id': lambda x: x[0] << 64 | x[1]}, - # 'rowfilter': _remove_loaded_unique_id, - 'chunksize': chunksize, - 'maxrows': maxrows + 'chunksize': chunksize }], 'redshift': [{'filepaths': ztile_file, 'tcls': Ztile, 'hdu': 'ZCATALOG', - 'preload': _survey_program, - 'expand': {'COEFF': ('coeff_0', 'coeff_1', 'coeff_2', 'coeff_3', 'coeff_4', - 'coeff_5', 'coeff_6', 'coeff_7', 'coeff_8', 'coeff_9',)}, - 'convert': {'id': lambda x: x[0] << 64 | x[1], - 'targetphotid': lambda x: x[0] << 64 | x[1]}, - # 'rowfilter': lambda x: (x['TARGETID'] > 0) & ((x['TARGETID'] & 2**59) == 0), - 'rowfilter': no_sky, - 'chunksize': chunksize, - 'maxrows': maxrows + 'row_filter': no_sky, + 'chunksize': chunksize }], 'fiberassign': [{'filepaths': None, 'tcls': Fiberassign, 'hdu': 'FIBERASSIGN', - 'preload': _tileid, - 'convert': {'id': lambda x: x[0] << 64 | x[1]}, - # 'rowfilter': lambda x: (x['TARGETID'] > 0) & ((x['TARGETID'] & 2**59) == 0), - 'rowfilter': no_sky, + 'row_filter': no_sky, 'q3c': 'target_ra', - 'chunksize': chunksize, - 'maxrows': maxrows + 'chunksize': chunksize }, {'filepaths': None, 'tcls': Potential, 'hdu': 'POTENTIAL_ASSIGNMENTS', - 'preload': _tileid, - 'convert': {'id': lambda x: x[0] << 64 | x[1]}, - # 'rowfilter': lambda x: (x['TARGETID'] > 0) & ((x['TARGETID'] & 2**59) == 0), - 'rowfilter': no_sky, - 'chunksize': chunksize, - 'maxrows': maxrows + 'row_filter': no_sky, + 'chunksize': chunksize }]} if specprod != 'daily': loaders['redshift'].append({'filepaths': zpix_file, 'tcls': Zpix, 'hdu': 'ZCATALOG', - 'preload': _survey_program, - 'expand': {'COEFF': ('coeff_0', 'coeff_1', 'coeff_2', 'coeff_3', 'coeff_4', - 'coeff_5', 'coeff_6', 'coeff_7', 'coeff_8', 'coeff_9',)}, - 'convert': {'id': lambda x: x[0] << 64 | x[1]}, - # 'rowfilter': lambda x: (x['TARGETID'] > 0) & ((x['TARGETID'] & 2**59) == 0), - 'rowfilter': no_sky, - 'chunksize': chunksize, - 'maxrows': maxrows}) + 'row_filter': no_sky, + 'chunksize': chunksize + }) try: loader = loaders[options.load] except KeyError: @@ -2063,29 +1745,6 @@ def main(): log.info("Loading %s from %s.", tn, str(l['filepaths'])) load_file(**l) log.info("Finished loading %s.", tn) - if options.load == 'fiberassign' and redshift_type not in ('daily', 'patch', 'zcat'): - # - # Fiberassign table has to be loaded for this step. - # Eventually we want to eliminate this entirely. - # Daily reductions don't load Zpix anyway, so zpix_target is not needed. - # - log.info("Applying target bitmask corrections for %s to zpix table.", - specprod) - try: - zpix_target(specprod) - except ProgrammingError: - log.critical("Failed target bitmask corrections for %s!", - specprod) - return 1 - log.info("Finished target bitmask corrections for %s zpix table.", - specprod) if options.load == 'fiberassign': - # - # Automatically VACUUM. The system detects this as still inside a - # transaction block, & VACUUM can't be run in that. - # - # log.info("Issuing VACUUM command.") - # dbSession.execute("VACUUM FULL VERBOSE ANALYZE;") - # log.info("Finished with VACUUM command.") - pass + log.info("Consider running VACUUM FULL VERBOSE ANALYZE at this point.") return 0 diff --git a/py/specprodDB/util.py b/py/specprodDB/util.py index 0c190ce..44330b9 100644 --- a/py/specprodDB/util.py +++ b/py/specprodDB/util.py @@ -376,9 +376,6 @@ def common_options(description): # prsr.add_argument('-l', '--load', action='store', dest='load', # default='exposures', metavar='STAGE', # help='Load the set of files associated with STAGE (default "%(default)s").') - # prsr.add_argument('-m', '--max-rows', action='store', dest='maxrows', - # type=int, default=0, metavar='M', - # help="Load up to M rows in the tables (default is all rows).") prsr.add_argument('-o', '--overwrite', action='store_true', dest='overwrite', help='Delete any existing files or tables before loading.') prsr.add_argument('-P', '--public', action='store_true', dest='public', @@ -386,9 +383,6 @@ def common_options(description): # prsr.add_argument('-p', '--photometry-version', action='store', dest='photometry_version', # metavar='VERSION', default='v2.1', # help='Load target photometry data from VERSION (default "%(default)s").') - # prsr.add_argument('-r', '--rows', action='store', dest='chunksize', - # type=int, default=50000, metavar='N', - # help="Load N rows at a time (default %(default)s).") prsr.add_argument('-s', '--schema', action='store', dest='schema', metavar='SCHEMA', help='Set the schema name in the PostgreSQL database.')