Skip to content

pipeline

pipeline

Pipeline orchestrator: execute the config-driven parameterization stages 1--5.

Implement the 5-stage pipeline from design.md section 4:

  1. Resolve Target Fabric -- load the polygon mesh and apply optional domain filtering (bbox clip).
  2. Resolve Source Datasets -- match dataset names in the pipeline config to registry entries and resolve variable specifications.
  3. Compute/Load Weights -- handled internally by gdptools (no explicit stage function; gdptools ZonalGen computes coverage weights on the fly).
  4. Process Datasets -- spatial batching loop for static datasets (ZonalGen) and full-fabric temporal processing (WeightGen + AggGen). Per-variable and temporal output files are written incrementally as each dataset completes.
  5. Normalize SIR -- canonical variable naming, unit conversion, and validation. Produces the Standardized Internal Representation consumed by model plugins.

A lazy :meth:PipelineResult.load_sir method assembles a combined xr.Dataset on demand for downstream consumers. Phase 2 model plugins (e.g., pywatershed) consume SIR files from disk via SIRAccessor rather than PipelineResult.load_sir.

This module is model-agnostic by design -- all model-specific logic (unit conversions, variable renaming, derived math, output formatting) lives in model plugins under derivations/ and formatters/.

See Also

design.md : Full architecture document (section 4: pipeline stages, section 11: MVP implementation). hydro_param.config : Pydantic config schema consumed by every stage. hydro_param.sir : SIR normalization and validation utilities. hydro_param.sir_accessor : Lazy SIR loader used by Phase 2 plugins.

USER_REGISTRY_DIR module-attribute

USER_REGISTRY_DIR = home() / '.hydro-param' / 'datasets'

User-local registry overlay directory (~/.hydro-param/datasets/).

YAML files in this directory extend the bundled dataset registry. Overlay entries replace bundled entries on name collision.

Stage4Results dataclass

Stage4Results(
    static_files: dict[str, Path] = dict(),
    temporal_files: dict[str, Path] = dict(),
    categories: dict[str, str] = dict(),
)

Collect file paths and category metadata from stage 4 processing.

Stage 4 writes per-variable CSV files (static datasets) and per-year NetCDF/Parquet files (temporal datasets) incrementally. This dataclass aggregates the paths so that stage 5 can locate and normalize them.

ATTRIBUTE DESCRIPTION
static_files

Mapping of result key (e.g., "elevation" or "land_cover_2021") to the CSV file path written during stage 4.

TYPE: dict[str, Path]

temporal_files

Mapping of result key (e.g., "gridmet_2020") to the NetCDF or Parquet file path written during stage 4.

TYPE: dict[str, Path]

categories

Mapping of result key to its dataset category (e.g., "topography"), used for organizing output into subdirectories.

TYPE: dict[str, str]

PipelineResult dataclass

PipelineResult(
    output_dir: Path,
    static_files: dict[str, Path] = dict(),
    temporal_files: dict[str, Path] = dict(),
    categories: dict[str, str] = dict(),
    fabric: GeoDataFrame | None = None,
    sir_files: dict[str, Path] = dict(),
    sir_schema: list[SIRVariableSchema] = list(),
    sir_warnings: list[SIRValidationWarning] = list(),
)

Encapsulate all pipeline outputs with lazy SIR loading.

Per-variable and temporal files are written incrementally during stage 4. Stage 5 normalizes them into SIR files. Use :meth:load_sir to assemble a combined xr.Dataset on demand rather than holding all data in memory.

ATTRIBUTE DESCRIPTION
output_dir

Root output directory (same as config.output.path).

TYPE: Path

static_files

Raw (pre-normalization) per-variable CSV paths from stage 4.

TYPE: dict[str, Path]

temporal_files

Raw temporal NetCDF/Parquet paths from stage 4.

TYPE: dict[str, Path]

categories

Result key to dataset category mapping.

TYPE: dict[str, str]

fabric

Target fabric with batch_id column, retained for downstream consumers (e.g., model plugins that need geometry or topology).

TYPE: GeoDataFrame or None

sir_files

Normalized SIR file paths from stage 5.

TYPE: dict[str, Path]

sir_schema

Schema entries describing each SIR variable (canonical name, units, source dataset, statistic).

TYPE: list[SIRVariableSchema]

sir_warnings

Validation warnings from stage 5 SIR validation.

TYPE: list[SIRValidationWarning]

load_sir

load_sir() -> xr.Dataset

Load normalized SIR files into a combined xr.Dataset.

Assemble all per-variable SIR CSV files into a single xr.Dataset with the fabric id_field as the dimension.

RETURNS DESCRIPTION
Dataset

Combined dataset with one data variable per SIR variable. Returns an empty dataset if no SIR files are available.

Source code in src/hydro_param/pipeline.py
def load_sir(self) -> xr.Dataset:
    """Load normalized SIR files into a combined xr.Dataset.

    Assemble all per-variable SIR CSV files into a single
    ``xr.Dataset`` with the fabric ``id_field`` as the dimension.

    Returns
    -------
    xr.Dataset
        Combined dataset with one data variable per SIR variable.
        Returns an empty dataset if no SIR files are available.
    """
    if not self.sir_files:
        logger.warning("No SIR files available — returning empty dataset")
        return xr.Dataset()
    dfs = [pd.read_csv(p, index_col=0) for p in self.sir_files.values()]
    combined = pd.concat(dfs, axis=1)
    return xr.Dataset.from_dataframe(combined)

load_raw_sir

load_raw_sir() -> xr.Dataset

Load raw (pre-normalization) static files into a combined xr.Dataset.

Unlike :meth:load_sir, this always uses the stage 4 raw CSV files, bypassing SIR normalization. Useful for debugging or inspecting source-native variable names and units.

