Skip to content

data_access

data_access

Data access: STAC COG loading, local GeoTIFF loading, terrain derivation.

Provide unified functions for fetching raster source data from multiple backends (Planetary Computer STAC, local GeoTIFFs, remote VRTs via GDAL vsicurl) and for deriving terrain variables (slope, aspect) from elevation grids using pure numpy.

The module supports three of the five processing strategies defined in the pipeline architecture:

  • stac_cog -- :func:fetch_stac_cog via Planetary Computer or USGS GDP STAC
  • local_tiff -- :func:fetch_local_tiff for local files or HTTP(S) COGs
  • climr_cat -- :func:build_climr_cat_dict for ClimateR-Catalog OPeNDAP

The remaining two strategies (nhgf_stac static and temporal) are handled directly in :mod:hydro_param.processing via gdptools NHGFStacTiffData and NHGFStacData classes.

See Also

design.md : Sections 6.12 (local data preference) and 11.4 (access strategies). hydro_param.processing : Zonal statistics and temporal aggregation.

References

.. [1] design.md section 6.12 -- Local data over remote services. .. [2] design.md section 11.4 -- Data access strategy details.

derive_slope

derive_slope(
    elevation: DataArray,
    method: str = "horn",
    *,
    x_coord: str = "x",
    y_coord: str = "y",
) -> xr.DataArray

Compute terrain slope in degrees from an elevation DataArray.

Derive slope from a DEM raster using the Horn (1981) finite-difference method implemented via numpy.gradient. The result is used by pywatershed derivation step 3 (topography) for parameters such as hru_slope.

Elevation units do not matter for slope computation as long as they are consistent with the cell size (both converted to meters internally via :func:_cell_sizes_meters).

PARAMETER DESCRIPTION
elevation

2-D elevation raster with spatial coordinates. May be in any CRS; geographic CRS coordinates (degrees) are automatically converted to meters for gradient computation.

TYPE: DataArray

method

Derivation method. Only "horn" is currently supported.

TYPE: str DEFAULT: 'horn'

x_coord

Name of the x coordinate dimension in the DataArray.

TYPE: str DEFAULT: 'x'

y_coord

Name of the y coordinate dimension in the DataArray.

TYPE: str DEFAULT: 'y'

RETURNS DESCRIPTION
DataArray

Slope in degrees [0, 90], with the same shape, coordinates, and CRS as the input elevation. Output attributes include units="degrees" and long_name="Terrain slope".

RAISES DESCRIPTION
ValueError

If method is not "horn".

Notes

The Horn method computes slope as arctan(sqrt(dz/dx² + dz/dy²)), where the partial derivatives are estimated by numpy.gradient using second-order central differences.

References

.. [1] Horn, B.K.P. (1981). "Hill shading and the reflectance map." Proceedings of the IEEE, 69(1), 14-47.

See Also

derive_aspect : Compute terrain aspect from the same elevation input. _cell_sizes_meters : Convert coordinate spacing to meters.

Source code in src/hydro_param/data_access.py
def derive_slope(
    elevation: xr.DataArray,
    method: str = "horn",
    *,
    x_coord: str = "x",
    y_coord: str = "y",
) -> xr.DataArray:
    """Compute terrain slope in degrees from an elevation DataArray.

    Derive slope from a DEM raster using the Horn (1981) finite-difference
    method implemented via ``numpy.gradient``. The result is used by
    pywatershed derivation step 3 (topography) for parameters such as
    ``hru_slope``.

    Elevation units do not matter for slope computation as long as they
    are consistent with the cell size (both converted to meters internally
    via :func:`_cell_sizes_meters`).

    Parameters
    ----------
    elevation : xr.DataArray
        2-D elevation raster with spatial coordinates. May be in any CRS;
        geographic CRS coordinates (degrees) are automatically converted
        to meters for gradient computation.
    method : str
        Derivation method. Only ``"horn"`` is currently supported.
    x_coord : str
        Name of the x coordinate dimension in the DataArray.
    y_coord : str
        Name of the y coordinate dimension in the DataArray.

    Returns
    -------
    xr.DataArray
        Slope in degrees [0, 90], with the same shape, coordinates, and
        CRS as the input elevation. Output attributes include
        ``units="degrees"`` and ``long_name="Terrain slope"``.

    Raises
    ------
    ValueError
        If ``method`` is not ``"horn"``.

    Notes
    -----
    The Horn method computes slope as ``arctan(sqrt(dz/dx² + dz/dy²))``,
    where the partial derivatives are estimated by ``numpy.gradient``
    using second-order central differences.

    References
    ----------
    .. [1] Horn, B.K.P. (1981). "Hill shading and the reflectance map."
       Proceedings of the IEEE, 69(1), 14-47.

    See Also
    --------
    derive_aspect : Compute terrain aspect from the same elevation input.
    _cell_sizes_meters : Convert coordinate spacing to meters.
    """
    if method != "horn":
        raise ValueError(f"Unsupported slope method: {method}")

    elev = elevation.values.astype(np.float64)
    y_coords = elevation.coords[y_coord].values
    x_coords = elevation.coords[x_coord].values

    has_crs = hasattr(elevation, "rio") and elevation.rio.crs
    is_geographic = has_crs and elevation.rio.crs.is_geographic
    dy_m, dx_m = _cell_sizes_meters(y_coords, x_coords, is_geographic)

    dz_dy, dz_dx = np.gradient(elev, dy_m, dx_m)
    slope_rad = np.arctan(np.sqrt(dz_dx**2 + dz_dy**2))
    slope_deg = np.degrees(slope_rad)

    result = elevation.copy(data=slope_deg)
    result.name = "slope"
    result.attrs = {"units": "degrees", "long_name": "Terrain slope"}
    return result

derive_aspect

derive_aspect(
    elevation: DataArray,
    method: str = "horn",
    *,
    x_coord: str = "x",
    y_coord: str = "y",
) -> xr.DataArray

Compute terrain aspect in degrees clockwise from north.

Derive aspect (downslope direction) from a DEM raster using the Horn (1981) finite-difference method. The result is used by pywatershed derivation step 3 (topography) for parameters such as hru_aspect.

Aspect is measured as a compass bearing: 0 degrees = north, 90 = east, 180 = south, 270 = west. Flat areas where both partial derivatives are zero will have an aspect of 0 degrees (north by convention).

PARAMETER DESCRIPTION
elevation

2-D elevation raster with spatial coordinates. May be in any CRS; geographic CRS coordinates (degrees) are automatically converted to meters for gradient computation.

TYPE: DataArray

method

Derivation method. Only "horn" is currently supported.

