Assemble WebSky background catalog with Dask

[2]:
import h5pickle as h5py
import numpy as np
import healpy as hp
import matplotlib.pyplot as plt
from tqdm import tqdm
[3]:
%load_ext jupyter_ai
%ai list gemini
[3]:

Provider

Environment variable

Set?

Models

| gemini | GOOGLE_API_KEY | ✅ |

  • gemini:gemini-1.0-pro

  • gemini:gemini-1.0-pro-001

  • gemini:gemini-1.0-pro-latest

  • gemini:gemini-1.0-pro-vision-latest

  • gemini:gemini-pro

  • gemini:gemini-pro-vision

|

[4]:
#%%ai gemini:gemini-pro -f code

[5]:
import healpy as hp
hp.version
[5]:
<module 'healpy.version' from '/global/common/software/cmb/zonca/conda/pycmb/lib/python3.10/site-packages/healpy/version.py'>
[6]:
%alias_magic gm ai -p "gemini:gemini-pro -f code"
Created `%gm` as an alias for `%ai gemini:gemini-pro -f code`.
Created `%%gm` as an alias for `%%ai gemini:gemini-pro -f code`.
[7]:
import os

num_threads = 128
os.environ["OMP_NUM_THREADS"] = "1"
[8]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=num_threads, threads_per_worker=1, processes=True)
client = Client(cluster)
[9]:
cutoff_flux = 1e-3
[10]:
plot = False
[11]:
cd /global/cfs/cdirs/sobs/www/users/Radio_WebSky/matched_catalogs_2
/global/cfs/cdirs/sobs/www/users/Radio_WebSky/matched_catalogs_2
[12]:
cat = h5py.File("catalog_100.0.h5", "r")
[13]:
#%%ai gemini:gemini-pro -f code

#find the fields in a h5py File
[14]:
import dask.array as da

There are no metadata in the file, I guess fluxes are in Jy

[15]:
import pandas as pd
import xarray as xr
[16]:
field = 'flux'
[17]:
chunk_size = int(1e6)
[18]:
cat_xr = xr.open_dataset("catalog_100.0.h5", chunks=chunk_size)
cat_xr = cat_xr.rename({"phony_dim_0":"index"})
[19]:
cutoff_mask = (cat_xr.flux < cutoff_flux).compute()
[20]:
%load_ext jupyter_ai
%ai list gemini
The jupyter_ai extension is already loaded. To reload it, use:
  %reload_ext jupyter_ai
[20]:

Provider

Environment variable

Set?

Models

| gemini | GOOGLE_API_KEY | ✅ |

  • gemini:gemini-1.0-pro

  • gemini:gemini-1.0-pro-001

  • gemini:gemini-1.0-pro-latest

  • gemini:gemini-1.0-pro-vision-latest

  • gemini:gemini-pro

  • gemini:gemini-pro-vision

|

[21]:
#%%ai gemini:gemini-pro -f code
[22]:
pol_coeff = xr.open_dataarray(
    "/pscratch/sd/z/zonca/websky_full_catalog_polarized flux.h5", chunks=chunk_size)[:, cutoff_mask]
[23]:
pol_coeff
[23]:
<xarray.DataArray 'flux' (power: 5, index: 281384121)>
dask.array<getitem, shape=(5, 281384121), dtype=float64, chunksize=(5, 999136), chunktype=numpy.ndarray>
Coordinates:
  * power    (power) int64 4 3 2 1 0
  * index    (index) int64 0 1 2 3 4 ... 281756372 281756373 281756374 281756375
[24]:
temp_coeff = xr.open_dataarray("/pscratch/sd/z/zonca/websky_full_catalog_flux.h5", chunks=chunk_size)[:, cutoff_mask]
[25]:
output_catalog = xr.Dataset({"logpolycoefpolflux":pol_coeff,"logpolycoefflux":temp_coeff })
[27]:
output_catalog
[27]:
<xarray.Dataset>
Dimensions:             (power: 5, index: 281384121)
Coordinates:
  * power               (power) int64 4 3 2 1 0
  * index               (index) int64 0 1 2 3 ... 281756373 281756374 281756375
Data variables:
    logpolycoefpolflux  (index, power) float64 dask.array<chunksize=(999136, 5), meta=np.ndarray>
    logpolycoefflux     (index, power) float64 dask.array<chunksize=(999136, 5), meta=np.ndarray>