RETURNS DESCRIPTION
Dataset

Combined dataset from raw static files, or an empty dataset if no static files exist.

Source code in src/hydro_param/pipeline.py
def load_raw_sir(self) -> xr.Dataset:
    """Load raw (pre-normalization) static files into a combined xr.Dataset.

    Unlike :meth:`load_sir`, this always uses the stage 4 raw CSV files,
    bypassing SIR normalization.  Useful for debugging or inspecting
    source-native variable names and units.

    Returns
    -------
    xr.Dataset
        Combined dataset from raw static files, or an empty dataset if
        no static files exist.
    """
    if not self.static_files:
        return xr.Dataset()
    dfs = [pd.read_csv(p, index_col=0) for p in self.static_files.values()]
    combined = pd.concat(dfs, axis=1)
    return xr.Dataset.from_dataframe(combined)

resolve_bbox

resolve_bbox(config: PipelineConfig) -> list[float]

Extract the domain bounding box from the pipeline config.

PARAMETER DESCRIPTION
config

Pipeline configuration containing an optional domain section.

TYPE: PipelineConfig

RETURNS DESCRIPTION
list[float]

Bounding box as [west, south, east, north] in EPSG:4326 (decimal degrees).

RAISES DESCRIPTION
ValueError

If no domain is configured (the caller should use the fabric bounding box directly instead).

NotImplementedError

If the domain type is not "bbox" (HUC/gage extraction is planned but not yet implemented).

Source code in src/hydro_param/pipeline.py
def resolve_bbox(config: PipelineConfig) -> list[float]:
    """Extract the domain bounding box from the pipeline config.

    Parameters
    ----------
    config : PipelineConfig
        Pipeline configuration containing an optional ``domain`` section.

    Returns
    -------
    list[float]
        Bounding box as ``[west, south, east, north]`` in EPSG:4326
        (decimal degrees).

    Raises
    ------
    ValueError
        If no domain is configured (the caller should use the fabric
        bounding box directly instead).
    NotImplementedError
        If the domain type is not ``"bbox"`` (HUC/gage extraction is
        planned but not yet implemented).
    """
    if config.domain is None:
        raise ValueError(
            "No domain configured. When domain is omitted, the pipeline uses "
            "the fabric bounding box automatically."
        )
    if config.domain.type == "bbox":
        return config.domain.bbox  # type: ignore[return-value]
    raise NotImplementedError(
        f"Domain type '{config.domain.type}' is not yet supported. Use type='bbox'."
    )

stage1_resolve_fabric

stage1_resolve_fabric(
    config: PipelineConfig,
) -> gpd.GeoDataFrame

Stage 1: Load the target fabric and apply optional domain filtering.

Read the geospatial file specified by config.target_fabric.path, validate that the id_field column exists, and optionally clip the fabric to a bounding box domain. The domain bbox is assumed to be in EPSG:4326 and is reprojected to the fabric CRS if they differ.

PARAMETER DESCRIPTION
config

Pipeline configuration with target_fabric and optional domain sections.

TYPE: PipelineConfig

RETURNS DESCRIPTION
GeoDataFrame

Target fabric, potentially spatially subsetted by the domain bbox.

RAISES DESCRIPTION
FileNotFoundError

If the fabric file does not exist on disk.

ValueError

If the id_field is not found in the fabric columns, or if domain filtering produces an empty result.

Source code in src/hydro_param/pipeline.py
def stage1_resolve_fabric(config: PipelineConfig) -> gpd.GeoDataFrame:
    """Stage 1: Load the target fabric and apply optional domain filtering.

    Read the geospatial file specified by ``config.target_fabric.path``,
    validate that the ``id_field`` column exists, and optionally clip the
    fabric to a bounding box domain.  The domain bbox is assumed to be in
    EPSG:4326 and is reprojected to the fabric CRS if they differ.

    Parameters
    ----------
    config : PipelineConfig
        Pipeline configuration with ``target_fabric`` and optional
        ``domain`` sections.

    Returns
    -------
    gpd.GeoDataFrame
        Target fabric, potentially spatially subsetted by the domain bbox.

    Raises
    ------
    FileNotFoundError
        If the fabric file does not exist on disk.
    ValueError
        If the ``id_field`` is not found in the fabric columns, or if
        domain filtering produces an empty result.
    """
    logger.info("Stage 1: Loading target fabric from %s", config.target_fabric.path)
    fabric_path = config.target_fabric.path
    if not fabric_path.exists():
        raise FileNotFoundError(
            f"Target fabric not found: {fabric_path}\n"
            f"Download or copy the fabric file before running the pipeline. "
            f"See 'hydro-param init --help' for project setup."
        )
    fabric = gpd.read_file(fabric_path)

    if config.target_fabric.id_field not in fabric.columns:
        raise ValueError(
            f"ID field '{config.target_fabric.id_field}' not found in fabric. "
            f"Available columns: {list(fabric.columns)}"
        )

    logger.info(
        "Loaded %d features, id_field='%s', CRS=%s",
        len(fabric),
        config.target_fabric.id_field,
        fabric.crs,
    )

    # Apply domain filter to spatially subset fabric (optional)
    if (
        config.domain is not None
        and config.domain.type == "bbox"
        and config.domain.bbox is not None
    ):
        from shapely.geometry import box

        # Config bbox is assumed EPSG:4326; reproject if fabric CRS differs.
        bbox_geom = box(*config.domain.bbox)
        bbox_gdf = gpd.GeoDataFrame(index=[0], geometry=[bbox_geom], crs="EPSG:4326")
        if fabric.crs is not None and fabric.crs != bbox_gdf.crs:
            bbox_gdf = bbox_gdf.to_crs(fabric.crs)
        fabric = gpd.clip(fabric, bbox_gdf)
        logger.info("Domain bbox filter: %d features within bbox", len(fabric))
        if fabric.empty:
            raise ValueError("No features found within the specified domain bbox.")

    return fabric