TYPE: str DEFAULT: 'horn'

x_coord

Name of the x coordinate dimension in the DataArray.

TYPE: str DEFAULT: 'x'

y_coord

Name of the y coordinate dimension in the DataArray.

TYPE: str DEFAULT: 'y'

RETURNS DESCRIPTION
DataArray

Aspect in degrees [0, 360), with the same shape, coordinates, and CRS as the input elevation. Output attributes include units="degrees" and long_name="Terrain aspect (clockwise from north)".

RAISES DESCRIPTION
ValueError

If method is not "horn".

Notes

Aspect is computed as atan2(-dz/dx, -dz/dy) (negated gradients give the descent direction), then converted from mathematical angle to compass bearing by adding 360 and taking modulo 360.

References

.. [1] Horn, B.K.P. (1981). "Hill shading and the reflectance map." Proceedings of the IEEE, 69(1), 14-47.

See Also

derive_slope : Compute terrain slope from the same elevation input. _cell_sizes_meters : Convert coordinate spacing to meters.

Source code in src/hydro_param/data_access.py
def derive_aspect(
    elevation: xr.DataArray,
    method: str = "horn",
    *,
    x_coord: str = "x",
    y_coord: str = "y",
) -> xr.DataArray:
    """Compute terrain aspect in degrees clockwise from north.

    Derive aspect (downslope direction) from a DEM raster using the Horn
    (1981) finite-difference method. The result is used by pywatershed
    derivation step 3 (topography) for parameters such as ``hru_aspect``.

    Aspect is measured as a compass bearing: 0 degrees = north, 90 = east,
    180 = south, 270 = west. Flat areas where both partial derivatives are
    zero will have an aspect of 0 degrees (north by convention).

    Parameters
    ----------
    elevation : xr.DataArray
        2-D elevation raster with spatial coordinates. May be in any CRS;
        geographic CRS coordinates (degrees) are automatically converted
        to meters for gradient computation.
    method : str
        Derivation method. Only ``"horn"`` is currently supported.
    x_coord : str
        Name of the x coordinate dimension in the DataArray.
    y_coord : str
        Name of the y coordinate dimension in the DataArray.

    Returns
    -------
    xr.DataArray
        Aspect in degrees [0, 360), with the same shape, coordinates, and
        CRS as the input elevation. Output attributes include
        ``units="degrees"`` and ``long_name="Terrain aspect (clockwise
        from north)"``.

    Raises
    ------
    ValueError
        If ``method`` is not ``"horn"``.

    Notes
    -----
    Aspect is computed as ``atan2(-dz/dx, -dz/dy)`` (negated gradients
    give the descent direction), then converted from mathematical angle
    to compass bearing by adding 360 and taking modulo 360.

    References
    ----------
    .. [1] Horn, B.K.P. (1981). "Hill shading and the reflectance map."
       Proceedings of the IEEE, 69(1), 14-47.

    See Also
    --------
    derive_slope : Compute terrain slope from the same elevation input.
    _cell_sizes_meters : Convert coordinate spacing to meters.
    """
    if method != "horn":
        raise ValueError(f"Unsupported aspect method: {method}")

    elev = elevation.values.astype(np.float64)
    y_coords = elevation.coords[y_coord].values
    x_coords = elevation.coords[x_coord].values

    has_crs = hasattr(elevation, "rio") and elevation.rio.crs
    is_geographic = has_crs and elevation.rio.crs.is_geographic
    dy_m, dx_m = _cell_sizes_meters(y_coords, x_coords, is_geographic)

    dz_dy, dz_dx = np.gradient(elev, dy_m, dx_m)
    # Compass bearing: atan2(east_descent, north_descent)
    # Descent direction is opposite to gradient: (-dz_dx, -dz_dy)
    aspect_rad = np.arctan2(-dz_dx, -dz_dy)
    aspect_deg = np.degrees(aspect_rad)
    aspect_deg = (aspect_deg + 360) % 360

    result = elevation.copy(data=aspect_deg)
    result.name = "aspect"
    result.attrs = {"units": "degrees", "long_name": "Terrain aspect (clockwise from north)"}
    return result

derive_sin_aspect

derive_sin_aspect(
    elevation: DataArray,
    method: str = "horn",
    *,
    x_coord: str = "x",
    y_coord: str = "y",
) -> xr.DataArray

Compute sine of terrain aspect for circular mean aggregation.

Derive per-pixel sin(aspect) from a DEM raster. Arithmetic zonal mean of sine values is one component of the circular mean: the HRU-level aspect is recovered as atan2(mean_sin, mean_cos).

PARAMETER DESCRIPTION
elevation

2-D elevation raster with spatial coordinates.

TYPE: DataArray

method

Derivation method. Only "horn" is currently supported.

TYPE: str DEFAULT: 'horn'

x_coord

Name of the x coordinate dimension in the DataArray.

TYPE: str DEFAULT: 'x'

y_coord

Name of the y coordinate dimension in the DataArray.

TYPE: str DEFAULT: 'y'

RETURNS DESCRIPTION
DataArray

Sine of aspect, unitless [-1, 1], with the same shape, coordinates, and CRS as the input elevation.

See Also

derive_cos_aspect : Cosine component for circular mean. derive_aspect : Raw aspect in degrees.

Source code in src/hydro_param/data_access.py
def derive_sin_aspect(
    elevation: xr.DataArray,
    method: str = "horn",
    *,
    x_coord: str = "x",
    y_coord: str = "y",
) -> xr.DataArray:
    """Compute sine of terrain aspect for circular mean aggregation.

    Derive per-pixel ``sin(aspect)`` from a DEM raster.  Arithmetic zonal
    mean of sine values is one component of the circular mean: the HRU-level
    aspect is recovered as ``atan2(mean_sin, mean_cos)``.

    Parameters
    ----------
    elevation : xr.DataArray
        2-D elevation raster with spatial coordinates.
    method : str
        Derivation method. Only ``"horn"`` is currently supported.
    x_coord : str
        Name of the x coordinate dimension in the DataArray.
    y_coord : str
        Name of the y coordinate dimension in the DataArray.

    Returns
    -------
    xr.DataArray
        Sine of aspect, unitless [-1, 1], with the same shape,
        coordinates, and CRS as the input elevation.

    See Also
    --------
    derive_cos_aspect : Cosine component for circular mean.
    derive_aspect : Raw aspect in degrees.
    """
    aspect = derive_aspect(elevation, method, x_coord=x_coord, y_coord=y_coord)
    result = aspect.copy(data=np.sin(np.radians(aspect.values)))
    result.name = "sin_aspect"
    result.attrs = {"units": "unitless", "long_name": "Sine of terrain aspect"}
    return result

