Source code for priority_flow.slope_calc_standard

"""
Calculate slopes from a DEM.

Line-by-line translation of Slope_Calc_Standard.R (SlopeCalStan) from the R
PriorityFlow package. R uses 1-based indexing; we use 0-based.
"""

import numpy as np
from typing import Dict, Optional, Tuple

####################################################################
# PriorityFlow - Topographic Processing Toolkit for Hydrologic Models
# Copyright (C) 2018  Laura Condon (lecondon@email.arizona.edu)
# Contributors - Reed Maxwell (rmaxwell@mines.edu)
####################################################################


[docs] def slope_calc_standard( dem: np.ndarray, direction: np.ndarray, dx: float, dy: float, mask: Optional[np.ndarray] = None, d4: Tuple[int, int, int, int] = (1, 2, 3, 4), minslope: float = 0, maxslope: float = -1, secondary_th: float = -1, printflag: bool = False, ) -> Dict[str, np.ndarray]: """ Calculate the slopes from a DEM. This function will calculate slopes using standard or upwinding options and apply a range of smoothing options. Parameters ---------- dem : np.ndarray 2D array of elevations in **HydroFrame** layout. direction : np.ndarray 2D array of D4 flow directions for each cell, using the convention encoded in ``d4``. dx, dy : float Lateral grid cell resolutions in the x and y directions, respectively. mask : np.ndarray, optional 2D mask defining the domain extent to be considered. Cells with value 0 are excluded from slope calculations. If ``None``, the full rectangular domain of ``dem`` is used. d4 : tuple of int, optional Direction numbering system for the D4 neighbors, given as ``(down, left, up, right)``. Defaults to ``(1, 2, 3, 4)`` to match the original R implementation. minslope : float, optional Minimum absolute slope to enforce on primary-direction slopes. If ``minslope >= 0``, primary slopes with magnitude less than this value are adjusted up to ``minslope`` (with sign preserved). Defaults to ``0``. maxslope : float, optional Maximum absolute slope to enforce. If ``maxslope >= 0``, slopes with magnitude greater than this value are limited to ``±maxslope``. If set to ``-1``, no maximum is applied. secondary_th : float, optional Secondary threshold - maximum ratio of |secondary|/|primary| to be enforced. NOTE - this scaling occurs after any max threholds are applied. Currently this is only working for two options: (1) If this is set to -1 no scaling will be applied, (2) If this is set to zero all seconeary slopes will be zero printflag : bool, optional If True, print progress information and details about slope limiting and secondary-slope handling. Notes ----- **River methods**: - ``0``: default value, no special treatment for river cells. - ``1``: scale secondary slopes along the river (requires a river mask and a river-specific secondary threshold). - ``2``: apply watershed mean slope to each river reach (requires river mask and subbasins). - ``3``: apply the stream mean slope to each reach (requires river mask and subbasins). NOTE: The river mask can differ from the rivers used to create the subbasins (for example, using a higher threshold to define subbasins and a lower threshold to define river cells). Returns ------- Dict[str, np.ndarray] Dictionary containing: - ``\"slopex\"``: 2D array of adjusted x-direction face-centered slopes. - ``\"slopey\"``: 2D array of adjusted y-direction face-centered slopes. - ``\"direction\"``: the (possibly renumbered) flow-direction array used when constructing primary/secondary masks. - ``\"Sinks\"``: 1D array of linear indices (flattened) marking locations where slope signs were flipped to remove sinks. """ # HydroFrame layout -> internal R-style layout dem = dem.T.copy() direction = direction.T.copy() if mask is not None: mask = mask.T.copy() dx, dy = dy, dx # R: ny=ncol(dem) nx=nrow(dem) nx = dem.shape[0] ny = dem.shape[1] # If no mask is provided default to the rectangular domain # R: if(missing(mask)){ print(...); mask=matrix(1,...); borders=matrix(0,...) } if mask is None: if printflag: print("No mask provided, initializing mask for complete domain") mask = np.ones((nx, ny)) borders = np.zeros((nx, ny)) else: # R: Identify the border cells # R: borders=matrix(1, nrow=nx, ncol=ny) borders = np.ones((nx, ny)) # R: borders[2:(nx-1), 2:(ny-1)]= mask[1:(nx-2), 2:(ny-1)] + mask[3:nx, 2:(ny-1)] + ... borders[1 : (nx - 1), 1 : (ny - 1)] = ( mask[0 : (nx - 2), 1 : (ny - 1)] + mask[2:nx, 1 : (ny - 1)] + mask[1 : (nx - 1), 0 : (ny - 2)] + mask[1 : (nx - 1), 2:ny] ) # R: borders=borders*mask borders[which(borders<4 & borders!=0)]=1 borders[borders==4]=0 borders = borders * mask borders[(borders < 4) & (borders != 0)] = 1 borders[borders == 4] = 0 # R: bordi=borders bordi[bordi>1]=1 bordlist=which(borders>0) bordi = borders.copy() bordi[bordi > 1] = 1 bordlist = np.where(borders > 0) # R: assign NA values to the DEM outside the mask # R: inmask=which(mask==1) demMask=dem demMask[-inmask]=NA demMask = dem.copy().astype(float) demMask[mask != 1] = np.nan # R: First pass calculate the x and y slopes as (i+1)-(i) # R: slopex1[1:(nx-1),]=(demMask[2:nx,]-demMask[1:(nx-1),])/dx slopex1 = np.full((nx, ny), np.nan) slopey1 = np.full((nx, ny), np.nan) slopex1[0 : (nx - 1), :] = (demMask[1:nx, :] - demMask[0 : (nx - 1), :]) / dx # R: slopey1[,1:(ny-1)]=(demMask[,2:ny]-demMask[,1:(ny-1)])/dy slopey1[:, 0 : (ny - 1)] = (demMask[:, 1:ny] - demMask[:, 0 : (ny - 1)]) / dy # R: slopex2=slopex1 slopey2=slopey1 slopex2 = slopex1.copy() slopey2 = slopey1.copy() ### Right # R: borderR=slopeR=matrix(0, nrow=nx, ncol=ny) borderR = np.zeros((nx, ny)) slopeR = np.zeros((nx, ny)) # R: borderR[1:(nx-1),]=(mask[1:(nx-1),]+mask[2:nx,])*mask[1:(nx-1),] borderR[0 : (nx - 1), :] = ( (mask[0 : (nx - 1), :] + mask[1:nx, :]) * mask[0 : (nx - 1), :] ) # R: borderR[nx,]=1 borderR[nx - 1, :] = 1 # R: borderR=borderR*bordi Rlist=which(borderR==1) slopex2[Rlist]=0 borderR = borderR * bordi Rlist = np.where(borderR == 1) slopex2[Rlist] = 0 # R: borderR2=matrix(0,...) borderR2[nx,]=1 borderR2 = np.zeros((nx, ny)) borderR2[nx - 1, :] = 1 # R: borderR2[2:(nx-1),]=(mask[2:(nx-1),]+mask[3:nx,])*mask[1:(nx-2),] borderR2[1 : (nx - 1), :] = ( (mask[1 : (nx - 1), :] + mask[2:nx, :]) * mask[0 : (nx - 2), :] ) borderR2 = borderR2 * bordi borderR2[borderR2 > 1] = 0 # R: slopeR[2:nx,]=slopex2[1:(nx-1),]*borderR2[2:nx,] slopeR[1:nx, :] = slopex2[0 : (nx - 1), :] * borderR2[1:nx, :] slopeR[np.isnan(slopeR)] = 0 slopex2 = slopex2 + slopeR ### Top # R: borderT=slopeT=matrix(0,...) borderT = np.zeros((nx, ny)) slopeT = np.zeros((nx, ny)) # R: borderT[,1:(ny-1)]=(mask[,1:(ny-1)]+mask[,2:ny])*mask[,1:(ny-1)] borderT[:, 0 : (ny - 1)] = ( (mask[:, 0 : (ny - 1)] + mask[:, 1:ny]) * mask[:, 0 : (ny - 1)] ) # R: borderT[,ny]=1 borderT[:, ny - 1] = 1 borderT = borderT * bordi Tlist = np.where(borderT == 1) slopey2[Tlist] = 0 # R: borderT2[,ny]=1 borderT2[,2:(ny-1)]=(mask[,2:(ny-1)]+mask[,3:ny])*mask[,1:(ny-2)] borderT2 = np.zeros((nx, ny)) borderT2[:, ny - 1] = 1 borderT2[:, 1 : (ny - 1)] = ( (mask[:, 1 : (ny - 1)] + mask[:, 2:ny]) * mask[:, 0 : (ny - 2)] ) borderT2 = borderT2 * bordi borderT2[borderT2 > 1] = 0 # R: slopeT[,2:ny]=slopey2[,1:(ny-1)]*borderT2[,2:ny] slopeT[:, 1:ny] = slopey2[:, 0 : (ny - 1)] * borderT2[:, 1:ny] slopeT[np.isnan(slopeT)] = 0 slopey2 = slopey2 + slopeT ###################################### # R: Make masks of the primary flow directions # R: downlist=which(direction==d4[1]) etc. downlist = np.where(direction == d4[0]) leftlist = np.where(direction == d4[1]) uplist = np.where(direction == d4[2]) rightlist = np.where(direction == d4[3]) # R: downlist.arr=which(direction==d4[1], arr.ind=T) etc. downlist_arr = np.argwhere(direction == d4[0]) leftlist_arr = np.argwhere(direction == d4[1]) uplist_arr = np.argwhere(direction == d4[2]) rightlist_arr = np.argwhere(direction == d4[3]) # R: xmask=ymask=matrix(0,...) ymask[uplist]=-1 xmask[rightlist]=-1 xmask = np.zeros((nx, ny)) ymask = np.zeros((nx, ny)) ymask[uplist] = -1 xmask[rightlist] = -1 # R: if(length(leftlist.arr>0)){ for(ii in 1:nrow(leftlist.arr)){ ... } } if leftlist_arr.shape[0] > 0: for ii in range(leftlist_arr.shape[0]): # R: xindex=max((leftlist.arr[ii,1]-1),1) xindex = max(leftlist_arr[ii, 0] - 1, 0) # R: if(mask[xindex, leftlist.arr[ii,2]]==0){xindex=xindex+1} if mask[xindex, leftlist_arr[ii, 1]] == 0: xindex = xindex + 1 # R: xmask[xindex, leftlist.arr[ii,2]]=1 xmask[xindex, leftlist_arr[ii, 1]] = 1 if downlist_arr.shape[0] > 0: for ii in range(downlist_arr.shape[0]): # R: yindex=max((downlist.arr[ii,2]-1),1) yindex = max(downlist_arr[ii, 1] - 1, 0) # R: if(mask[downlist.arr[ii,1],yindex]==0){yindex=yindex+1} if mask[downlist_arr[ii, 0], yindex] == 0: yindex = yindex + 1 # R: ymask[downlist.arr[ii,1], yindex]=1 ymask[downlist_arr[ii, 0], yindex] = 1 # R: ylist=which(ymask!=0) xlist=which(xmask!=0) ylist = np.where(ymask != 0) xlist = np.where(xmask != 0) ################################### # R: fixPx=which(sign(slopex2)==-1 & xmask==1) slopex2[fixPx]=abs(slopex2[fixPx]) fixPx = np.where((np.sign(slopex2) == -1) & (xmask == 1)) slopex2[fixPx] = np.abs(slopex2[fixPx]) fixNx = np.where((np.sign(slopex2) == 1) & (xmask == -1)) slopex2[fixNx] = -np.abs(slopex2[fixNx]) fixPy = np.where((np.sign(slopey2) == -1) & (ymask == 1)) slopey2[fixPy] = np.abs(slopey2[fixPy]) fixNy = np.where((np.sign(slopey2) == 1) & (ymask == -1)) slopey2[fixNy] = -np.abs(slopey2[fixNy]) # R: Sinklist=c(fixPx, fixNx, fixPy, fixNy) (linear indices) # Map internal (row_R, col_R) to HydroFrame (row_HF, col_HF) before flattening. shape_hf = (dem.shape[1], dem.shape[0]) fixPx_hf = (fixPx[1], fixPx[0]) fixNx_hf = (fixNx[1], fixNx[0]) fixPy_hf = (fixPy[1], fixPy[0]) fixNy_hf = (fixNy[1], fixNy[0]) fixPx_flat = np.ravel_multi_index(fixPx_hf, shape_hf) fixNx_flat = np.ravel_multi_index(fixNx_hf, shape_hf) fixPy_flat = np.ravel_multi_index(fixPy_hf, shape_hf) fixNy_flat = np.ravel_multi_index(fixNy_hf, shape_hf) Sinklist = np.concatenate([fixPx_flat, fixNx_flat, fixPy_flat, fixNy_flat]) ################################### # R: if(minslope>=0){ ... } if minslope >= 0: if printflag: print(f"Limiting slopes to minimum {minslope}") xclipP = np.where((np.abs(slopex2) < minslope) & (xmask == 1)) slopex2[xclipP] = minslope xclipN = np.where((np.abs(slopex2) < minslope) & (xmask == -1)) slopex2[xclipN] = -minslope yclipP = np.where((np.abs(slopey2) < minslope) & (ymask == 1)) slopey2[yclipP] = minslope yclipN = np.where((np.abs(slopey2) < minslope) & (ymask == -1)) slopey2[yclipN] = -minslope ################################### # R: if(maxslope>=0){ ... } if maxslope >= 0: if printflag: print(f"Limiting slopes to maximum absolute value of {maxslope}") xclipP = np.where(slopex2 > maxslope) slopex2[xclipP] = maxslope xclipN = np.where(slopex2 < -maxslope) slopex2[xclipN] = -maxslope yclipP = np.where(slopey2 > maxslope) slopey2[yclipP] = maxslope yclipN = np.where(slopey2 < -maxslope) slopey2[yclipN] = -maxslope ################################### # R: if(secondaryTH>=0){ if(secondaryTH==0){ slopex2[-xlist]=0 slopey2[-ylist]=0 } ... } if secondary_th >= 0: if secondary_th == 0: if printflag: print(f"Limiting the ratio of secondary to primary slopes {secondary_th}") # R: slopex2[-xlist]=0 (set non-primary x slopes to 0) slopex2[xmask == 0] = 0 # R: slopey2[-ylist]=0 slopey2[ymask == 0] = 0 else: if printflag: print( "Options for nonzero secondary scaling not currently available please set secondaryTH to -1 or 0" ) ################################### # R: flattest=abs(slopex2[2:nx,2:ny])+abs(slopex2[1:(nx-1),2:ny])+abs(slopey2[2:nx,2:ny])+abs(slopey2[2:nx,1:(ny-1)]) flattest = ( np.abs(slopex2[1:nx, 1:ny]) + np.abs(slopex2[0 : (nx - 1), 1:ny]) + np.abs(slopey2[1:nx, 1:ny]) + np.abs(slopey2[1:nx, 0 : (ny - 1)]) ) # R: nflat=length(which(flattest==0 & mask[2:nx, 2:ny])) nflat = np.sum((flattest == 0) & (mask[1:nx, 1:ny] == 1)) # R: flats=which(flattest==0, arr.ind=T)+1 (R +1 for 1-based; we keep 0-based) flats = np.argwhere(flattest == 0) if nflat > 0 and printflag: print(f"WARNING: {nflat} Flat cells found") ########################### # R: nax=which(is.na(slopex2==T)) slopex2[nax]=0 etc. slopex2[np.isnan(slopex2)] = 0 slopey2[np.isnan(slopey2)] = 0 # Internal layout -> HydroFrame layout (transpose 2D arrays) # R: output_list=list("slopex"=slopex2, "slopey"=slopey2, "direction"=direction, "Sinks"=Sinklist) output_list = { "slopex": slopex2.T, "slopey": slopey2.T, "direction": direction.T, "Sinks": Sinklist, } return output_list