stage2_resolve_datasets

stage2_resolve_datasets(
    config: PipelineConfig, registry: DatasetRegistry
) -> list[
    tuple[
        DatasetEntry, DatasetRequest, list[AnyVariableSpec]
    ]
]

Stage 2: Resolve dataset names to registry entries and variable specs.

For each :class:~hydro_param.config.DatasetRequest in the pipeline config, look up the corresponding :class:~hydro_param.dataset_registry.DatasetEntry in the registry. Apply source overrides from the pipeline config, validate strategy-specific requirements (e.g., local_tiff needs a source path, temporal datasets need a time_period), and resolve each requested variable name to its full specification.

PARAMETER DESCRIPTION
config

Pipeline configuration containing the datasets dict (keyed by category).

TYPE: PipelineConfig

registry

Dataset registry mapping names to entries and variable specs.

TYPE: DatasetRegistry

RETURNS DESCRIPTION
list[tuple[DatasetEntry, DatasetRequest, list[...]]]

One tuple per dataset: the registry entry, the pipeline request, and the resolved variable specifications (VariableSpec, DerivedVariableSpec, DerivedCategoricalSpec, or DerivedContinuousSpec).

RAISES DESCRIPTION
ValueError

If a local_tiff dataset has no source path (dataset-level or per-variable), if a temporal dataset has no time_period, or if the requested time_period or year falls outside the dataset's year_range.

KeyError

If a dataset name is not found in the registry (raised by registry.get()).

Source code in src/hydro_param/pipeline.py
def stage2_resolve_datasets(
    config: PipelineConfig,
    registry: DatasetRegistry,
) -> list[
    tuple[
        DatasetEntry,
        DatasetRequest,
        list[AnyVariableSpec],
    ]
]:
    """Stage 2: Resolve dataset names to registry entries and variable specs.

    For each :class:`~hydro_param.config.DatasetRequest` in the pipeline
    config, look up the corresponding :class:`~hydro_param.dataset_registry.DatasetEntry`
    in the registry.  Apply source overrides from the pipeline config,
    validate strategy-specific requirements (e.g., ``local_tiff`` needs a
    source path, temporal datasets need a ``time_period``), and resolve
    each requested variable name to its full specification.

    Parameters
    ----------
    config : PipelineConfig
        Pipeline configuration containing the ``datasets`` dict (keyed by category).
    registry : DatasetRegistry
        Dataset registry mapping names to entries and variable specs.

    Returns
    -------
    list[tuple[DatasetEntry, DatasetRequest, list[...]]]
        One tuple per dataset: the registry entry, the pipeline request,
        and the resolved variable specifications (VariableSpec,
        DerivedVariableSpec, DerivedCategoricalSpec, or DerivedContinuousSpec).

    Raises
    ------
    ValueError
        If a ``local_tiff`` dataset has no source path (dataset-level or
        per-variable), if a temporal dataset has no ``time_period``, or if
        the requested ``time_period`` or ``year`` falls outside the
        dataset's ``year_range``.
    KeyError
        If a dataset name is not found in the registry (raised by
        ``registry.get()``).
    """
    flat_datasets = config.flatten_datasets()
    logger.info("Stage 2: Resolving %d datasets from registry", len(flat_datasets))

    # Build dataset-name → config-category lookup for cross-validation
    ds_category_map: dict[str, str] = {}
    for category_key, ds_list in config.datasets.items():
        for _ds_req in ds_list:
            if _ds_req.name in ds_category_map:
                raise ValueError(
                    f"Dataset '{_ds_req.name}' appears in multiple categories: "
                    f"'{ds_category_map[_ds_req.name]}' and '{category_key}'. "
                    f"Each dataset should appear in exactly one category."
                )
            ds_category_map[_ds_req.name] = category_key

    resolved = []
    for ds_req in flat_datasets:
        entry = registry.get(ds_req.name)

        # Cross-validate config category vs registry category
        config_cat = ds_category_map[ds_req.name]
        if not entry.category:
            logger.debug(
                "Skipping category cross-validation for '%s': no category set in registry",
                ds_req.name,
            )
        elif entry.category != config_cat:
            logger.warning(
                "Category mismatch for dataset '%s': config key is '%s' "
                "but registry category is '%s'",
                ds_req.name,
                config_cat,
                entry.category,
            )

        # Apply pipeline config source override
        if ds_req.source is not None:
            entry = entry.model_copy(update={"source": str(ds_req.source)})

        # Validate: local_tiff datasets must have a source (dataset-level or per-variable)
        if entry.strategy == "local_tiff" and entry.source is None:
            # Check if all requested variables have per-variable source overrides
            requested_var_specs = [
                registry.resolve_variable(ds_req.name, v) for v in ds_req.variables
            ]
            all_vars_have_source = all(
                isinstance(vs, DerivedCategoricalSpec | DerivedContinuousSpec)
                or (isinstance(vs, VariableSpec) and vs.source_override is not None)
                for vs in requested_var_specs
            )
            if not all_vars_have_source:
                msg = (
                    f"Dataset '{ds_req.name}' requires a local file "
                    f"(strategy: local_tiff) but no 'source' path is set."
                )
                if entry.download:
                    if entry.download.files:
                        msg += (
                            f"\n\nThis dataset has {len(entry.download.files)} "
                            f"downloadable files. Run:\n"
                            f"  hydro-param datasets info {ds_req.name}"
                        )
                    elif entry.download.url_template:
                        start, end = entry.download.year_range
                        n_vars = len(entry.download.variables_available)
                        msg += (
                            f"\n\nThis dataset has templated downloads "
                            f"({end - start + 1} years x {n_vars} variables). Run:\n"
                            f"  hydro-param datasets info {ds_req.name}"
                        )
                    elif entry.download.url:
                        msg += f"\n\nDownload from: {entry.download.url}"
                        if entry.download.size_gb:
                            msg += f"\nExpected size: ~{entry.download.size_gb} GB"
                        if entry.download.format:
                            msg += f"\nFormat: {entry.download.format}"
                        if entry.download.notes:
                            msg += f"\n{entry.download.notes.strip()}"
                msg += (
                    f"\n\nThen set 'source' in your pipeline config:\n"
                    f"  datasets:\n"
                    f"    {config_cat}:\n"
                    f"      - name: {ds_req.name}\n"
                    f"        source: /path/to/downloaded/file.tif"
                )
                raise ValueError(msg)

        # Validate: temporal datasets require time_period
        if entry.temporal and ds_req.time_period is None:
            raise ValueError(
                f"Dataset '{ds_req.name}' is temporal but no 'time_period' specified. "
                f"Add time_period: ['YYYY-MM-DD', 'YYYY-MM-DD'] to your pipeline config."
            )

        # Validate time range against dataset availability
        _validate_time_range(ds_req, entry)

        # Auto-include source variables needed by derived categorical/continuous specs
        requested = set(ds_req.variables)
        extra_sources: list[str] = []
        for vname in ds_req.variables:
            spec = registry.resolve_variable(ds_req.name, vname)
            if isinstance(spec, DerivedCategoricalSpec | DerivedContinuousSpec):
                for src in spec.sources:
                    if src not in requested and src not in extra_sources:
                        extra_sources.append(src)
        if extra_sources:
            logger.info(
                "  Auto-including source variables for derived specs: %s",
                extra_sources,
            )

        all_var_names = extra_sources + list(ds_req.variables)
        var_specs = [registry.resolve_variable(ds_req.name, v) for v in all_var_names]
        resolved.append((entry, ds_req, var_specs))
        logger.info(
            "  %s (%s): %d variables — %s",
            ds_req.name,
            entry.strategy,
            len(var_specs),
            [v.name for v in var_specs],
        )
    return resolved

