Assemble WebSky background catalog with Dask¶
[2]:
import h5pickle as h5py
import numpy as np
import healpy as hp
import matplotlib.pyplot as plt
from tqdm import tqdm
[3]:
%load_ext jupyter_ai
%ai list gemini
[3]:
Provider |
Environment variable |
Set? |
Models |
|---|---|---|---|
| gemini | GOOGLE_API_KEY | ✅ |
gemini:gemini-1.0-progemini:gemini-1.0-pro-001gemini:gemini-1.0-pro-latestgemini:gemini-1.0-pro-vision-latestgemini:gemini-progemini:gemini-pro-vision
|
[4]:
#%%ai gemini:gemini-pro -f code
[5]:
import healpy as hp
hp.version
[5]:
<module 'healpy.version' from '/global/common/software/cmb/zonca/conda/pycmb/lib/python3.10/site-packages/healpy/version.py'>
[6]:
%alias_magic gm ai -p "gemini:gemini-pro -f code"
Created `%gm` as an alias for `%ai gemini:gemini-pro -f code`.
Created `%%gm` as an alias for `%%ai gemini:gemini-pro -f code`.
[7]:
import os
num_threads = 128
os.environ["OMP_NUM_THREADS"] = "1"
[8]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=num_threads, threads_per_worker=1, processes=True)
client = Client(cluster)
[9]:
cutoff_flux = 1e-3
[10]:
plot = False
[11]:
cd /global/cfs/cdirs/sobs/www/users/Radio_WebSky/matched_catalogs_2
/global/cfs/cdirs/sobs/www/users/Radio_WebSky/matched_catalogs_2
[12]:
cat = h5py.File("catalog_100.0.h5", "r")
[13]:
#%%ai gemini:gemini-pro -f code
#find the fields in a h5py File
[14]:
import dask.array as da
There are no metadata in the file, I guess fluxes are in Jy
[15]:
import pandas as pd
import xarray as xr
[16]:
field = 'flux'
[17]:
chunk_size = int(1e6)
[18]:
cat_xr = xr.open_dataset("catalog_100.0.h5", chunks=chunk_size)
cat_xr = cat_xr.rename({"phony_dim_0":"index"})
[19]:
cutoff_mask = (cat_xr.flux < cutoff_flux).compute()
[20]:
%load_ext jupyter_ai
%ai list gemini
The jupyter_ai extension is already loaded. To reload it, use:
%reload_ext jupyter_ai
[20]:
Provider |
Environment variable |
Set? |
Models |
|---|---|---|---|
| gemini | GOOGLE_API_KEY | ✅ |
gemini:gemini-1.0-progemini:gemini-1.0-pro-001gemini:gemini-1.0-pro-latestgemini:gemini-1.0-pro-vision-latestgemini:gemini-progemini:gemini-pro-vision
|
[21]:
#%%ai gemini:gemini-pro -f code
[22]:
pol_coeff = xr.open_dataarray(
"/pscratch/sd/z/zonca/websky_full_catalog_polarized flux.h5", chunks=chunk_size)[:, cutoff_mask]
[23]:
pol_coeff
[23]:
<xarray.DataArray 'flux' (power: 5, index: 281384121)> dask.array<getitem, shape=(5, 281384121), dtype=float64, chunksize=(5, 999136), chunktype=numpy.ndarray> Coordinates: * power (power) int64 4 3 2 1 0 * index (index) int64 0 1 2 3 4 ... 281756372 281756373 281756374 281756375
[24]:
temp_coeff = xr.open_dataarray("/pscratch/sd/z/zonca/websky_full_catalog_flux.h5", chunks=chunk_size)[:, cutoff_mask]
[25]:
output_catalog = xr.Dataset({"logpolycoefpolflux":pol_coeff,"logpolycoefflux":temp_coeff })
[27]:
output_catalog
[27]:
<xarray.Dataset>
Dimensions: (power: 5, index: 281384121)
Coordinates:
* power (power) int64 4 3 2 1 0
* index (index) int64 0 1 2 3 ... 281756373 281756374 281756375
Data variables:
logpolycoefpolflux (index, power) float64 dask.array<chunksize=(999136, 5), meta=np.ndarray>
logpolycoefflux (index, power) float64 dask.array<chunksize=(999136, 5), meta=np.ndarray>[33]:
output_catalog = output_catalog.transpose()
[34]:
output_catalog["logpolycoefflux"].shape
[34]:
(281384121, 5)
[35]:
output_catalog.logpolycoefflux.attrs["units"] = "Jy"
output_catalog.logpolycoefpolflux.attrs["units"] = "Jy"
[36]:
for coord in ["theta", "phi"]:
output_catalog = output_catalog.assign_coords(
**{coord:(("index"), cat_xr[coord][cutoff_mask].data)})
[37]:
output_catalog["flux_100"] = np.polynomial.polynomial.polyval(
np.log(100), output_catalog["logpolycoefflux"][:,::-1], tensor=False)
/global/common/software/cmb/zonca/conda/pycmb/lib/python3.10/site-packages/distributed/client.py:3361: UserWarning: Sending large graph of size 1.05 GiB.
This may cause some slowdown.
Consider loading the data with Dask directly
or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
warnings.warn(
/global/common/software/cmb/zonca/conda/pycmb/lib/python3.10/site-packages/numpy/polynomial/polynomial.py:756: RuntimeWarning: overflow encountered in multiply
c0 = c[-i] + c0*x
[38]:
output_catalog["flux_100"].max()
[38]:
<xarray.DataArray 'flux_100' ()> array(inf)
[39]:
output_catalog = output_catalog.sortby("flux_100", ascending=False)
del output_catalog["flux_100"]
[40]:
output_catalog.coords["theta"].attrs["units"] = "rad"
output_catalog.coords["phi"].attrs["units"] = "rad"
[41]:
output_catalog.attrs["notes"] = \
"""Catalog of sources where the flux in Jy at any frequency is calculated with a 5th order polynomial in the logarithm of the frequency in GHz, separately for temperature and polarization.
The catalog does not contain information about the polarization angle of a source.
The catalog sorted in descending order based on the source flux at 100 GHz"""
[44]:
output_filename = f"/pscratch/sd/z/zonca/websky_full_catalog_trasp.h5"
[42]:
output_catalog.to_netcdf(
output_filename, format="NETCDF4") # requires netcdf4 package
/global/common/software/cmb/zonca/conda/pycmb/lib/python3.10/site-packages/distributed/client.py:3361: UserWarning: Sending large graph of size 2.10 GiB.
This may cause some slowdown.
Consider loading the data with Dask directly
or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
warnings.warn(
2024-09-21 14:02:36,894 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 2.99 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:36,934 - distributed.worker.memory - WARNING - Worker is at 79% memory usage. Resuming worker. Process memory: 2.95 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:37,039 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.05 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:37,159 - distributed.worker.memory - WARNING - Worker is at 42% memory usage. Resuming worker. Process memory: 1.58 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:49,972 - distributed.worker.memory - WARNING - Worker is at 82% memory usage. Pausing worker. Process memory: 3.08 GiB -- Worker memory limit: 3.72 GiB
2024-09-21 14:02:50,525 - distributed.worker.memory - WARNING - Worker is at 78% memory usage. Resuming worker. Process memory: 2.92 GiB -- Worker memory limit: 3.72 GiB
[45]:
import h5py
[46]:
f = h5py.File(output_filename)
[47]:
f.keys()
[47]:
<KeysViewHDF5 ['index', 'logpolycoefflux', 'logpolycoefpolflux', 'phi', 'power', 'theta']>
[50]:
f["logpolycoefflux"][0]
[50]:
array([ 5.37899652e-09, -1.29664725e-07, 1.20804354e-06, -5.22231671e-06,
9.00274650e-06])
[1]:
!benchmark-pixell-runner
/bin/bash: benchmark-pixell-runner: command not found
[2]:
!free -m
total used free shared buff/cache available
Mem: 515307 288278 237454 2284 3068 227028
Swap: 0 0 0
[1]:
import xarray as xr
catalog = xr.open_dataset("/pscratch/sd/z/zonca/websky_full_catalog_trasp.h5")
[2]:
catalog
[2]:
<xarray.Dataset>
Dimensions: (power: 5, index: 281384121)
Coordinates:
* power (power) int64 4 3 2 1 0
* index (index) int64 0 1 2 3 ... 281756373 281756374 281756375
theta (index) float64 ...
phi (index) float64 ...
Data variables:
logpolycoefpolflux (index, power) float64 ...
logpolycoefflux (index, power) float64 ...
Attributes:
notes: Catalog of sources where the flux in Jy at any frequency is cal...[3]:
catalog["logpolycoefflux"].shape
[3]:
(281384121, 5)
[ ]: