Module skytools.border_finder

Expand source code
#######################################################################
# This file is a part of SkyTools
#
# Sky Tools
# Copyright (C) 2023  Shamik Ghosh
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
# For more information about SkyTools please visit 
# <https://github.com/1cosmologist/skytools> or contact Shamik Ghosh 
# at shamik@lbl.gov
#
#########################################################################

import numpy as np 
import healpy as hp
import os

datapath = os.getenv('SKYTOOLS_DATA')

def nwhere(array, cond, val, ma=False):
    if (cond == 'gt') or (cond == 'GT'):
        if ma:
            where_arr = np.array(np.ma.where(array > val))[0]
        else:
            where_arr = np.array(np.where(array > val))[0]
    elif (cond == 'lt') or (cond == 'LT'):
        if ma:
            where_arr = np.array(np.ma.where(array < val))[0]
        else:
            where_arr = np.array(np.where(array < val))[0]
    elif (cond == 'eq') or (cond == 'EQ'):
        if ma:
            where_arr = np.array(np.ma.where(array == val))[0]
        else:
            where_arr = np.array(np.where(array == val))[0]

    nwh = len(where_arr)

    return where_arr, nwh
    

def get_mask_border(mask_in, need_nos=False):
    npix = hp.get_map_size(mask_in)
    nside = hp.get_nside(mask_in)

    whmask, nmasked = nwhere(mask_in, 'eq', 1.)

    # All masked case
    if nmasked == 0:
        p0 = np.array([])
        p1 = np.array([])
        np0 = 0
        np1 = 0

    # For low number of holes:

    elif np.sum(1. - mask_in) / len(mask_in) <= 5e-2:

        whmask, nwhmask = nwhere(mask_in, 'EQ', 0.) 

        nei = hp.get_all_neighbours(nside, whmask)

        ip = nei.flatten()
        ip = ip[np.where(ip>=0)]

        ip_mask = np.ones(hp.nside2npix(nside), dtype='bool')
        ip_mask[ip] = False
        
        whp1, nwhp1 = nwhere(np.ma.masked_array(mask_in, mask=ip_mask), 'eq', 1, ma=True)

        if nwhp1 == 0:
            print('Something wrong here! Low number of holes case.')

        p1 = np.unique(whp1)

        whp0 = np.sum(np.isin(nei,p1), axis=0)
        
        p0 = whmask[np.where(whp0 >= 1)[0]]

        np0 = len(p0)
        np1 = len(p1)

    # For low number of valid pixels:

    elif np.sum(mask_in) / len(mask_in) <= 5e-2:

        whnomask, nwhnomask = nwhere(mask_in, 'EQ', 1.) 

        nei = hp.get_all_neighbours(nside, whnomask)

        ip = nei.flatten()
        ip = ip[np.where(ip>=0)]

        ip_mask = np.ones(hp.nside2npix(nside), dtype='bool')
        ip_mask[ip] = False
        
        whp0, nwhp0 = nwhere(np.ma.masked_array(mask_in, mask=ip_mask), 'eq', 0, ma=True)

        if nwhp0 == 0:
            print('Something wrong here! Low number of valid pixels case.')

        p0 = np.unique(whp0)

        whp1 = np.sum(np.isin(nei,p0), axis=0)
        
        p1 = whmask[np.where(whp1 >= 1)[0]]

        np0 = len(p0)
        np1 = len(p1)


    # For intermediate cases

    else:
        pixsize = hp.nside2resol(nside)
        mask_lm = hp.map2alm(mask_in, pol=False, use_weights=True, datapath=datapath)
        mask_lm_sm = hp.smoothalm(mask_lm, fwhm=(np.sqrt(2.) * pixsize), pol=False)
        smooth_mask = hp.alm2map(mask_lm_sm, nside, pol=False)

        del mask_lm, mask_lm_sm

        p0 = np.where((mask_in == 0) & (smooth_mask >= 0.15))[0]
        nwh0 = len(p0)

        p1 = np.where((mask_in == 1) & (smooth_mask <= 0.85))[0]
        nwh1 = len(p1)

        if nwh0 != 0 :
            m = np.zeros_like(mask_in)
            m[p1] = 1

            nei = np.unique(hp.get_all_neighbours(nside, p0).flatten())

            m[nei[nei>=0]] = 1

            m *= mask_in

            p1, np1 = nwhere(m, 'eq', 1)

        if nwh1 != 0 :
            m = np.zeros_like(mask_in)
            m[p0] = 1

            nei = np.unique(hp.get_all_neighbours(nside, p1).flatten())

            m[nei[nei>=0]] = 1

            m *= (1 - mask_in)

            p0, np0 = nwhere(m, 'eq', 1)


    p0 = np.unique(p0)
    p1 = np.unique(p1)

    np0 = min(np0, len(p0))
    np1 = min(np1, len(p1))

    if need_nos:
        return p0, p1, np0, np1
    else:
        return p0, p1 

