"""
Walk upstream from a point ensuring DEM is increasing by a minimum epsilon.
Line-by-line translation of Fix_Drainage.R (FixDrainage) from the R PriorityFlow
package. R uses 1-based indexing; we use 0-based. startpoint is (row, col) 0-based.
"""
import numpy as np
from typing import Dict, Tuple, Union
####################################################################
# PriorityFlow - Topographic Processing Toolkit for Hydrologic Models
# Copyright (C) 2018 Laura Condon (lecondon@email.arizona.edu)
# Contributors - Reed Maxwell (rmaxwell@mines.edu)
####################################################################
[docs]
def fix_drainage(
dem: np.ndarray,
direction: np.ndarray,
mask: np.ndarray,
bank_epsilon: float,
startpoint: Union[Tuple[int, int], list, np.ndarray],
d4: Tuple[int, int, int, int] = (1, 2, 3, 4),
) -> Dict[str, np.ndarray]:
"""
Walk upstream from a point ensuring DEM is increasing by a minum epsilon.
This walks upstream from a point following a flow direction file and checks that the elevation upstream
is greater than or equal to the elvation at the point + epsilon
Once the function reaches a point where upstream cells pass it stops
NOTE: this is only processing the immediate neigborhood and does not recurse over the entire domain,
For example if you did the entire overal prioirty flow processing with an espilon of 0 then ran this
functon starting at a river point with an epsilon of 0.1 this function will traverse upstream from the
river bottom checking that every cell conneting to the river cell is higher by at least this ammout but
once it reaches a point where every connected cell passes this test it will stop. Therefore there could
still be locations higher up on the hillslope with the original epsilon of zero still. This is on purpose
and this script is not intended to gobally ensure a given epsilon.
Parameters
----------
dem : np.ndarray
2D array of elevations for the domain (nx, ny).
direction : np.ndarray
2D array of D4 flow directions for each cell. Direction codes
follow the ``d4`` numbering scheme.
mask : np.ndarray
2D mask with ones for cells that are allowed to be processed and
zeros elsewhere. Only neighbors within the mask can be adjusted.
bank_epsilon : float
Minimum elevation difference that upstream cells must have
relative to the current cell. If an upstream neighbor is lower
than ``current + bank_epsilon``, its elevation is raised to
``current + bank_epsilon``.
startpoint : tuple[int, int] or list or np.ndarray
Starting grid cell given as ``(row, col)`` indices (0-based)
from which to begin walking upstream.
d4 : tuple of int, optional
Direction numbering system for the D4 neighbors, given as
``(down, left, up, right)``. Defaults to ``(1, 2, 3, 4)`` to
match the original R implementation.
Returns
-------
Dict[str, np.ndarray]
Dictionary containing:
- ``\"dem.adj\"``: 2D elevation array after local adjustments.
- ``\"processed\"``: 2D array marking cells that were adjusted
(1) or visited during processing, and 0 elsewhere.
"""
# D4 neighbors - Rows: down, left top right
# Columns: (1)deltax, (2)deltay, direction number if you are walking upstream
# R: ku=matrix(0, nrow=4, ncol=3)
ku = np.zeros((4, 3))
# R: ku[,1]=c(0,1,0,-1)
ku[:, 0] = [0, 1, 0, -1]
# R: ku[,2]=c(1,0,-1,0)
ku[:, 1] = [1, 0, -1, 0]
# R: ku[,3]=c(1, 2, 3, 4)
ku[:, 2] = [1, 2, 3, 4]
# renumber the directions to 1=down, 2=left, 3=up, 4=right if a different numbering scheme was used
# R: dir2=direction
dir2 = direction.copy()
# R: nx=dim(direction)[1] ny=dim(direction)[2]
nx = direction.shape[0]
ny = direction.shape[1]
# R: if(d4[1]!=1){dir2[which(direction==d4[1])]=1} etc.
if d4[0] != 1:
dir2[direction == d4[0]] = 1
if d4[1] != 2:
dir2[direction == d4[1]] = 2
if d4[2] != 3:
dir2[direction == d4[2]] = 3
if d4[3] != 4:
dir2[direction == d4[3]] = 4
# initializing
# R: marked=matrix(0, nrow=nx, ncol=ny)
marked = np.zeros((nx, ny))
# R: queue=cbind(startpoint[1],startpoint[2])
queue = np.array([[startpoint[0], startpoint[1]]])
# R: active=TRUE
active = True
# R: dem2=dem
dem2 = dem.copy()
# R: while(active==T){
while active:
# R: indx=queue[1,1] indy=queue[1,2]
indx = int(queue[0, 0])
indy = int(queue[0, 1])
# R: queuetemp=NULL
queuetemp = None
# Loop over four directions check for non-stream neighbors pointing to this cell
# R: for(d in 1:4){
for d in range(4):
# R: tempx=indx+ku[d,1] tempy=indy+ku[d,2]
tempx = indx + int(ku[d, 0])
tempy = indy + int(ku[d, 1])
# if it points to the cell, is within the mask of cells to be processed, and has epsilon < the threshold
# R: if(tempx*tempy>0 & tempx<nx & tempy<ny){
# R 1-based: valid 1..nx, 1..ny; R has tempx<nx so 1..nx-1 - translating to 0-based: 0..nx-1, 0..ny-1
if tempx >= 0 and tempy >= 0 and tempx < nx and tempy < ny:
# R: if((d-dir2[tempx,tempy])==0 & mask[tempx,tempy]==1){
# R d is 1..4; in Python d is 0..3 so direction at neighbor should be d+1
if (d + 1 - dir2[tempx, tempy]) == 0 and mask[tempx, tempy] == 1:
# R: if((dem2[tempx,tempy]-dem2[indx,indy])<bank.epsilon){
if (dem2[tempx, tempy] - dem2[indx, indy]) < bank_epsilon:
# R: dem2[tempx,tempy]=dem2[indx,indy]+bank.epsilon
dem2[tempx, tempy] = dem2[indx, indy] + bank_epsilon
# R: marked[tempx,tempy]=1
marked[tempx, tempy] = 1
# R: queuetemp=rbind(c(tempx,tempy),queuetemp)
if queuetemp is None:
queuetemp = np.array([[tempx, tempy]])
else:
queuetemp = np.vstack(
[np.array([[tempx, tempy]]), queuetemp]
)
# if cells were adjusted then add to the top of the queue replacing the cell that was just done
# R: if(length(queuetemp>0)){ (typo in R: should be length(queuetemp)>0)
if queuetemp is not None and queuetemp.size > 0:
# R: queue=rbind(queuetemp,queue[-1,])
queue = np.vstack([queuetemp, queue[1:]])
else:
# if no cells were adjusted remove this cell from the queue and if its the last one you are done
# R: if(nrow(queue)>1){
if queue.shape[0] > 1:
# R: queue=queue[-1,]
queue = queue[1:]
# R: if(length(queue)==2){ queue=matrix(queue, ncol=2, byrow=T) }
if queue.size == 2:
queue = queue.reshape(1, 2)
else:
# R: active=F
active = False
# R: output_list=list("dem.adj"=dem2, "processed"=marked)
output_list = {"dem.adj": dem2, "processed": marked}
return output_list