Source code for screens.conjspec

# Licensed under the GPLv3 - see LICENSE
import numpy as np
import astropy.units as u

from .fields import phasor, expand
from .dynspec import DynamicSpectrum


__all__ = ['ConjugateSpectrum']


def power(z):
    return z.real**2 + z.imag**2


[docs] class ConjugateSpectrum: """Conjugate spectrum and methods to fit it. The code is meant to be agnostic to which axes are which, but some may assume a shape of ``(..., doppler_axis, delay_axis)``. Parameters ---------- conjspec : `~numpy.ndarray` Fourier transform of a dynamic spectrum. fd : `~astropy.units.Quantity` Doppler factors of the conjugate spectrum. Normally time conjugate but can be arbitrary (e.g., conjugate of ``f*t``). Should have the the proper shape to broadcast with ``conjspec``. tau : `~astropy.units.Quantity` Delays of the conjugate spectrum. Should have the proper shape to broadcast with ``dynspec``. noise : float The uncertainty in the real and imaginary components of the conjugate spectrum. """ def __init__(self, conjspec, tau, fd, noise=None): self.conjspec = conjspec self.tau = tau self.fd = fd self.noise = noise @property def secspec(self): """Secondary spectrum, i.e., the power of the conjugate spectrum.""" return power(self.conjspec)
[docs] @classmethod def from_dynamic_spectrum(cls, dynspec, normalization='mean', **kwargs): """Create a conjugate spectrum from a dynamic one. Easiest if the input is a `~screens.dynspec.DynamicSpectrum` instance. By passing in an explicit time axis using ``t``, one can get a different delay factor conjugate. Particularly useful with ``t=f*t``, which takes into account the frequency dependence of the time variation of scintles. Note that the dynamic spectrum is assumed to have shape ``(..., time_axis, frequency_axis)``. Parameters ---------- dynspec : array_like or `~screens.dynspec.DynamicSpectrum` Input dynamic spectrum for which the fourier transform will be calculated. If it has attributes ``f``, ``t``, ``d_eff``, ``theta``, ``magnification``, and ``noise``, those will be used as default inputs. TODO: ``noise`` is likely wrong! normalization : 'mean' or None Normalize dynamic spectrum by its mean and subtract 1 before transforming and ensure the resulting conjugate spectrum is normalized as well, with the 0, 0 element equal unity. **kwargs Other arguments to initialize the conjugate spectrum. """ for key in ('f', 't', 'noise'): val = getattr(dynspec, key, None) if val is not None: kwargs.setdefault(key, val) if isinstance(dynspec, DynamicSpectrum): # TODO: give DynamicSpectrum an __array__ method. dynspec = dynspec.dynspec if normalization == 'mean': dynspec = dynspec / dynspec.mean() - 1. # Not in place!! f = kwargs.pop('f') t = kwargs.pop('t') fd = kwargs.pop('fd', None) if t.size in t.shape and fd is None: # fast FFT possible. conj = np.fft.fftshift(np.fft.fft2(dynspec)) fd = np.fft.fftshift(np.fft.fftfreq(t.size, t[1]-t[0]) .reshape(t.shape)) else: # Time axis has slow FT or explicit fd given. # Time is assumed to be along axis -2. # TODO: relax this assumption. if fd is None: t_step = np.abs(np.diff(t, axis=-2)).min() n_t = round((np.ptp(t)/t_step).to_value(1).item()) + 1 fd = np.fft.fftshift(np.fft.fftfreq(n_t, t_step).to(u.mHz)) fd = expand(fd, n=dynspec.ndim) linear_axis = "transform" else: if fd.ndim == 1: fd = expand(fd, n=dynspec.ndim) linear_axis = None # Check for linear spacing to speed up the calculation. Here, # the first check is whether fd is a linearly spaced array # along a single axis, and the second whether the last axis of # time (generally along frequency) is linearly spaced. if (fd.size in fd.shape and np.allclose( dfd := np.diff( fd, axis=(axis := fd.shape.index(fd.size))), dfd.take(0, axis=axis))): linear_axis = "transform" elif (t.shape[-1] > 1 and np.allclose(dt := np.diff(t, axis=-1), dt[..., :1])): linear_axis = -1 factor = (phasor(t, fd, linear_axis=linear_axis).conj() * dynspec) step1 = factor.sum(-2, keepdims=True).swapaxes(0, -2).squeeze(0) conj = np.fft.fftshift(np.fft.fft(step1, axis=-1), axes=-1) fd.shape = conj.shape[-2], 1 if normalization == 'mean': conj /= dynspec.size conj[conj.shape[-2] // 2, conj.shape[-1] // 2] = 1. tau = np.fft.fftshift(np.fft.fftfreq(f.size, f[1]-f[0])) tau.shape = f.shape self = cls(conj, tau, fd, **kwargs) self.f = f self.t = t self.normalization = normalization return self