Source code for priority_flow.stream_traverse

"""
Stream Traverse functions for PriorityFlow.

Line-by-line translation of `Stream_Traverse.R` (StreamTraverse) from the
PriorityFlow R package.

This processes stream networks by walking upstream on D4 neighbors in a river
mask. Where no D4 neighbors exist, it looks for D8 neighbors and creates D4
bridges to these diagonal cells.
"""

from __future__ import annotations

import numpy as np
from typing import Dict, Optional, Tuple


[docs] def stream_traverse( dem: np.ndarray, mask: np.ndarray, queue: np.ndarray, marked: np.ndarray, step: Optional[np.ndarray] = None, direction: Optional[np.ndarray] = None, basins: Optional[np.ndarray] = None, d4: Tuple[int, int, int, int] = (1, 2, 3, 4), printstep: bool = False, epsilon: float = 0.0, ) -> Dict[str, np.ndarray]: """ DEM processing of stream networks. Function to process stream networks walking upstream on d4 neigbors in a river mask. Where no D4 neigbhors exist it looks for d8 neigbors and creates d4 bridges to these diagonal cells. Parameters ---------- dem : np.ndarray Digital Elevation Model matrix (nx x ny). mask : np.ndarray Mask with zeros for non-river cells and 1 for river cells. queue : np.ndarray Priority queue to start from; columns: x (row), y (col), elevation. marked : np.ndarray Matrix of which cells have been marked already. step : np.ndarray, optional Matrix of the step number for processed cells. Defaults to zeros. direction : np.ndarray, optional Matrix of flow directions for processed cells. Defaults to NaN. basins : np.ndarray, optional Matrix of basin numbers (e.g., from InitQueue). If provided, every newly added cell inherits the basin of the cell that adds it. d4 : tuple of int, optional D4 direction numbering (down, left, up, right). Default (1,2,3,4). printstep : bool, optional If True, print step number and queue size. epsilon : float, optional Amount to add to filled areas to avoid creating flats. Returns ------- Dict[str, np.ndarray] Dictionary with keys: "dem", "mask", "marked", "step", "direction", and "basins". """ # HydroFrame layout -> internal R-style layout (same convention as init_queue / d4_traverse_b) dem = dem.T.copy() mask = mask.T.copy() marked = marked.T.copy() queue = np.column_stack((queue[:, 1], queue[:, 0], queue[:, 2])).copy() if step is not None: step = step.T.copy() if direction is not None: direction = direction.T.copy() if basins is not None: basins = basins.T.copy() # R: nx=dim(dem)[1] ny=dim(dem)[2] nx, ny = dem.shape # R: demnew=dem demnew = dem.copy() # setup matrices for anything that wasn't input # R: if(missing(step)){step=matrix(0, nrow=nx, ncol=ny)} if step is None: step = np.zeros((nx, ny)) # R: if(missing(direction)){direction=matrix(NA,nrow=nx, ncol=ny)} if direction is None: direction = np.full((nx, ny), np.nan) # R: if(missing(basins)){basins=matrix(0,nrow=nx, ncol=ny)} if basins is None: basins = np.zeros((nx, ny)) # D4 neighbors (ordered down, left, top, right; walking upstream so # direction points opposite) # R: kd=matrix(0, nrow=4, ncol=3) # R: kd[,1]=c(0,-1,0,1) # R: kd[,2]=c(-1,0,1,0) # R: kd[,3]=c(d4[3], d4[4], d4[1], d4[2]) kd = np.zeros((4, 3), dtype=float) kd[:, 0] = [0, -1, 0, 1] kd[:, 1] = [-1, 0, 1, 0] kd[:, 2] = [d4[2], d4[3], d4[0], d4[1]] # D8 neighbors # R: kd8=matrix(0, nrow=4, ncol=4) # R: kd8[,1]=c(-1,-1,1,1) # R: kd8[,2]=c(-1,1,1,-1) # R: kd8[,3]= c(d4[3], d4[1], d4[1], d4[3]) # R: kd8[,4]= c(d4[4], d4[4], d4[2], d4[2]) kd8 = np.zeros((4, 4), dtype=float) kd8[:, 0] = [-1, -1, 1, 1] kd8[:, 1] = [-1, 1, 1, -1] kd8[:, 2] = [d4[2], d4[0], d4[0], d4[2]] kd8[:, 3] = [d4[3], d4[3], d4[1], d4[1]] # R: nqueue=nrow(queue) nstep=0 # Ensure queue is 2D if queue.ndim == 1: queue = queue.reshape(1, -1) nqueue = queue.shape[0] nstep = 0 # Main traversal loop # R: while(nqueue>0){ while nqueue > 0: # ############# # pick the lowest DEM cell on the queue # R: pick=which.min(queue[,3]) pick = int(np.argmin(queue[:, 2])) # R: xC=queue[pick,1] yC=queue[pick,2] demC=queue[pick,3] xC = int(queue[pick, 0]) yC = int(queue[pick, 1]) demC = float(queue[pick, 2]) # ############# # Look for D4 neighbor cells that are on the mask and add to queue # R: count=0 count = 0 # R: for(k in 1:4){ for k in range(4): # R: xk=xC+kd[k,1] yk=yC+kd[k,2] xk = xC + int(kd[k, 0]) yk = yC + int(kd[k, 1]) # check that the neighbor is inside the domain and on the mask # R: if(yk>=1 & yk<=ny & xk>=1 & xk<=nx ){ if 0 <= xk < nx and 0 <= yk < ny: # R: if (mask[xk, yk]==1 & marked[xk,yk]==0){ if mask[xk, yk] == 1 and marked[xk, yk] == 0: # R: demtemp=max((demC+epsilon), dem[xk, yk]) demtemp = max(demC + epsilon, dem[xk, yk]) # R: demnew[xk, yk]=demtemp demnew[xk, yk] = demtemp # R: queue=rbind(queue, c(xk, yk, demtemp)) queue = np.vstack([queue, [xk, yk, demtemp]]) # R: marked[xk, yk]=1 marked[xk, yk] = 1 # R: step[xk, yk]=step[xC, yC]+1 step[xk, yk] = step[xC, yC] + 1 # R: direction[xk,yk]=kd[k,3] direction[xk, yk] = kd[k, 2] # R: basins[xk,yk]=basins[xC,yC] basins[xk, yk] = basins[xC, yC] # R: count=count+1 nqueue=nqueue+1 count += 1 nqueue += 1 # ############# # If you don't find any D4 neighbors, look for D8 neighbors and choose # the least-cost D4 option to reach that D8 cell if count == 0: # R: n4=matrix(NA,ncol=4, nrow=2) for k in range(4): # R: xk=xC+kd8[k,1] yk=yC+kd8[k,2] xk = xC + int(kd8[k, 0]) yk = yC + int(kd8[k, 1]) # R: count4=0 n4=matrix(NA,ncol=4, nrow=2) count4 = 0 n4 = np.full((2, 4), np.nan) # check that the neighbor is inside the domain and on the mask # R: if(yk>=1 & yk<=ny & xk>=1 & xk<=nx ){ if 0 <= xk < nx and 0 <= yk < ny: # R: if(marked[xk,yk]<1 & mask[xk,yk]==1){ if marked[xk, yk] < 1 and mask[xk, yk] == 1: # look for available D4 neighbors to add instead # first neighbor: (xC, yk) # R: if(marked[xC,yk]<1){ n4[1,]=c(xC, yk, dem[xC, yk], kd8[k,3]) } if marked[xC, yk] < 1: n4[0, :] = [xC, yk, dem[xC, yk], kd8[k, 2]] count4 += 1 # second neighbor: (xk, yC) # R: if(marked[xk,yC]<1){ n4[2,]=c(xk, yC, dem[xk, yC], kd8[k,4]) } if marked[xk, yC] < 1: n4[1, :] = [xk, yC, dem[xk, yC], kd8[k, 3]] count4 += 1 # choose the neighbor which is the lowest without going under # and if not, fill the highest if count4 > 0: valid = ~np.isnan(n4[:, 2]) if np.any(valid): vals = n4[valid, 2] idxs = np.where(valid)[0] if np.min(vals) >= demC: # R: npick=which.min(n4[,3]) npick = int(idxs[np.argmin(vals)]) else: # R: npick=which.max(n4[,3]) npick = int(idxs[np.argmax(vals)]) else: npick = 0 rx = int(n4[npick, 0]) ry = int(n4[npick, 1]) # R: demtemp=max((demC+epsilon), dem[n4[npick,1], n4[npick,2]]) demtemp = max(demC + epsilon, dem[rx, ry]) # R: demnew[n4[npick,1], n4[npick,2]]=demtemp demnew[rx, ry] = demtemp # R: queue=rbind(queue, c(n4[npick,1], n4[npick,2], demtemp)) queue = np.vstack([queue, [rx, ry, demtemp]]) # R: marked[n4[npick,1], n4[npick,2]]=1 marked[rx, ry] = 1 # R: direction[n4[npick,1], n4[npick,2]]=n4[npick,4] direction[rx, ry] = n4[npick, 3] # R: step[n4[npick,1], n4[npick,2]]=step[xC, yC]+1 step[rx, ry] = step[xC, yC] + 1 # R: basins[n4[npick,1], n4[npick,2]]=basins[xC, yC] basins[rx, ry] = basins[xC, yC] # R: count=count+1 count += 1 # ############# # Remove from the queue and move on # R: nqueue=length(queue)/3 nqueue = queue.shape[0] # R: if(nqueue>1){ queue=queue[-pick,]; nqueue=length(queue)/3 } else { nqueue=0 } if nqueue > 1: queue = np.delete(queue, pick, axis=0) nqueue = queue.shape[0] else: nqueue = 0 # if there is only one row, force 2D structure # R: if(nqueue==1){ queue=matrix(queue, ncol=3,byrow=T)} if nqueue == 1 and queue.ndim == 1: queue = queue.reshape(1, 3) # R: nstep=nstep+1 nstep += 1 # R: if(printstep){print(paste("Step:", nstep, "NQueue:", nqueue))} if printstep: print(f"Step: {nstep} NQueue: {nqueue}") # Internal layout -> HydroFrame layout (transpose 2D fields) # R: output_list=list("dem"=demnew, "mask"=mask, "marked"=marked, "step"= step, "direction"=direction, "basins"=basins) output_list = { "dem": demnew.T, "mask": mask.T, "marked": marked.T, "step": step.T, "direction": direction.T, "basins": basins.T, } # R: return(output_list) return output_list