derive_cos_aspect

derive_cos_aspect(
    elevation: DataArray,
    method: str = "horn",
    *,
    x_coord: str = "x",
    y_coord: str = "y",
) -> xr.DataArray

Compute cosine of terrain aspect for circular mean aggregation.

Derive per-pixel cos(aspect) from a DEM raster. Arithmetic zonal mean of cosine values is one component of the circular mean: the HRU-level aspect is recovered as atan2(mean_sin, mean_cos).

PARAMETER DESCRIPTION
elevation

2-D elevation raster with spatial coordinates.

TYPE: DataArray

method

Derivation method. Only "horn" is currently supported.

TYPE: str DEFAULT: 'horn'

x_coord

Name of the x coordinate dimension in the DataArray.

TYPE: str DEFAULT: 'x'

y_coord

Name of the y coordinate dimension in the DataArray.

TYPE: str DEFAULT: 'y'

RETURNS DESCRIPTION
DataArray

Cosine of aspect, unitless [-1, 1], with the same shape, coordinates, and CRS as the input elevation.

See Also

derive_sin_aspect : Sine component for circular mean. derive_aspect : Raw aspect in degrees.

Source code in src/hydro_param/data_access.py
def derive_cos_aspect(
    elevation: xr.DataArray,
    method: str = "horn",
    *,
    x_coord: str = "x",
    y_coord: str = "y",
) -> xr.DataArray:
    """Compute cosine of terrain aspect for circular mean aggregation.

    Derive per-pixel ``cos(aspect)`` from a DEM raster.  Arithmetic zonal
    mean of cosine values is one component of the circular mean: the HRU-level
    aspect is recovered as ``atan2(mean_sin, mean_cos)``.

    Parameters
    ----------
    elevation : xr.DataArray
        2-D elevation raster with spatial coordinates.
    method : str
        Derivation method. Only ``"horn"`` is currently supported.
    x_coord : str
        Name of the x coordinate dimension in the DataArray.
    y_coord : str
        Name of the y coordinate dimension in the DataArray.

    Returns
    -------
    xr.DataArray
        Cosine of aspect, unitless [-1, 1], with the same shape,
        coordinates, and CRS as the input elevation.

    See Also
    --------
    derive_sin_aspect : Sine component for circular mean.
    derive_aspect : Raw aspect in degrees.
    """
    aspect = derive_aspect(elevation, method, x_coord=x_coord, y_coord=y_coord)
    result = aspect.copy(data=np.cos(np.radians(aspect.values)))
    result.name = "cos_aspect"
    result.attrs = {"units": "unitless", "long_name": "Cosine of terrain aspect"}
    return result

classify_usda_texture_raster

classify_usda_texture_raster(
    sand: DataArray, silt: DataArray, clay: DataArray
) -> xr.DataArray

Classify sand/silt/clay percentage rasters into USDA texture classes.

Wrapper around classify_usda_texture() that normalizes inputs and handles xarray DataArray I/O. Returns a float64 raster with class codes (1--12) suitable for categorical zonal statistics.

Before classification, three data-cleaning steps are applied:

  1. Negative clamping — ML-derived products (e.g. POLARIS) can produce negative regression artifacts. Negative values are clamped to zero with a WARNING log.
  2. Near-zero guard — Pixels where sand+silt+clay < 0.1% are treated as no-data (set to NaN) since they cannot be meaningfully normalized.
  3. Sum normalization — Remaining pixels are rescaled so that sand+silt+clay = 100%. This is necessary because POLARIS (and similar products) estimate each fraction independently, so pixel-level sums can deviate significantly from 100%. Normalization preserves the relative proportions — which correctly indicate the texture triangle region — while ensuring each pixel represents a valid soil composition.
PARAMETER DESCRIPTION
sand

Sand content as percentage (0--100), 2-D raster.

TYPE: DataArray

silt

Silt content as percentage (0--100), 2-D raster.

TYPE: DataArray

clay

Clay content as percentage (0--100), 2-D raster.

TYPE: DataArray

RETURNS DESCRIPTION
DataArray

Float64 raster with USDA texture class codes (1--12). Elements where any input is NaN remain NaN.

Notes

The normalization tolerance (0.01%) is tighter than the downstream classify_usda_texture() warning threshold (5%), so all pixels reaching the classifier will have sums within floating-point precision of 100%.

See Also

hydro_param.classification.classify_usda_texture : Core classifier. hydro_param.classification.USDA_TEXTURE_CLASSES : Code-to-name mapping.