stage4_process

stage4_process(
    fabric: GeoDataFrame,
    resolved: list[
        tuple[
            DatasetEntry,
            DatasetRequest,
            list[AnyVariableSpec],
        ]
    ],
    config: PipelineConfig,
) -> Stage4Results

Stage 4: Process all datasets with spatial batching and incremental writes.

Iterate over each resolved dataset. Static datasets are processed through the spatial batch loop (KD-tree batches, per-batch GeoTIFF fetch, gdptools ZonalGen). Temporal datasets skip batching and are processed full-fabric via WeightGen + AggGen.

Per-variable CSV files (static) and per-year NetCDF/Parquet files (temporal) are written incrementally as each dataset completes, reducing peak memory usage compared to accumulating all results.

Resume support is provided via a manifest that records dataset fingerprints and output paths. When config.processing.resume is True, datasets whose outputs are already current are skipped.

PARAMETER DESCRIPTION
fabric

Target fabric with a batch_id column added by :func:~hydro_param.batching.spatial_batch.

TYPE: GeoDataFrame

resolved

Resolved dataset entries from :func:stage2_resolve_datasets.

TYPE: list[tuple[DatasetEntry, DatasetRequest, list[...]]]

config

Pipeline configuration (output path, batch size, resume flag, etc.).

TYPE: PipelineConfig

RETURNS DESCRIPTION
Stage4Results

Aggregated file paths and category metadata for all processed datasets.

RAISES DESCRIPTION
ValueError

If duplicate result keys are detected across datasets or years (indicates a config collision).

Notes

Multi-year static datasets (year: [2020, 2021]) produce year-suffixed result keys (e.g., "land_cover_2020"). Temporal datasets are split into per-calendar-year chunks via :func:_split_time_period_by_year.

