Source code for priority_flow.downstream_extract
"""
Walk downstream from a point and extract values from a matrix.
Line-by-line translation of Downstream_Extract.R (PathExtract) from the R
PriorityFlow package. R uses 1-based indexing; we use 0-based. startpoint
is (row, col) 0-based for array indexing.
"""
import numpy as np
from typing import Dict, Optional, Tuple, Union
####################################################################
# PriorityFlow - Topographic Processing Toolkit for Hydrologic Models
# Copyright (C) 2018 Laura Condon (lecondon@email.arizona.edu)
# Contributors - Reed Maxwell (rmaxwell@mines.edu)
####################################################################
[docs]
def path_extract(
input: np.ndarray,
direction: np.ndarray,
mask: Optional[np.ndarray] = None,
startpoint: Optional[Union[Tuple[int, int], list, np.ndarray]] = None,
d4: Tuple[int, int, int, int] = (1, 2, 3, 4),
) -> Dict[str, np.ndarray]:
"""
Walk downstream from a starting point and extract values from a matrix.
This is a port of the R function ``PathExtract`` from the PriorityFlow
package. Starting from a given grid cell, it follows the D4 flow-direction
grid ``direction`` downstream, collecting values from ``input`` until the
path leaves the domain or exits the processing mask.
Parameters
----------
input : np.ndarray
2D array of values from which to extract the stream-path sequence
(for example, elevation, drainage area, or any other field defined
on the same grid as ``direction``).
direction : np.ndarray
2D array of D4 flow directions for each cell. Direction codes follow
the ``d4`` numbering scheme.
mask : np.ndarray, optional
2D mask with ones for cells that are allowed to be part of the path
and zeros elsewhere. If ``None``, a mask of all ones with the same
shape as ``direction`` is used.
startpoint : tuple[int, int] or list or np.ndarray, optional
Starting grid cell given as ``(row, col)`` indices (0-based). If an
array-like object is provided, only the first two elements are used.
d4 : tuple of int, optional
Direction numbering system for the D4 neighbors, given as
``(down, left, up, right)``. Defaults to ``(1, 2, 3, 4)`` to match the
original R implementation.
Returns
-------
Dict[str, np.ndarray]
Dictionary containing:
- ``\"data\"``: 1D array of values extracted from ``input`` along the
downstream path, in order.
- ``\"path_mask\"``: 2D array with non-zero entries indicating the
step number at each cell visited along the path.
- ``\"path_list\"``: 2D array of ``(row, col)`` indices for the cells
visited along the path, in traversal order.
"""
startpoint = np.asarray(startpoint)
if startpoint.ndim >= 2:
row_hf = int(startpoint.flat[0])
col_hf = int(startpoint.flat[1])
else:
row_hf = int(startpoint[0])
col_hf = int(startpoint[1])
# HF (row, col) -> internal first index = col, second = row (after input.T)
indx = col_hf
indy = row_hf
# HydroFrame layout -> internal R-style layout
input = input.T.copy()
direction = direction.T.copy()
if mask is not None:
mask = mask.T.copy()
# R: nx=dim(direction)[1] ny=dim(direction)[2]
nx = direction.shape[0]
ny = direction.shape[1]
# R: if(missing(mask)){mask=matrix(1, nrow=nx, ncol=ny)}
if mask is None:
mask = np.ones((nx, ny))
# R: path.mask=matrix(0, nrow=nx, ncol=ny)
path_mask = np.zeros((nx, ny))
# R: path=NULL (list of cells on the path in order)
path = []
# D4 neighbors - Rows: down, left top right. Columns (1)deltax, (2)deltay
# R: kd=matrix(0, nrow=4, ncol=4) kd[,1]=c(0,-1,0,1) kd[,2]=c(-1,0,1,0)
kd = np.zeros((4, 2))
kd[:, 0] = [0, -1, 0, 1]
kd[:, 1] = [-1, 0, 1, 0]
# renumber the directions to 1=down, 2=left, 3=up, 4=right if a different numbering scheme was used
# R: dir2=direction
dir2 = direction.copy()
# R: if(d4[1]!=1){dir2[which(direction==d4[1])]=1} etc.
if d4[0] != 1:
dir2[direction == d4[0]] = 1
if d4[1] != 2:
dir2[direction == d4[1]] = 2
if d4[2] != 3:
dir2[direction == d4[2]] = 3
if d4[3] != 4:
dir2[direction == d4[3]] = 4
# initializing things (indx, indy already set from HydroFrame startpoint)
# R: step=1
step = 1
# R: active=T
active = True
# R: output=NULL
output = []
# walking downstream
# R: while(active==T){
while active:
# R: output=c(output,input[indx,indy])
output.append(input[indx, indy])
# R: path.mask[indx,indy]=step
path_mask[indx, indy] = step
# R: path=rbind(path, c(indx,indy))
path.append([indx, indy])
# look downstream
# R: dirtemp=dir2[indx,indy]
dirtemp = int(dir2[indx, indy])
# R: downindx=indx+kd[dirtemp,1] downindy=indy+kd[dirtemp,2]
# R uses 1-based direction (1..4); kd rows are 1..4 so in Python kd[dirtemp-1, :]
downindx = indx + int(kd[dirtemp - 1, 0])
downindy = indy + int(kd[dirtemp - 1, 1])
# If you have made it out of the domain then stop
# R: if(downindx<1 | downindx>nx | downindy<1 | downindy>ny){ active=F }
# R 1-based: valid 1..nx, 1..ny. Python 0-based: valid 0..nx-1, 0..ny-1
if downindx < 0 or downindx >= nx or downindy < 0 or downindy >= ny:
active = False
else:
# R: if(mask[downindx,downindy]==0){ active=F }
if mask[downindx, downindy] == 0:
active = False
# R: indx=downindx indy=downindy step=step+1
indx = downindx
indy = downindy
step = step + 1
# Internal (row, col) -> HydroFrame (row, col): swap columns
path_list = np.array(path, dtype=np.int64) if path else np.zeros((0, 2), dtype=np.int64)
if path_list.size > 0:
path_list_out = path_list.copy()
t = path_list_out[:, 0].copy()
path_list_out[:, 0] = path_list_out[:, 1]
path_list_out[:, 1] = t
else:
path_list_out = path_list
# Internal layout -> HydroFrame layout
# R: output_list=list("data"=output, "path.mask"=path.mask, "path.list"=path)
output_list = {
"data": np.array(output, dtype=input.dtype),
"path_mask": path_mask.T,
"path_list": path_list_out,
}
return output_list