Source code in src/hydro_param/data_access.py
def classify_usda_texture_raster(
    sand: xr.DataArray,
    silt: xr.DataArray,
    clay: xr.DataArray,
) -> xr.DataArray:
    """Classify sand/silt/clay percentage rasters into USDA texture classes.

    Wrapper around ``classify_usda_texture()`` that normalizes inputs
    and handles xarray DataArray I/O.  Returns a float64 raster with
    class codes (1--12) suitable for categorical zonal statistics.

    Before classification, three data-cleaning steps are applied:

    1. **Negative clamping** — ML-derived products (e.g. POLARIS) can
       produce negative regression artifacts.  Negative values are
       clamped to zero with a WARNING log.
    2. **Near-zero guard** — Pixels where sand+silt+clay < 0.1% are
       treated as no-data (set to NaN) since they cannot be
       meaningfully normalized.
    3. **Sum normalization** — Remaining pixels are rescaled so that
       sand+silt+clay = 100%.  This is necessary because POLARIS (and
       similar products) estimate each fraction independently, so
       pixel-level sums can deviate significantly from 100%.
       Normalization preserves the relative proportions — which
       correctly indicate the texture triangle region — while ensuring
       each pixel represents a valid soil composition.

    Parameters
    ----------
    sand : xr.DataArray
        Sand content as percentage (0--100), 2-D raster.
    silt : xr.DataArray
        Silt content as percentage (0--100), 2-D raster.
    clay : xr.DataArray
        Clay content as percentage (0--100), 2-D raster.

    Returns
    -------
    xr.DataArray
        Float64 raster with USDA texture class codes (1--12).
        Elements where any input is NaN remain NaN.

    Notes
    -----
    The normalization tolerance (0.01%) is tighter than the downstream
    ``classify_usda_texture()`` warning threshold (5%), so all pixels
    reaching the classifier will have sums within floating-point
    precision of 100%.

    See Also
    --------
    hydro_param.classification.classify_usda_texture : Core classifier.
    hydro_param.classification.USDA_TEXTURE_CLASSES : Code-to-name mapping.
    """
    s = sand.values.astype(np.float64).ravel()
    si = silt.values.astype(np.float64).ravel()
    c = clay.values.astype(np.float64).ravel()

    # Step 1: Clamp negative values (ML regression artifacts) to zero.
    for arr, name in [(s, "sand"), (si, "silt"), (c, "clay")]:
        neg_mask = (arr < 0) & ~np.isnan(arr)
        n_neg = int(np.sum(neg_mask))
        if n_neg > 0:
            logger.warning(
                "classify_usda_texture_raster: %d pixel(s) have negative "
                "%s values (min=%.2f%%) — clamping to 0",
                n_neg,
                name,
                float(np.nanmin(arr)),
            )
            arr[neg_mask] = 0.0

    # Step 2: Mark zero/near-zero totals as no-data (cannot normalize).
    total = s + si + c
    valid = ~(np.isnan(s) | np.isnan(si) | np.isnan(c))
    zero_total = valid & (total < 0.1)
    n_zero = int(np.sum(zero_total))
    if n_zero > 0:
        logger.warning(
            "classify_usda_texture_raster: %d pixel(s) have "
            "sand+silt+clay sum < 0.1%% — treating as no-data",
            n_zero,
        )
        s[zero_total] = np.nan
        si[zero_total] = np.nan
        c[zero_total] = np.nan
        valid = valid & ~zero_total

    # Step 3: Normalize to sum=100 (see docstring for rationale).
    # Tolerance 0.01% avoids floating-point noise; tighter than the
    # downstream classify_usda_texture() 5% warning threshold.
    need_norm = valid & (np.abs(total - 100.0) > 0.01)
    n_normalized = int(np.sum(need_norm))
    if n_normalized > 0:
        deviations = np.abs(total[need_norm] - 100.0)
        mean_dev = float(np.mean(deviations))
        max_dev = float(np.max(deviations))
        msg = (
            "classify_usda_texture_raster: normalized %d/%d pixel(s) "
            "to sum=100%% (mean deviation: %.1f%%, max: %.1f%%)"
        )
        args = (n_normalized, int(np.sum(valid)), mean_dev, max_dev)
        if max_dev > 20.0:
            logger.warning(msg, *args)
        else:
            logger.info(msg, *args)
        scale = 100.0 / total[need_norm]
        s[need_norm] *= scale
        si[need_norm] *= scale
        c[need_norm] *= scale

    codes = classify_usda_texture(s, si, c)
    out = sand.copy(data=codes.reshape(sand.shape))
    out.name = "soil_texture"
    out.attrs = {"units": "class", "long_name": "USDA soil texture classification"}
    return out

align_rasters

align_rasters(
    sources: list[DataArray],
    template: DataArray,
    method: str = "nearest",
) -> list[xr.DataArray]

Align source rasters to a template grid via reprojection.

Non-template sources are reprojected to match the template's CRS, resolution, and extent using rioxarray.reproject_match(). The template itself is passed through unchanged.

PARAMETER DESCRIPTION
sources

Source rasters to align.

TYPE: list[DataArray]

template

Reference raster whose grid defines the target CRS, resolution, and extent.

TYPE: DataArray

method

Rasterio resampling method name (default "nearest").

TYPE: str DEFAULT: 'nearest'

RETURNS DESCRIPTION
list[DataArray]

Aligned rasters in the same order as sources, all sharing the template's grid.

Source code in src/hydro_param/data_access.py
def align_rasters(
    sources: list[xr.DataArray],
    template: xr.DataArray,
    method: str = "nearest",
) -> list[xr.DataArray]:
    """Align source rasters to a template grid via reprojection.

    Non-template sources are reprojected to match the template's CRS,
    resolution, and extent using ``rioxarray.reproject_match()``.  The
    template itself is passed through unchanged.

    Parameters
    ----------
    sources : list[xr.DataArray]
        Source rasters to align.
    template : xr.DataArray
        Reference raster whose grid defines the target CRS,
        resolution, and extent.
    method : str
        Rasterio resampling method name (default ``"nearest"``).

    Returns
    -------
    list[xr.DataArray]
        Aligned rasters in the same order as *sources*, all
        sharing the template's grid.
    """
    from rasterio.enums import Resampling  # type: ignore[import-untyped]

    resampling = Resampling[method]
    aligned: list[xr.DataArray] = []
    for da in sources:
        if da is template:
            aligned.append(da)
        else:
            aligned.append(da.rio.reproject_match(template, resampling=resampling))
    return aligned

apply_raster_operation

apply_raster_operation(
    sources: list[DataArray], operation: str
) -> xr.DataArray

Apply an arithmetic operation across source rasters via left-to-right fold.

PARAMETER DESCRIPTION
sources

Two or more rasters to combine. Must be on the same grid (use align_rasters first if resolutions differ).

TYPE: list[DataArray]

operation

Arithmetic operation to apply.

TYPE: ('multiply', 'divide', 'add', 'subtract') DEFAULT: "multiply"

RETURNS DESCRIPTION
DataArray

Result of reduce(op, sources).

RAISES DESCRIPTION
ValueError

If operation is not one of the supported operations, or if sources contains fewer than 2 rasters.

Source code in src/hydro_param/data_access.py
def apply_raster_operation(
    sources: list[xr.DataArray],
    operation: str,
) -> xr.DataArray:
    """Apply an arithmetic operation across source rasters via left-to-right fold.

    Parameters
    ----------
    sources : list[xr.DataArray]
        Two or more rasters to combine.  Must be on the same grid
        (use ``align_rasters`` first if resolutions differ).
    operation : {"multiply", "divide", "add", "subtract"}
        Arithmetic operation to apply.

    Returns
    -------
    xr.DataArray
        Result of ``reduce(op, sources)``.

    Raises
    ------
    ValueError
        If *operation* is not one of the supported operations,
        or if *sources* contains fewer than 2 rasters.
    """
    if len(sources) < 2:
        msg = f"apply_raster_operation requires at least 2 sources, got {len(sources)}"
        raise ValueError(msg)
    op_fn = _RASTER_OPS.get(operation)
    if op_fn is None:
        msg = f"Unsupported raster operation '{operation}'. Choose from {list(_RASTER_OPS)}"
        raise ValueError(msg)
    return functools.reduce(op_fn, sources)

query_stac_items

query_stac_items(
    entry: DatasetEntry, bbox: list[float]
) -> list[Any]

Query a STAC catalog and return matching items for a bounding box.

Handle client creation, optional Planetary Computer signing, spatial search, and GSD (ground sample distance) filtering. The returned items can be passed to fetch_stac_cog(..., items=...) to avoid repeated queries when multiple variables share the same STAC collection within a single processing batch.