Source code in src/hydro_param/pipeline.py
def stage4_process(
    fabric: gpd.GeoDataFrame,
    resolved: list[
        tuple[
            DatasetEntry,
            DatasetRequest,
            list[AnyVariableSpec],
        ]
    ],
    config: PipelineConfig,
) -> Stage4Results:
    """Stage 4: Process all datasets with spatial batching and incremental writes.

    Iterate over each resolved dataset.  Static datasets are processed
    through the spatial batch loop (KD-tree batches, per-batch GeoTIFF
    fetch, gdptools ``ZonalGen``).  Temporal datasets skip batching and
    are processed full-fabric via ``WeightGen`` + ``AggGen``.

    Per-variable CSV files (static) and per-year NetCDF/Parquet files
    (temporal) are written incrementally as each dataset completes,
    reducing peak memory usage compared to accumulating all results.

    Resume support is provided via a manifest that records dataset
    fingerprints and output paths.  When ``config.processing.resume`` is
    ``True``, datasets whose outputs are already current are skipped.

    Parameters
    ----------
    fabric : gpd.GeoDataFrame
        Target fabric with a ``batch_id`` column added by
        :func:`~hydro_param.batching.spatial_batch`.
    resolved : list[tuple[DatasetEntry, DatasetRequest, list[...]]]
        Resolved dataset entries from :func:`stage2_resolve_datasets`.
    config : PipelineConfig
        Pipeline configuration (output path, batch size, resume
        flag, etc.).

    Returns
    -------
    Stage4Results
        Aggregated file paths and category metadata for all processed
        datasets.

    Raises
    ------
    ValueError
        If duplicate result keys are detected across datasets or years
        (indicates a config collision).

    Notes
    -----
    Multi-year static datasets (``year: [2020, 2021]``) produce
    year-suffixed result keys (e.g., ``"land_cover_2020"``).  Temporal
    datasets are split into per-calendar-year chunks via
    :func:`_split_time_period_by_year`.
    """
    batch_ids = sorted(fabric["batch_id"].unique())
    logger.info("Stage 4: Processing %d datasets across %d batches", len(resolved), len(batch_ids))

    id_field = config.target_fabric.id_field
    feature_ids = fabric[id_field].values

    # Ensure output directory exists for incremental writes
    config.output.path.mkdir(parents=True, exist_ok=True)

    # Always create manifest for resume support.
    # The resume flag controls whether completed datasets are *skipped*,
    # not whether the manifest is *written*.
    fab_fp = _manifest_mod.fabric_fingerprint(config)
    manifest = _manifest_mod.PipelineManifest(fabric_fingerprint=fab_fp)

    if config.processing.resume:
        existing = _manifest_mod.load_manifest(config.output.path)
        if existing is not None and existing.is_fabric_current(fab_fp):
            manifest = existing  # Preserve entries for skip checks
        elif existing is not None:
            logger.warning(
                "Fabric fingerprint changed — reprocessing all datasets (old=%s, new=%s)",
                existing.fabric_fingerprint,
                fab_fp,
            )

    static_files: dict[str, Path] = {}
    temporal_files: dict[str, Path] = {}
    categories: dict[str, str] = {}

    for ds_idx, (entry, ds_req, var_specs) in enumerate(resolved, 1):
        category = entry.category or "uncategorized"
        var_names = [v.name for v in var_specs]

        ds_fp = _manifest_mod.dataset_fingerprint(ds_req, entry, var_specs, config.processing)

        # Resume: check if this dataset can be skipped
        if config.processing.resume:
            if manifest.is_dataset_current(ds_req.name, ds_fp, config.output.path):
                cached = manifest.entries[ds_req.name]
                for k, rel in cached.static_files.items():
                    static_files[k] = config.output.path / rel
                    categories[k] = category
                for k, rel in cached.temporal_files.items():
                    temporal_files[k] = config.output.path / rel
                    categories[k] = category
                logger.info(
                    "Dataset %d/%d: %s — skipped (outputs current)",
                    ds_idx,
                    len(resolved),
                    ds_req.name,
                )
                continue

        if entry.temporal:
            # Split temporal processing by year to keep files manageable
            year_chunks = _split_time_period_by_year(cast(list[str], ds_req.time_period))
            logger.info(
                "Dataset %d/%d: %s [%s, temporal] vars=%s period=%s (%d year chunks)",
                ds_idx,
                len(resolved),
                ds_req.name,
                entry.strategy,
                var_names,
                ds_req.time_period,
                len(year_chunks),
            )
            t_ds = time.perf_counter()

            # Track this dataset's temporal files explicitly
            ds_temporal_files: dict[str, Path] = {}

            for chunk_period in year_chunks:
                chunk_year = chunk_period[0][:4]
                t_chunk = time.perf_counter()
                chunk_req = ds_req.model_copy(update={"time_period": chunk_period})
                ds = _process_temporal(fabric, entry, chunk_req, var_specs, config)
                result_key = f"{ds_req.name}_{chunk_year}"
                categories[result_key] = category
                logger.info(
                    "  %s year %s: %d vars, %d time steps (%.1fs)",
                    ds_req.name,
                    chunk_year,
                    len(ds.data_vars),
                    ds.sizes.get("time", 0),
                    time.perf_counter() - t_chunk,
                )
                # Write temporal file immediately after each year chunk
                temporal_files[result_key] = _write_temporal_file(result_key, ds, category, config)
                ds_temporal_files[result_key] = temporal_files[result_key]

            logger.info("  %s complete (%.1fs)", ds_req.name, time.perf_counter() - t_ds)

            # Update manifest after temporal dataset completes
            _save_manifest(manifest, ds_req.name, ds_fp, {}, ds_temporal_files, config.output.path)

            continue

        # Expand years: list → iterate, bare int → [int], None → [None]
        if isinstance(ds_req.year, list):
            years: list[int | None] = list(ds_req.year)
        elif ds_req.year is not None:
            years = [ds_req.year]
        else:
            years = [None]

        year_label = years if len(years) > 1 else (years[0] if years[0] is not None else "none")
        logger.info(
            "Dataset %d/%d: %s [%s, static] vars=%s year=%s",
            ds_idx,
            len(resolved),
            ds_req.name,
            entry.strategy,
            var_names,
            year_label,
        )
        t_ds = time.perf_counter()

        # Collect batch results for this dataset only
        ds_batch_results: dict[str, list[pd.DataFrame]] = {}

        for year in years:
            # Create single-year request for _process_batch
            year_req = ds_req.model_copy(update={"year": year})

            for batch_id in batch_ids:
                batch = fabric[fabric["batch_id"] == batch_id]
                t_batch = time.perf_counter()

                with tempfile.TemporaryDirectory(prefix="hydro_param_") as tmp:
                    work_dir = Path(tmp)
                    batch_results = _process_batch(
                        batch, entry, year_req, var_specs, config, work_dir
                    )

                logger.info(
                    "  Batch %d/%d: %d features, year=%s (%.1fs)",
                    batch_id + 1,
                    len(batch_ids),
                    len(batch),
                    year,
                    time.perf_counter() - t_batch,
                )

                for var_name, df in batch_results.items():
                    # Year-suffix result keys when multiple years are specified
                    result_key = f"{var_name}_{year}" if year is not None else var_name
                    ds_batch_results.setdefault(result_key, []).append(df)

            # Track categories with year-suffixed keys
            for var_spec in var_specs:
                result_key = f"{var_spec.name}_{year}" if year is not None else var_spec.name
                categories[result_key] = category

        # Merge and write per-variable files immediately after this dataset completes
        for var_key, dfs in ds_batch_results.items():
            if var_key in static_files:
                raise ValueError(
                    f"Duplicate static result key '{var_key}' from dataset "
                    f"'{ds_req.name}'. Overlapping variable names across "
                    f"datasets or years; adjust your configuration to avoid collisions."
                )
            merged_df = pd.concat(dfs)
            category = categories.get(var_key, "uncategorized")
            static_files[var_key] = _write_variable_file(
                var_key, merged_df, category, config, feature_ids
            )
            logger.info("  Merged %s: %d total features", var_key, len(merged_df))

        logger.info("  %s complete (%.1fs)", ds_req.name, time.perf_counter() - t_ds)

        # Update manifest after static dataset completes
        ds_static: dict[str, Path] = {k: static_files[k] for k in ds_batch_results}
        _save_manifest(manifest, ds_req.name, ds_fp, ds_static, {}, config.output.path)

    # Safety save: ensures manifest is on disk even if a per-dataset save was interrupted
    _save_manifest_to_disk(manifest, config.output.path)

    return Stage4Results(
        static_files=static_files, temporal_files=temporal_files, categories=categories
    )

