Source code for frei.opacity

import os
import tarfile
import shutil
from glob import glob
from tqdm import tqdm
import numpy as np
import astropy.units as u
from astropy.constants import m_p
import xarray as xr

from .chemistry import chemistry

__all__ = [
    'binned_opacity',
    'kappa',
    'load_example_opacity',
    'download_molecule', 
    'download_atom'
]

n_ref_H2 = 2.68678e19 * u.cm**-3
n_ref_He = 2.546899e19 * u.cm**-3
K_lambda = 1

interp_kwargs = dict(
    method='nearest', 
    kwargs=dict(fill_value="extrapolate")
)


def mapfunc_exact(
        group, temperature=2500, pressure=1e-08, interp_kwargs=interp_kwargs
):
    wl = group.wavelength
    op = group.interp(
        dict(temperature=temperature, pressure=pressure), **interp_kwargs
    ).opacity
    
    Delta_x = wl.max() - wl.min()
    return op.integrate('wavelength').expand_dims(dict(wavelength=[wl.mean()])) / Delta_x


def delayed_map_exact_concat(grouped, temperatures, pressures, lam, client):

    r = client.submit(
        grouped.map, mapfunc_exact,
        temperature=temperatures.value,
        pressure=pressures.to(u.bar).value
    )

    r = client.compute(r)
    results = client.gather(r)
    # Concatenate the results from each delayed task into a big dask array
    return xr.concat(
        results, dim='wavelength'
        # Also interpolate to span all grid wavelength grid points that weren't covered by 
        # the xarray groupings (currently xarray doesn't return empty bins)
    ).interp(
        dict(wavelength=lam.to(u.um).value),
        method='linear', kwargs=dict(fill_value='extrapolate')
    )


