import numpy as np
import astropy.units as u
from astropy.constants import m_p, G, sigma_sb
from specutils import Spectrum1D
from tqdm.auto import trange
from .twostream import BB
from .tp import pressure_grid, temperature_grid
from .opacity import binned_opacity
from .phoenix import get_binned_phoenix_spectrum
from .plot import dashboard
from .twostream import emit, absorb
__all__ = [
'dask_client',
'Grid',
'Planet',
'effective_temperature'
]
[docs]def dask_client(cluster_kwargs=dict(), client_kwargs=dict()):
"""
Launch a local cluster, return the client.
"""
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
**cluster_kwargs
)
client = Client(cluster, **client_kwargs)
return client
def wavelength_grid(min_micron=0.5, max_micron=10, n_bins=500, lam=None):
"""
Compute a log-spaced wavelength grid.
"""
if lam is None:
lam = np.logspace(np.log10(min_micron), np.log10(max_micron), n_bins) * u.um
wl_bins = np.concatenate([
[(lam.min() - (lam[1] - lam[0])).to(u.um).value],
lam.to(u.um).value]
) + (lam[1] - lam[0]).to(u.um).value / 2
R = float(lam[lam.shape[0]//2] / (lam[lam.shape[0]//2 + 1] - lam[lam.shape[0]//2]))
return lam, wl_bins, R
def F_TOA(lam, T_star=5800*u.K, f=2/3, a_rstar=float(0.03 * u.AU / u.R_sun)):
"""
Compute the flux at the top of the atmosphere of the planet.
"""
return (f * a_rstar ** -2 *
1 / (2 * np.pi) *
(np.pi * B_star(T_star, lam))
).to(u.erg/u.s/u.cm**3, u.spectral_density(lam))
def B_star(T_star, lam):
"""
Compute the blackbody spectrum of the star
"""
return BB(T_star)(lam)
[docs]class Planet(object):
"""Container for planetary system information"""
@u.quantity_input(
m_bar=u.g, g=u.m/u.s**2, T_star=u.K
)
def __init__(self, a_rstar, m_bar, g, T_star, alpha):
"""
Parameters
----------
a_rstar : float
Ratio of the semimajor axis of the orbit to the stellar radius
m_bar : ~astropy.units.Quantity
Mean molecular weight
g : ~astropy.units.Quantity
Surface gravity
T_star : ~astropy.units.Quantity
Stellar effective temperature
alpha : float
Number of scale heights in a mixing length
"""
self.a_rstar = a_rstar
self.m_bar = m_bar
self.g = g
self.T_star = T_star
self.alpha = alpha
[docs] @classmethod
def from_hot_jupiter(cls):
"""
Initialize a hot-Jupiter system with standard parameters:
:math:`M=M_J`, :math:`R=R_J`, :math:`\\bar{m}=2.4`, :math:`g=g_J`,
:math:`T_\\mathrm{eff}=5800` K.
"""
g_jup = 1 * G * u.M_jup / u.R_jup**2
return cls(
a_rstar=float(0.03 * u.AU / u.R_sun),
m_bar=2.4*m_p,
g=g_jup,
T_star=5800 * u.K,
alpha=1
)
[docs]class Grid(object):
"""
Grid over temperatures, pressures and wavelengths.
"""
@u.quantity_input(
lam_min=u.um, lam_max=u.um, P_toa=u.bar, P_boa=u.bar,
T_ref=u.K, P_ref=u.bar, lam=u.cm, pressures=u.bar, init_temperatures=u.K
)
def __init__(
self, planet,
lam=None, pressures=None, init_temperatures=None,
# Wavelength grid:
lam_min=0.5 * u.um, lam_max=10 * u.um, n_wl_bins=500,
# Pressure grid:
P_toa=1e-6 * u.bar, P_boa=200 * u.bar, n_layers=30,
# Initial temperature grid:
T_ref=2300 * u.K, P_ref=0.1 * u.bar, alpha=0.1
):
"""
If ``lam``, ``pressures``, or ``init_temperatures`` are None,
frei use the remaining keyword arguments to produce each grid.
Parameters
----------
planet : ~frei.Planet
Planet object associated with this grid.
lam : ~astropy.units.Quantity or None (optional)
Wavelength grid
pressures : ~astropy.units.Quantity or None (optional)
Pressure grid
init_temperatures : ~astropy.units.Quantity or None (optioonal)
Initial temperature grid at each pressure
lam_min : ~astropy.units.Quantity
Minimum wavelength.
lam_max : ~astropy.units.Quantity
Maximum wavelength
n_wl_bins : int
Number of log-spaced bins in wavelength
P_toa : ~astropy.units.Quantity
Pressure at the top of the atmosphere
P_boa : ~astropy.units.Quantity
Pressure at the bottom of the atmosphere
n_layers : int
Number of log-spaced bins in pressure
T_ref : ~astropy.units.Quantity
Reference temperature at reference pressure
P_ref : ~astropy.units.Quantity
Reference pressure
alpha : float (default = 0.1)
Power law index of initial guess T-p profile
"""
self.planet = planet
if lam is None:
self.lam, self.wl_bins, self.R = wavelength_grid(
min_micron=lam_min.to(u.um).value,
max_micron=lam_max.to(u.um).value,
n_bins=n_wl_bins
)
else:
self.lam, self.wl_bins, self.R = wavelength_grid(
lam=lam
)
if pressures is None:
self.pressures = pressure_grid(
n_layers=n_layers, P_toa=np.log10(P_toa.to(u.bar).value),
P_boa=np.log10(P_boa.to(u.bar).value)
)
else:
self.pressures = pressures
if init_temperatures is None:
self.init_temperatures = temperature_grid(
self.pressures, T_ref, P_ref, alpha
)
else:
self.init_temperatures = init_temperatures
self.opacities = None
def __repr__(self):
return (
f"<Grid in T=[{self.init_temperatures[0]:.0f}" +
f"...{self.init_temperatures[-1]:.0f}], " +
f"p=[{self.pressures[0]:.2g}...{self.pressures[-1]:.2g}], " +
f"lam=[{self.lam[0]}...{self.lam[-1]}]>"
)
[docs] def load_opacities(
self, species=None, path=None, opacities=None, client=None, force_reload=False
):
"""
Load opacity tables from path.
Parameters
----------
species : list or None (optional)
List of species to load. If None, load all available.
path : str
Path passed to ~glob.glob to find opacity netCDF files.
opacities : None or dict (optional)
If opacities are already computed, simply pass them into the Grid
object with this keyword argument.
client : None or ~dask.distributed.client.Client
Client for distributed dask computation on opacity tables
Returns
-------
opacities : dict
Opacity dictionary of xarray.DataArray's
"""
if (self.opacities is None and opacities is None) or force_reload:
self.opacities = binned_opacity(
self.init_temperatures,
self.pressures, self.wl_bins, self.lam, client,
species
)
else:
self.opacities = opacities
return self.opacities
[docs] def emission_spectrum(self, n_timesteps=1, n_zero_crossings=2, convergence_dT=3*u.K):
"""
Compute the emission spectrum for this grid.
Parameters
----------
n_timesteps : int
Maximum number of timesteps to take towards radiative equilibrium
convergence_dT : ~astropy.units.Quantity
Alternatively, set the maximum change in temperature before
considered converged
Returns
-------
spec : specutils.Spectrum1D
Emission spectrum
final_temps : astropy.units.Quantity
Final temperature grid
temperature_history : astropy.units.Quantity
Grid of temperatures with dimensions (n_layers, n_timesteps)
dtaus : numpy.ndarray
Change in optical depth in final iteration
n_zero_crossings : int
Number of changes in sign of ∆T in each layer before
considered converged
"""
if self.opacities is None:
raise ValueError("Must load opacities before computing emission spectrum.")
flux_unit = u.erg / u.s / u.cm**3
F_toa = F_TOA(self.lam, T_star=self.planet.T_star, a_rstar=self.planet.a_rstar)
final_temps = self.init_temperatures.copy()
n_layers, n_wavelengths = len(self.pressures), len(self.lam)
fluxes_down = np.zeros((n_layers, n_wavelengths)) * flux_unit
fluxes_up = np.zeros((n_layers, n_wavelengths)) * flux_unit
temp_hists = []
max_dTs = []
timestep_iterator = (
trange(n_timesteps) if n_timesteps > 1 else range(n_timesteps)
)
for i in timestep_iterator:
fluxes_up, fluxes_down, final_temps, temperature_history_emit, _, dT = emit(
opacities=self.opacities,
temperatures=final_temps,
pressures=self.pressures,
lam=self.lam,
F_TOA=F_toa,
g=self.planet.g,
m_bar=self.planet.m_bar,
n_timesteps=1,
alpha=self.planet.alpha,
fluxes_up=fluxes_up, fluxes_down=fluxes_down
)
fluxes_up, fluxes_down, final_temps, temperature_history_absorb, _, dT = absorb(
opacities=self.opacities,
temperatures=final_temps,
pressures=self.pressures,
lam=self.lam,
F_TOA=F_toa,
g=self.planet.g,
m_bar=self.planet.m_bar,
n_timesteps=1,
alpha=self.planet.alpha,
fluxes_up=fluxes_up, fluxes_down=fluxes_down
)
max_dT = np.abs(dT).max()
# Check for convergence with 6 sign flips in temperature history diff
temp_hists.append(temperature_history_absorb)
max_dTs.append(max_dT)
temp_hist = np.hstack(temp_hists)
temp_hist = temp_hist.T[temp_hist[0] != 0].T
diffs = np.diff(temp_hist.value.T, axis=0)
conv = (np.count_nonzero(
np.sign(diffs[1:]) != np.sign(diffs[:-1]), axis=0
) > n_zero_crossings) | (np.abs(dT) < convergence_dT)
if n_timesteps > 1:
timestep_iterator.set_description(
f"max|∆T|={max_dT:.1f}; conv = {np.count_nonzero(conv)}/{n_layers}"
)
if np.all(conv):
break
temp_hist = np.hstack(temp_hists)
temp_hist = temp_hist.T[temp_hist[0] != 0].T
fluxes_up, fluxes_down, final_temps, _, dtaus, dT = emit(
opacities=self.opacities,
temperatures=final_temps,
pressures=self.pressures,
lam=self.lam,
F_TOA=F_toa,
g=self.planet.g,
m_bar=self.planet.m_bar,
n_timesteps=1,
fluxes_up=fluxes_up, fluxes_down=fluxes_down
)
return (
Spectrum1D(flux=fluxes_up[-1], spectral_axis=self.lam),
final_temps, temp_hist, dtaus
)
[docs] def emission_dashboard(self, spec, final_temps, temperature_history, dtaus,
T_eff=None, plot_phoenix=True, cache=False):
"""
Produce the "dashboard" plot with the outputs from ``emission_spectrum``.
Parameters
----------
spec : ~specutils.Spectrum1D
Emission spectrum
final_temps : ~astropy.units.Quantity
Final temperature grid
temperature_history : ~astropy.units.Quantity
Grid of temperatures with dimensions (n_layers, n_timesteps)
dtaus : ~np.ndarray
Change in optical depth in final iteration
T_eff : ~astropy.units.Quantity or None
If not None, give the effective temperature of the PHOENIX model
to plot in comparison, otherwise compute it on the fly.
plot_phoenix : bool
If True, plot the corresponding PHOENIX model
cache : bool
Cache the PHOENIX model spectrum if ``plot_phoenix`` is True.
Returns
-------
fig, ax
Matplotlib figure and axis objects.
"""
if plot_phoenix:
if T_eff is None:
T_eff = effective_temperature(self, spec, dtaus, final_temps)
phoenix_lowres_padded = get_binned_phoenix_spectrum(
T_eff, self.planet.g, self.wl_bins, self.lam, cache=cache
)
else:
flux_unit = u.erg/u.cm**3/u.s
phoenix_lowres_padded = np.zeros(len(self.lam)) * flux_unit
fig, ax = dashboard(
self.lam, spec.flux, phoenix_lowres_padded, dtaus,
self.pressures, final_temps, temperature_history, self.opacities
)
return fig, ax
def effective_temperature_milne(grid, spec, dtaus, final_temps):
"""
Estimate photosphere temperature from Milne's solution (tau ~ 2/3).
"""
pressure_milne = np.ones_like(grid.lam.value)
for i in range(dtaus.shape[1]):
pressure_milne[i] = np.interp(
2/3, np.exp(-dtaus[:, i]), grid.pressures
).to(u.bar).value
temperature_milne = np.interp(
np.average(
pressure_milne,
weights=spec.flux.to(u.erg/u.s/u.cm**2,
u.spectral_density(grid.lam)).value
),
grid.pressures[::-1].to(u.bar).value, final_temps[::-1]
)
return temperature_milne
def effective_temperature_planck(grid, spec):
"""
Use the Stefan-Boltzmann law to invert the emitted flux for the
effective temperature.
"""
bol_flux = np.trapz(spec.flux, grid.lam)
return ((bol_flux / sigma_sb) ** (1/4)).decompose()
[docs]def effective_temperature(grid, spec, dtaus, final_temps):
"""
Compute effective temperature of an atmosphere given the outputs
from ``emit``.
This is the mean of the effective temperatures computed from
Milne's solution and the Stefan-Boltzmann law.
Parameters
----------
grid : ~frei.Grid
Wavelength, pressure and temperature grid
spec : ~specutils.Spectrum1D
Emission spectrum
dtaus : ~numpy.ndarray
Change in optical depth in final iteration
final_temps : ~astropy.units.Quantity
Temperature grid in the final iteration
"""
return u.Quantity([
effective_temperature_milne(grid, spec, dtaus, final_temps),
effective_temperature_planck(grid, spec)
]).mean()