Source code for priority_flow.cal_stream_order

"""
Calculate Stream Order functions for PriorityFlow.

This module provides functions for calculating Strahler stream orders using
stream segments delineated using the CalcSubbasins function.
"""

import numpy as np
from typing import Dict, List, Optional, Tuple, Union


[docs] def calc_stream_order( basin_id: np.ndarray, ds_id: np.ndarray, segments: Optional[np.ndarray] = None ) -> Dict[str, Union[np.ndarray, np.ndarray]]: """ Calculate Strahler stream orders using stream segments delineated using the CalcSubbasins function. Parameters ---------- basin_id : np.ndarray An array of basin IDs (can be obtained from the CalcSubbasins function Summary output column 1 "Basin_ID") ds_id : np.ndarray Downstream basin IDs, the downstream ID of each basin, should be corresponding to the basin ID (can be obtained from the CalcSubbasins function Summary output Column 6 "Downstream_Basin_ID") segments : np.ndarray, optional A channel mask with segments assigned with basin IDs (can use the CalcSubbasins 'segments' output for this) Returns ------- Dict[str, np.ndarray] A dictionary containing: - 'summary': A summary table with a row for every basin with three columns "Basin_ID", "Downstream_ID", "StreamOrder_number" - 'order_mask': (optional) Only available if segments is an input, this will output a spatial raster with the channel orders Notes ----- This function implements the Strahler stream ordering system where: - First-order streams are those without any tributaries - When two streams of the same order join, the resulting stream is of the next higher order - When streams of different orders join, the resulting stream maintains the higher order """ # Initialize order numbers array order_no = np.zeros((len(basin_id), 1)) # Find 1st order streams - streams without any basin draining into this basin ds_all = np.unique(ds_id) headwater = basin_id[~np.isin(basin_id, ds_all)] for i in range(len(headwater)): blist = np.where(basin_id == headwater[i])[0] order_no[blist] = 1 # Process each headwater basin to calculate stream orders for i in range(len(headwater)): active = True btemp = headwater[i] while active: # Find downstream basin blist = np.where(basin_id == btemp)[0] dstemp = ds_id[blist][0] # Get the downstream basin ID dlist = np.where(basin_id == dstemp)[0] if dstemp == 0: # Stop when the basin drains outside the domain active = False # Find all basins draining to this downstream basin ulist = np.where(ds_id == dstemp)[0] if len(ulist) != 1: # If more than one basin urest = ulist[~np.isin(ulist, blist)] # Remove the basin in process ordertemp = order_no[urest].flatten() ordertemp[np.isnan(ordertemp)] = 0 if np.prod(ordertemp) != 0: # Check if there is any upstream not been processed umax = np.max(ordertemp) if umax == order_no[blist][0]: # Compare the orders of all tributaries order_no[dlist] = umax + 1 else: order_no[dlist] = max(umax, order_no[blist][0]) btemp = dstemp else: active = False else: # If only one is draining, assign the same order order_no[dlist] = order_no[blist] btemp = dstemp # Create summary table summary = np.column_stack([basin_id, ds_id, order_no]) # TODO NOTE: this should be a dataframe. Check other places where it is called. # Prepare output if segments is None: output_dict = {"summary": summary} # TODO NOTE: or perhaps the arrays should go to the dict with keys? else: segments2 = segments.copy() for i in range(len(basin_id)): btemp = basin_id[i] blist2 = np.where(segments2 == btemp) segments[blist2] = order_no[i, 0] # Scalar value for order output_dict = {"summary": summary, "order_mask": segments} return output_dict