Enables STAC query reuse across variables, reducing redundant network calls during batch processing.

PARAMETER DESCRIPTION
entry

Registry entry with strategy="stac_cog". Must have non-None catalog_url and collection fields.

TYPE: DatasetEntry

bbox

Bounding box as [west, south, east, north] in EPSG:4326 decimal degrees.

TYPE: list[float]

RETURNS DESCRIPTION
list[Any]

Matching STAC items (signed if entry.sign is set). Items are pystac.Item objects suitable for asset access.

RAISES DESCRIPTION
ValueError

If entry.catalog_url or entry.collection is None.

RuntimeError

If no STAC items match the given collection and bounding box.

Notes

When entry.gsd is set, items are filtered to those matching the target ground sample distance (e.g., 10 m for 3DEP 1/3 arc-second). If no items match the GSD filter, all unfiltered items are returned with a warning.

See Also

fetch_stac_cog : Load and mosaic COG data from the returned items.

Source code in src/hydro_param/data_access.py
def query_stac_items(
    entry: DatasetEntry,
    bbox: list[float],
) -> list[Any]:
    """Query a STAC catalog and return matching items for a bounding box.

    Handle client creation, optional Planetary Computer signing, spatial
    search, and GSD (ground sample distance) filtering. The returned items
    can be passed to ``fetch_stac_cog(..., items=...)`` to avoid repeated
    queries when multiple variables share the same STAC collection within
    a single processing batch.

    Enables STAC query reuse across variables, reducing redundant network
    calls during batch processing.

    Parameters
    ----------
    entry : DatasetEntry
        Registry entry with ``strategy="stac_cog"``. Must have non-None
        ``catalog_url`` and ``collection`` fields.
    bbox : list[float]
        Bounding box as ``[west, south, east, north]`` in EPSG:4326
        decimal degrees.

    Returns
    -------
    list[Any]
        Matching STAC items (signed if ``entry.sign`` is set). Items are
        ``pystac.Item`` objects suitable for asset access.

    Raises
    ------
    ValueError
        If ``entry.catalog_url`` or ``entry.collection`` is None.
    RuntimeError
        If no STAC items match the given collection and bounding box.

    Notes
    -----
    When ``entry.gsd`` is set, items are filtered to those matching the
    target ground sample distance (e.g., 10 m for 3DEP 1/3 arc-second).
    If no items match the GSD filter, all unfiltered items are returned
    with a warning.

    See Also
    --------
    fetch_stac_cog : Load and mosaic COG data from the returned items.
    """
    import pystac_client

    if entry.catalog_url is None:
        raise ValueError("stac_cog strategy requires 'catalog_url' on the dataset entry")
    if entry.collection is None:
        raise ValueError("stac_cog strategy requires 'collection' on the dataset entry")

    logger.info(
        "Querying STAC: catalog=%s collection=%s bbox=%s",
        entry.catalog_url,
        entry.collection,
        bbox,
    )

    modifier = None
    if entry.sign == "planetary_computer":
        import planetary_computer

        modifier = planetary_computer.sign_inplace

    client = pystac_client.Client.open(entry.catalog_url, modifier=modifier)

    search = client.search(
        collections=[entry.collection],
        bbox=bbox,
    )
    items = list(search.item_collection())
    if not items:
        raise RuntimeError(f"No STAC items found for collection='{entry.collection}' bbox={bbox}")

    # Filter by GSD if specified
    if entry.gsd is not None:
        filtered = [i for i in items if i.properties.get("gsd") == entry.gsd]
        if filtered:
            items = filtered
        else:
            logger.warning(
                "No items with gsd=%d; using %d unfiltered items",
                entry.gsd,
                len(items),
            )

    logger.info("Found %d STAC items for bbox", len(items))
    return items

fetch_stac_cog

fetch_stac_cog(
    entry: DatasetEntry,
    bbox: list[float],
    *,
    asset_key: str | None = None,
    items: list[Any] | None = None,
) -> xr.DataArray

Load Cloud Optimized GeoTIFF(s) from a STAC catalog, clipped to bbox.

Fetch one or more COG tiles from a STAC collection (e.g., 3DEP on Planetary Computer, gNATSGO rasters), clip each to the bounding box, and mosaic them into a single DataArray. This is the primary data loader for the stac_cog processing strategy.

Multi-tile mosaicing is handled automatically when a bounding box spans multiple STAC items (common at DEM tile boundaries).

PARAMETER DESCRIPTION
entry

Registry entry with strategy="stac_cog". Must have non-None catalog_url and collection fields.

TYPE: DatasetEntry

bbox

Bounding box as [west, south, east, north] in EPSG:4326 decimal degrees. Typically comes from :func:hydro_param.pipeline._buffered_bbox.

TYPE: list[float]

asset_key

Per-variable STAC asset key override. When not None, this is used instead of entry.asset_key. Necessary for collections like gnatsgo-rasters where each variable is a separate named asset (e.g., "sandtotal_r") rather than a single "data" asset.

TYPE: str or None DEFAULT: None

items

Pre-fetched STAC items from :func:query_stac_items. When provided, the STAC query is skipped entirely, saving redundant network calls when processing multiple variables from the same collection within a batch.

TYPE: list[Any] or None DEFAULT: None

RETURNS DESCRIPTION
DataArray

2-D raster data clipped to the bounding box, in the source CRS (typically EPSG:4269 for 3DEP, EPSG:5070 for gNATSGO). The band dimension is squeezed out.

RAISES DESCRIPTION
KeyError

If asset_key (or entry.asset_key) is not found in any STAC item. The error message lists available data assets.

RuntimeError

If no data remains after clipping all tiles to the bounding box.

Notes

Tiles that do not overlap the bounding box after precise clipping are silently skipped (logged at DEBUG level). This is expected when STAC search returns items whose coarse footprints overlap but whose actual data does not.

See Also

query_stac_items : Pre-query STAC items for reuse across variables. fetch_local_tiff : Load data from local GeoTIFF files.