[docs]def binned_opacity( temperatures, pressures, wl_bins, lam, client, species=None, path=None ): """ Compute opacity for all available species, binned to wavelengths lam. Parameters ---------- path : str Path passed to ~glob.glob to find opacity netCDF files. temperatures : ~astropy.units.Quantity Temperature grid pressures : ~astropy.units.Quantity Pressure grid wl_bins : ~astropy.units.Quantity Wavelength bin edges lam : ~astropy.units.Quantity Wavelength bin centers client : None or ~dask.distributed.client.Client Client for distributed dask computation on opacity tables path : str (optional) Path to opacity files. species : list or None (optional) List of species to load. If None, load all available. Returns ------- op : dict Opacity tables for each species """ from .chemistry import iso_to_species if path is None: path = os.path.join(os.path.expanduser('~'), '.frei', '*.nc') paths = glob(path) if species is None: # Get all species names species = [ iso_to_species(path.split('/')[-1].split('_')[0]) for path in paths ] fetch_paths = [ path for path in paths if iso_to_species(path.split('/')[-1].split('_')[0]) in species ] fetch_species = [ iso_to_species(path.split('/')[-1].split('_')[0]) for path in fetch_paths ] results = dict() xr_kwargs = dict( # chunks=dict(wavelength=10_000) ) pbar = tqdm(zip(fetch_species, fetch_paths), total=len(fetch_paths)) for species_name, species_path in pbar: isotopologue = species_path.split('/')[-1].split('_')[0] pbar.set_description(f'Opacities for {isotopologue} (opening)') species_ds = xr.open_dataset(species_path, **xr_kwargs) pbar.set_description(f'Opacities for {isotopologue} (binning)') species_grouped = species_ds.groupby_bins("wavelength", wl_bins) pbar.set_description(f'Opacities for {isotopologue} (integrating)') species_binned = species_grouped.map( mapfunc_exact, temperature=temperatures.value, pressure=pressures.to(u.bar).value ) pbar.set_description(f'Opacities for {isotopologue} (interp)') results[isotopologue] = species_binned.interp( dict(wavelength=lam.to(u.um).value), method='linear', kwargs=dict(fill_value='extrapolate') ) del species_ds, species_grouped, species_binned return results
def n_lambda_H2(wavelength): # Malik 2017 Eqn 17 return 13.58e-5 * ( 1 + (7.52e-11 * u.cm**2) * wavelength**-2 ) + 1 def n_lambda_He(wavelength): # Deitrick 2020 Eqn C3 return 1e-8 * (2283 + (1.8102e13 / (1.5342e10 - (wavelength / (1 * u.um))**-2)) ) + 1 def rayleigh_H2(wavelength, m_bar=2.4*m_p): # Malik 2017 Eqn 16 return ((24 * np.pi**3 / n_ref_H2**2 / wavelength**4 * ((n_lambda_H2(wavelength)**2 - 1) / (n_lambda_H2(wavelength)**2 + 2))**2 * K_lambda ) / m_bar).decompose() def rayleigh_He(wavelength, m_bar=2.4*m_p): # Malik 2017 Eqn 16 return ((24 * np.pi**3 / n_ref_He**2 / wavelength**4 * ((n_lambda_He(wavelength)**2 - 1) / (n_lambda_He(wavelength)**2 + 2))**2 * K_lambda ) / m_bar).decompose()
[docs]def kappa( opacities, temperature, pressure, lam, m_bar=2.4*m_p ): """ Return the opacity at a given temperature and pressure. Parameters ---------- opacities : dict Opacity dictionary of xarray.DataArray's temperature : ~astropy.units.Quantity Temperature value pressure : ~astropy.units.Quantity Pressure value lam : ~astropy.units.Quantity Wavelength bin centers m_bar : ~astropy.units.Quantity Mean molecular weight Returns ------- k : ~astropy.units.Quantity Sum of opacities over all species sigma_scattering : ~astropy.units.Quantity Scattering cross section """ sigma_scattering = rayleigh_H2(lam, m_bar) + rayleigh_He(lam, m_bar) if pressure.isscalar and temperature.isscalar: pressure = u.Quantity([pressure]) temperature = u.Quantity([temperature]) ops = [] interp_kwargs = dict( method='linear', kwargs=dict(fill_value=0) ) fastchem_mmr = chemistry( temperature, pressure, opacities.keys(), m_bar=m_bar ) for species in opacities: interp_point = dict( pressure=xr.DataArray(pressure.to(u.bar).value, dims='z'), ) if len(np.unique(opacities[species].temperature)) > 1: interp_point['temperature'] = xr.DataArray( temperature.value, dims='z' ) opacity = fastchem_mmr[species][:, None] * opacities[species].interp( **interp_point, **interp_kwargs ) ops.append(opacity) if len(ops) == 1: ops = ops[0].values.flatten() elif len(ops) > 1: ops = xr.concat(ops, 'opacities').sum('opacities').values.flatten() return ops * u.cm**2 / u.g + sigma_scattering, sigma_scattering
[docs]def load_example_opacity(grid, seed=42, scale_factor=20): """ Load "example" opacity xarray. This file function returns something compatible with the output of ``binned_opacity``, so fake data can be substituted for the real opacities during testing and in the documentaiton. Parameters ---------- grid : ~frei.Grid Grid object seed : int Random seed for numpy scale_factor : float Scale up/down synthetic opacities by this factor Returns ------- op : dict Opacity tables for each species """ np.random.seed(seed) simple_opacities = np.zeros( (grid.pressures.shape[0], grid.init_temperatures.shape[0], grid.lam.shape[0]) ) so = ( # Broad infrared opacity np.exp(-0.5 * (grid.lam - 6 * u.um)**2 / (2 * u.um)**2) + # Broad optical opacity 0.8 * np.exp(-0.5 * (grid.lam - 0.3 * u.um)**2 / (0.5 * u.um)**2) ) # Add a bunch of random absorption bands in the optical for amp, wl_micron in zip( np.random.uniform(low=0.1, high=0.2, size=15), np.random.uniform(low=0.5, high=1, size=15) ): so += amp * np.exp( -0.5 * (grid.lam - wl_micron * u.um)**2 / (0.005 * u.um)**2 ) # Add a few water-like absorption bands in the NIR for amp, wl_micron in zip( [0.22, 0.2, 0.18], np.logspace(np.log10(1.4), np.log10(2.7), 3) ): so += amp * np.exp( -0.5 * (grid.lam - wl_micron * u.um)**2 / (0.13 * u.um)**2 ) simple_opacities[:] += 5 * 10**(2.5 * (so.value - 0.4)) simple_opacities *= scale_factor # Save this fake opacity grid to the water key in the opacity dictionary op = { "1H2-16O": xr.DataArray( simple_opacities, dims=['pressure', 'temperature', 'wavelength'], coords=dict( pressure=grid.pressures.to(u.bar).value, temperature=grid.init_temperatures, wavelength=grid.lam.to(u.um).value ) ).drop_duplicates('temperature') } return op
def dace_download_molecule( isotopologue='48Ti-16O', linelist='Toto', temperature_range=[500, 5000], pressure_range=[-6, 1.5], version=1 ): from dace.opacity import Molecule os.makedirs('tmp', exist_ok=True) archive_name = isotopologue + '__' + linelist + '.tar.gz' Molecule.download( isotopologue, linelist, float(version), temperature_range, pressure_range, output_directory='tmp', output_filename=archive_name ) return os.path.join('tmp', archive_name) def dace_download_atom( element='Na', charge=0, linelist='Kurucz', temperature_range=[500, 5000], pressure_range=[-8, 1.5], version=1 ): from dace.opacity import Atom os.makedirs('tmp', exist_ok=True) archive_name = element + '__' + linelist + '.tar.gz' Atom.download( element, charge, linelist, float(version), temperature_range, pressure_range, output_directory='tmp', output_filename=archive_name ) return os.path.join('tmp', archive_name) def untar_bin_files(archive_name): def bin_files(members): for tarinfo in members: if os.path.splitext(tarinfo.name)[1] == ".bin": yield tarinfo with tarfile.open(archive_name, 'r:gz') as tar: tar.extractall(path='tmp/.', members=bin_files(tar)) def get_opacity_dir_path_molecule(archive_name, isotopologue, linelist): return glob(os.path.join('tmp', isotopologue + '__' + linelist + "*e2b"))[0] def get_opacity_dir_path_atom(linelist): return glob(os.path.join('tmp', linelist + "*e2b"))[0] def opacity_dir_to_netcdf(opacity_dir, outpath): import xarray as xr temperature_grid = [] pressure_grid = [] for dirpath, dirnames, filenames in os.walk(opacity_dir): for filename in filenames: # Wavenumber points from range given in the file names temperature = int(filename.split('_')[3]) sign = 1 if filename.split('_')[4][0] == 'p' else -1 pressure = 10 ** (sign * float(filename.split('_')[4][1:].split('.')[0]) / 100) wl_start = int(filename.split('_')[1]) wl_end = int(filename.split('_')[2]) wlen = np.arange(wl_start, wl_end, 0.01) # Convert to micron wavelength = 1 / wlen / 1e-4 unique_wavelengths = wavelength[1:][::-1] temperature_grid.append(temperature) pressure_grid.append(pressure) tgrid = np.sort(list(set(temperature_grid))) pgrid = np.sort(list(set(pressure_grid))) if len(pgrid) == 1: extrapolate_pgrid = True pgrid = np.concatenate([pgrid, 10**(-1*np.log10(pgrid))]) else: extrapolate_pgrid = False opacity_grid = np.zeros( (len(tgrid), len(pgrid), len(unique_wavelengths)), dtype='float32' ) for dirpath, dirnames, filenames in os.walk(opacity_dir): for filename in filenames: opacity = np.fromfile( os.path.join(dirpath, filename), dtype=np.float32 )[1:][::-1] # Wavenumber points from range given in the file names temperature = int(filename.split('_')[3]) sign = 1 if filename.split('_')[4][0] == 'p' else -1 pressure = 10 ** (sign * float(filename.split('_')[4][1:].split('.')[0]) / 100) temperature_ind = np.argmin(np.abs(tgrid - temperature)) pressure_ind = np.argmin(np.abs(pgrid - pressure)) opacity_grid[temperature_ind, pressure_ind, :] = opacity if extrapolate_pgrid: for dirpath, dirnames, filenames in os.walk(opacity_dir): for filename in filenames: opacity = np.fromfile( os.path.join(dirpath, filename), dtype=np.float32 )[1:][::-1] # Wavenumber points from range given in the file names temperature = int(filename.split('_')[3]) # *Flip the sign for the extrapolated grid point in pressure* sign = -1 if filename.split('_')[4][0] == 'p' else 1 pressure = 10 ** (sign * float(filename.split('_')[4][1:].split('.')[0]) / 100) temperature_ind = np.argmin(np.abs(tgrid - temperature)) pressure_ind = np.argmin(np.abs(pgrid - pressure)) opacity_grid[temperature_ind, pressure_ind, :] = opacity ds = xr.Dataset( data_vars=dict( opacity=(["temperature", "pressure", "wavelength"], opacity_grid) ), coords=dict( temperature=(["temperature"], tgrid), pressure=(["pressure"], pgrid), wavelength=unique_wavelengths ) ) if not os.path.exists(os.path.dirname(outpath)): os.makedirs(os.path.dirname(outpath), exist_ok=True) ds.to_netcdf(outpath if outpath.endswith(".nc") else outpath + '.nc', encoding={'opacity': {'dtype': 'float32', "zlib": True}}) def clean_up(bin_dir, archive_name): os.remove(archive_name) shutil.rmtree(bin_dir)
[docs]def download_molecule(isotopologue, linelist): """ Download molecular opacity data from DACE. .. warning:: This generates *very* large files. Only run this method if you have ~6 GB available per molecule. Parameters ---------- isotopologue : str For example, "1H2-16O" for water. linelist : str For example, "POKAZATEL" for water. """ archive_name = dace_download_molecule(isotopologue, linelist) untar_bin_files(archive_name) bin_dir = get_opacity_dir_path_molecule( archive_name, isotopologue, linelist ) nc_path = os.path.join( os.path.expanduser('~'), '.frei', isotopologue + '__' + linelist + '.nc' ) opacity_dir_to_netcdf(bin_dir, nc_path) clean_up(bin_dir, archive_name)
[docs]def download_atom(atom, charge, linelist): """ Download atomic opacity data from DACE. .. warning:: This generates *very* large files. Only run this method if you have ~6 GB available per molecule. Parameters ---------- atom : str For example, "Na" for sodium. charge : int For example, 0 for neutral. linelist : str For example, "Kurucz". """ archive_name = dace_download_atom(atom, charge, linelist) untar_bin_files(archive_name) bin_dir = get_opacity_dir_path_atom(linelist) nc_path = os.path.join( os.path.expanduser('~'), '.frei', atom + '_' + str(int(charge)) + '__' + linelist + '.nc' ) opacity_dir_to_netcdf(bin_dir, nc_path) clean_up(bin_dir, archive_name)