stage5_normalize_sir

stage5_normalize_sir(
    stage4: Stage4Results,
    resolved: list[
        tuple[
            DatasetEntry,
            DatasetRequest,
            list[AnyVariableSpec],
        ]
    ],
    config: PipelineConfig,
) -> tuple[
    dict[str, Path],
    list[SIRVariableSchema],
    list[SIRValidationWarning],
    _manifest_mod.SIRManifestEntry,
]

Stage 5: Normalize raw stage 4 output to canonical SIR format.

Build a SIR schema from the resolved datasets, then normalize each raw per-variable CSV into a canonical SIR file with standardized variable names and units. Temporal files are also normalized. Finally, validate the SIR files against the schema.

The SIR (Standardized Internal Representation) is the contract between the generic pipeline and model plugins -- plugins consume SIR files with predictable names and units, never raw source output.

PARAMETER DESCRIPTION
stage4

Stage 4 results containing raw per-variable file paths.

TYPE: Stage4Results

resolved

Resolved dataset entries from :func:stage2_resolve_datasets, used to build the SIR schema.

TYPE: list[tuple[DatasetEntry, DatasetRequest, list[...]]]

config

Pipeline configuration (output path, id_field, validation mode).

TYPE: PipelineConfig

RETURNS DESCRIPTION
tuple[dict[str, Path], list[SIRVariableSchema], list[SIRValidationWarning], SIRManifestEntry]
  • Normalized SIR file paths (sir/ subdirectory)
  • Schema entries describing each SIR variable
  • Validation warnings (empty if all checks pass)
  • SIR manifest entry for persisting to the pipeline manifest
RAISES DESCRIPTION
SIRValidationError

If config.processing.sir_validation == "strict" and any validation warnings are found.

See Also

hydro_param.sir.normalize_sir : Per-file normalization logic. hydro_param.sir.validate_sir : SIR validation checks.

Source code in src/hydro_param/pipeline.py
def stage5_normalize_sir(
    stage4: Stage4Results,
    resolved: list[
        tuple[
            DatasetEntry,
            DatasetRequest,
            list[AnyVariableSpec],
        ]
    ],
    config: PipelineConfig,
) -> tuple[
    dict[str, Path],
    list[SIRVariableSchema],
    list[SIRValidationWarning],
    _manifest_mod.SIRManifestEntry,
]:
    """Stage 5: Normalize raw stage 4 output to canonical SIR format.

    Build a SIR schema from the resolved datasets, then normalize each
    raw per-variable CSV into a canonical SIR file with standardized
    variable names and units.  Temporal files are also normalized.
    Finally, validate the SIR files against the schema.

    The SIR (Standardized Internal Representation) is the contract
    between the generic pipeline and model plugins -- plugins consume
    SIR files with predictable names and units, never raw source output.

    Parameters
    ----------
    stage4 : Stage4Results
        Stage 4 results containing raw per-variable file paths.
    resolved : list[tuple[DatasetEntry, DatasetRequest, list[...]]]
        Resolved dataset entries from :func:`stage2_resolve_datasets`,
        used to build the SIR schema.
    config : PipelineConfig
        Pipeline configuration (output path, id_field, validation mode).

    Returns
    -------
    tuple[dict[str, Path], list[SIRVariableSchema], list[SIRValidationWarning], SIRManifestEntry]
        - Normalized SIR file paths (``sir/`` subdirectory)
        - Schema entries describing each SIR variable
        - Validation warnings (empty if all checks pass)
        - SIR manifest entry for persisting to the pipeline manifest

    Raises
    ------
    SIRValidationError
        If ``config.processing.sir_validation == "strict"`` and any
        validation warnings are found.

    See Also
    --------
    hydro_param.sir.normalize_sir : Per-file normalization logic.
    hydro_param.sir.validate_sir : SIR validation checks.
    """
    logger.info("Stage 5: SIR normalization")
    schema = build_sir_schema(resolved)
    logger.info("  SIR schema: %d variables", len(schema))

    sir_dir = config.output.path / "sir"
    sir_files = normalize_sir(
        raw_files=stage4.static_files,
        schema=schema,
        output_dir=sir_dir,
        id_field=config.target_fabric.id_field,
    )
    logger.info("  Normalized %d SIR files → %s", len(sir_files), sir_dir)

    # Normalize temporal files
    if stage4.temporal_files:
        temporal_sir = normalize_sir_temporal(
            temporal_files=stage4.temporal_files,
            schema=schema,
            resolved=resolved,
            output_dir=sir_dir,
        )
        sir_files.update(temporal_sir)
        logger.info("  Normalized %d temporal SIR files", len(temporal_sir))

    strict = config.processing.sir_validation == "strict"
    warnings = validate_sir(sir_files, schema, strict=strict)
    if warnings:
        logger.warning("  SIR validation: %d warnings", len(warnings))
        for w in warnings:
            logger.warning("    [%s] %s: %s", w.check_type, w.variable, w.message)
    else:
        logger.info("  SIR validation: passed")

    # Build SIR manifest entry for Phase 2 discovery
    output_path = config.output.path
    sir_manifest = _manifest_mod.SIRManifestEntry(
        static_files={
            k: str(v.relative_to(output_path))
            for k, v in sir_files.items()
            if str(v).endswith(".csv")
        },
        temporal_files={
            k: str(v.relative_to(output_path))
            for k, v in sir_files.items()
            if str(v).endswith(".nc")
        },
        sir_schema=[
            _manifest_mod.SIRSchemaEntry(
                name=s.canonical_name,
                units=s.canonical_units,
                statistic="categorical" if s.categorical else "continuous",
                source_dataset=s.dataset_name,
            )
            for s in schema
        ],
        completed_at=datetime.now(timezone.utc),
    )

    return sir_files, schema, warnings, sir_manifest