Functions

def get_mask_border(mask_in, need_nos=False)
Expand source code
def get_mask_border(mask_in, need_nos=False):
    npix = hp.get_map_size(mask_in)
    nside = hp.get_nside(mask_in)

    whmask, nmasked = nwhere(mask_in, 'eq', 1.)

    # All masked case
    if nmasked == 0:
        p0 = np.array([])
        p1 = np.array([])
        np0 = 0
        np1 = 0

    # For low number of holes:

    elif np.sum(1. - mask_in) / len(mask_in) <= 5e-2:

        whmask, nwhmask = nwhere(mask_in, 'EQ', 0.) 

        nei = hp.get_all_neighbours(nside, whmask)

        ip = nei.flatten()
        ip = ip[np.where(ip>=0)]

        ip_mask = np.ones(hp.nside2npix(nside), dtype='bool')
        ip_mask[ip] = False
        
        whp1, nwhp1 = nwhere(np.ma.masked_array(mask_in, mask=ip_mask), 'eq', 1, ma=True)

        if nwhp1 == 0:
            print('Something wrong here! Low number of holes case.')

        p1 = np.unique(whp1)

        whp0 = np.sum(np.isin(nei,p1), axis=0)
        
        p0 = whmask[np.where(whp0 >= 1)[0]]

        np0 = len(p0)
        np1 = len(p1)

    # For low number of valid pixels:

    elif np.sum(mask_in) / len(mask_in) <= 5e-2:

        whnomask, nwhnomask = nwhere(mask_in, 'EQ', 1.) 

        nei = hp.get_all_neighbours(nside, whnomask)

        ip = nei.flatten()
        ip = ip[np.where(ip>=0)]

        ip_mask = np.ones(hp.nside2npix(nside), dtype='bool')
        ip_mask[ip] = False
        
        whp0, nwhp0 = nwhere(np.ma.masked_array(mask_in, mask=ip_mask), 'eq', 0, ma=True)

        if nwhp0 == 0:
            print('Something wrong here! Low number of valid pixels case.')

        p0 = np.unique(whp0)

        whp1 = np.sum(np.isin(nei,p0), axis=0)
        
        p1 = whmask[np.where(whp1 >= 1)[0]]

        np0 = len(p0)
        np1 = len(p1)


    # For intermediate cases

    else:
        pixsize = hp.nside2resol(nside)
        mask_lm = hp.map2alm(mask_in, pol=False, use_weights=True, datapath=datapath)
        mask_lm_sm = hp.smoothalm(mask_lm, fwhm=(np.sqrt(2.) * pixsize), pol=False)
        smooth_mask = hp.alm2map(mask_lm_sm, nside, pol=False)

        del mask_lm, mask_lm_sm

        p0 = np.where((mask_in == 0) & (smooth_mask >= 0.15))[0]
        nwh0 = len(p0)

        p1 = np.where((mask_in == 1) & (smooth_mask <= 0.85))[0]
        nwh1 = len(p1)

        if nwh0 != 0 :
            m = np.zeros_like(mask_in)
            m[p1] = 1

            nei = np.unique(hp.get_all_neighbours(nside, p0).flatten())

            m[nei[nei>=0]] = 1

            m *= mask_in

            p1, np1 = nwhere(m, 'eq', 1)

        if nwh1 != 0 :
            m = np.zeros_like(mask_in)
            m[p0] = 1

            nei = np.unique(hp.get_all_neighbours(nside, p1).flatten())

            m[nei[nei>=0]] = 1

            m *= (1 - mask_in)

            p0, np0 = nwhere(m, 'eq', 1)


    p0 = np.unique(p0)
    p1 = np.unique(p1)

    np0 = min(np0, len(p0))
    np1 = min(np1, len(p1))

    if need_nos:
        return p0, p1, np0, np1
    else:
        return p0, p1 
def nwhere(array, cond, val, ma=False)
Expand source code
def nwhere(array, cond, val, ma=False):
    if (cond == 'gt') or (cond == 'GT'):
        if ma:
            where_arr = np.array(np.ma.where(array > val))[0]
        else:
            where_arr = np.array(np.where(array > val))[0]
    elif (cond == 'lt') or (cond == 'LT'):
        if ma:
            where_arr = np.array(np.ma.where(array < val))[0]
        else:
            where_arr = np.array(np.where(array < val))[0]
    elif (cond == 'eq') or (cond == 'EQ'):
        if ma:
            where_arr = np.array(np.ma.where(array == val))[0]
        else:
            where_arr = np.array(np.where(array == val))[0]

    nwh = len(where_arr)

    return where_arr, nwh