"""
Define subbasins and stream segments from flow direction and drainage area.
Line-by-line translation of Define_Subbasins.R (CalcSubbasins) from the R
PriorityFlow package. R uses 1-based indexing; we use 0-based for array
indexing; summary coordinate columns are 0-based.
"""
import numpy as np
from typing import Dict, Optional, Tuple
####################################################################
# PriorityFlow - Topographic Processing Toolkit for Hydrologic Models
# Copyright (C) 2018 Laura Condon (lecondon@email.arizona.edu)
# Contributors - Reed Maxwell (rmaxwell@mines.edu)
####################################################################
[docs]
def calc_subbasins(
direction: np.ndarray,
area: np.ndarray,
mask: Optional[np.ndarray] = None,
d4: Tuple[int, int, int, int] = (1, 2, 3, 4),
riv_th: int = 50,
printflag: bool = False,
merge_th: int = 0,
) -> Dict[str, np.ndarray]:
"""
Define subbasins and stream segments from flow direction and drainage area.
This function is a port of the R function ``CalcSubbasins`` from the
PriorityFlow package. It divides the domain into subbasins and individual
stream segments using a flow-direction grid and a drainage-area grid.
Parameters
----------
direction : np.ndarray
2D array of D4 flow directions for each cell. Direction codes follow
the ``d4`` numbering scheme.
area : np.ndarray
2D array of drainage area values for every cell (typically in number of
contributing cells or an equivalent measure).
mask : np.ndarray, optional
2D mask with ones for cells to be processed and zeros for everything
else. If ``None``, a mask of all ones with the same shape as
``direction`` is used.
d4 : tuple of int, optional
Direction numbering system for the D4 neighbors, given as
``(down, left, up, right)``. Defaults to ``(1, 2, 3, 4)`` to match the
original R implementation.
riv_th : int, optional
Drainage-area threshold used to classify river cells. Cells with
``area >= riv_th`` are treated as part of the river network. Defaults
to ``50``.
printflag : bool, optional
If ``True``, print basic progress information while growing subbasins
upstream from the river network.
merge_th : int, optional
After all subbasins have been defined, subbasins with total area
smaller than ``merge_th`` are merged into their downstream neighbors.
A value of ``0`` (the default) disables merging.
Returns
-------
Dict[str, np.ndarray]
Dictionary containing:
- ``\"segments\"``: 2D array with a unique ID for each stream segment.
- ``\"subbasins\"``: 2D array with a unique ID for each subbasin
(segment plus its contributing area).
- ``\"RiverMask\"``: 2D mask of river cells derived from ``area`` and
``riv_th``.
- ``\"summary\"``: 2D array where each row summarizes a subbasin/segment
(segment ID, representative coordinates, downstream segment ID and
cell count). Coordinates are stored using 0-based indexing.
"""
# HydroFrame layout -> internal R-style layout
direction = direction.T.copy()
area = area.T.copy()
if mask is not None:
mask = mask.T.copy()
# R: nx=nrow(direction)
# R: ny=ncol(direction)
nx = direction.shape[0]
ny = direction.shape[1]
# R: if(missing(mask)){mask=matrix(1, nrow=nx, ncol=ny)}
if mask is None:
mask = np.ones((nx, ny))
# Setup the border
# R: border=matrix(1, nrow=nx, ncol=ny)
border = np.ones((nx, ny))
# R: border[2:(nx-1), 2:(ny-1)]= mask[1:(nx-2), 2:(ny-1)] + mask[3:nx, 2:(ny-1)] + ...
border[1 : (nx - 1), 1 : (ny - 1)] = (
mask[0 : (nx - 2), 1 : (ny - 1)]
+ mask[2:nx, 1 : (ny - 1)]
+ mask[1 : (nx - 1), 0 : (ny - 2)]
+ mask[1 : (nx - 1), 2:ny]
)
# R: border=border*mask
border = border * mask
# R: border[which(border<4 & border!=0)]=1
border[(border < 4) & (border != 0)] = 1
# R: border[border==4]=0
border[border == 4] = 0
# initilize drinage area matrix
# R: subbasin=matrix(0, nrow=nx, ncol=ny)
subbasin = np.zeros((nx, ny))
# R: marked=matrix(0, nrow=nx, ncol=ny)
marked = np.zeros((nx, ny))
# D4 neighbors
# R: kd=matrix(0, nrow=4, ncol=2) #ordered down, left top right
kd = np.zeros((4, 2))
# R: kd[,1]=c(0,-1,0,1)
kd[:, 0] = [0, -1, 0, 1]
# R: kd[,2]=c(-1,0,1,0)
kd[:, 1] = [-1, 0, 1, 0]
# make a river mask based on the drainage area threshold
# R: rivers=area
rivers = area.copy()
# R: rivers[area<riv_th]=0
rivers[area < riv_th] = 0
# R: rivers[area>=riv_th]=1
rivers[area >= riv_th] = 1
# R: end=F
end = False
# R: if(sum(rivers)==0) { print(...); end=T }
if np.sum(rivers) == 0:
print(
"Area Threshold too high. No river cells found. Please select a lower riv_th value"
)
end = True
if end == False:
# make masks of which cells drain down, up, left right
# R: down=up=left=right=matrix(0, nrow=nx, ncol=ny)
down = np.zeros((nx, ny))
up = np.zeros((nx, ny))
left = np.zeros((nx, ny))
right = np.zeros((nx, ny))
# R: down[which(direction==d4[1])]=1
down[direction == d4[0]] = 1
# R: left[which(direction==d4[2])]=1
left[direction == d4[1]] = 1
# R: up[which(direction==d4[3])]=1
up[direction == d4[2]] = 1
# R: right[which(direction==d4[4])]=1
right[direction == d4[3]] = 1
# calculate the number of river cells draining to any cell
# R: draincount=matrix(0, nrow=nx, ncol=ny)
draincount = np.zeros((nx, ny))
# R: draincount[,1:(ny-1)]=draincount[,1:(ny-1)]+down[,2:ny]*rivers[,2:ny]
draincount[:, 0 : (ny - 1)] = (
draincount[:, 0 : (ny - 1)] + down[:, 1:ny] * rivers[:, 1:ny]
)
# R: draincount[,2:ny]=draincount[,2:ny]+up[,1:(ny-1)]*rivers[,1:(ny-1)]
draincount[:, 1:ny] = (
draincount[:, 1:ny] + up[:, 0 : (ny - 1)] * rivers[:, 0 : (ny - 1)]
)
# R: draincount[1:(nx-1),]=draincount[1:(nx-1),]+left[2:nx,]*rivers[2:nx,]
draincount[0 : (nx - 1), :] = (
draincount[0 : (nx - 1), :] + left[1:nx, :] * rivers[1:nx, :]
)
# R: draincount[2:nx, ]=draincount[2:nx,]+right[1:(nx-1),]*rivers[1:(nx-1),]
draincount[1:nx, :] = (
draincount[1:nx, :] + right[0 : (nx - 1), :] * rivers[0 : (nx - 1), :]
)
# Identify all the headwater cells
# R: headwater=matrix(0, nrow=nx, ncol=ny)
headwater = np.zeros((nx, ny))
# R: headwater[which(draincount==0 & rivers==1)]=1
headwater[(draincount == 0) & (rivers == 1)] = 1
# give values outside the mask and on the border a negative count so they aren't processed
# R: marked[which(mask==0)]=1
marked[mask == 0] = 1
# start with all the headwater cells (i.e. cells with zero upstream neigbors)
# R: blist=cbind(which(headwater==1), which(headwater==1, arr.ind=T))
# R returns indices in column-major order; argwhere is row-major, so sort to match R
blist = np.argwhere(headwater == 1)
blist = blist[np.lexsort((blist[:, 0], blist[:, 1]))] # sort by col then row (column-major)
# R: nheadwater=nrow(blist)
nheadwater = blist.shape[0]
# R: ends=rivers
ends = rivers.copy()
# R: ends[blist[,1]]=2 -> set headwater cells to 2
ends[headwater == 1] = 2
# Get just the river areas to use for this
# R: rivarea=area*rivers
rivarea = area * rivers
# R: index=0
index = 0
# R: subbasin=matrix(0, nrow=nx, ncol=ny)
subbasin = np.zeros((nx, ny))
# R: marked=matrix(0, nrow=nx, ncol=ny)
marked = np.zeros((nx, ny))
# R: first=T
first = True
###1. walk down from every headwater marking stream segments
# R: for(i in 1:nheadwater){
for i in range(nheadwater):
# R: xtemp=blist[i,2] ytemp=blist[i,3] (R 1-based row,col)
xtemp = int(blist[i, 0])
ytemp = int(blist[i, 1])
# R: active=T
active = True
# R: index=index+1
index = index + 1
# R: subbasin[xtemp,ytemp]=index
subbasin[xtemp, ytemp] = index
# R: marked[xtemp,ytemp]=1
marked[xtemp, ytemp] = 1
# R: summarytemp=c(index, xtemp, ytemp, rep(0,4))
summarytemp = [index, xtemp, ytemp, 0, 0, 0, 0]
# R: while(active==T){
while active:
# get the direction and find downstream cell
# R: dirtemp=which(d4==direction[xtemp,ytemp])
dirtemp = int(np.where(np.array(d4) == direction[xtemp, ytemp])[0][0])
# R: xds=xtemp+kd[dirtemp,1]
xds = xtemp + int(kd[dirtemp, 0])
# R: yds=ytemp+kd[dirtemp,2]
yds = ytemp + int(kd[dirtemp, 1])
# if the downstream neigbor hasn't already been procesed and its in the domain
# R: if(xds*yds>0 & xds<=nx & yds<=ny){
if xds >= 0 and yds >= 0 and xds < nx and yds < ny:
# R: if(marked[xds,yds]==0 & mask[xds,yds]==1){
if marked[xds, yds] == 0 and mask[xds, yds] == 1:
# Check the area difference
# R: accum=area[xds,yds]-area[xtemp, ytemp]
accum = area[xds, yds] - area[xtemp, ytemp]
# if there is a tributary coming in then start a new segment
# R: if(accum>riv_th){
if accum > riv_th:
# R: summarytemp[4:5]=c(xtemp,ytemp)
summarytemp[3] = xtemp
summarytemp[4] = ytemp
# R: summarytemp[6]=index+1
summarytemp[5] = index + 1
# R: index=index+1
index = index + 1
# R: ends[xtemp,ytemp]=3
ends[xtemp, ytemp] = 3
# R: ends[xds,yds]=2
ends[xds, yds] = 2
# R: if(first==T){ summary=summarytemp; first=F }else{ summary=rbind(summary, summarytemp) }
if first:
summary = np.array([summarytemp], dtype=np.float64)
first = False
else:
summary = np.vstack([summary, summarytemp])
# R: summarytemp=c(index, xds, yds, rep(0,4))
summarytemp = [index, xds, yds, 0, 0, 0, 0]
# assign subbasin number to the downstream cell and mark it off
# R: subbasin[xds,yds]=index
subbasin[xds, yds] = index
# R: marked[xds,yds]=1
marked[xds, yds] = 1
# R: xtemp=xds ytemp=yds
xtemp = xds
ytemp = yds
else:
# if the downstream neighbor has been processed then move on to the next headwater cell
# R: active=FALSE
active = False
# R: ends[xtemp,ytemp]=3
ends[xtemp, ytemp] = 3
# R: summarytemp[4:5]=c(xtemp,ytemp)
summarytemp[3] = xtemp
summarytemp[4] = ytemp
# R: summarytemp[6]=subbasin[xds,yds]
summarytemp[5] = subbasin[xds, yds]
else:
# R: active=FALSE ends[xtemp,ytemp]=3 summarytemp[4:5]=c(xtemp,ytemp) summarytemp[6]=-1
active = False
ends[xtemp, ytemp] = 3
summarytemp[3] = xtemp
summarytemp[4] = ytemp
summarytemp[5] = -1
# R: if(first==T){ summary=matrix(summarytemp, ncol=7, byrow=T); first=F }else{ summary=rbind(summary, summarytemp) }
if first:
summary = np.array([summarytemp], dtype=np.float64)
first = False
else:
summary = np.vstack([summary, summarytemp])
# R: rownames(summary)=NULL colnames(summary)=c(...) (metadata only)
###2. Get the drainage basins for every segement
# R: subbasinA=subbasin
subbasinA = subbasin.copy()
# start a queue with all the cells in the river
# R: queue=which(subbasin>0, arr.ind=T)
queue = np.argwhere(subbasin > 0)
# R: nqueue=nrow(queue)
nqueue = queue.shape[0]
# R: ii=1
ii = 1
# R: while(nqueue>0){
while nqueue > 0:
# R: if(printflag){print(paste("lap", ii, "ncell", nqueue))}
if printflag:
print(f"lap {ii} ncell {nqueue}")
# R: queue2=NULL
queue2 = []
# loop through the queue
# R: for(i in 1:nqueue){
for i in range(nqueue):
# R: xtemp=queue[i,1] ytemp=queue[i,2]
xtemp = int(queue[i, 0])
ytemp = int(queue[i, 1])
# add one to the subbasin area for the summary
# R: sbtemp=subbasinA[xtemp,ytemp]
sbtemp = int(subbasinA[xtemp, ytemp])
# R: summary[sbtemp,7]=summary[sbtemp,7]+1 (R: row index = basin ID)
row_idx = np.where(summary[:, 0] == sbtemp)[0][0]
summary[row_idx, 6] = summary[row_idx, 6] + 1
# look for cells that drain to this cell
# R: for(d in 1:4){
for d in range(4):
# R: xus=xtemp-kd[d,1] yus=ytemp-kd[d,2]
xus = xtemp - int(kd[d, 0])
yus = ytemp - int(kd[d, 1])
# R: if(xus*yus>0 & xus<=nx & yus<=ny){
if xus >= 0 and yus >= 0 and xus < nx and yus < ny:
# R: if(mask[xus,yus]==1 & subbasinA[xus,yus]==0){
if mask[xus, yus] == 1 and subbasinA[xus, yus] == 0:
# R: if(direction[xus,yus]==d4[d]){
if direction[xus, yus] == d4[d]:
# R: subbasinA[xus,yus]=subbasinA[xtemp,ytemp]
subbasinA[xus, yus] = subbasinA[xtemp, ytemp]
# R: queue2=rbind(queue2, c(xus,yus))
queue2.append([xus, yus])
# R: if(length(queue2)>=2){ queue=queue2; nqueue=nrow(queue); ii=ii+1 } else{nqueue=0}
# In R, length(queue2) is rows * cols; a single-row (1x2) matrix has length 2.
# The condition length(queue2) >= 2 therefore means "at least one row".
# In Python, len(queue2) is the number of rows, so we must check >= 1.
if len(queue2) >= 1:
queue = np.array(queue2)
nqueue = queue.shape[0]
ii = ii + 1
else:
nqueue = 0
###3. if merge_th >0 look for basins with areas less than the merge threshold...
# R: delete=NULL
delete = []
# R: if(merge_th>0){
if merge_th > 0:
print(
"WARNING: non-zero merge thresholds are not compatible with the RiverSmooth function"
)
# R: nsb=nrow(summary)
nsb = summary.shape[0]
# R: for(i in 1:nsb){
for i in range(nsb):
# check if area is less than the threshold & it does not drain externally
# R: if(summary[i,7]<merge_th & summary[i,6]>0){
if summary[i, 6] < merge_th and summary[i, 5] > 0:
# R: delete=c(delete,i)
delete.append(i)
# R: bas1=summary[i,1] bas2=summary[i,6]
bas1 = int(summary[i, 0])
bas2 = int(summary[i, 5])
# replace numbers in the subbasin matrix
# R: ilist=which(subbasin==bas1) subbasin[ilist]=bas2
subbasin[subbasin == bas1] = bas2
# replace numbers in the subbasin area matrix
# R: ilistA=which(subbasinA==bas1) subbasinA[ilistA]=bas2
subbasinA[subbasinA == bas1] = bas2
# adjust the summary matrix for the downstream basin
# R: summary[which(summary[,1]==bas2),7]=summary[which(summary[,1]==bas2),7] + summary[i,7]
ds_rows = np.where(summary[:, 0] == bas2)[0]
summary[ds_rows[0], 6] = summary[ds_rows[0], 6] + summary[i, 6]
# R: summary[which(summary[,1]==bas2),2]=summary[i,2] summary[...,3]=summary[i,3]
summary[ds_rows[0], 1] = summary[i, 1]
summary[ds_rows[0], 2] = summary[i, 2]
# Change the downstream basin number for any upstream basins to downstream basin
# R: uplist=which(summary[,6]==bas1) summary[uplist,6]=bas2
uplist = np.where(summary[:, 5] == bas1)[0]
summary[uplist, 5] = bas2
# R: if(is.null(delete)==F){ summary=summary[-delete,] }
if len(delete) > 0:
summary = np.delete(summary, delete, axis=0)
# R: output_list=list("segments"=subbasin, "subbasins"=subbasinA, "RiverMask"=rivers, "summary"=summary)
# When end==True we never set subbasin, subbasinA, summary; return empty/default
if end:
subbasin = np.zeros((nx, ny))
subbasinA = np.zeros((nx, ny))
summary = np.zeros((0, 7))
# Internal (row, col) pairs in summary columns 1–2 and 3–4 -> HydroFrame
if not end and summary.size > 0:
summary_out = summary.copy()
t = summary_out[:, 1].copy()
summary_out[:, 1] = summary_out[:, 2]
summary_out[:, 2] = t
t = summary_out[:, 3].copy()
summary_out[:, 3] = summary_out[:, 4]
summary_out[:, 4] = t
else:
summary_out = summary
# Internal layout -> HydroFrame layout
output_list = {
"segments": subbasin.T,
"subbasins": subbasinA.T,
"RiverMask": rivers.T,
"summary": summary_out,
}
# R: return(output_list)
return output_list