run_pipeline_from_config

run_pipeline_from_config(
    config: PipelineConfig, registry: DatasetRegistry
) -> PipelineResult

Execute the full 5-stage pipeline from pre-loaded config and registry.

Orchestrate all pipeline stages in sequence: resolve fabric (stage 1), resolve datasets (stage 2), process datasets with spatial batching (stage 4), and normalize to SIR (stage 5). Stage 3 (weights) is handled internally by gdptools during stage 4.

GDAL HTTP timeout environment variables are set from config.processing.network_timeout for the duration of the pipeline and restored afterward.

PARAMETER DESCRIPTION
config

Validated pipeline configuration.

TYPE: PipelineConfig

registry

Dataset registry for resolving dataset names to entries.

TYPE: DatasetRegistry

RETURNS DESCRIPTION
PipelineResult

Complete pipeline output including file paths for static and temporal outputs, the target fabric, normalized SIR files, schema, and validation warnings. Use :meth:PipelineResult.load_sir to assemble a combined xr.Dataset on demand.

See Also

run_pipeline : Convenience wrapper that loads config and registry from paths.

Source code in src/hydro_param/pipeline.py
def run_pipeline_from_config(
    config: PipelineConfig,
    registry: DatasetRegistry,
) -> PipelineResult:
    """Execute the full 5-stage pipeline from pre-loaded config and registry.

    Orchestrate all pipeline stages in sequence: resolve fabric (stage 1),
    resolve datasets (stage 2), process datasets with spatial batching
    (stage 4), and normalize to SIR (stage 5).  Stage 3 (weights) is
    handled internally by gdptools during stage 4.

    GDAL HTTP timeout environment variables are set from
    ``config.processing.network_timeout`` for the duration of the pipeline
    and restored afterward.

    Parameters
    ----------
    config : PipelineConfig
        Validated pipeline configuration.
    registry : DatasetRegistry
        Dataset registry for resolving dataset names to entries.

    Returns
    -------
    PipelineResult
        Complete pipeline output including file paths for static and
        temporal outputs, the target fabric, normalized SIR files,
        schema, and validation warnings.  Use :meth:`PipelineResult.load_sir`
        to assemble a combined ``xr.Dataset`` on demand.

    See Also
    --------
    run_pipeline : Convenience wrapper that loads config and registry from paths.
    """
    t0 = time.perf_counter()

    logger.info("=" * 60)
    logger.info("hydro-param pipeline: %s", config.output.sir_name)
    logger.info(
        "  Fabric: %s (id_field=%s)", config.target_fabric.path, config.target_fabric.id_field
    )
    logger.info(
        "  Datasets: %d, Batch size: %d",
        len(config.flatten_datasets()),
        config.processing.batch_size,
    )
    logger.info("  Output: %s (%s)", config.output.path, config.output.format)
    logger.info("=" * 60)

    # Apply network timeout to GDAL HTTP operations (COG/vsicurl access)
    _timeout_s = str(config.processing.network_timeout)
    _prev_timeout = os.environ.get("GDAL_HTTP_TIMEOUT")
    _prev_connect = os.environ.get("GDAL_HTTP_CONNECTTIMEOUT")
    os.environ["GDAL_HTTP_TIMEOUT"] = _timeout_s
    os.environ["GDAL_HTTP_CONNECTTIMEOUT"] = _timeout_s
    logger.info("  Network timeout: %ss", _timeout_s)

    try:
        # Stage 1: Resolve target fabric (applies domain filter if configured)
        t1 = time.perf_counter()
        fabric = stage1_resolve_fabric(config)

        # Spatial batching
        fabric = spatial_batch(fabric, batch_size=config.processing.batch_size)
        batch_ids = sorted(fabric["batch_id"].unique())
        batch_sizes = fabric.groupby("batch_id").size()
        logger.info(
            "Spatial batching: %d features → %d batches (min=%d, max=%d, mean=%d) (%.1fs)",
            len(fabric),
            len(batch_ids),
            batch_sizes.min(),
            batch_sizes.max(),
            batch_sizes.mean(),
            time.perf_counter() - t1,
        )

        # Stage 2: Resolve source datasets
        t2 = time.perf_counter()
        resolved = stage2_resolve_datasets(config, registry)
        logger.info("Stage 2 complete (%.1fs)", time.perf_counter() - t2)

        # Stage 3: Weights (handled internally by gdptools ZonalGen)
        logger.info("Stage 3: Weights computed internally by gdptools")

        # Stage 4: Process datasets + incremental writes
        t4 = time.perf_counter()
        results = stage4_process(fabric, resolved, config)
        logger.info(
            "Stage 4 complete: %d static vars, %d temporal datasets (%.1fs)",
            len(results.static_files),
            len(results.temporal_files),
            time.perf_counter() - t4,
        )

        # Stage 5: Normalize SIR
        t5 = time.perf_counter()
        sir_files, sir_schema, sir_warnings, sir_manifest_entry = stage5_normalize_sir(
            results, resolved, config
        )
        # Load manifest written by stage4 and append SIR section
        manifest = _manifest_mod.load_manifest(config.output.path)
        if manifest is None:
            manifest = _manifest_mod.PipelineManifest()
        manifest.sir = sir_manifest_entry
        _save_manifest_to_disk(manifest, config.output.path)
        logger.info("Stage 5 complete (%.1fs)", time.perf_counter() - t5)

        elapsed = time.perf_counter() - t0
        logger.info("=" * 60)
        logger.info(
            "Pipeline complete: %d static + %d temporal datasets, %d features, %.1f seconds",
            len(results.static_files),
            len(results.temporal_files),
            len(fabric),
            elapsed,
        )
        logger.info("=" * 60)

        return PipelineResult(
            output_dir=config.output.path,
            static_files=results.static_files,
            temporal_files=results.temporal_files,
            categories=results.categories,
            fabric=fabric,
            sir_files=sir_files,
            sir_schema=sir_schema,
            sir_warnings=sir_warnings,
        )
    finally:
        # Restore previous GDAL timeout settings
        if _prev_timeout is None:
            os.environ.pop("GDAL_HTTP_TIMEOUT", None)
        else:
            os.environ["GDAL_HTTP_TIMEOUT"] = _prev_timeout
        if _prev_connect is None:
            os.environ.pop("GDAL_HTTP_CONNECTTIMEOUT", None)
        else:
            os.environ["GDAL_HTTP_CONNECTTIMEOUT"] = _prev_connect

