Source code for screens.dynspec

# Licensed under the GPLv3 - see LICENSE
import numpy as np
from astropy import units as u, constants as const
from astropy.table import QTable
from scipy.optimize import curve_fit
from numpy.linalg import eigh

from .fields import dynamic_field, theta_grid, theta_theta_indices

__all__ = ['DynamicSpectrum']

[docs] class DynamicSpectrum: """Dynamic spectrum and methods to fit it. The code is meant to be agnostic to which axes are which, but some may assume a shape of ``(..., time_axis, frequency_axis)``. Parameters ---------- dynspec : `~numpy.ndarray` Intensities as a function of time and frequency. t : `~astropy.units.Quantity` Times of the dynamic spectrum. Should have the proper shape to broadcast with ``dynspec``. f : `~astropy.units.Quantity` Frequencies of the dynamic spectrum. Should have the proper shape to broadcast with ``dynspec``. noise : float The uncertainty in the intensities in the dynamic spectrum. d_eff : `~astropy.units.Quantity` Assumed effective distance. This is used throughout and not fit, but can be treated as a scaling parameters. mu_eff : `~astropy.units.Quantity`, optional Initial guess for the effective proper motion, ``v_eff/d_eff``. theta : `~astropy.units.Quantity`, optional Grid of theta angles to use for modelling the dynamic spectrum. Probably more usefully calculated later using ``theta_grid``. magnification : `~astropy.units.Quantity`, optional Magnifications at each ``theta``. More typically inferred by fitting the dynamic spectrum. """ def __init__(self, dynspec, f, t, noise=None, d_eff=None, mu_eff=None, theta=None, magnification=None): self.dynspec = dynspec self.f = f self.t = t self.noise = noise self.d_eff = d_eff self.mu_eff = mu_eff self.theta = theta self.magnification = magnification
[docs] @classmethod def fromfile(cls, filename, d_eff=None, mu_eff=None, noise=None): """Read a dynamic spectrum from an HDF5 file. This includes its time and frequency axes. Note: this needs the baseband-tasks package for HDF5 file access. """ from import hdf5 with as fh: dynspec = f = fh.frequency t = (np.arange(-fh.shape[0] // 2, fh.shape[0] // 2) / fh.sample_rate).to(u.minute)[:, np.newaxis] if noise is None: noise = fh.fh_raw.attrs['noise'] self = cls(dynspec, f, t, noise, d_eff, mu_eff) self.filename = filename try: self.curvature =, path='curvature') self.theta = self.curvature.meta['theta'] self.d_eff = self.curvature.meta['d_eff'] self.mu_eff = self.curvature.meta['mu_eff'] except Exception: pass return self
[docs] def theta_grid(self, oversample_tau=1.3, oversample_fd=1.69, **kwargs): """Calculate a grid of theta for modelling the dynamic spectrum. Wraps `screens.fields.theta_grid` with defaults from the class. See that function for details. Note that this does *not* set the angles on the class, so typical usage is ``ds.theta = ds.theta_grid()``. """ kwargs.setdefault('d_eff', self.d_eff) kwargs.setdefault('mu_eff', self.mu_eff) kwargs.setdefault('fobs', self.f.mean()) kwargs.setdefault('dtau', (1./self.f.ptp()).to( / oversample_tau) kwargs.setdefault('dfd', (1./self.t.ptp()).to(u.mHz) / oversample_fd) kwargs.setdefault('tau_max', 1/np.abs(self.f[2]-self.f[0]).min()) kwargs.setdefault('fd_max', 1/np.abs(self.t[2]-self.t[0]).min()) return theta_grid(**kwargs)
[docs] def dynamic_bases(self, mu_eff=None): """Calculate the amplitude infererence patterns for current fit. The power of the sum of these amplitudes is a dynamic spectrum. Explicitly assumes that time and frequency are the last two axes of the dynamic spectrum. (TODO: lift this restriction.) Parameters ---------- mu_eff : `~astropy.units.Quantity`, optional Effective proper motion to use. Defaults to that stored on the instance. Will *not* update the instance. Notes ----- The calculated dynamic bases are cached for a given ``mu_eff``. """ if mu_eff is None: mu_eff = self.mu_eff if mu_eff != getattr(self, '_mu_eff_old', None): self._dyn_wave = dynamic_field( self.theta, 0., 1., self.d_eff, mu_eff, self.f, self.t).reshape( (1,)*(self.dynspec.ndim-2)+(-1,)+self.dynspec.shape[-2:]) self._mu_eff_old = mu_eff return self._dyn_wave
[docs] def theta_theta(self, mu_eff=None, theta_grid=None, **kwargs): """Create a theta-theta array from the dynamic spectrum. For a given grid in ``theta`` (possibly calculated) and a set of pairs found using `screens.fields.theta_theta_indices`, this brute-force estimates the amplitudes at each pair by cross-multiplying their expected signature in the dynamic spectrum. Parameters ---------- mu_eff : `~astropy.units.Quantity`, optional Effective proper motion to use. Defaults to that stored on the instance. Will update the instance if given. theta_grid : bool, optional Whether to calculate a new theta grid, or use the one stored on the instance. By default, calculate it only if ``mu_eff`` is passed in. If `True`, this will update the grid stored on the instance. **kwargs Any further arguments are passed on to `~screens.dynspec.DynamicSpectrum.theta_grid` """ if mu_eff is not None: self.mu_eff = mu_eff if theta_grid or (theta_grid is None and mu_eff is not None): self.theta = self.theta_grid(**kwargs) self._mu_eff_old = None dynwave = self.dynamic_bases() # Get intensities by brute-force mapping: # dynspec * dynwave[j] * dynwave[i].conj() / sqrt(2) for all j > i # Do first product ahead of time to speed up calculation # (remove constant parts of input spectrum to eliminate edge effects) ddyn = dynwave * np.expand_dims( self.dynspec - self.dynspec.mean(), -3) # Explicit loop is faster than just broadcasting or using indices # for advanced indexing, since it avoids creating a large array. result = np.zeros(self.dynspec.shape[:-2] + self.theta.shape * 2, ddyn.dtype) indices = theta_theta_indices(self.theta) for i, j in zip(*indices): amplitude = ((ddyn[..., j, :, :] * dynwave[..., i, :, :].conj()).mean((-2, -1)) / np.sqrt(2.)) result[..., i, j] = amplitude result[..., j, i] = amplitude.conj() return result
[docs] def locate_mu_eff(self, mu_eff_trials=None, verbose=True, use=True, **kwargs): """Try reproducing the dynamic spectrum for a range of proper motion. For each proper motion, construct a theta-theta array, calculte the largest eigenvalue and use the corresponding eigenvector as the model one-dimensional screen. Parameters ---------- mu_eff_trials : `~astropy.units.Quantity` Proper motions to try. verbose : bool Whether or not to give summary statistics for each trial. use : bool Whether to store the best-fit proper motion and corresponding theta grid and magnifications on the instance. **kwargs Further parameters to use in calculating the theta grid for each proper motion. Returns ------- curvature : `~astropy.table.QTable` Table with the following columns: - ``mu_eff`` : Input proper motions. - ``theta`` : grid in theta used. - ``w`` : Largest eigenvalue. - ``recovered`` : corresponding eigenvector, i.e., magnifications. - ``th_ms`` : Mean-square residual in theta-theta space. - ``ndof`` : degrees of freedom ``n_dynspec - n_theta - 2``. - ``redchi2`` : reduced chi2 ``((dynspec - model)/noise)**2/ndof``. Notes ----- The resulting table is also stored on the instance, as ``curvature``. """ if mu_eff_trials is None: mu_eff_trials = np.linspace(self.mu_eff * 0.8, self.mu_eff * 1.2, 21) r = QTable([mu_eff_trials], names=['mu_eff']) shape = (len(mu_eff_trials),) + self.dynspec.shape[:-2] r['theta'] = np.zeros(len(mu_eff_trials), object) r['w'] = np.zeros(shape) r['recovered'] = np.zeros(len(mu_eff_trials), object) r['th_ms'] = np.zeros(shape) r['ndof'] = 0 r['redchi2'] = np.zeros(shape) for i, mu_eff in enumerate(r['mu_eff']): th_th = self.theta_theta(mu_eff, **kwargs) w, v = eigh(th_th) recovered = v[..., -1] recovered0 = recovered[..., self.theta == 0] recovered *= recovered0.conj()/np.abs(recovered0) th_ms = (np.abs( th_th - (recovered[..., :, np.newaxis] * recovered[..., np.newaxis, :].conj()))**2).mean() dynspec_r = self.model(recovered, mu_eff=mu_eff) # Mean of dynamic spectra should equal sum of all recovered powers. # Since we normalize that to (close to) 1, just rescale similarly. dynspec_r *= (self.dynspec.mean((-2, -1), keepdims=True) / dynspec_r.mean((-2, -1), keepdims=True)) ndof = (self.dynspec.shape[-1] * self.dynspec.shape[-2] - self.theta.size - 2) redchi2 = (((self.dynspec-dynspec_r)**2).sum((-2, -1)) / self.noise**2) / ndof r['theta'][i] = self.theta r['w'][i] = w[..., -1] r['recovered'][i] = recovered r['th_ms'][i] = th_ms r['ndof'][i] = ndof r['redchi2'][i] = redchi2 if verbose: print(f'{mu_eff} {w[..., -1]} {ndof} {redchi2}') self.curvature = r if use: ibest = r['redchi2'].argmin(0) self.theta = r['theta'][ibest] self.magnification = np.array(r['recovered'][ibest]) if self.dynspec.ndim > 2: assert self.dynspec.ndim == 3, 'not implemented yet' self.magnification = np.array( [self.magnification[i][i] for i in range(self.dynspec.shape[0])], dtype=object) self.mu_eff = r['mu_eff'][ibest] return r
[docs] def model(self, magnification=None, mu_eff=None): """Calculate a model dynamic spectrum. Uses parameters on the instance if not otherwise specified. Parameters ---------- magnification : array-like, optional Complex magnification for each theta value. mu_eff : `~astropy.units.Quantity`, optional Effective proper motion. """ if magnification is None: magnification = self.magnification dyn_wave_sum = (self.dynamic_bases(mu_eff) * magnification[..., np.newaxis, np.newaxis]).sum(-3) dynspec_r = (dyn_wave_sum.view('2f8')**2).sum(-1) return dynspec_r
[docs] def residuals(self, magnification=None, mu_eff=None): """Residuals compared to the model. Parameters as for :meth:`~screens.dynspec.DynamicSpectrum.model`: """ model = self.model(magnification, mu_eff) return (self.dynspec - model) / self.noise
[docs] def jacobian(self, magnification, mu_eff): r"""Derivatives of the model dynamic spectrum to all parameters. The model dynamic spectrum is .. math:: w(f, t) &= \sum_k \mu_k \exp[j\phi(\theta_i, \mu_{\rm eff}, f, t)] &\equiv \sum_k \mu_k \Phi_k \\ DS(f,t) &= |w(f, t)|^2 = w \bar{w} It is easiest to use Wirtinger derivatives: .. math:: \frac{\partial}{\partial x} &= \left(\frac{\partial}{\partial z} + \frac{\partial}{\partial\bar{z}}\right) \\ \frac{\partial}{\partial y} &= j \left(\frac{\partial}{\partial z} - \frac{\partial}{\partial\bar{z}}\right) Using this for the magnifications: .. math:: \frac{\partial DS}{\partial\mu} &= \bar{w}\frac{\partial w}{\partial\mu} = \bar{w}\Phi \\ \frac{\partial DS}{\partial\bar{\mu}} &= w \frac{\partial\bar{w}}{\partial\bar{\mu}} = w \bar{\Phi} \\ \frac{\partial DS}{\partial x} &= \left(\bar{w} \Phi + w \bar{\Phi}\right) = 2\Re(w\bar{\Phi}) \\ \frac{\partial DS}{\partial y} &= j\left(\bar{w} \Phi - w \bar{\Phi}\right) = 2\Im(w\bar{\Phi}) And for :math:`\mu_{\rm eff}`: .. math:: \frac{\partial w}{\partial\mu_{\rm eff}} &= \sum_k\mu_k\Phi_k \frac{\partial j\phi_k}{\partial\mu_{\rm eff}} \\ \frac{\partial\bar{w}}{\partial\mu_{\rm eff}} &= \sum_k\bar{\mu}_k\bar{\Phi}_k \frac{\partial -j\phi_k}{\partial\mu_{\rm eff}} \\ \frac{\partial DS}{\partial\mu_{\rm eff}} &= \bar{w} \frac{\partial w}{\partial\mu_{\rm eff}} + w \frac{\partial\bar{w}}{\partial\mu_{\rm eff}} = 2j\sum_k\mu_k \Phi_k \frac{\partial\phi}{\partial\mu_{\rm eff}} """ dyn_wave = self.dynamic_bases(mu_eff) magdw = dyn_wave * magnification[:, np.newaxis, np.newaxis] magdw_sum = magdw.sum(0) jac_mag = ((2. * dyn_wave.conj() * magdw_sum) .transpose(1, 2, 0).reshape(self.dynspec.size, -1) .view([('mag_real', 'f8'), ('mag_imag', 'f8')])) if mu_eff is not None: dphidmu = (1j * self.d_eff/const.c * u.cycle * self.f * self.t * self.theta[:, np.newaxis, np.newaxis]).to( 1./self.mu_eff.unit, u.dimensionless_angles()) ddyn_wave_sum_dmu = (magdw * dphidmu).sum(0) dmu_eff = 2.*(magdw_sum.view('2f8') * ddyn_wave_sum_dmu.view('2f8')).sum(-1) jac_mu = dmu_eff.reshape(self.dynspec.size, 1) else: jac_mu = None return jac_mag, jac_mu
def _combine_pars(self, magnification, mu_eff, mu_eff_scale): """Turn complex parameters into the real ones used in the fits.""" msh = magnification.shape[:-1] magnification = magnification.view('2f8') theta0 = self.theta == 0 mag0real = magnification[..., theta0, 0].reshape(msh+(-1,)) others = magnification[..., ~theta0, :].reshape(msh+(-1,)) if mu_eff is not None: mu_eff_par = mu_eff.to_value(mu_eff_scale).reshape(msh+(1,)) pars = np.concatenate([others, mag0real, mu_eff_par], axis=-1) else: pars = np.concatenate([others, mag0real], axis=-1) return pars def _separate_pars(self, pars, mu_eff_scale=None): """Turn real parameters used in the fits into physical ones.""" pars = np.asanyarray(pars) if len(pars) > 2*len(self.theta)-1: if mu_eff_scale is None: mu_eff_scale = self.mu_eff_guess mu_eff = pars[-1] * mu_eff_scale pars = pars[:-1] else: mu_eff = None amp0 = pars[-1] others = pars[:-1].view(complex) magnification = np.zeros(self.theta.shape, complex) theta0 = self.theta == 0 magnification[theta0] = amp0 magnification[~theta0] = others return magnification, mu_eff def _separate_covar(self, pcovar, mu_eff_scale=None): """Complex covariance and pseudo-covariance. Of magnitudes with themselves and magnitudes with mu_eff Note: not quite sure this is done correctly (or useful)! """ if len(pcovar) > 2*len(self.theta)-1: if mu_eff_scale is None: mu_eff_scale = self.mu_eff_guess mu_eff_cov, mu_eff_var = self._separate_pars( pcovar[-1], mu_eff_scale**2) pcovar = pcovar[:-1, :-1] * mu_eff_scale else: mu_eff_cov = mu_eff_var = None r0 = pcovar[-1:, :-1] r0 = np.concatenate([r0, np.zeros_like(r0)], axis=0) c0 = pcovar[:-1, -1:] c0 = np.concatenate([c0, np.zeros_like(c0)], axis=1) others = pcovar[:-1, :-1] var0 = np.array([[pcovar[-1, -1], 0.], [0., 0.]]) i0 = np.argmax(self.theta == 0) * 2 mag_cov_reim = np.block( [[others[:i0, :i0], c0[:i0], others[:i0, i0:]], [r0[:, :i0], var0, r0[:, i0:]], [others[i0:, :i0], c0[i0:], others[i0:, i0:]]]) mag_cov_xx = mag_cov_reim[0::2, 0::2] mag_cov_yx = mag_cov_reim[1::2, 0::2] mag_cov_xy = mag_cov_reim[0::2, 1::2] mag_cov_yy = mag_cov_reim[1::2, 1::2] mag_covar = (mag_cov_xx + mag_cov_yy + 1j*(mag_cov_yx - mag_cov_xy)) mag_pseudo = (mag_cov_xx - mag_cov_yy + 1j*(mag_cov_yx + mag_cov_xy)) return mag_covar, mag_pseudo, mu_eff_cov, mu_eff_var def _model(self, unused_x_data, *pars): """Calculate model for the fit routine.""" magnification, mu_eff = self._separate_pars(pars) model = self.model(magnification, mu_eff) if self._prt: print(mu_eff if mu_eff is not None else self.mu_eff, ((self.dynspec-model)**2).sum()/self.noise**2) return model.ravel() def _jacobian(self, unused_x_data, *pars): """Calculate jacobian for the fit routine.""" magnification, mu_eff = self._separate_pars(pars) jac_mag, jac_mu_eff = self.jacobian(magnification, mu_eff) return self._combine_pars(jac_mag, jac_mu_eff, 1/self.mu_eff_guess)
[docs] def fit(self, guess=None, verbose=2, **kwargs): """Fit the dynamic spectrum directly. This needs good guesses, such as can be gotten from :meth:`screens.dynspec.DynamicSpectrum.locate_mu_eff`. Parameters ---------- guess : initial guesses, optional If `None`, use the current ``magnification`` and ``mu_eff`` on the instance. If ``curvature``, select the best value from the ``curvature`` table on the instance (created by ``locate_mu_eff``). If a tuple, guesses for ``magnification`` and ``mu_eff``. If a single number, treat it as a guess for ``mu_eff`` and determine initial guesses for the magnification using eigen-value decomposition. verbose : int How much information to print during fitting. A value of ``3`` forces the class itself to print a reduced chi2 any time ``mu_eff`` is updated. **kwargs Any further keyword arguments to pass on to :func:`scipy.optimize.curve_fit`. Notes ----- This uses the ``model`` and ``jacobian`` methods on the instance. """ if guess is None: mu_eff_guess = self.mu_eff mag_guess = self.magnification elif guess == 'curvature': ibest = self.curvature['redchi2'].argmin() mu_eff_guess = self.curvature['mu_eff'][ibest] mag_guess = self.curvature['recovered'][ibest] self.theta = self.curvature['theta'][ibest] elif isinstance(guess, tuple): mag_guess, mu_eff_guess = guess else: # Assume mu_eff; note this defines new theta grid. mu_eff_guess = guess th_th = self.theta_theta(mu_eff_guess) _, v = eigh(th_th) mag_guess = v[..., -1] m0 = mag_guess[..., self.theta == 0] mag_guess *= m0.conj()/np.abs(m0) self._prt = verbose > 2 self.mag_guess = mag_guess self.mu_eff_guess = mu_eff_guess guesses = self._combine_pars(mag_guess, mu_eff_guess, self.mu_eff_guess) # Default method of 'lm' does not give reliable error estimates. kwargs.setdefault('method', 'trf') if kwargs['method'] != 'lm': kwargs['verbose'] = min(verbose, 2) kwargs['x_scale'] = 'jac' # Note: typically, run-time is dominated by svd decompositions # inside fitting routines. self.popt, self.pcovar = curve_fit( self._model, xdata=None, ydata=self.dynspec.ravel(), p0=guesses, sigma=np.broadcast_to(self.noise, self.dynspec.size), jac=self._jacobian, **kwargs) self.raw_mag_fit, self.raw_mu_eff_fit = self._separate_pars(self.popt) perr = np.sqrt(np.diag(self.pcovar)) self.raw_mag_err, self.raw_mu_eff_err = self._separate_pars(perr) return (self.raw_mag_fit, self.raw_mag_err, self.raw_mu_eff_fit, self.raw_mu_eff_err)
[docs] def cleanup_fit(self, max_abs_err=0.1, max_rel_err=1000., verbose=True): """Remove highly degenerate configurations from the fit. Works by doing an singular value decomposition on the fit and removing very small values, those that are either smaller than ``max_abs_err`` or ``max_rel_err`` times the largest value. """ jm = self._jacobian(None, *self.popt) # Get bases for jacobian using SVD _, jms, jmvt = np.linalg.svd(jm, full_matrices=False) # Project optimal parameters on SVD bases pproj = jmvt.conj() @ self.popt # Remove basis which carry very little information. # For this purpose, magnifications cannot physically be # more than 1, and mu_eff is also scaled to be around 1, # so remove all that have inferred error = 1/jms > max_err. jms /= self.noise max_err = np.minimum(max_abs_err, max_rel_err/jms[0]) ok = jms > 1./max_err self.copt = jmvt[ok].T @ pproj[ok] self.ccovar = (jmvt[ok].T / jms[ok]**2) @ jmvt[ok] self.cln_mag_fit, self.cln_mu_eff_fit = self._separate_pars(self.copt) cerr = np.sqrt(np.diag(self.ccovar)) self.cln_mag_err, self.cln_mu_eff_err = self._separate_pars(cerr) if verbose: msg = "{} correction, mu_eff={:+10.4f}±{:6.4f} mas/yr, chi2={}" print(msg.format("Before", self.raw_mu_eff_fit.to_value(u.mas/u.yr), self.raw_mu_eff_err.to_value(u.mas/u.yr), (self.residuals(self.raw_mag_fit, self.raw_mu_eff_fit)**2).sum())) print(msg.format("After", self.cln_mu_eff_fit.to_value(u.mas/u.yr), self.cln_mu_eff_err.to_value(u.mas/u.yr), (self.residuals(self.cln_mag_fit, self.cln_mu_eff_fit)**2).sum())) return (self.cln_mag_fit, self.cln_mag_err, self.cln_mu_eff_fit, self.cln_mu_eff_err)