Source code for calistar.calistar

"""
Module with the ``calistar`` tool.
"""

import json
import socket
import urllib.request
import warnings

from copy import copy
from pathlib import Path

import astropy.units as u
import numpy as np
import pandas as pd
import pooch

from astropy.coordinates import SkyCoord
from astropy.table import Table
from astroquery.gaia import Gaia
from astroquery.simbad import Simbad
from astroquery.vizier import Vizier
from beartype import beartype, typing
from gaiaxpy import calibrate, plot_spectra

# from gaiaxpy.calibrator.calibrator import __create_merge as create_merge
from gaiaxpy.core.generic_functions import correlation_to_covariance
from tqdm import tqdm

from ._version import __version__, __version_tuple__


# No limit on the number of rows with a Gaia query
Gaia.ROW_LIMIT = -1


[docs] class CaliStar: """ Class for finding calibration stars based on their separation and magnitude difference with the selected ``gaia_source``. """ @beartype def __init__( self, gaia_source: typing.Union[int, str], gaia_release: typing.Optional[str] = None, ) -> None: """ Parameters ---------- gaia_source : int, str The GAIA source ID of the object for which calibration sources should be searched for. For example, set the argument to ``6843672087120107264`` or ``"6843672087120107264"`` for the star HD 206893 when the DR2 or DR3 catalog as argument of ``gaia_release``. gaia_release : str, None Data release of the Gaia catalog that will be used for the queries (``"DR2"``, ``"EDR3"``, or ``"DR3"``). The default release is DR3 when the argument is set to ``None``. Returns ------- NoneType None """ print("========\ncalistar\n========") # Check if there is a new version available print(f"\nVersion: {__version__}") calistar_version = ( f"{__version_tuple__[0]}." f"{__version_tuple__[1]}." f"{__version_tuple__[2]}" ) try: pypi_url = "https://pypi.org/pypi/calistar/json" with urllib.request.urlopen(pypi_url, timeout=1.0) as open_url: url_content = open_url.read() url_data = json.loads(url_content) pypi_version = url_data["info"]["version"] except (urllib.error.URLError, socket.timeout): pypi_version = None if pypi_version is not None: pypi_split = pypi_version.split(".") current_split = calistar_version.split(".") new_major = (pypi_split[0] == current_split[0]) & ( pypi_split[1] > current_split[1] ) new_minor = ( (pypi_split[0] == current_split[0]) & (pypi_split[1] == current_split[1]) & (pypi_split[2] > current_split[2]) ) if new_major | new_minor: print(f"\n-> calistar v{pypi_version} is available!") # Set attributes of CaliStar self.gaia_source = gaia_source # Gaia source ID if isinstance(gaia_source, str): self.gaia_source = int(self.gaia_source) # Gaia data release version self.gaia_release = gaia_release if self.gaia_release is None: self.gaia_release = "DR3" if self.gaia_release not in ["DR2", "EDR3", "DR3"]: raise ValueError( "The argument of 'gaia_release' should " "be set to 'DR2', 'EDR3', or 'DR3'." ) # Set the Gaia source catalog if self.gaia_release == "DR2": Gaia.MAIN_GAIA_TABLE = "gaiadr2.gaia_source" self.gaia_idx = 2 elif self.gaia_release == "EDR3": Gaia.MAIN_GAIA_TABLE = "gaiaedr3.gaia_source" self.gaia_idx = 3 elif self.gaia_release == "DR3": Gaia.MAIN_GAIA_TABLE = "gaiadr3.gaia_source" self.gaia_idx = 3 # Filter IDs from the SVO Filter Profile Service self.all_filters = [ f"GAIA/GAIA{self.gaia_idx}.G", "2MASS/2MASS.J", "2MASS/2MASS.H", "2MASS/2MASS.Ks", "WISE/WISE.W1", "WISE/WISE.W2", "WISE/WISE.W3", "WISE/WISE.W4", ] # Create .calistar folder in the home directory self.calistar_folder = Path.home() / ".calistar" if not self.calistar_folder.exists(): print(f"\nCreating .calistar folder in {Path.home()}...") self.calistar_folder.mkdir(parents=False, exist_ok=False) # Download The Washington Visual Double Star Catalog # https://cdsarc.cds.unistra.fr/viz-bin/cat/B/wds self.wds_file = Path.home() / ".calistar/wds_catalog.hdf5" url = "https://home.strw.leidenuniv.nl/~stolker/calistar/wds_catalog.hdf5" if not self.wds_file.exists(): pooch.retrieve( url=url, known_hash="b0a63ed95bf060cccc3ce66afb36ed669d287ad85211cee9cde3bd57d48c622b", fname="wds_catalog.hdf5", path=self.calistar_folder, progressbar=True, )
[docs] @beartype def target_star( self, write_json: bool = True, get_gaiaxp: bool = True, allwise_catalog: bool = True, print_astroph: bool = False, ) -> typing.Dict[str, typing.Union[float, str, int, typing.Tuple[float, float]]]: """ Function for retrieving the the astrometric and photometric properties of a target star of interest. The function returns a dictionary with the properties, but it also (optionally) stores the data in a JSON file in the working folder and retrieves the Gaia XP spectrum. Parameters ---------- write_json : bool Write the target properties to a JSON file (default: True). The file will be stored in the working folder and starts with ``target_``. The filename contains also the Gaia release and the Gaia source ID of the target. get_gaiaxp : bool Retrieve the Gaia XP spectrum if available (default: True). If set to ``True``, the spectrum will be written to a data file and a plot will also be created. The spectrum is not retrieved when the argument is set to ``False``. allwise_catalog : bool Select the WISE magnitudes from the ALLWISE catalog if set to ``True`` or select the magnitudes from the earlier WISE catalog if set to ``False``. print_astroph : bool Print a list with all the astrophysical parameters that will be retrieved from the Gaia catalog when the argument is set to ``True``. Returns ------- dict Dictionary with the properties of the target star. """ target_dict = {} # List all Gaia tables # for table_item in Gaia.load_tables(only_names=True): # print(table_item.get_qualified_name()) # Gaia query for selected Gaia source ID gaia_query = f""" SELECT * FROM gaia{self.gaia_release.lower()}.gaia_source WHERE source_id = {self.gaia_source} """ # Launch the Gaia job and get the results print(f"\n-> Querying GAIA {self.gaia_release}...\n") gaia_job = Gaia.launch_job_async(gaia_query, dump_to_file=False, verbose=False) gaia_result = gaia_job.get_results() # print(gaia_result.columns) if self.gaia_release == "DR2": # Gaia DR2 VEGAMAG zero points # https://www.cosmos.esa.int/web/gaia/iow_20180316 gaia_g_zp = (25.6914396869, 0.0011309370) gaia_bp_zp = (25.3488107670, 0.0004899854) gaia_rp_zp = (24.7626744847, 0.0035071711) elif self.gaia_release in ["EDR3", "DR3"]: # Gaia (E)DR3 VEGAMAG zero points # https://www.cosmos.esa.int/web/gaia/edr3-passbands gaia_g_zp = (25.6873668671, 0.0027553202) gaia_bp_zp = (25.3385422158, 0.0027901700) gaia_rp_zp = (24.7478955012, 0.0037793818) else: raise ValueError( f"The '{self.gaia_release}' data release is not supported." ) # Magnitude error, assuming Delta_f << f # Delta_m = -2.5/ln(10) Delta_f/f # Add in quadrature the uncertainty on the zero point mag_g_error = ( -2.5 / np.log(10.0) / gaia_result["phot_g_mean_flux_over_error"][0] ) mag_g_error = np.sqrt(mag_g_error**2 + gaia_g_zp[1] ** 2) mag_bp_error = ( -2.5 / np.log(10.0) / gaia_result["phot_bp_mean_flux_over_error"][0] ) mag_bp_error = np.sqrt(mag_bp_error**2 + gaia_bp_zp[1] ** 2) mag_rp_error = ( -2.5 / np.log(10.0) / gaia_result["phot_rp_mean_flux_over_error"][0] ) mag_rp_error = np.sqrt(mag_rp_error**2 + gaia_rp_zp[1] ** 2) if "SOURCE_ID" in gaia_result.columns: gaia_source_id = int(gaia_result["SOURCE_ID"][0]) elif "source_id" in gaia_result.columns: gaia_source_id = int(gaia_result["source_id"][0]) else: raise ValueError(f"Gaia source ID not found in {gaia_result}") target_dict["Gaia ID"] = gaia_source_id target_dict["Gaia release"] = self.gaia_release target_dict["Gaia epoch"] = gaia_result["ref_epoch"][0] target_dict["Gaia RA"] = ( float(gaia_result["ra"][0]), # (deg) float(gaia_result["ra_error"][0] * 1e-3 / 3600.0), # (mas) -> (deg) ) target_dict["Gaia Dec"] = ( float(gaia_result["dec"][0]), # (deg) float(gaia_result["dec_error"][0] * 1e-3 / 3600.0), # (mas) -> (deg) ) target_dict["Gaia pm RA"] = ( float(gaia_result["pmra"][0]), # (mas yr-1) float(gaia_result["pmra_error"][0]), # (mas yr-1) ) target_dict["Gaia pm Dec"] = ( float(gaia_result["pmdec"][0]), # (mas yr-1) float(gaia_result["pmdec_error"][0]), # (mas yr-1) ) target_dict["Gaia parallax"] = ( float(gaia_result["parallax"][0]), # (mas) float(gaia_result["parallax_error"][0]), # (mas) ) if "phot_g_mean_mag" in gaia_result.columns: if not np.ma.is_masked(gaia_result["phot_g_mean_mag"][0]): target_dict[f"GAIA/GAIA{self.gaia_idx}.G"] = ( float(gaia_result["phot_g_mean_mag"][0]), mag_g_error, ) print( f"\nG mag = {gaia_result['phot_g_mean_mag'][0]:.6f} +/- {mag_g_error:.6f}" ) if "phot_bp_mean_mag" in gaia_result.columns: if not np.ma.is_masked(gaia_result["phot_bp_mean_mag"][0]): target_dict[f"GAIA/GAIA{self.gaia_idx}.Gbp"] = ( float(gaia_result["phot_bp_mean_mag"][0]), mag_bp_error, ) print( f"BP mag = {gaia_result['phot_bp_mean_mag'][0]:.6f} +/- {mag_bp_error:.6f}" ) if "phot_rp_mean_mag" in gaia_result.columns: if not np.ma.is_masked(gaia_result["phot_rp_mean_mag"][0]): target_dict[f"GAIA/GAIA{self.gaia_idx}.Grp"] = ( float(gaia_result["phot_rp_mean_mag"][0]), mag_rp_error, ) print( f"RP mag = {gaia_result['phot_rp_mean_mag'][0]:.6f} +/- {mag_rp_error:.6f}" ) if "grvs_mag" in gaia_result.columns: if not np.ma.is_masked(gaia_result["grvs_mag"][0]): target_dict[f"GAIA/GAIA{self.gaia_idx}.Grvs"] = ( float(gaia_result["grvs_mag"][0]), float(gaia_result["grvs_mag_error"][0]), ) print( f"GRVS mag = {gaia_result['grvs_mag'][0]:.6f} " f"+/- {gaia_result['grvs_mag_error'][0]:.6f}" ) # Create SkyCoord object from the RA and Dec of the selected Gaia source ID gaia_coord = SkyCoord( gaia_result["ra"][0], gaia_result["dec"][0], frame="icrs", unit=(u.deg, u.deg), ) coord_str = gaia_coord.to_string( "hmsdms", alwayssign=True, precision=4, pad=True ) if self.gaia_source != gaia_source_id: raise ValueError( f"The requested source ID ({self.gaia_source}) is not " f"equal to the retrieved source ID ({gaia_source_id})." ) print(f"\nGAIA {self.gaia_release} source ID = {gaia_source_id}") print(f"Reference epoch = {gaia_result['ref_epoch'][0]}") print( f"Parallax = {gaia_result['parallax'][0]:.2f} " f"+/- {gaia_result['parallax_error'][0]:.2f} mas" ) print( f"\nRA = {gaia_result['ra'][0]:.6f} deg +/- {gaia_result['ra_error'][0]:.4f} mas" ) print( f"Dec = {gaia_result['dec'][0]:.6f} deg +/- {gaia_result['dec_error'][0]:.4f} mas" ) print(f"Coordinates = {coord_str}") print( f"\nProper motion RA = {gaia_result['pmra'][0]:.2f} " f"+/- {gaia_result['pmra_error'][0]:.2f} mas/yr" ) print( f"Proper motion Dec = {gaia_result['pmdec'][0]:.2f} " f"+/- {gaia_result['pmdec_error'][0]:.2f} mas/yr" ) if "radial_velocity" in gaia_result.columns: if not np.ma.is_masked(gaia_result["radial_velocity"]): print( f"Radial velocity = {gaia_result['radial_velocity'][0]:.2f} " f"+/- {gaia_result['radial_velocity_error'][0]:.2f} km/s" ) if ( "astrometric_chi2_al" in gaia_result.columns and "astrometric_n_good_obs_al" in gaia_result.columns ): if not np.ma.is_masked( gaia_result["astrometric_chi2_al"] ) and not np.ma.is_masked(gaia_result["astrometric_n_good_obs_al"]): uwe = np.sqrt( gaia_result["astrometric_chi2_al"][0] / (gaia_result["astrometric_n_good_obs_al"][0] - 5.0) ) print(f"\nUWE = {uwe:.2f}") if "ruwe" in gaia_result.columns: if not np.ma.is_masked(gaia_result["ruwe"]): print(f"RUWE = {gaia_result['ruwe'][0]:.2f}") print( f"\nAstrometric excess noise (mas) = {gaia_result['astrometric_excess_noise'][0]:.2f}" ) if "non_single_star" in gaia_result.columns: if not np.ma.is_masked(gaia_result["non_single_star"]): print(f"Non single star = {gaia_result['non_single_star'][0]}") if "classprob_dsc_combmod_star" in gaia_result.columns: if not np.ma.is_masked(gaia_result["classprob_dsc_combmod_star"]): print( "Single star probability from DSC-Combmod = " f"{gaia_result['classprob_dsc_combmod_star'][0]:.2f}" ) if "ipd_gof_harmonic_amplitude" in gaia_result.columns: if not np.ma.is_masked(gaia_result["ipd_gof_harmonic_amplitude"]): print( f"\nIPD variation of goodness of fit = {gaia_result['ipd_gof_harmonic_amplitude'][0]:.2f}" ) if "ipd_frac_multi_peak" in gaia_result.columns: if not np.ma.is_masked(gaia_result["ipd_frac_multi_peak"]): print( f"IPD fraction with multiple peaks = {gaia_result['ipd_frac_multi_peak'][0]:.2f}" ) if "bp_rp" in gaia_result.columns: if not np.ma.is_masked(gaia_result["bp_rp"]): print(f"\nBp - Rp color = {gaia_result['bp_rp'][0]:.2f}") if "phot_bp_rp_excess_factor" in gaia_result.columns: if not np.ma.is_masked(gaia_result["phot_bp_rp_excess_factor"]): print( f"Bp/Rp excess factor = {gaia_result['phot_bp_rp_excess_factor'][0]:.2f}" ) if "has_xp_continuous" in gaia_result.columns: print(f"\nXP continuous = {gaia_result['has_xp_continuous'][0]}") if "has_xp_sampled" in gaia_result.columns: print(f"XP sampled = {gaia_result['has_xp_sampled'][0]}") if "has_rvs" in gaia_result.columns: print(f"RVS spectrum = {gaia_result['has_rvs'][0]}") if "teff_gspphot" in gaia_result.columns: if not np.ma.is_masked(gaia_result["teff_gspphot"]): print( f"\nEffective temperature = {gaia_result['teff_gspphot'][0]:.0f} K" ) print(f"Surface gravity = {gaia_result['logg_gspphot'][0]:.2f}") print(f"Metallicity = {gaia_result['mh_gspphot'][0]:.2f}") print(f"G-band extinction = {gaia_result['ag_gspphot'][0]:.2f}") print( f"A0 (541.4 nm) extinction = {gaia_result['azero_gspphot'][0]:.2f}" ) target_dict["teff"] = float(gaia_result["teff_gspphot"][0]) target_dict["log(g)"] = float(gaia_result["logg_gspphot"][0]) target_dict["metallicity"] = float(gaia_result["mh_gspphot"][0]) target_dict["ag_ext"] = float(gaia_result["ag_gspphot"][0]) target_dict["azero_ext"] = float(gaia_result["azero_gspphot"][0]) # Gaia XP spectrum if get_gaiaxp and ( "has_xp_continuous" in gaia_result.columns and gaia_result["has_xp_continuous"][0] ): # Sampling adopted from the GaiaXPy documentation # https://gaiaxpy.readthedocs.io/en/latest/usage.html # Default GaiaXPy sampling # sampling = np.arange(336, 1021, 2) # Improved sampling at the blue end of the spectrum sampling = np.geomspace(330, 1049.9999999999, 361) df_cal, sampling = calibrate( input_object=[f"{self.gaia_source}"], sampling=sampling, truncation=False, with_correlation=True, output_path="./", # output_file=f"{self.gaia_source}_gaiaxp", output_format=None, save_file=False, username=None, password=None, ) # merge_bp = create_merge(xp='bp', sampling=sampling) # merge_rp = create_merge(xp='rp', sampling=sampling) xp_plot = f"gaiaxp_{self.gaia_source}" print(f"\nStoring Gaia XP plot: {xp_plot}_0.jpg") plot_spectra( spectra=df_cal, sampling=sampling, multi=False, show_plot=False, output_path="./", output_file=xp_plot, format=None, legend=True, ) xp_wavel = sampling * 1e-3 # (nm) -> (um) xp_flux = 1e3 * df_cal["flux"][0] # (W m-2 nm-1) -> (W m-2 um-1) xp_error = 1e3 * df_cal["flux_error"][0] xp_cov = correlation_to_covariance( correlation=df_cal["correlation"][0], error=df_cal["flux_error"][0], stdev=1.0, ) xp_cov *= 1e6 # (W m-2 nm-1)^2 -> (W m-2 um-1)^2 header = "Wavelength (um) - Flux (W m-2 um-1) - Uncertainty (W m-2 um-1)" xp_spec_file = f"gaiaxp_{self.gaia_source}_spec.dat" xp_spec = np.column_stack([xp_wavel, xp_flux, xp_error]) np.savetxt(xp_spec_file, xp_spec, header=header) print(f"Storing Gaia XP spectrum: {xp_spec_file}") header = "Covariances (W m-2 um-1)^2" xp_cov_file = f"gaiaxp_{self.gaia_source}_cov.dat" np.savetxt(xp_cov_file, xp_cov, header=header) print(f"Storing Gaia XP covariances: {xp_cov_file}") # # Gaia query for another table # # gaia_query = f""" # SELECT * # FROM gaia{self.gaia_release.lower()}.vari_spurious_signals # WHERE source_id = {self.gaia_source} # """ # # # Launch the Gaia job and get the results # # gaia_job = Gaia.launch_job_async( # gaia_query, dump_to_file=False, verbose=False # ) # gaia_result = gaia_job.get_results() # # if len(gaia_result) == 0: # print("\nTarget has no data") # # else: # for param_item in gaia_result.columns: # print(f" - {param_item} = {gaia_result.columns[param_item][0]}") # Gaia query for selected the astrophysical parameters if print_astroph: print("\n-> Querying astrophysical parameters...\n") gaia_query = f""" SELECT * FROM gaia{self.gaia_release.lower()}.astrophysical_parameters WHERE source_id = {self.gaia_source} """ # Launch the Gaia job and get the results gaia_job = Gaia.launch_job_async( gaia_query, dump_to_file=False, verbose=False ) gaia_astro = gaia_job.get_results() if len(gaia_astro) == 0: print("\nTarget has no data in the astrophysical_parameters catalog") else: print("\nAstrophysical parameters:") for param_item in gaia_astro.columns: print(f" - {param_item} = {gaia_astro.columns[param_item][0]}") # Add spectral type to the Simbad output # for item in Simbad.list_votable_fields(): # print(item) Simbad.add_votable_fields( "sp_type", "ids", "sp", "sp_qual", "sp_bibcode", "otype", "otype_txt", # "flux(J)", # "flux(H)", # "flux(K)", # "flux_error(J)", # "flux_error(H)", # "flux_error(K)", ) # Simbad query for selected Gaia source ID print("\n-> Querying Simbad...\n") simbad_result = Simbad.query_object( f"GAIA {self.gaia_release} {self.gaia_source}" ) if simbad_result is not None and len(simbad_result) > 0: simbad_result = simbad_result[0] print(f"Simbad ID = {simbad_result['main_id']}") print(f"Object type = {simbad_result['otype_txt']}") print(f"Spectral type = {simbad_result['sp_type']}") print(f"Reference = {simbad_result['sp_bibcode']}") target_dict["Simbad ID"] = simbad_result["main_id"] target_dict["Spectral Type"] = simbad_result["sp_type"] target_dict["Object Type"] = simbad_result["otype_txt"] # print( # f"\n2MASS J mag = {simbad_result['FLUX_J']:.3f} " # f"+/- {simbad_result['FLUX_ERROR_J']:.3f}" # ) # # print( # f"2MASS H mag = {simbad_result['FLUX_H']:.3f} " # f"+/- {simbad_result['FLUX_ERROR_H']:.3f}" # ) # # print( # f"2MASS Ks mag = {simbad_result['FLUX_K']:.3f} " # f"+/- {simbad_result['FLUX_ERROR_K']:.3f}" # ) # target_dict["2MASS/2MASS.J"] = ( # float(simbad_result["FLUX_J"]), # float(simbad_result["FLUX_ERROR_J"]), # ) # # target_dict["2MASS/2MASS.H"] = ( # float(simbad_result["FLUX_H"]), # float(simbad_result["FLUX_ERROR_H"]), # ) # # target_dict["2MASS/2MASS.Ks"] = ( # float(simbad_result["FLUX_K"]), # float(simbad_result["FLUX_ERROR_K"]), # ) else: print("\nTarget not found on Simbad") # VizieR query for the selected Gaia source ID # Sort the result by distance from the queried object print("\n-> Querying VizieR...\n") vizier_obj = Vizier( columns=["*", "+_r", "BTmag", "e_BTmag", "VTmag", "e_VTmag"], catalog=["I/259/tyc2", "II/246/out", "II/328/allwise", "II/311/wise"], timeout=10.0, row_limit=1, ) radius = u.Quantity(1.0 * u.arcmin) vizier_result = vizier_obj.query_object( f"GAIA {self.gaia_release} {self.gaia_source}", radius=radius ) # TYCHO data from VizieR if "I/259/tyc2" in vizier_result.keys(): vizier_tycho = vizier_result["I/259/tyc2"] else: vizier_tycho = None if vizier_tycho is not None and len(vizier_tycho) > 0: vizier_tycho = vizier_tycho[0] print( f"TYCHO source ID = {vizier_tycho['TYC1']}-" f"{vizier_tycho['TYC2']}-{vizier_tycho['TYC3']}" ) print( f"Separation between Gaia and TYCHO source = " f"{1e3*vizier_tycho['_r']:.1f} mas" ) target_dict["TYCHO separation"] = 1e3 * vizier_tycho["_r"] if 1e3 * vizier_tycho["_r"] > 10.0: warnings.warn( "The separation between the Gaia and TYCHO source " "is more than 10 mas. Please check carefully if " "these are indeed the same sources." ) if np.ma.is_masked(vizier_tycho["e_BTmag"]): if not np.ma.is_masked(vizier_tycho["BTmag"]): print(f"\nTYCHO BT mag = >{vizier_tycho['BTmag']:.3f}") else: print( f"\nTYCHO BT mag = {vizier_tycho['BTmag']:.3f} " f"+/- {vizier_tycho['e_BTmag']:.3f}" ) target_dict["TYCHO/TYCHO.B"] = ( float(vizier_tycho["BTmag"]), float(vizier_tycho["e_BTmag"]), ) if np.ma.is_masked(vizier_tycho["e_VTmag"]): if not np.ma.is_masked(vizier_tycho["VTmag"]): print(f"\nTYCHO VT mag = >{vizier_tycho['VTmag']:.3f}") else: print( f"TYCHO VT mag = {vizier_tycho['VTmag']:.3f} " f"+/- {vizier_tycho['e_VTmag']:.3f}" ) target_dict["TYCHO/TYCHO.V"] = ( float(vizier_tycho["VTmag"]), float(vizier_tycho["e_VTmag"]), ) else: print("Target not found in TYCHO catalog") # 2MASS data from VizieR if "II/246/out" in vizier_result.keys(): vizier_2mass = vizier_result["II/246/out"] else: vizier_2mass = None if vizier_2mass is not None and len(vizier_2mass) > 0: vizier_2mass = vizier_2mass[0] print(f"\n2MASS source ID = {vizier_2mass['2MASS']}") target_dict["2MASS ID"] = vizier_2mass["2MASS"] print( f"Separation between Gaia and 2MASS source = " f"{1e3*vizier_2mass['_r']:.1f} mas" ) target_dict["2MASS separation"] = 1e3 * vizier_2mass["_r"] if 1e3 * vizier_2mass["_r"] > 10.0: warnings.warn( "The separation between the Gaia and 2MASS source " "is more than 10 mas. Please check carefully if " "these are indeed the same sources." ) if np.ma.is_masked(vizier_2mass["e_Jmag"]): if not np.ma.is_masked(vizier_2mass["Jmag"]): print(f"\n2MASS J mag = >{vizier_2mass['Jmag']:.3f}") else: print( f"\n2MASS J mag = {vizier_2mass['Jmag']:.3f} " f"+/- {vizier_2mass['e_Jmag']:.3f}" ) target_dict["2MASS/2MASS.J"] = ( float(vizier_2mass["Jmag"]), float(vizier_2mass["e_Jmag"]), ) if np.ma.is_masked(vizier_2mass["e_Hmag"]): if not np.ma.is_masked(vizier_2mass["Hmag"]): print(f"2MASS H mag = >{vizier_2mass['Hmag']:.3f}") else: print( f"2MASS H mag = {vizier_2mass['Hmag']:.3f} " f"+/- {vizier_2mass['e_Hmag']:.3f}" ) target_dict["2MASS/2MASS.H"] = ( float(vizier_2mass["Hmag"]), float(vizier_2mass["e_Hmag"]), ) if np.ma.is_masked(vizier_2mass["e_Kmag"]): if not np.ma.is_masked(vizier_2mass["Kmag"]): print(f"2MASS Ks mag = >{vizier_2mass['Kmag']:.3f}") else: print( f"2MASS Ks mag = {vizier_2mass['Kmag']:.3f} " f"+/- {vizier_2mass['e_Kmag']:.3f}" ) target_dict["2MASS/2MASS.Ks"] = ( float(vizier_2mass["Kmag"]), float(vizier_2mass["e_Kmag"]), ) else: print("Target not found in 2MASS catalog") # WISE data from VizieR vizier_wise = None if allwise_catalog: if "II/328/allwise" in vizier_result.keys(): vizier_wise = vizier_result["II/328/allwise"] else: if "II/311/wise" in vizier_result.keys(): vizier_wise = vizier_result["II/311/wise"] if vizier_wise is not None and len(vizier_wise) > 0: vizier_wise = vizier_wise[0] if allwise_catalog: print(f"\nALLWISE source ID = {vizier_wise['AllWISE']}") target_dict["WISE ID"] = vizier_wise["AllWISE"] else: print(f"\nWISE source ID = {vizier_wise['WISE']}") target_dict["WISE ID"] = vizier_wise["WISE"] print( f"Separation between Gaia and WISE source = " f"{1e3*vizier_wise['_r']:.1f} mas" ) target_dict["WISE separation"] = 1e3 * vizier_wise["_r"] if 1e3 * vizier_wise["_r"] > 10.0: warnings.warn( "The separation between the Gaia and WISE source " "is more than 10 mas. Please check carefully if " "these are indeed the same sources." ) if np.ma.is_masked(vizier_wise["e_W1mag"]): if not np.ma.is_masked(vizier_wise["W1mag"]): print(f"\nWISE W1 mag = >{vizier_wise['W1mag']:.3f}") else: print( f"\nWISE W1 mag = {vizier_wise['W1mag']:.3f} " f"+/- {vizier_wise['e_W1mag']:.3f}" ) target_dict["WISE/WISE.W1"] = ( float(vizier_wise["W1mag"]), float(vizier_wise["e_W1mag"]), ) if np.ma.is_masked(vizier_wise["e_W2mag"]): if not np.ma.is_masked(vizier_wise["W2mag"]): print(f"WISE W2 mag = >{vizier_wise['W2mag']:.3f}") else: print( f"WISE W2 mag = {vizier_wise['W2mag']:.3f} " f"+/- {vizier_wise['e_W2mag']:.3f}" ) target_dict["WISE/WISE.W2"] = ( float(vizier_wise["W2mag"]), float(vizier_wise["e_W2mag"]), ) if np.ma.is_masked(vizier_wise["e_W3mag"]): if not np.ma.is_masked(vizier_wise["W3mag"]): print(f"WISE W3 mag = >{vizier_wise['W3mag']:.3f}") else: print( f"WISE W3 mag = {vizier_wise['W3mag']:.3f} " f"+/- {vizier_wise['e_W3mag']:.3f}" ) target_dict["WISE/WISE.W3"] = ( float(vizier_wise["W3mag"]), float(vizier_wise["e_W3mag"]), ) if np.ma.is_masked(vizier_wise["e_W4mag"]): if not np.ma.is_masked(vizier_wise["W4mag"]): print(f"WISE W4 mag = >{vizier_wise['W4mag']:.3f}") else: print( f"WISE W4 mag = {vizier_wise['W4mag']:.3f} " f"+/- {vizier_wise['e_W4mag']:.3f}" ) target_dict["WISE/WISE.W4"] = ( float(vizier_wise["W4mag"]), float(vizier_wise["e_W4mag"]), ) else: print("Target not found in WISE catalog") print("\n-> Querying Washington Double Star catalog...\n") found_wds = False if simbad_result is not None and len(simbad_result) > 0: simbad_ids = simbad_result["ids"].split("|") wds_id = list(filter(lambda x: x.startswith("WDS"), simbad_ids)) if len(wds_id) == 1: # There should be a single WDS identified per target on Simbad wds_id = wds_id[0] wds_table = Table.read(self.wds_file, path="wds_catalog") # print(wds_table.columns) # This will not always given the correct ID # only for regular binary system IDs id_crop = wds_id.split(" ")[-1][1:11] id_idx = np.where(id_crop == wds_table["WDS"])[0] target_dict["WDS ID"] = wds_id for wds_idx in range(len(id_idx)): if wds_idx > 0: print() wds_select = wds_table[id_idx[wds_idx]] print(f"WDS ID = {wds_select['WDS']}") if len(wds_select["Comp"]) > 0: print(f"Companion = {wds_select['Comp']}") print(f"Observation 1 = {wds_select['Obs1']}") print(f"Observation 2 = {wds_select['Obs2']}") if wds_select["sep1"] is not np.ma.masked: print(f"Separation 1 (arcsec) = {wds_select['sep1']:.2f}") else: print("Separation 1 (arcsec) = --") if wds_select["sep2"] is not np.ma.masked: print(f"Separation 2 (arcsec) = {wds_select['sep2']:.2f}") else: print("Separation 2 (arcsec) = --") if wds_select["pa1"] is not np.ma.masked: print(f"Position angle 1 (deg) = {wds_select['pa1']:.2f}") else: print("Position angle 1 (deg) = --") if wds_select["pa2"] is not np.ma.masked: print(f"Position angle 2 (deg) = {wds_select['pa2']:.2f}") else: print("Position angle 2 (deg) = --") if wds_select["mag1"] is not np.ma.masked: print(f"Magnitude 1 = {wds_select['mag1']:.2f}") else: print("Magnitude 1 = --") if wds_select["mag2"] is not np.ma.masked: print(f"Magnitude 2 = {wds_select['mag2']:.2f}") else: print("Magnitude 2 = --") found_wds = True if not found_wds: print("Target not found in WDS catalog") if write_json: json_file = f"target_{self.gaia_release.lower()}_{self.gaia_source}.json" print(f"\nStoring JSON output: {json_file}") with open(json_file, "w", encoding="utf-8") as open_file: json.dump(target_dict, open_file, indent=4) return target_dict
[docs] @beartype def find_calib( self, search_radius: float = 0.1, g_mag_range: typing.Optional[typing.Tuple[float, float]] = None, write_csv: bool = True, ) -> pd.DataFrame: """ Function for finding calibration stars. The function returns a ``DataFrame`` with the sources that are queried from the Gaia catalog, but it also (optionally) stores the data in a CSV file in the working folder. The table also contains 2MASS and WISE magnitudes, and data from The Washington Visual Double Star Catalog. It is recommended to open the CSV file in a spreadsheet editor for easy visualization. Parameters ---------- search_radius : float Radius (in degrees) of the cone that is used to query the GAIA source catalog to search for calibration sources in the vicinity of the selected ``gaia_source`` (default: 0.1). The data release of the Gaia source catalog that is used for the query can be set with the ``gaia_release`` argument of the :class:`~calistar.calistar.CaliStar` instance. g_mag_range : tuple(float, float), None Magnitude range relative to the the Gaia $G$ band of the magnitude of the selected ``gaia_source``. The magnitude range will be used for querying sources in the Gaia catalog. The argument should be specified, for example, as ``(-2.0, 5.0)`` if source are selected with a $G$ magnitude that is at most 2 mag smaller and 5 mag larger than the magnitude ``gaia_source``. A range of :math:`\\pm` 1.0 mag (i.e. ``g_mag_range=(-1.0, 1.0)``) is used if the argument of ``g_mag_range`` is set to ``None``. write_csv : bool Write the table with found source to a CSV file (default: True). The file will be stored in the working folder and starts with ``calib_find_``. The filename contains also the Gaia release and the Gaia source ID of the target. Returns ------- pandas.DataFrame A ``DataFrame`` with the table of queried sources. """ json_file = Path(f"target_{self.gaia_release.lower()}_{self.gaia_source}.json") if not json_file.exists(): self.target_star(write_json=True) print("\n-> Finding calibration stars...\n") with open(json_file, "r", encoding="utf-8") as open_file: target_dict = json.load(open_file) if g_mag_range is None: g_mag_range = (-1.0, 1.0) # Add 2MASS JHKs magnitudes to the Simbad output # print(Simbad.list_votable_fields()) Simbad.add_votable_fields( "sp_type", "ids", # "flux(J)", # "flux(H)", # "flux(K)", # "flux_error(J)", # "flux_error(H)", # "flux_error(K)", ) print(f"Radius of search cone = {search_radius} deg") mag_low = target_dict[f"GAIA/GAIA{self.gaia_idx}.G"][0] + g_mag_range[0] mag_upp = target_dict[f"GAIA/GAIA{self.gaia_idx}.G"][0] + g_mag_range[1] print(f"G mag search range = ({mag_low:.2f}, {mag_upp:.2f})") gaia_query = f""" SELECT *, DISTANCE( POINT('ICRS', ra, dec), POINT('ICRS', {target_dict['Gaia RA'][0]}, {target_dict['Gaia Dec'][0]}) ) AS ang_sep FROM gaia{self.gaia_release.lower()}.gaia_source WHERE CONTAINS( POINT('ICRS', ra, dec), CIRCLE( 'ICRS', {target_dict['Gaia RA'][0]}, {target_dict['Gaia Dec'][0]}, {search_radius} ) ) = 1 AND phot_g_mean_mag > {mag_low} AND phot_g_mean_mag < {mag_upp} AND parallax IS NOT NULL ORDER BY ang_sep ASC """ # Launch the Gaia job and get the results gaia_job = Gaia.launch_job_async(gaia_query, dump_to_file=False, verbose=False) gaia_results = gaia_job.get_results() print(f"Number of found sources: {len(gaia_results)}") columns = [ "Simbad ID", "Gaia ID", "SpT", "Separation", "Astrometric excess noise", "RUWE", "Non single star", "Single star probability", "IPD variation of goodness of fit", "IPD fraction with multiple peaks", "Bp/Rp excess factor", ] columns += self.all_filters columns += [ "WDS ID", "WDS epoch 1", "WDS epoch 2", "WDS sep 1", "WDS sep 2", "WDS PA 1", "WDS PA 2", "WDS mag 1", "WDS mag 2", ] # Initiate all values in the dataframe to NaN cal_df = pd.DataFrame(index=range(len(gaia_results)), columns=columns) drop_indices = [] warnings.filterwarnings("ignore", category=UserWarning) vizier_obj = Vizier(columns=["*", "+_r"], timeout=10.0, row_limit=1) for gaia_item in tqdm(gaia_results): if "SOURCE_ID" in gaia_item.columns: gaia_source_id = int(gaia_item["SOURCE_ID"]) elif "source_id" in gaia_item.columns: gaia_source_id = int(gaia_item["source_id"]) else: raise ValueError(f"Gaia source ID not found in {gaia_item}") cal_df.loc[gaia_item.index, "Gaia ID"] = gaia_source_id coord_target = SkyCoord( target_dict["Gaia RA"][0], target_dict["Gaia Dec"][0], frame="icrs", unit=(u.deg, u.deg), ) coord_calib = SkyCoord( gaia_item["ra"], gaia_item["dec"], frame="icrs", unit=(u.deg, u.deg), ) separation = coord_target.separation(coord_calib) cal_df.loc[gaia_item.index, "Separation"] = separation.deg cal_df.loc[gaia_item.index, f"GAIA/GAIA{self.gaia_idx}.G"] = gaia_item[ "phot_g_mean_mag" ] simbad_result = Simbad.query_object( f"GAIA {self.gaia_release} {gaia_source_id}" ) if simbad_result is not None and len(simbad_result) > 0: simbad_result = simbad_result[0] if np.ma.is_masked(simbad_result["main_id"]): cal_df.loc[gaia_item.index, "Simbad ID"] = np.nan else: cal_df.loc[gaia_item.index, "Simbad ID"] = simbad_result["main_id"] if np.ma.is_masked(simbad_result["sp_type"]): cal_df.loc[gaia_item.index, "SpT"] = np.nan else: cal_df.loc[gaia_item.index, "SpT"] = simbad_result["sp_type"] # if np.ma.is_masked(simbad_result["FLUX_J"]): # cal_df.loc[gaia_item.index, "2MASS/2MASS.J"] = np.nan # else: # cal_df.loc[gaia_item.index, "2MASS/2MASS.J"] = simbad_result[ # "FLUX_J" # ] # # if np.ma.is_masked(simbad_result["FLUX_H"]): # cal_df.loc[gaia_item.index, "2MASS/2MASS.H"] = np.nan # else: # cal_df.loc[gaia_item.index, "2MASS/2MASS.H"] = simbad_result[ # "FLUX_H" # ] # # if np.ma.is_masked(simbad_result["FLUX_K"]): # cal_df.loc[gaia_item.index, "2MASS/2MASS.Ks"] = np.nan # else: # cal_df.loc[gaia_item.index, "2MASS/2MASS.Ks"] = simbad_result[ # "FLUX_K" # ] radius = u.Quantity(1.0 * u.arcmin) vizier_result = vizier_obj.query_object( f"GAIA {self.gaia_release} {gaia_source_id}", radius=radius, catalog=["II/246/out", "II/328/allwise"], ) if len(vizier_result) == 2: # 2MASS vizier_2mass = vizier_result["II/246/out"][0] # Check if the separation between the Gaia and # the 2MASS coordinates is at most 200 mas skip_source = vizier_2mass["_r"] > 0.2 if skip_source or np.ma.is_masked(vizier_2mass["Jmag"]): cal_df.loc[gaia_item.index, "2MASS/2MASS.J"] = np.nan else: cal_df.loc[gaia_item.index, "2MASS/2MASS.J"] = vizier_2mass["Jmag"] if skip_source or np.ma.is_masked(vizier_2mass["Hmag"]): cal_df.loc[gaia_item.index, "2MASS/2MASS.H"] = np.nan else: cal_df.loc[gaia_item.index, "2MASS/2MASS.H"] = vizier_2mass["Hmag"] if skip_source or np.ma.is_masked(vizier_2mass["Kmag"]): cal_df.loc[gaia_item.index, "2MASS/2MASS.Ks"] = np.nan else: cal_df.loc[gaia_item.index, "2MASS/2MASS.Ks"] = vizier_2mass["Kmag"] # WISE vizier_wise = vizier_result["II/328/allwise"][0] # Check if the separation between the Gaia and # the ALLWISE coordinates is at most 200 mas skip_source = vizier_wise["_r"] > 0.2 if skip_source or np.ma.is_masked(vizier_wise["W1mag"]): cal_df.loc[gaia_item.index, "WISE/WISE.W1"] = np.nan else: cal_df.loc[gaia_item.index, "WISE/WISE.W1"] = vizier_wise["W1mag"] if skip_source or np.ma.is_masked(vizier_wise["W2mag"]): cal_df.loc[gaia_item.index, "WISE/WISE.W2"] = np.nan else: cal_df.loc[gaia_item.index, "WISE/WISE.W2"] = vizier_wise["W2mag"] if skip_source or np.ma.is_masked(vizier_wise["W3mag"]): cal_df.loc[gaia_item.index, "WISE/WISE.W3"] = np.nan else: cal_df.loc[gaia_item.index, "WISE/WISE.W3"] = vizier_wise["W3mag"] if skip_source or np.ma.is_masked(vizier_wise["W4mag"]): cal_df.loc[gaia_item.index, "WISE/WISE.W4"] = np.nan else: cal_df.loc[gaia_item.index, "WISE/WISE.W4"] = vizier_wise["W4mag"] # This query returns no sources? # gaia_query = f""" # SELECT * # FROM gaia{self.gaia_release.lower()}.allwise_best_neighbour # WHERE source_id = {self.gaia_source} # """ # gaia_job = Gaia.launch_job_async(gaia_query, # dump_to_file=False, verbose=False) # gaia_result = gaia_job.get_results() else: drop_indices.append(gaia_item.index) cal_df.loc[gaia_item.index, "Astrometric excess noise"] = gaia_item[ "astrometric_excess_noise" ] if "ruwe" in gaia_item.columns: cal_df.loc[gaia_item.index, "RUWE"] = gaia_item["ruwe"] if "non_single_star" in gaia_item.columns: cal_df.loc[gaia_item.index, "Non single star"] = gaia_item[ "non_single_star" ] if "classprob_dsc_combmod_star" in gaia_item.columns: cal_df.loc[gaia_item.index, "Single star probability"] = gaia_item[ "classprob_dsc_combmod_star" ] if "ipd_gof_harmonic_amplitude" in gaia_item.columns: cal_df.loc[gaia_item.index, "IPD variation of goodness of fit"] = ( gaia_item["ipd_gof_harmonic_amplitude"] ) if "ipd_frac_multi_peak" in gaia_item.columns: cal_df.loc[gaia_item.index, "IPD fraction with multiple peaks"] = ( gaia_item["ipd_frac_multi_peak"] ) if "phot_bp_rp_excess_factor" in gaia_item.columns: cal_df.loc[gaia_item.index, "Bp/Rp excess factor"] = gaia_item[ "phot_bp_rp_excess_factor" ] # Query The Washington Visual Double Star Catalog if simbad_result is not None and len(simbad_result) > 0: simbad_ids = simbad_result["ids"].split("|") wds_id = list(filter(lambda x: x.startswith("WDS"), simbad_ids)) if len(wds_id) == 1: wds_table = Table.read(self.wds_file, path="wds_catalog") id_crop = wds_id[0].split(" ")[-1][1:11] id_idx = np.where(id_crop == wds_table["WDS"])[0] if len(id_idx) == 1: wds_select = wds_table[id_idx] cal_df.loc[gaia_item.index, "WDS ID"] = wds_id[0] cal_df.loc[gaia_item.index, "WDS epoch 1"] = wds_select["Obs1"] cal_df.loc[gaia_item.index, "WDS epoch 2"] = wds_select["Obs2"] cal_df.loc[gaia_item.index, "WDS sep 1"] = wds_select["sep1"] cal_df.loc[gaia_item.index, "WDS sep 2"] = wds_select["sep2"] cal_df.loc[gaia_item.index, "WDS PA 1"] = wds_select["pa1"] cal_df.loc[gaia_item.index, "WDS PA 2"] = wds_select["pa2"] cal_df.loc[gaia_item.index, "WDS mag 1"] = wds_select["mag1"] cal_df.loc[gaia_item.index, "WDS mag 2"] = wds_select["mag2"] warnings.filterwarnings("default", category=UserWarning) cal_df = cal_df.drop(index=drop_indices) cal_df["Gaia ID"] = cal_df["Gaia ID"].astype("int") if write_csv: output_file = ( f"calib_find_{self.gaia_release.lower()}_{self.gaia_source}.csv" ) print(f"Storing output: {output_file}") cal_df.to_csv(path_or_buf=output_file, header=True, index=False) return cal_df
[docs] @beartype def select_calib( self, filter_names: typing.Optional[typing.List[str]] = None, mag_diff: typing.Union[float, typing.Dict[str, float]] = 0.1, write_csv: bool = True, ) -> pd.DataFrame: """ Function for selecting the calibration stars. The function returns a ``DataFrame`` with the selected sources, but it also (optionally) stores the data in a CSV file in the working folder. It is recommended to open the CSV file in a spreadsheet editor for easy visualization. Parameters ---------- filter_names : list(str), None List with filter names that are used in combination with ``mag_diff`` for selecting sources. Any of the 2MASS, WISE, and GAIA filter names from the `SVO Filter Profile Service <http://svo2.cab.inta-csic.es/theory/fps/>`_ can be used. (default: ``['2MASS/2MASS.J', '2MASS/2MASS.H', '2MASS/2MASS.Ks']``). mag_diff : float, dict(str, float) Allowed magnitude difference between the selected target (i.e. the argument of ``gaia_source``) and the sources there were found with :func:`~calistar.calistar.CaliStar.find_calib()` (default: 0.1). The argument can be either a float, in which case the same value is used for all filters listed in ``filter_names``, or a dictionary in which case the keys should be the filter names that are listed in ``filter_names`` and the values are the allowed magnitude differences for each filter. write_csv : bool Write the table with found source to a CSV file (default: True). The file will be stored in the working folder and starts with ``calib_select_``. The filename contains also the Gaia release and the Gaia source ID of the target. Returns ------- pandas.DataFrame The ``DataFrame`` with the selected calibration stars. """ json_file = Path(f"target_{self.gaia_release.lower()}_{self.gaia_source}.json") if not json_file.exists(): self.target_star(write_json=True) print("\n-> Selecting calibration stars...\n") with open(json_file, "r", encoding="utf-8") as open_file: target_dict = json.load(open_file) if filter_names is None: filter_names = ["2MASS/2MASS.J", "2MASS/2MASS.H", "2MASS/2MASS.Ks"] if not isinstance(mag_diff, dict): diff_val = copy(mag_diff) mag_diff = {} for filter_item in filter_names: mag_diff[filter_item] = diff_val if sorted(filter_names) != sorted(list(mag_diff.keys())): raise ValueError( "The values in the list of 'filter_names', " f"{filter_names}, is not equal to the keys in the " f"dictionary of 'mag_diff', {list(mag_diff.keys())}." ) csv_file = Path( f"calib_find_{self.gaia_release.lower()}_{self.gaia_source}.csv" ) if not csv_file.exists(): err_msg = ( "The CSV file with pre-selected calibration " "sources is not found. Please make sure to run " "the 'find_calib()' method before " "'select_calib()', and set the argument of " "'write_csv' to True." ) raise FileNotFoundError(err_msg) cal_df = pd.read_csv( filepath_or_buffer=csv_file, header=0, index_col=False, ) drop_indices = [] for row_idx in tqdm(range(len(cal_df))): for filter_item in filter_names: if np.isnan(cal_df.loc[row_idx, filter_item]) or ( np.abs( cal_df.loc[row_idx, filter_item] - target_dict[filter_item][0] ) > mag_diff[filter_item] ): if row_idx not in drop_indices: drop_indices.append(row_idx) cal_df = cal_df.drop(index=drop_indices) print(f"Number of selected sources: {len(cal_df)}") if write_csv: output_file = ( f"calib_select_{self.gaia_release.lower()}_{self.gaia_source}.csv" ) print(f"Storing output: {output_file}") cal_df.to_csv(path_or_buf=output_file, header=True, index=False) return cal_df