Source code in src/hydro_param/data_access.py
def fetch_stac_cog(
    entry: DatasetEntry,
    bbox: list[float],
    *,
    asset_key: str | None = None,
    items: list[Any] | None = None,
) -> xr.DataArray:
    """Load Cloud Optimized GeoTIFF(s) from a STAC catalog, clipped to bbox.

    Fetch one or more COG tiles from a STAC collection (e.g., 3DEP on
    Planetary Computer, gNATSGO rasters), clip each to the bounding box,
    and mosaic them into a single DataArray. This is the primary data
    loader for the ``stac_cog`` processing strategy.

    Multi-tile mosaicing is handled automatically when a bounding box
    spans multiple STAC items (common at DEM tile boundaries).

    Parameters
    ----------
    entry : DatasetEntry
        Registry entry with ``strategy="stac_cog"``. Must have non-None
        ``catalog_url`` and ``collection`` fields.
    bbox : list[float]
        Bounding box as ``[west, south, east, north]`` in EPSG:4326
        decimal degrees. Typically comes from
        :func:`hydro_param.pipeline._buffered_bbox`.
    asset_key : str or None
        Per-variable STAC asset key override. When not ``None``, this is
        used instead of ``entry.asset_key``. Necessary for collections
        like ``gnatsgo-rasters`` where each variable is a separate named
        asset (e.g., ``"sandtotal_r"``) rather than a single ``"data"``
        asset.
    items : list[Any] or None
        Pre-fetched STAC items from :func:`query_stac_items`. When
        provided, the STAC query is skipped entirely, saving redundant
        network calls when processing multiple variables from the same
        collection within a batch.

    Returns
    -------
    xr.DataArray
        2-D raster data clipped to the bounding box, in the source CRS
        (typically EPSG:4269 for 3DEP, EPSG:5070 for gNATSGO). The band
        dimension is squeezed out.

    Raises
    ------
    KeyError
        If ``asset_key`` (or ``entry.asset_key``) is not found in any
        STAC item. The error message lists available data assets.
    RuntimeError
        If no data remains after clipping all tiles to the bounding box.

    Notes
    -----
    Tiles that do not overlap the bounding box after precise clipping are
    silently skipped (logged at DEBUG level). This is expected when STAC
    search returns items whose coarse footprints overlap but whose actual
    data does not.

    See Also
    --------
    query_stac_items : Pre-query STAC items for reuse across variables.
    fetch_local_tiff : Load data from local GeoTIFF files.
    """
    import rioxarray  # noqa: F401

    if items is None:
        items = query_stac_items(entry, bbox)

    # Load and mosaic tiles
    resolved_key = asset_key if asset_key is not None else entry.asset_key
    logger.debug(
        "Using asset key '%s' (override=%s, default=%s)",
        resolved_key,
        asset_key,
        entry.asset_key,
    )
    arrays = []
    for item in items:
        try:
            asset = item.assets[resolved_key]
        except KeyError:
            available = sorted(k for k, a in item.assets.items() if a.roles and "data" in a.roles)
            raise KeyError(
                f"Asset key '{resolved_key}' not found in STAC item '{item.id}' "
                f"(collection='{entry.collection}'). "
                f"Available data assets: {available}. "
                f"Check the 'asset_key' field in your dataset registry."
            ) from None
        da = cast(xr.DataArray, rioxarray.open_rasterio(asset.href, masked=True))
        da = da.squeeze("band", drop=True)
        try:
            da = da.rio.clip_box(
                minx=bbox[0], miny=bbox[1], maxx=bbox[2], maxy=bbox[3], crs="EPSG:4326"
            )
        except (rioxarray.exceptions.NoDataInBounds, ValueError):
            # Item may not overlap after precise clipping
            logger.debug("Tile %s has no data in bbox, skipping", item.id)
            continue
        if da.size > 0:
            arrays.append(da)

    if not arrays:
        raise RuntimeError(f"No data after clipping to bbox={bbox}")

    if len(arrays) == 1:
        result = arrays[0]
    else:
        from rioxarray.merge import merge_arrays

        result = merge_arrays(arrays)
        logger.info("Mosaiced %d tiles", len(arrays))

    logger.info("Loaded raster: shape=%s, crs=%s", result.shape, result.rio.crs)
    return result

fetch_local_tiff

fetch_local_tiff(
    entry: DatasetEntry,
    bbox: list[float],
    *,
    dataset_name: str = "unknown",
    variable_source: str | None = None,
) -> xr.DataArray

Load a local GeoTIFF or remote COG/VRT clipped to a bounding box.

Read a raster file referenced by variable_source (per-variable override) or entry.source (dataset-level default), clip it to the bounding box, and return a 2-D DataArray. This is the primary data loader for the local_tiff processing strategy.

Despite the name, this function supports both local file paths and remote HTTP(S) URLs (opened via GDAL vsicurl). The function name matches the strategy="local_tiff" enum value in the dataset registry schema.

Source resolution order:

  1. variable_source (per-variable override, e.g., for POLARIS where each soil property is a separate file)
  2. entry.source (dataset-level path from pipeline config)
  3. Raise ValueError with download instructions if available
PARAMETER DESCRIPTION
entry

Registry entry with strategy="local_tiff".

TYPE: DatasetEntry

bbox

Bounding box as [west, south, east, north] in EPSG:4326 decimal degrees.

TYPE: list[float]

dataset_name

Dataset name for use in error messages and download instructions.

TYPE: str DEFAULT: 'unknown'

variable_source

Per-variable source path or URL. Takes precedence over entry.source when set. Common for datasets where each variable is stored in a separate file (e.g., POLARIS soil properties).

TYPE: str or None DEFAULT: None

RETURNS DESCRIPTION
DataArray

2-D raster data clipped to the bounding box, in the source CRS. The band dimension is squeezed out.

RAISES DESCRIPTION
ValueError

If no source is available (neither variable_source nor entry.source). The error message includes download instructions when entry.download metadata is available.

FileNotFoundError

If the source is a local path that does not exist on disk.

RuntimeError

If no data remains after clipping to the bounding box, or if a remote URL fails to open.

See Also

fetch_stac_cog : Load data from STAC-cataloged COGs. save_to_geotiff : Write a DataArray back to GeoTIFF format.

