Source code for pysm3.models.catalog

from typing import Optional
import h5py
import healpy as hp
import numpy as np

import logging

log = logging.getLogger("pysm3")

try:
    from numpy import trapezoid
except ImportError:
    from numpy import trapz as trapezoid
from numba import njit

# from astropy import constants as const
#
from .. import units as u
from .. import utils
from .template import Model


@njit
def aggregate(index, array, values):
    """Sums values by index

    Example:
    m = np.zeros(3)
    m[2, 2] += np.ones(2)
    gives:
    m
    [0, 0, 1]
    instead
    aggregate([2,2], m, np.ones(2))
    gives
    m
    [0, 0, 2]
    """
    for i, v in zip(index, values):
        array[i] += v


@njit
def fwhm2sigma(fwhm):
    """Converts the Full Width Half Maximum of a Gaussian beam to its standard deviation"""
    return fwhm / (2.0 * np.sqrt(2.0 * np.log(2.0)))


@njit
def flux2amp(flux, fwhm):
    """Converts the total flux of a radio source to the peak amplitude of its Gaussian
    beam representation, taking into account the width of the beam as specified
    by its FWHM

    Parameters
    ----------
    flux: float
        Total flux of the radio source
    fwhm: float
        Full Width Half Maximum of the beam in radians

    Returns
    -------
    amp: float
        Peak amplitude of the Gaussian beam representation of the radio source"""
    sigma = fwhm2sigma(fwhm)
    amp = flux / (2 * np.pi * sigma ** 2)
    # sim_objects fails if amp is zero
    c = 1e-8  # clip
    amp[np.logical_and(amp < c, amp >= 0)] = c
    amp[np.logical_and(amp > -c, amp < 0)] = -c
    return amp


@njit
def evaluate_poly(coeffs, x):
    """
    Evaluate polynomial for each source at value x.
    coeffs: shape (n_coeff, n_sources) (power, index)
    x: float (log-frequency)
    Returns: shape (n_sources,)
    """
    n_coeff, n_sources = coeffs.shape
    out = np.zeros(n_sources, dtype=np.float64)
    for i_source in range(n_sources):
        val = 0.0
        for i_coeff in range(n_coeff):
            val += coeffs[i_coeff, i_source] * x ** (n_coeff - 1 - i_coeff)
        out[i_source] = max(0, val)
    return out


@njit
def evaluate_model(freqs, weights, coeffs):
    """
    Integrate log polynomial model across the bandpass for each source in the catalog.
    coeffs: shape (n_coeff, n_sources) (power, index)
    """
    n_coeff, n_sources = coeffs.shape
    logfreqs = np.log(freqs)
    out = np.zeros(n_sources, dtype=np.float64)
    assert len(freqs) == len(weights)
    if len(freqs) == 1:
        out = evaluate_poly(coeffs, logfreqs[0])
    else:
        flux = np.zeros((len(freqs), n_sources), dtype=np.float64)
        for i_freq in range(len(freqs)):
            flux[i_freq, :] = evaluate_poly(coeffs, logfreqs[i_freq])
        for i_source in range(n_sources):
            out[i_source] = trapezoid(flux[:, i_source] * weights, x=freqs)
    return out


