Expand source code
#######################################################################
# This file is a part of SkyTools
#
# Sky Tools
# Copyright (C) 2023 Shamik Ghosh
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# For more information about SkyTools please visit
# <https://github.com/1cosmologist/skytools> or contact Shamik Ghosh
# at shamik@lbl.gov
#
#########################################################################
import numpy as np
import healpy as hp
import multiprocessing as mp
import joblib as jl
import os
from . import hpx_utils as hu
from . import border_finder as bf
datapath = os.getenv('SKYTOOLS_DATA')
def maskdist(mask_in, dist_from_edge_in_rad):
n_cores = mp.cpu_count()
nside = hp.get_nside(mask_in)
border0_pix, border1_pix, np0, np1 = bf.get_mask_border(mask_in, need_nos=True)
del border1_pix
if np0 == 0:
raise Exception("ERROR: No masked pixels in the map! Abort!", np0)
border_slice = 2500
nparts = np0 // border_slice
if nparts == 0:
nparts = 1
npix_per_part = np.ones((nparts,)) * np0
else:
if np.mod(np0, nparts) > 0.:
nparts += 1
npix_per_part = np.zeros((nparts,))
npix_per_part[:nparts-1] = border_slice
npix_per_part[nparts-1] = np0 - np.sum(npix_per_part)
if np.sum(npix_per_part) != np0:
raise Exception("ERROR: npix_per_part sum does not match npix_0", np.sum(npix_per_part), np0)
start = int(0)
stop = int(start+npix_per_part[0])
dist_map = np.copy(mask_in) * 6*np.pi
for part in range(nparts):
# bordermap = np.zeros_like(mask_in)
# bordermap[border0_pix[start:stop]] = 1.
# bordermap = hp.smoothing(bordermap, fwhm=(np.sqrt(2.) * dist_from_edge_in_rad),use_weights=True, datapath=datapath)
# valid_pix = np.where((mask_in == 1) & (bordermap > 5e-4))[0]
# del bordermap
vec_bdr = np.array(hp.pix2vec(nside, border0_pix[start:stop]))
neighbour_list = jl.Parallel(n_jobs=n_cores)(jl.delayed(hp.query_disc)(nside, vec_bdr[:, ivec], 1.05* dist_from_edge_in_rad, inclusive=True) for ivec in np.arange(stop-start, dtype=int))
valid_pix = np.unique(np.hstack(np.array(neighbour_list, dtype=object)).astype(np.int64))
valid_pix = valid_pix[valid_pix > 0.]
valid_pix = valid_pix[mask_in[valid_pix] > 0.]
# print(len(valid_pix), npix_per_part[part])
dist4valid = np.amin(hu.angdist(nside, valid_pix, nside, border0_pix[start:stop]), axis=1)
pix2save = np.where(dist4valid < dist_map[valid_pix])[0]
dist_map[valid_pix[pix2save]] = dist4valid[pix2save]
if part < nparts-1:
start = int(stop)
stop = int(start+npix_per_part[part+1])
dist_map[dist_map > 2*np.pi] = hp.UNSEEN
return dist_map
def apodize_mask(mask_in, aposize_in_deg, apotype="c2", tune=None):
angdist = maskdist(mask_in, np.deg2rad(aposize_in_deg))
x = np.ones_like(angdist)
x[angdist == hp.UNSEEN] = 1.
if apotype.lower() in ["c1", "sin","c2","cos"]:
# following definition from NaMaster C1/C2 window definition, differing from Grain et al. 2009 definition
x[angdist != hp.UNSEEN] = np.sqrt((1 - np.cos(angdist[angdist != hp.UNSEEN]))/(1 - np.cos(np.deg2rad(aposize_in_deg))))
else:
x[angdist != hp.UNSEEN] = angdist[angdist != hp.UNSEEN]/ np.deg2rad(aposize_in_deg)
apo_mask = np.zeros(mask_in.shape)
if apotype.lower() in ["c1", "sin"]:
# following definition from NaMaster C1 window definition, differing from Grain et al. 2009 definition
apo_mask[x >= 1.] = 1.
apo_mask[x < 1.] = x[x < 1.] - (np.sin(2 * np.pi * x[x < 1.]) / 2. / np.pi)
if apotype.lower() in ["c2","cos"]:
# following definition from NaMaster C2 window definition, differing from Grain et al. 2009 definition
apo_mask[x >= 1.] = 1.
apo_mask[x < 1.] = 0.5 * (1. - np.cos(np.pi * x[x < 1.]))
if apotype.lower() in ["mbh"]:
# following Modified Barlett-Hanning window defined in Gautam, Kumar and Saxena 1996 (IEEE)
# When tune is 0. matches the Grain et al. C2 window definition.
# As tune goes closer to 1. the transition becomes steeper like a stright line. The tune parameter sharpens the transition.
if tune == None:
tune = 0.15
if (tune < 0.) or (tune > 1.):
raise Exception("ERROR: Unphysical tune for MBH apodization. Choose 0<= tune <= 1. Aborting!")
apo_mask[x > 1.] = 1.
apo_mask[x <= 1.] = 1. - tune*(1. - x[x <= 1.]) - (1. - tune)*(0.5 + 0.5*np.cos(np.pi*x[x <= 1.]))
if apotype.lower() in ["cn", "cosn"]:
# largely based on Nuttall 1981.
if tune == None:
tune = 4.
if tune < 2:
raise Exception("ERROR: Unphysical tune for cosine^n apodization. Choose tune>=2. Aborting!")
apo_mask[x > 1.] = 1.
apo_mask[x <= 1.] = 1. - np.cos(0.5*np.pi*x[x <= 1.])**tune
if apotype.lower() in ["nut", "nuttall"]:
# Based on Nuttall 1981. This the minimum 4-term window.
a = np.array([0.35875, 0.48829, 0.14128, 0.01168])
apo_mask[x >= 1.] = 1.
apo_mask[x < 1.] = 1. - a[0] - a[1] * np.cos(np.pi*x[x < 1.]) - a[2] * np.cos(2*np.pi*x[x < 1.]) - a[3] * np.cos(3*np.pi*x[x < 1.])
apo_mask *= mask_in
return apo_mask