Source code for screens.fields

# Licensed under the GPLv3 - see LICENSE
import numpy as np
from astropy import units as u, constants as const


__all__ = ['phasor', 'dynamic_field', 'theta_grid', 'theta_theta_indices']


def expand(*arrays, n=2):
    """Add n unity axes to the end of all arrays."""
    result = [np.reshape(array, np.shape(array)+(1,)*n)
              for array in arrays]
    return result if len(arrays) > 1 else result[0]


[docs] def phasor(indep, transform, linear_axis=None): """Calculate phase part of a Fourier transform like operation. Simply calculates ``exp(j*indep*transform)``, where generally the two inputs will be on different dimensions, so that they broadcast against each other. If the independent variable spans a linearly spaced range, one can use ``linear_axis`` to speed up the calculation by only calculating ``exp(j*indep[0]*transform)`` and ``exp(j*(indep[1]-indep[0])*tranform)`` and filling the array by cumulative multiplication. Parameters ---------- indep : array_like Independent variable. transform : array_like Transformed variable. If an `~astropy.units.Quantity`, it must have the inverse units of ``indep``. It should *not* include a factor 2pi. linear_axis : int, "transform", or None, optional Possible axis along which ``indep`` changes by linear steps, and for which the calculation can be sped up using cumulative multiplication. This will lead to inaccuracies at the 1e-9 level, which should not matter for most purposes. If "transform", then take ``transform`` to be linearly spaced (along a single axis, e.g., shape ``(n, 1, 1)``). """ if linear_axis is None: phasor = 1j * (indep * transform * u.cycle).to_value(u.rad) return np.exp(phasor, out=phasor) along_transform = (linear_axis == "transform") if along_transform: linear_axis = transform.shape.index(transform.size) - transform.ndim elif linear_axis >= 0: linear_axis -= indep.ndim extra_slice = (slice(None),) * (-1-linear_axis) ph0_index = (Ellipsis, slice(0, 1)) + extra_slice ph01_index = (Ellipsis, slice(0, 2)) + extra_slice dph0_index = (Ellipsis, slice(1, None)) + extra_slice if along_transform: ph0 = (indep * transform[ph0_index] * u.cycle).to_value(u.rad) dph = (np.diff(transform[ph01_index], axis=linear_axis) * indep * u.cycle).to_value(u.rad) else: ph0 = (indep[ph0_index] * transform * u.cycle).to_value(u.rad) dph = (np.diff(indep[ph01_index], axis=linear_axis) * transform * u.cycle).to_value(u.rad) phasor = np.empty(np.broadcast(indep, transform).shape, complex) phasor[ph0_index] = np.exp(1j * ph0) phasor[dph0_index] = np.exp(1j * dph) return np.cumprod(phasor, out=phasor, axis=linear_axis)
[docs] def dynamic_field(theta_par, theta_perp, realization, d_eff, mu_eff, f, t, fast=True): """Given a set of scattering points, construct the dynamic wave field. Parameters ---------- theta_par : ~astropy.units.Quantity Angles of the scattering point in the direction parallel to ``mu_eff`` theta_perp : ~astropy.units.Quantity Angles perpendicular to ``mu_eff``. realization : array-like Complex amplitudes of the scattering points. Set to ``1.`` to avoid using it. d_eff : ~astropy.units.Quantity Effective distance. Should be constant; if different for different points, no screen-to-screen scattering is taken into account. mu_eff : ~astropy.units.Quantity Effective proper motion (``v_eff / d_eff``), i.e., parallel to ``theta_par``. t : ~astropy.units.Quantity Times for which the dynamic wave spectrum should be calculated. Should broadcast with ``f`` to give the dynamic spectrum shape. f : ~astropy.units.frequency Frequencies for which the spectrum should be calculated. Should broadcast with ``t`` to give the dynamic spectrum shape. fast : bool Calculates the field faster by iteratively applying a phasor for each the frequency step along the frequency axis. Assumes the frequencies are a linear sequence. Will lead to inaccuracies at the 1e-9 level, which should be negligible for most purposes. Returns ------- dynwave : array Delayed wave field array, with time and frequency axes as given by ``t`` and ``f``, and earlier axes as given by the other parameters. """ ds_ndim = np.broadcast(f, t).ndim theta_par, theta_perp, realization, d_eff, mu_eff = expand( theta_par, theta_perp, realization, d_eff, mu_eff, n=ds_ndim) th_par = theta_par + mu_eff * t tau_t = (((d_eff / (2*const.c)) * (th_par**2 + theta_perp**2)) .to(u.s, u.dimensionless_angles())) result = phasor(f, tau_t, linear_axis=(f.shape.index(f.size)-f.ndim if fast else None)) if np.any(realization != 1.): result *= realization return result
[docs] def theta_theta_indices(theta, lower=-0.25, upper=0.75): """Indices to pairs of angles within bounds. Select pairs theta0, theta1 for which theta1 is constrained to lie around theta0 to within (lower*theta0, upper*theta0). Here, ``lower=-1, upper=1`` would select all non-duplicate pairs that are not on the diagonals with theta1 = ±theta0 (i.e., like ``np.triu_indices(theta.size, k=1)``, but also excluding the cross-diagonal; ``upper=1+epsilon`` would be identical). But using all off-diagonal points gives too many poor constraints. The defaults instead select points nearer the top of the inverted arclets, extending further on the outside than on the inside, since on the inside all arclets crowd together. Parameters ---------- theta : `~astropy.units.Quantity` Grid of angles. lower, upper : float Range of angles for which indices should be returned. """ indgrid = np.indices((len(theta), len(theta))) sel = (np.abs(theta[:, np.newaxis] + (upper+lower)/2*theta) < (upper-lower)/2 * np.abs(theta)) return indgrid[1][sel], indgrid[0][sel]
[docs] def theta_grid(d_eff, mu_eff, fobs, dtau, tau_max, dfd, fd_max): """Make a grid of theta that sample the parabola in a particular way. The idea would be to impose the constraint that near tau_max the spacing is roughly the spacing allowed by the frequencies, and near the origin that allowed by the times. In practice, one needs to oversample in both directions, by about a factor 1.3 in tau (which makes some sense from wanting to sample resolution elements with about 3 points, not just 2 for a FFT), but by about a factor 1.6 in f_d, which is less clear. Parameters ---------- d_eff : `~astropy.units.Quantity` Effective distance. Should be constant; if different for different points, no screen-to-screen scattering is taken into account. mu_eff : `~astropy.units.Quantity` Effective proper motion (``v_eff / d_eff``), parallel to ``theta_par``. fobs : `~astropy.units.Quantity` Mean frequency for which the doppler factor should be calculated. dtau : `~astropy.units.Quantity` Requested spacing in delay (typically should somewhat oversample the spacing in the secondary spectrum). tau_max : `~astropy.units.Quantity` Maximum delay to consider. Can be up to the value implied by the the frequency resolution (i.e., ``1/(f[2]-f[0])``). dfd : `~astropy.units.Quantity` Requested spacing in doppler factor (typically should oversample the spacing in the secondary spectrum). fd_max : `~astropy.units.Quantity` Maximum doppler factor to consider. Can be up to the value implied by the time resolution (i.e., ``1/(t[2]-t[0])``). """ tau_factor = d_eff/(2.*const.c) fd_factor = d_eff*mu_eff*fobs/const.c # Curvature in physical units. a = tau_factor / fd_factor**2 # Consider relevant maximum. fd_max = min(fd_max, np.sqrt(tau_max/a).to( fd_max.unit, u.dimensionless_angles())) # Curvature in sample units. a_pix = (a * dfd**2 / dtau).to_value( 1, equivalencies=u.dimensionless_angles()) x_max = (fd_max/dfd).to_value(1) x = sample_parabola(x_max, a_pix) th_r = (x*dfd/fd_factor).to(u.mas, u.dimensionless_angles()) return th_r
def sample_parabola(x_max, a=1.): """Solve for the x that evenly sample a parabola. The points will have spacing of 1 along the parabola (in units of x). Parameters ---------- x_max : float Maximum x value to extend to. """ s_max = np.round(path_length(x_max, a)) # Corresponding path length around a parabola s = np.arange(1, s_max+1) # Initial guesses for x. x = np.linspace(1, x_max, s.size) d_s = s - path_length(x, a) it = 0 while np.any(np.abs(d_s) > 1e-6) and it < 100: dsdx = path_length(x, a, derivative=True) x = x + d_s / dsdx d_s = s - path_length(x, a) it += 1 return np.hstack([-x[::-1], 0, x[:]]) def path_length(x, a=1, derivative=False): r"""Path length along a parabola, measured from the origin. For a parabola :math:`y=ax^2`:: .. math:: \int ds &= \int \sqrt{dx^2+dy^2} = \int \sqrt{1+(2ax)^2} dx \\ &= \frac{1}{4a}\left(\asinh(2ax) + x\sqrt{1+(2ax)^2}\right) Parameters ---------- x : array-like X position to evaluate the path length for. a : float, optional Curvature (y = a*x**2). Default: 1. derivative : bool If true, return ds/dx rather than s. """ x = 2 * a * x sq = np.sqrt(1+x**2) if derivative: return sq else: return (np.arcsinh(x) + x * sq) / (4*a)