[docs] class PointSourceCatalog(Model): """Model for a Catalog of point sources defined with their coordinates and a model of their emission based on a logpolynomial of frequency. The beam convolution is performed in map domain with `pixell`. The catalog should be in HDF5 format, with the fields: theta: colatitude in radians phi: longitude in radians logpolycoefflux and logpolycoefpolflux: polynomial coefficients in natural logaritm (`np.log`) of the frequency, typically 4th order, but accepts any order. (source_index, pol_order). Unit needs to be Jy each field should have an attribute units which is checked when loading a model. No conversion is performed. See the documentation and the unit tests for examples on how to create a catalog file with `xarray`. Parameters ---------- catalog_filename: str or Path Path to the catalog HDF5 file """ includes_smoothing = True def __init__( self, catalog_filename, nside=None, target_wcs=None, catalog_slice=None, max_nside=None, map_dist=None, ): super().__init__(nside=nside, max_nside=max_nside, map_dist=map_dist) self.catalog_filename = utils.RemoteData().get(catalog_filename) self.wcs = target_wcs if catalog_slice is None: self.catalog_slice = slice(None) else: self.catalog_slice = catalog_slice with h5py.File(self.catalog_filename) as f: assert f["theta"].attrs["units"].decode("UTF-8") == "rad" assert f["phi"].attrs["units"].decode("UTF-8") == "rad" assert f["logpolycoefflux"].attrs["units"].decode("UTF-8") == "Jy" assert f["logpolycoefpolflux"].attrs["units"].decode("UTF-8") == "Jy" assert map_dist is None, "Distributed execution not supported"
[docs] def get_fluxes(self, freqs: u.GHz, coeff="logpolycoefflux", weights=None): """Get catalog fluxes in Jy integrated over a bandpass""" freqs = utils.check_freq_input(freqs) weights = utils.normalize_weights(freqs, weights) with h5py.File(self.catalog_filename) as f: # New format: coeffs are (power, index) coeffs = np.array(f[coeff][:, self.catalog_slice]) # If catalog_slice is not a full slice, squeeze to (n_coeff, n_sources) if coeffs.ndim == 3: # e.g. (n_coeff, n_sources, 1) coeffs = np.squeeze(coeffs, axis=-1) flux = evaluate_model(freqs, weights, coeffs) return flux * u.Jy
[docs] @u.quantity_input def get_emission( self, freqs: u.Quantity[u.GHz], fwhm: Optional[u.Quantity[u.arcmin]] = None, weights=None, output_units=u.uK_RJ, car_map_resolution: Optional[u.Quantity[u.arcmin]] = None, lmax=None, output_nside=None, coord=None, return_car=False, output_car_resol=None, return_healpix=True, ): """Generate a HEALPix or CAR map of the catalog emission integrated on the bandpass and convolved with the beam Parameters ---------- freqs: np.array Array of frequencies in GHz fwhm: float or None Full Width Half Maximum of the beam in arcminutes, if None, each source is assigned to a single pixel weights: np.array Array of relative bandpass weights already normalized Same length of freqs, if None, uniform weights are assumed output_units: astropy.units Output units of the map car_map_resolution: float Resolution of the CAR map used by pixell to generate the map, if None, it is set to half of the resolution of the HEALPix map given by `self.nside`, unless Nside is above >= 8192, then it is the same resolution of the HEALPix map output_nside : int Output HEALPix nside, if None, the nside used when creating the component (self.nside) lmax: int Not used, all operations are performed in pixel domain coord: str Coordinate rotation to apply, for example ("G", "C") to rotate from Galactic to Equatorial coordinates. If None, no rotation is applied return_car: bool If True return a CAR map output_car_resol: arcmin Not implemented yet return_healpix: bool If True return a HEALPix map Returns ------ue- output_map: np.array Output HEALPix or CAR map or tuple with HEALPix and CAR maps""" convolve_beam = fwhm is not None scaling_factor = utils.bandpass_unit_conversion( freqs, weights, output_unit=output_units, input_unit=u.Jy / u.sr ) log.info( "HEALPix map resolution: %.2e arcmin, nside %d", hp.nside2resol(self.nside, arcmin=True), self.nside, ) pix_size = hp.nside2pixarea(self.nside) * u.sr if return_car: assert output_car_resol is None, "CAR resolution not implemented yet" if convolve_beam: if car_map_resolution is None: car_map_resolution = hp.nside2resol(self.nside) * u.rad if self.nside < 8192: car_map_resolution /= 2 log.info( "Rounded CAR map resolution: %s", car_map_resolution.to(u.arcmin).to_string(precision=2), ) # Make sure the resolution evenly divides the map vertically if (car_map_resolution.to_value(u.rad) % np.pi) > 1e-8: car_map_resolution = ( np.pi / np.round(np.pi / car_map_resolution.to_value(u.rad)) ) * u.rad log.info( "Rounded CAR map resolution: %s", car_map_resolution.to(u.arcmin).to_string(precision=2), ) log.info("Computing fluxes for I") fluxes_I = self.get_fluxes(freqs, weights=weights, coeff="logpolycoefflux") log.info("Fluxes for I computed for %.2f million sources", len(fluxes_I) / 1e6) if convolve_beam: from pixell import ( enmap, pointsrcs, ) shape, wcs = enmap.fullsky_geometry( car_map_resolution.to_value(u.radian), dims=(3,), variant="fejer1", ) log.info("CAR map shape %s", shape) output_map = enmap.enmap(np.zeros(shape, dtype=np.float32), wcs) r, p = pointsrcs.expand_beam(fwhm2sigma(fwhm.to_value(u.rad))) log.info("Reading pointing") with h5py.File(self.catalog_filename) as f: pointing = np.vstack( ( np.pi / 2 - np.array(f["theta"][self.catalog_slice]), np.array(f["phi"][self.catalog_slice]), ) ) log.info("Reading pointing completed") amps = flux2amp( fluxes_I.to_value(u.Jy) * scaling_factor.value, fwhm.to_value(u.rad), ) # to peak amplitude and to output units log.info("Executing sim_objects for I") # import pickle # with open( # "sim_objects_inputs.pkl", # "wb", # ) as f: # pickle.dump( # { # "shape": shape, # "wcs": wcs, # "poss": pointing, # "amps": amps, # "profile": (r, p), # }, # f, # ) # import sys # sys.exit(0) output_map[0] = pointsrcs.sim_objects( shape=shape, wcs=wcs, poss=pointing, amps=amps, profile=((r, p)), ) log.info("Execution of sim_objects for I completed") else: with h5py.File(self.catalog_filename) as f: pix = hp.ang2pix( self.nside, f["theta"][self.catalog_slice], f["phi"][self.catalog_slice], ) output_map = ( np.zeros((3, hp.nside2npix(self.nside)), dtype=np.float32) * output_units ) aggregate(pix, output_map[0], fluxes_I / pix_size * scaling_factor) del fluxes_I log.info("Computing fluxes for Q/U") fluxes_P = self.get_fluxes(freqs, weights=weights, coeff="logpolycoefpolflux") log.info( "Fluxes for Q/U computed for %.2f million sources", len(fluxes_P) / 1e6 ) # set seed so that the polarization angle is always the same for each run # could expose to the interface if useful np.random.seed(56567) psirand = np.random.uniform( low=-np.pi / 2.0, high=np.pi / 2.0, size=len(fluxes_P) ) if convolve_beam: pols = [(1, np.cos)] pols.append((2, np.sin)) for i_pol, sincos in pols: log.info("Executing sim_objects for Q/U") output_map[i_pol] = pointsrcs.sim_objects( shape, wcs, pointing, flux2amp( fluxes_P.to_value(u.Jy) * scaling_factor.value * sincos(2 * psirand), fwhm.to_value(u.rad), ), ((r, p)), ) log.info("Execution of sim_objects for Q/U completed") if return_car: assert ( coord is None ), "Coord rotation for CAR not implemented yet, open issue if you need it" if return_healpix: from pixell import reproject frames_dict = {"Q": "equ", "C": "equ", "G": "gal"} if coord is not None: coord = ",".join([frames_dict[frame] for frame in coord]) log.info("Reprojecting to HEALPix") output_nside = output_nside or self.nside output_map_healpix = ( reproject.map2healpix( output_map, output_nside, rot=coord, method="spline" ) * output_units ) log.info("Reprojecting to HEALPix completed") if return_car: output_map = (output_map_healpix, output_map) else: output_map = output_map_healpix else: assert not return_car and return_healpix, "Only HEALPix maps supported" aggregate( pix, output_map[1], fluxes_P / pix_size * scaling_factor * np.cos(2 * psirand), ) aggregate( pix, output_map[2], fluxes_P / pix_size * scaling_factor * np.sin(2 * psirand), ) log.info("Catalog emission computed") return output_map