run_pipeline

run_pipeline(
    config_path: str | Path,
    registry_path: str | Path | None = None,
) -> PipelineResult

Execute the full parameterization pipeline from file paths.

Convenience wrapper that loads the pipeline config and dataset registry from disk, then delegates to :func:run_pipeline_from_config.

PARAMETER DESCRIPTION
config_path

Path to the pipeline YAML config file.

TYPE: str or Path

registry_path

Path to a dataset registry YAML file or directory of YAML files. Defaults to the built-in registry bundled with the package.

TYPE: str or Path or None DEFAULT: None

RETURNS DESCRIPTION
PipelineResult

Complete pipeline output. Use :meth:PipelineResult.load_sir to assemble a combined xr.Dataset on demand.

See Also

run_pipeline_from_config : Core pipeline execution with pre-loaded objects. hydro_param.config.load_config : YAML config loader. hydro_param.dataset_registry.load_registry : Registry loader.

Source code in src/hydro_param/pipeline.py
def run_pipeline(
    config_path: str | Path,
    registry_path: str | Path | None = None,
) -> PipelineResult:
    """Execute the full parameterization pipeline from file paths.

    Convenience wrapper that loads the pipeline config and dataset registry
    from disk, then delegates to :func:`run_pipeline_from_config`.

    Parameters
    ----------
    config_path : str or Path
        Path to the pipeline YAML config file.
    registry_path : str or Path or None
        Path to a dataset registry YAML file or directory of YAML files.
        Defaults to the built-in registry bundled with the package.

    Returns
    -------
    PipelineResult
        Complete pipeline output.  Use :meth:`PipelineResult.load_sir`
        to assemble a combined ``xr.Dataset`` on demand.

    See Also
    --------
    run_pipeline_from_config : Core pipeline execution with pre-loaded objects.
    hydro_param.config.load_config : YAML config loader.
    hydro_param.dataset_registry.load_registry : Registry loader.
    """
    config = load_config(config_path)
    if registry_path is None:
        registry_path = DEFAULT_REGISTRY
    registry = load_registry(registry_path, overlay_dirs=[USER_REGISTRY_DIR])

    return run_pipeline_from_config(config, registry)

main

main() -> int

Run the pipeline from the command line via python -m hydro_param.pipeline.

Parse sys.argv for a config path and optional registry path, configure logging, and execute the pipeline. This is a minimal entry point for debugging; the primary CLI is :mod:hydro_param.cli.

RETURNS DESCRIPTION
int

Exit code: 0 on success, 1 on failure.

Source code in src/hydro_param/pipeline.py
def main() -> int:
    """Run the pipeline from the command line via ``python -m hydro_param.pipeline``.

    Parse ``sys.argv`` for a config path and optional registry path, configure
    logging, and execute the pipeline.  This is a minimal entry point for
    debugging; the primary CLI is :mod:`hydro_param.cli`.

    Returns
    -------
    int
        Exit code: 0 on success, 1 on failure.
    """
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
        datefmt="%H:%M:%S",
    )

    if len(sys.argv) < 2:
        logger.error("Usage: python -m hydro_param.pipeline <config.yml> [registry.yml]")
        return 1

    config_path = sys.argv[1]
    registry_path = sys.argv[2] if len(sys.argv) > 2 else None

    try:
        run_pipeline(config_path, registry_path)
    except Exception:
        logger.exception("Pipeline failed.")
        return 1

    return 0