Source code in src/hydro_param/data_access.py
def fetch_local_tiff(
    entry: DatasetEntry,
    bbox: list[float],
    *,
    dataset_name: str = "unknown",
    variable_source: str | None = None,
) -> xr.DataArray:
    """Load a local GeoTIFF or remote COG/VRT clipped to a bounding box.

    Read a raster file referenced by ``variable_source`` (per-variable
    override) or ``entry.source`` (dataset-level default), clip it to the
    bounding box, and return a 2-D DataArray. This is the primary data
    loader for the ``local_tiff`` processing strategy.

    Despite the name, this function supports both local file paths and
    remote HTTP(S) URLs (opened via GDAL vsicurl). The function name
    matches the ``strategy="local_tiff"`` enum value in the dataset
    registry schema.

    Source resolution order:

    1. ``variable_source`` (per-variable override, e.g., for POLARIS where
       each soil property is a separate file)
    2. ``entry.source`` (dataset-level path from pipeline config)
    3. Raise ``ValueError`` with download instructions if available

    Parameters
    ----------
    entry : DatasetEntry
        Registry entry with ``strategy="local_tiff"``.
    bbox : list[float]
        Bounding box as ``[west, south, east, north]`` in EPSG:4326
        decimal degrees.
    dataset_name : str
        Dataset name for use in error messages and download instructions.
    variable_source : str or None
        Per-variable source path or URL. Takes precedence over
        ``entry.source`` when set. Common for datasets where each
        variable is stored in a separate file (e.g., POLARIS soil
        properties).

    Returns
    -------
    xr.DataArray
        2-D raster data clipped to the bounding box, in the source CRS.
        The band dimension is squeezed out.

    Raises
    ------
    ValueError
        If no source is available (neither ``variable_source`` nor
        ``entry.source``). The error message includes download
        instructions when ``entry.download`` metadata is available.
    FileNotFoundError
        If the source is a local path that does not exist on disk.
    RuntimeError
        If no data remains after clipping to the bounding box, or if
        a remote URL fails to open.

    See Also
    --------
    fetch_stac_cog : Load data from STAC-cataloged COGs.
    save_to_geotiff : Write a DataArray back to GeoTIFF format.
    """
    import rioxarray  # noqa: F401
    from rioxarray.exceptions import NoDataInBounds

    # Resolve source: per-variable override > dataset-level
    source = variable_source if variable_source is not None else entry.source

    if source is None:
        msg = (
            f"Dataset '{dataset_name}' requires a source path or URL "
            f"(strategy: local_tiff) but neither variable_source nor entry.source is set."
        )
        if entry.download:
            if entry.download.files:
                msg += (
                    f"\n\nThis dataset has {len(entry.download.files)} "
                    f"downloadable files. Run:\n"
                    f"  hydro-param datasets info {dataset_name}"
                )
            elif entry.download.url_template:
                start, end = entry.download.year_range
                n_vars = len(entry.download.variables_available)
                msg += (
                    f"\n\nThis dataset has templated downloads "
                    f"({end - start + 1} years x {n_vars} variables). Run:\n"
                    f"  hydro-param datasets info {dataset_name}"
                )
            elif entry.download.url:
                msg += f"\n\nDownload from: {entry.download.url}"
                if entry.download.size_gb:
                    msg += f"\nExpected size: ~{entry.download.size_gb} GB"
                if entry.download.notes:
                    msg += f"\n{entry.download.notes.strip()}"
            msg += (
                f"\n\nSet 'source' in your pipeline config:\n"
                f"  datasets:\n"
                f"    - name: {dataset_name}\n"
                f"      source: /path/to/downloaded/file.tif"
            )
        raise ValueError(msg)

    if _is_remote_url(source):
        logger.info("Loading remote raster: %s bbox=%s", source, bbox)
    else:
        source_path = Path(source)
        if not source_path.exists():
            raise FileNotFoundError(f"GeoTIFF not found: {source_path}")
        logger.info("Loading local GeoTIFF: %s bbox=%s", source_path, bbox)

    try:
        da = cast(xr.DataArray, rioxarray.open_rasterio(source, masked=True))
    except Exception as exc:
        if _is_remote_url(source):
            raise RuntimeError(
                f"Failed to open remote raster for dataset '{dataset_name}': {source}\n"
                f"Original error: {exc}"
            ) from exc
        raise
    da = da.squeeze("band", drop=True)

    try:
        da = da.rio.clip_box(
            minx=bbox[0],
            miny=bbox[1],
            maxx=bbox[2],
            maxy=bbox[3],
            crs="EPSG:4326",
        )
    except (NoDataInBounds, ValueError) as exc:
        raise RuntimeError(f"No data in bbox={bbox} for {source}") from exc

    if da.size == 0:
        raise RuntimeError(f"Empty raster after clipping to bbox={bbox}")

    logger.info("Loaded raster: shape=%s, crs=%s", da.shape, da.rio.crs)
    return da

save_to_geotiff

save_to_geotiff(da: DataArray, path: Path) -> Path

Write an xarray DataArray to a GeoTIFF file.

Save raster data as a single-band GeoTIFF via rioxarray. This is used by the pipeline to cache intermediate raster results (e.g., clipped source data for a batch) before passing them to gdptools ZonalGen.

The function temporarily removes _FillValue from attributes and encoding to avoid conflicts with rioxarray's internal encoding, then restores them afterward. This avoids a full .copy() of the DataArray, which was identified as a memory bottleneck in PR #72.

PARAMETER DESCRIPTION
da

Raster data with CRS information (via rioxarray accessor).

TYPE: DataArray

path

Output file path. Parent directory must exist.

TYPE: Path

RETURNS DESCRIPTION
Path

The output path (same as input), for convenience in chaining.

Notes

Uses a sentinel-based pop/restore pattern instead of da.copy() to avoid doubling memory usage for large rasters (e.g., gNATSGO tiles at ~1.25 GB each). See PR #72 for the memory optimization rationale.

See Also

fetch_local_tiff : Load a GeoTIFF back into a DataArray. fetch_stac_cog : Load COG data from STAC.

Source code in src/hydro_param/data_access.py
def save_to_geotiff(da: xr.DataArray, path: Path) -> Path:
    """Write an xarray DataArray to a GeoTIFF file.

    Save raster data as a single-band GeoTIFF via rioxarray. This is used
    by the pipeline to cache intermediate raster results (e.g., clipped
    source data for a batch) before passing them to gdptools ZonalGen.

    The function temporarily removes ``_FillValue`` from attributes and
    encoding to avoid conflicts with rioxarray's internal encoding, then
    restores them afterward. This avoids a full ``.copy()`` of the
    DataArray, which was identified as a memory bottleneck in PR #72.

    Parameters
    ----------
    da : xr.DataArray
        Raster data with CRS information (via rioxarray accessor).
    path : Path
        Output file path. Parent directory must exist.

    Returns
    -------
    Path
        The output path (same as input), for convenience in chaining.

    Notes
    -----
    Uses a sentinel-based pop/restore pattern instead of ``da.copy()``
    to avoid doubling memory usage for large rasters (e.g., gNATSGO
    tiles at ~1.25 GB each). See PR #72 for the memory optimization
    rationale.

    See Also
    --------
    fetch_local_tiff : Load a GeoTIFF back into a DataArray.
    fetch_stac_cog : Load COG data from STAC.
    """
    import rioxarray  # noqa: F401

    # Temporarily remove _FillValue to avoid conflict with encoding,
    # then restore — avoids a full .copy() of the DataArray.
    fill_in_attrs = da.attrs.pop("_FillValue", _SENTINEL)
    fill_in_encoding = da.encoding.pop("_FillValue", _SENTINEL)
    try:
        da.rio.to_raster(path)
    finally:
        if fill_in_attrs is not _SENTINEL:
            da.attrs["_FillValue"] = fill_in_attrs
        if fill_in_encoding is not _SENTINEL:
            da.encoding["_FillValue"] = fill_in_encoding
    logger.debug("Saved GeoTIFF: %s (%s)", path, da.shape)
    return path