[33]:
output_catalog = output_catalog.transpose()
[34]:
output_catalog["logpolycoefflux"].shape
[34]:
(281384121, 5)
[35]:
output_catalog.logpolycoefflux.attrs["units"] = "Jy"
output_catalog.logpolycoefpolflux.attrs["units"] = "Jy"
[36]:
for coord in ["theta", "phi"]:
    output_catalog = output_catalog.assign_coords(
        **{coord:(("index"), cat_xr[coord][cutoff_mask].data)})
[37]:
output_catalog["flux_100"] = np.polynomial.polynomial.polyval(
    np.log(100), output_catalog["logpolycoefflux"][:,::-1], tensor=False)
/global/common/software/cmb/zonca/conda/pycmb/lib/python3.10/site-packages/distributed/client.py:3361: UserWarning: Sending large graph of size 1.05 GiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
  warnings.warn(
/global/common/software/cmb/zonca/conda/pycmb/lib/python3.10/site-packages/numpy/polynomial/polynomial.py:756: RuntimeWarning: overflow encountered in multiply
  c0 = c[-i] + c0*x
[38]:
output_catalog["flux_100"].max()
[38]:
<xarray.DataArray 'flux_100' ()>
array(inf)
[39]:
output_catalog = output_catalog.sortby("flux_100", ascending=False)
del output_catalog["flux_100"]
[40]:
output_catalog.coords["theta"].attrs["units"] = "rad"
output_catalog.coords["phi"].attrs["units"] = "rad"
[41]:
output_catalog.attrs["notes"] = \
"""Catalog of sources where the flux in Jy at any frequency is calculated with a 5th order polynomial in the logarithm of the frequency in GHz, separately for temperature and polarization.
The catalog does not contain information about the polarization angle of a source.
The catalog sorted in descending order based on the source flux at 100 GHz"""
[44]:
output_filename = f"/pscratch/sd/z/zonca/websky_full_catalog_trasp.h5"
[42]:
output_catalog.to_netcdf(
    output_filename, format="NETCDF4") # requires netcdf4 package
/global/common/software/cmb/zonca/conda/pycmb/lib/python3.10/site-packages/distributed/client.py:3361: UserWarning: Sending large graph of size 2.10 GiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
  warnings.warn(
2024-09-21 14:02:36,894 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 2.99 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:36,934 - distributed.worker.memory - WARNING - Worker is at 79% memory usage. Resuming worker. Process memory: 2.95 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:37,039 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker.  Process memory: 3.05 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:37,159 - distributed.worker.memory - WARNING - Worker is at 42% memory usage. Resuming worker. Process memory: 1.58 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:49,972 - distributed.worker.memory - WARNING - Worker is at 82% memory usage. Pausing worker.  Process memory: 3.08 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:50,525 - distributed.worker.memory - WARNING - Worker is at 78% memory usage. Resuming worker. Process memory: 2.92 GiB -- Worker memory limit: 3.72 GiB
[45]:
import h5py
[46]:
f = h5py.File(output_filename)
[47]:
f.keys()
[47]:
<KeysViewHDF5 ['index', 'logpolycoefflux', 'logpolycoefpolflux', 'phi', 'power', 'theta']>
[50]:
f["logpolycoefflux"][0]
[50]:
array([ 5.37899652e-09, -1.29664725e-07,  1.20804354e-06, -5.22231671e-06,
        9.00274650e-06])
[1]:
!benchmark-pixell-runner
/bin/bash: benchmark-pixell-runner: command not found
[2]:
!free -m
              total        used        free      shared  buff/cache   available
Mem:         515307      288278      237454        2284        3068      227028
Swap:             0           0           0
[1]:
import xarray as xr
catalog = xr.open_dataset("/pscratch/sd/z/zonca/websky_full_catalog_trasp.h5")
[2]:
catalog
[2]:
<xarray.Dataset>
Dimensions:             (power: 5, index: 281384121)
Coordinates:
  * power               (power) int64 4 3 2 1 0
  * index               (index) int64 0 1 2 3 ... 281756373 281756374 281756375
    theta               (index) float64 ...
    phi                 (index) float64 ...
Data variables:
    logpolycoefpolflux  (index, power) float64 ...
    logpolycoefflux     (index, power) float64 ...
Attributes:
    notes:    Catalog of sources where the flux in Jy at any frequency is cal...
[3]:
catalog["logpolycoefflux"].shape
[3]:
(281384121, 5)
[ ]: