Source code for exo_skryer.read_obs

"""
read_obs.py
===========
"""

from pathlib import Path
from typing import Any, Optional
import numpy as np

__all__ = ['resolve_obs_path', 'read_obs_data']


def _is_missing_obs_path(value: Any) -> bool:
    """Return whether an observation-path config entry should be treated as unset."""
    if value is None:
        return True
    return str(value).strip().lower() in {"", "none", "null", "~"}


[docs] def resolve_obs_path(cfg: Any) -> Any: """Resolve the observational data path from configuration. Parameters ---------- cfg : config object Parsed YAML configuration object with `cfg.data.obs` attribute. Returns ------- Any Observational data spec. Either a single path string (``data.obs``) or a dict ``{"east": ..., "west": ...}`` for separate limb spectra. Raises ------ ValueError If no observational data path is present in the configuration. """ data_cfg = getattr(cfg, "data", None) rel_obs_path: Optional[str] = None if data_cfg is not None: rel_obs_path = getattr(data_cfg, "obs", None) obs_east = getattr(data_cfg, "obs_east", None) if data_cfg is not None else None obs_west = getattr(data_cfg, "obs_west", None) if data_cfg is not None else None has_east = not _is_missing_obs_path(obs_east) has_west = not _is_missing_obs_path(obs_west) if has_east or has_west: if not has_east or not has_west: raise ValueError( "When using separate limb spectra, both cfg.data.obs_east and " "cfg.data.obs_west must be set." ) return {"east": obs_east, "west": obs_west} if _is_missing_obs_path(rel_obs_path): raise ValueError( "No observational data path found. Set cfg.data.obs in the YAML config." ) return rel_obs_path
def _load_columns(path: str) -> np.ndarray: ''' Input: path to observational data file Output: raw data from the data file ''' raw = np.genfromtxt(path, dtype=str, comments="#", autostrip=True) if raw.ndim == 1: raw = raw[None, :] return raw def _resolve_path(path, base_dir=None) -> Path: path_obj = Path(path) if not path_obj.is_absolute(): if base_dir is not None: path_obj = (Path(base_dir) / path_obj).resolve() else: path_obj = path_obj.resolve() return path_obj def _read_single_obs_data(path, base_dir=None): path_obj = _resolve_path(path, base_dir=base_dir) print(f"[info] Reading observational data from: {path_obj}") raw = _load_columns(path_obj) if raw.shape[1] < 4: raise ValueError(f"[error] Observational file '{path}' must have at least four columns (wl,dwl,y,dy).") max_numeric = min(raw.shape[1], 5) floats = None for ncols in range(max_numeric, 3, -1): try: floats = raw[:, :ncols].astype(float) numeric_cols = ncols break except ValueError: continue if floats is None: raise ValueError(f"[error] Could not parse numeric columns from '{path}'.") wl = floats[:, 0] dwl = floats[:, 1] y = floats[:, 2] dy_plus = floats[:, 3] if numeric_cols >= 5: dy_minus = floats[:, 4] dy_sym = np.maximum(dy_plus, dy_minus) else: dy_minus = dy_plus.copy() dy_sym = dy_plus.copy() if raw.shape[1] > numeric_cols: response_mode = raw[:, numeric_cols] print('[info] Using custom wavelength convolution functions for each band') else: response_mode = np.full(wl.shape, "boxcar", dtype="<U16") print('[info] All bands have been defaulted to boxcar convolution') offset_group_col = numeric_cols + 1 if raw.shape[1] > offset_group_col: offset_group = raw[:, offset_group_col].astype("<U32") unique_groups = np.unique(offset_group) print(f'[info] Found {len(unique_groups)} offset groups: {unique_groups.tolist()}') for grp in unique_groups: mask = offset_group == grp wl_min, wl_max = wl[mask].min(), wl[mask].max() n_pts = mask.sum() print(f'[info] -> {grp}: {wl_min:.4f} - {wl_max:.4f} um ({n_pts} points)') print('[info] Define offset_<group> parameters in YAML to fit instrument offsets') else: offset_group = np.full(wl.shape, "__no_offset__", dtype="<U32") unique_groups, offset_group_idx = np.unique(offset_group, return_inverse=True) return { "wl": wl, "dwl": dwl, "y": y, "dy": dy_sym, "response_mode": response_mode, "offset_group": offset_group, "offset_group_idx": offset_group_idx, "offset_group_names": unique_groups, } def _combine_limb_obs(east_obs: dict, west_obs: dict) -> dict: y = np.concatenate([east_obs["y"], west_obs["y"]]) dy = np.concatenate([east_obs["dy"], west_obs["dy"]]) wl = np.concatenate([east_obs["wl"], west_obs["wl"]]) dwl = np.concatenate([east_obs["dwl"], west_obs["dwl"]]) response_mode = np.concatenate([east_obs["response_mode"], west_obs["response_mode"]]) offset_group = np.concatenate([east_obs["offset_group"], west_obs["offset_group"]]).astype("<U32") unique_groups, offset_group_idx = np.unique(offset_group, return_inverse=True) n_east = east_obs["y"].shape[0] n_west = west_obs["y"].shape[0] return { "wl": wl, "dwl": dwl, "y": y, "dy": dy, "response_mode": response_mode, "offset_group": offset_group, "offset_group_idx": offset_group_idx, "offset_group_names": unique_groups, "has_limb_observations": True, "wl_east": east_obs["wl"], "dwl_east": east_obs["dwl"], "y_east": east_obs["y"], "dy_east": east_obs["dy"], "response_mode_east": east_obs["response_mode"], "offset_group_east": east_obs["offset_group"], "wl_west": west_obs["wl"], "dwl_west": west_obs["dwl"], "y_west": west_obs["y"], "dy_west": west_obs["dy"], "response_mode_west": west_obs["response_mode"], "offset_group_west": west_obs["offset_group"], "east_slice": slice(0, n_east), "west_slice": slice(n_east, n_east + n_west), }
[docs] def read_obs_data(path, base_dir=None): ''' Input: path to observational data file Output: Dictionary containing observational data ''' if isinstance(path, dict): if set(path.keys()) >= {"east", "west"}: east_obs = _read_single_obs_data(path["east"], base_dir=base_dir) west_obs = _read_single_obs_data(path["west"], base_dir=base_dir) print( f"[info] Using separate limb observations: east={len(east_obs['y'])} bins, " f"west={len(west_obs['y'])} bins" ) return _combine_limb_obs(east_obs, west_obs) raise ValueError("Observational dict specs must contain 'east' and 'west' keys.") obs_dict = _read_single_obs_data(path, base_dir=base_dir) obs_dict["has_limb_observations"] = False return obs_dict