Source code for screens.conjspec

# Licensed under the GPLv3 - see LICENSE
import numpy as np
from astropy import units as u, constants as const
from astropy.table import QTable
from scipy.linalg import eigh

from .fields import theta_grid, theta_theta_indices, phasor, expand
from .dynspec import DynamicSpectrum


__all__ = ['ConjugateSpectrum']


def power(z):
    return z.real**2 + z.imag**2


[docs] class ConjugateSpectrum: """Conjugate spectrum and methods to fit it. The code is meant to be agnostic to which axes are which, but some may assume a shape of ``(..., doppler_axis, delay_axis)``. Parameters ---------- conjspec : `~numpy.ndarray` Fourier transform of a dynamic spectrum. fd : `~astropy.units.Quantity` Doppler factors of the conjugate spectrum. Normally time conjugate but can be arbitrary (e.g., conjugate of ``f*t``). Should have the the proper shape to broadcast with ``conjspec``. tau : `~astropy.units.Quantity` Delays of the conjugate spectrum. Should have the proper shape to broadcast with ``dynspec``. noise : float The uncertainty in the real and imaginary components of the conjugate spectrum. d_eff : `~astropy.units.Quantity` Assumed effective distance. This is used throughout and not fit, but can be treated as a scaling parameters. mu_eff : `~astropy.units.Quantity`, optional Initial guess for the effective proper motion, ``v_eff/d_eff``. theta : `~astropy.units.Quantity`, optional Grid of theta angles to use for modelling the dynamic spectrum. Probably more usefully calculated later using ``theta_grid``. magnification : `~astropy.units.Quantity`, optional Magnifications at each ``theta``. More typically inferred by fitting the secondary spectrum. """ def __init__(self, conjspec, tau, fd, noise=None, d_eff=None, mu_eff=None, theta=None, magnification=None): self.conjspec = conjspec self.tau = tau self.fd = fd self.noise = noise self.d_eff = d_eff self.mu_eff = mu_eff self.theta = theta self.magnification = magnification @property def secspec(self): """Secondary spectrum, i.e., the power of the conjugate spectrum.""" return power(self.conjspec)
[docs] @classmethod def from_dynamic_spectrum(cls, dynspec, normalization='mean', **kwargs): """Create a conjugate spectrum from a dynamic one. Easiest if the input is a `~screens.dynspec.DynamicSpectrum` instance. By passing in an explicit time axis using ``t``, one can get a different delay factor conjugate. Particularly useful with ``t=f*t``, which takes into account the frequency dependence of the time variation of scintles. Note that the dynamic spectrum is assumed to have shape ``(..., time_axis, frequency_axis)``. Parameters ---------- dynspec : array_like or `~screens.dynspec.DynamicSpectrum` Input dynamic spectrum for which the fourier transform will be calculated. If it has attributes ``f``, ``t``, ``d_eff``, ``theta``, ``magnification``, and ``noise``, those will be used as default inputs. TODO: ``noise`` is likely wrong! normalization : 'mean' or None Normalize such that the 0, 0 element equals the mean of the dynamic spectrum. **kwargs Other arguments to initialize the conjugate spectrum. """ for key in ('f', 't', 'd_eff', 'mu_eff', 'theta', 'magnification', 'noise'): val = getattr(dynspec, key, None) if val is not None: kwargs.setdefault(key, val) if isinstance(dynspec, DynamicSpectrum): # TODO: give DynamicSpectrum an __array__ method. dynspec = dynspec.dynspec f = kwargs.pop('f') t = kwargs.pop('t') fd = kwargs.pop('fd', None) if t.size in t.shape and fd is None: # fast FFT possible. conj = np.fft.fftshift(np.fft.fft2(dynspec)) fd = np.fft.fftshift(np.fft.fftfreq(t.size, t[1]-t[0]).to(u.mHz) .reshape(t.shape)) else: # Time axis has slow FT or explicit fd given. # Time is assumed to be along axis -2. # TODO: relax this assumption. if fd is None: t_step = np.abs(np.diff(t, axis=-2)).min() n_t = round((t.ptp()/t_step).to_value(1).item()) + 1 fd = np.fft.fftshift(np.fft.fftfreq(n_t, t_step).to(u.mHz)) fd = expand(fd, n=dynspec.ndim) linear_axis = "transform" else: if fd.ndim == 1: fd = expand(fd, n=dynspec.ndim) linear_axis = None # Check for linear spacing to speed up the calculation. Here, # the first check is whether fd is a linearly spaced array # along a single axis, and the second whether the last axis of # time (generally along frequency) is linearly spaced. if (fd.size in fd.shape and np.allclose( dfd := np.diff( fd, axis=(axis := fd.shape.index(fd.size))), dfd.take(0, axis=axis))): linear_axis = "transform" elif (t.shape[-1] > 1 and np.allclose(dt := np.diff(t, axis=-1), dt[..., :1])): linear_axis = -1 factor = (phasor(t, fd, linear_axis=linear_axis).conj() * dynspec) step1 = factor.sum(-2, keepdims=True).swapaxes(0, -2).squeeze(0) conj = np.fft.fftshift(np.fft.fft(step1, axis=-1), axes=-1) fd.shape = conj.shape[-2], 1 if normalization == 'mean': normalization = conj[conj.shape[-2] // 2, conj.shape[-1] // 2] conj /= normalization tau = np.fft.fftshift(np.fft.fftfreq(f.size, f[1]-f[0]).to(u.us)) tau.shape = f.shape self = cls(conj, tau, fd, **kwargs) self.f = f self.t = t self.normalization = normalization return self
[docs] def theta_grid(self, oversample_tau=2, oversample_fd=4, **kwargs): """Calculate a grid of theta for modelling the secondary spectrum. Wraps `screens.fields.theta_grid` with defaults from the class. See that function for details. Oversampling is set relatively high here, since covariances between the theta are not as important as for fitting a dynamic spectrum. Note that this does *not* set the angles on the class, so typical usage is ``ds.theta = ds.theta_grid()``. """ kwargs.setdefault('d_eff', self.d_eff) kwargs.setdefault('mu_eff', self.mu_eff) kwargs.setdefault('fobs', self.f.mean()) kwargs.setdefault('dtau', (self.tau[1]-self.tau[0]).squeeze() / oversample_tau) kwargs.setdefault('dfd', (self.fd[1]-self.fd[0]).squeeze() / oversample_fd) kwargs.setdefault('tau_max', self.tau.max() * 2/3) kwargs.setdefault('fd_max', self.fd.max() * 2/3) return theta_grid(**kwargs)
[docs] def theta_theta(self, mu_eff=None, conserve=False, theta_grid=True, **kwargs): """Project the secondary spectrum into theta-theta space. For a given grid in ``theta`` (possibly calculated) and a set of pairs found using `screens.fields.theta_theta_indices`, interpolate in the secondary spectrum to find the theta-theta arrays. Parameters ---------- mu_eff : `~astropy.units.Quantity`, optional Effective proper motion to use. Defaults to that stored on the instance. Will update the instance if given. conserve : bool Whether to conserve flux per surface area. Doing so reduces sensitivity to points near the axes, but means one cannot directly use any eigenvectors directly in constructing dynamic spectra. theta_grid : bool, optional Whether to calculate a new theta grid, or use the one stored on the instance. By default, calculate it only if ``mu_eff`` is passed in. If `True`, this will update the grid stored on the instance. **kwargs Any further arguments are passed on to `~screens.dynspec.DynamicSpectrum.theta_grid` """ if mu_eff is None: mu_eff = self.mu_eff if theta_grid: self.theta = self.theta_grid(mu_eff=mu_eff, **kwargs) conj = self.conjspec i0, i1 = theta_theta_indices(self.theta) i1 = i1.ravel() fobs = self.f.mean() tau_factor = self.d_eff/(2.*const.c) fd_factor = self.d_eff*mu_eff*fobs/const.c tau_th = (tau_factor*self.theta**2).to(u.us, u.dimensionless_angles()) fd_th = (fd_factor*self.theta).to(u.mHz, u.dimensionless_angles()) dtau = tau_th[i0] - tau_th[i1] dfd = fd_th[i0] - fd_th[i1] idtau = np.round(((dtau-self.tau[0]) / (self.tau[1]-self.tau[0])).to_value(1)).astype(int) idfd = np.round(((dfd-self.fd[0]) / (self.fd[1]-self.fd[0])).to_value(1)).astype(int) ok = ((idtau >= 0) & (idtau < conj.shape[-1]) & (idfd >= 0) & (idfd < conj.shape[-2])) theta_theta = np.zeros(self.theta.shape*2, conj.dtype) amplitude = conj[idfd[ok], idtau[ok]] if conserve: # Area conversion factor: # abs(Δtau[i0]*Δfd[i1]-Δtau[i1]*Δfd[i0])/(Δth[i0]*Δth[i1]) # Δtau = dtau/dth * Δth = tau_factor * 2 * theta * Δtheta # Δfd = dfd/dth * Δth = fd_factor * Δth # -> conversion factor # abs(dtau/dth[i0]*dfd/dth[i1] - dtau/dth[i1]*dfd/dth[i0]) # = abs(tau_factor*2*(theta[i0]-theta[i1])*fd_factor) # = abs(tau_factor*2*dfd) [see above]. area = np.abs(tau_factor * 2. * dfd).to_value( u.us * u.mHz / u.mas**2, u.dimensionless_angles()) amplitude[ok] *= area theta_theta[i0[ok], i1[ok]] = amplitude theta_theta[i1[ok], i0[ok]] = amplitude return theta_theta
[docs] def model(self, magnification=None, mu_eff=None, conserve=False): """Calculate the expected conjugate spectrum for given parameters.""" if magnification is None: magnification = self.magnification if mu_eff is None: mu_eff = self.mu_eff fobs = self.f.mean() tau_factor = self.d_eff/(2.*const.c) fd_factor = self.d_eff*mu_eff*fobs/const.c ifd, itau = np.indices(self.conjspec.shape, sparse=True) fd = self.fd[ifd, 0] tau = self.tau[itau] # On purpose, keep the sign. th_tau2 = (tau/tau_factor).to(u.mas**2, u.dimensionless_angles()) th_fd = (fd/fd_factor).to(u.mas, u.dimensionless_angles()) # th_fd = th1-th2 # th_tau = np.sqrt(th1**2-th2**2) = np.sqrt((th1-th2)(th1+th2)) # -> th1+th2 = th_tau / th_fd # -> th1 = (th_tau**2 / th_fd + th_fd) / 2 # -> th2 = (th_tau**2 / th_fd - th_fd) / 2 with np.errstate(all='ignore'): ths = np.stack([(th_tau2 / th_fd + sign * th_fd) / 2. for sign in (-1, +1)], axis=0) << self.theta.unit ith = np.searchsorted(self.theta, ths) # Only keep pairs which are inside the grid and not on the axes ok = ((fd != 0) & (tau != 0) & np.all((ith > 0) & (ith < self.theta.size-1), axis=0)) ths = ths[:, ok] ith = ith[:, ok] goupone = self.theta[ith+1] - ths < ths - self.theta[ith] ith += goupone model = np.zeros_like(self.conjspec, magnification.dtype) amplitude = magnification[ith[1]] * magnification[ith[0]].conj() if conserve: area = (np.abs(tau_factor * 2. * fd_factor * (ths[1] - ths[0])) .to_value(u.us * u.mHz / u.mas**2, u.dimensionless_angles())) model[ok] = amplitude / area else: model[ok] = amplitude return model
[docs] def locate_mu_eff(self, mu_eff_trials=None, power=True, verbose=False): """Try reproducing the secondary spectrum for a range of proper motion. For each proper motion, construct a theta-theta array, calculte the largest eigenvalue and use the corresponding eigenvector as the model one-dimensional screen. Parameters ---------- mu_eff_trials : `~astropy.units.Quantity` Proper motions to try. power : bool Whether to just decompose the powers or the full complex secondary spectrum. The former is faster and varies less quickly with proper motion, so can be used to find the global minimum before re-running on the full complex values. verbose : bool Whether or not to give summary statistics for each trial. Returns ------- curvature : `~astropy.table.QTable` Table with the following columns: - ``mu_eff`` : Input proper motions. - ``theta`` : grid in theta used. - ``w`` : Largest eigenvalue. - ``recovered`` : corresponding eigenvector, i.e., magnifications. - ``th_ms`` : Mean-square residual in theta-theta space. - ``ndof`` : degrees of freedom ``n_dynspec - n_theta - 2``. - ``redchi2`` : reduced chi2 ``((dynspec - model)/noise)**2/ndof``. Notes ----- The resulting table is also stored on the instance, as ``curvature``. Note that the noise in the secondary spectrum is currently ignored. """ if mu_eff_trials is None: mu_eff_trials = np.linspace(self.mu_eff * 0.5, self.mu_eff * 2, 201) r = QTable([mu_eff_trials], names=['mu_eff']) r['theta'] = np.zeros(len(r), object) r['w'] = 0. r['recovered'] = np.zeros(len(r), object) r['th_ms'] = 0. r['redchi2'] = 0. for i, mu_eff in enumerate(r['mu_eff']): th_th = self.theta_theta(mu_eff) if power: th_th = power(th_th) w, v = eigh(th_th, eigvals=(self.theta.size-1,)*2) recovered = v[:, -1] * np.sqrt(w[-1]) if power: th_ms = ((th_th - (recovered[:, np.newaxis] * recovered))**2).mean() else: recovered0 = recovered[self.theta == 0] recovered *= recovered0.conj()/np.abs(recovered0) th_ms = (np.abs((th_th - (recovered[:, np.newaxis] * recovered.conj())))**2).mean() spec_r = self.model(recovered, mu_eff=mu_eff) if power: redchi2 = ((self.secspec - spec_r)**2).mean() else: redchi2 = power(self.conjspec-spec_r).mean() r['theta'][i] = self.theta r['w'][i] = w[-1] r['recovered'][i] = recovered r['th_ms'][i] = th_ms r['redchi2'][i] = redchi2 if verbose: print(f'{mu_eff} {w[-1]} {th_ms} {redchi2}') r.meta['theta'] = self.theta r.meta['d_eff'] = self.d_eff r.meta['mu_eff'] = r['mu_eff'][r['redchi2'].argmin()] self.curvature = r return r