load_climr_catalog

load_climr_catalog(
    catalog_url: str = CLIMR_CATALOG_URL,
) -> pd.DataFrame

Load the ClimateR-Catalog as a pandas DataFrame.

Fetch and parse the ClimateR-Catalog parquet file, which indexes 1,700+ climate and environmental datasets with OPeNDAP endpoints. The catalog is used by the climr_cat processing strategy to resolve variable-level metadata for gdptools ClimRCatData.

PARAMETER DESCRIPTION
catalog_url

URL to the catalog parquet file. Defaults to the June 2024 release from the mikejohnson51/climateR-catalogs repository.

TYPE: str DEFAULT: CLIMR_CATALOG_URL

RETURNS DESCRIPTION
DataFrame

The full ClimateR catalog with columns including id, variable, URL, units, and spatial metadata.

References

.. [1] Johnson, J.M. climateR-catalogs. https://github.com/mikejohnson51/climateR-catalogs

See Also

build_climr_cat_dict : Build per-variable dicts from the loaded catalog.

Source code in src/hydro_param/data_access.py
def load_climr_catalog(
    catalog_url: str = CLIMR_CATALOG_URL,
) -> pd.DataFrame:
    """Load the ClimateR-Catalog as a pandas DataFrame.

    Fetch and parse the ClimateR-Catalog parquet file, which indexes
    1,700+ climate and environmental datasets with OPeNDAP endpoints.
    The catalog is used by the ``climr_cat`` processing strategy to
    resolve variable-level metadata for gdptools ``ClimRCatData``.

    Parameters
    ----------
    catalog_url : str
        URL to the catalog parquet file. Defaults to the June 2024
        release from the mikejohnson51/climateR-catalogs repository.

    Returns
    -------
    pd.DataFrame
        The full ClimateR catalog with columns including ``id``,
        ``variable``, ``URL``, ``units``, and spatial metadata.

    References
    ----------
    .. [1] Johnson, J.M. climateR-catalogs.
       https://github.com/mikejohnson51/climateR-catalogs

    See Also
    --------
    build_climr_cat_dict : Build per-variable dicts from the loaded catalog.
    """
    logger.info("Loading ClimateR catalog from %s", catalog_url)
    return pd.read_parquet(catalog_url)

build_climr_cat_dict

build_climr_cat_dict(
    catalog: DataFrame,
    catalog_id: str,
    variable_names: list[str],
) -> dict[str, dict[str, Any]]

Build a source_cat_dict for gdptools ClimRCatData.

Extract per-variable metadata rows from the ClimateR-Catalog and format them as the dictionary structure expected by gdptools ClimRCatData constructor. Each entry contains the OPeNDAP URL, variable name, CRS, and spatial extent needed for data access.

PARAMETER DESCRIPTION
catalog

Full ClimateR catalog loaded by :func:load_climr_catalog.

TYPE: DataFrame

catalog_id

Dataset identifier in the catalog (e.g., "gridmet" for gridMET via OPeNDAP, "daymet" for Daymet).

TYPE: str

variable_names

Variables to extract (e.g., ["pr", "tmmx"] for gridMET precipitation and max temperature).

TYPE: list[str]

RETURNS DESCRIPTION
dict[str, dict[str, Any]]

Mapping of variable name to catalog row as a dictionary. Each value contains all columns from the catalog for that variable, ready for ClimRCatData(source_cat_dict=...).

RAISES DESCRIPTION
ValueError

If catalog_id is not found in the catalog, or if any variable in variable_names is not available for the given catalog ID. The error message lists available options.

See Also

load_climr_catalog : Load the catalog DataFrame. TemporalProcessor.process_climr_cat : Use the dict for temporal processing.

Source code in src/hydro_param/data_access.py
def build_climr_cat_dict(
    catalog: pd.DataFrame,
    catalog_id: str,
    variable_names: list[str],
) -> dict[str, dict[str, Any]]:
    """Build a ``source_cat_dict`` for gdptools ``ClimRCatData``.

    Extract per-variable metadata rows from the ClimateR-Catalog and
    format them as the dictionary structure expected by gdptools
    ``ClimRCatData`` constructor. Each entry contains the OPeNDAP URL,
    variable name, CRS, and spatial extent needed for data access.

    Parameters
    ----------
    catalog : pd.DataFrame
        Full ClimateR catalog loaded by :func:`load_climr_catalog`.
    catalog_id : str
        Dataset identifier in the catalog (e.g., ``"gridmet"`` for
        gridMET via OPeNDAP, ``"daymet"`` for Daymet).
    variable_names : list[str]
        Variables to extract (e.g., ``["pr", "tmmx"]`` for gridMET
        precipitation and max temperature).

    Returns
    -------
    dict[str, dict[str, Any]]
        Mapping of variable name to catalog row as a dictionary. Each
        value contains all columns from the catalog for that variable,
        ready for ``ClimRCatData(source_cat_dict=...)``.

    Raises
    ------
    ValueError
        If ``catalog_id`` is not found in the catalog, or if any
        variable in ``variable_names`` is not available for the given
        catalog ID. The error message lists available options.

    See Also
    --------
    load_climr_catalog : Load the catalog DataFrame.
    TemporalProcessor.process_climr_cat : Use the dict for temporal processing.
    """
    if catalog_id not in catalog["id"].values:
        available_ids = sorted(catalog["id"].unique())
        raise ValueError(
            f"Catalog ID '{catalog_id}' not found in ClimateR catalog. Available: {available_ids}"
        )

    source_cat_dict: dict[str, dict[str, Any]] = {}
    for var_name in variable_names:
        matches = catalog[(catalog["id"] == catalog_id) & (catalog["variable"] == var_name)]
        if matches.empty:
            available = sorted(catalog.loc[catalog["id"] == catalog_id, "variable"].unique())
            raise ValueError(
                f"Variable '{var_name}' not found in ClimateR catalog for '{catalog_id}'. "
                f"Available: {available}"
            )
        source_cat_dict[var_name] = matches.iloc[0].to_dict()
    return source_cat_dict