Source code for priority_flow.river_smoothing

"""
Apply smoothing to a DEM along a pre-defined stream network.

Line-by-line translation of River_Smoothing.R (RiverSmooth) from the R
PriorityFlow package. R uses 1-based indexing; we use 0-based. Summary
coordinates in river_summary are 0-based (row, col) for array indexing.
"""

import numpy as np
from typing import Dict, Optional, Tuple

from .fix_drainage import fix_drainage

####################################################################
# PriorityFlow - Topographic Processing Toolkit for Hydrologic Models
# Copyright (C) 2018  Laura Condon (lecondon@email.arizona.edu)
# Contributors - Reed Maxwell (rmaxwell@mines.edu)
####################################################################


[docs] def river_smooth( dem: np.ndarray, direction: np.ndarray, mask: Optional[np.ndarray] = None, river_summary: Optional[np.ndarray] = None, river_segments: Optional[np.ndarray] = None, bank_epsilon: float = 0.01, river_epsilon: float = 0.0, d4: Tuple[int, int, int, int] = (1, 2, 3, 4), printflag: bool = False, ) -> Dict[str, np.ndarray]: """ Smooth a DEM along a predefined stream network and enforce bank slopes. This function will smooth a DEM along a stream network. It requires pre-defined stream segments and subbasins which are obtained using the CalcSubbasins function. Parameters ---------- dem : np.ndarray 2D array of elevations for the domain (nx, ny). direction : np.ndarray 2D array of D4 flow directions for each cell, using the convention encoded in ``d4``. mask : np.ndarray, optional 2D mask defining the domain extent to be considered. Cells with value 0 are excluded from processing. If ``None``, the entire DEM is used. river_summary : np.ndarray, optional Summary table of stream segments with one row per segment and seven columns: 1. Subbasin (segment) ID number. 2. Row index of the upstream end of the segment. 3. Column index of the upstream end of the segment. 4. Row index of the downstream end of the segment. 5. Column index of the downstream end of the segment. 6. Subbasin ID of the downstream basin (``-1`` indicates that the segment drains out of the domain). 7. Drainage area of the subbasin. Indices are 0-based (row, col) for array indexing in this Python implementation. river_segments : np.ndarray, optional 2D array with the same shape as ``dem`` indicating the subbasin (segment) ID for each cell on the river network. Cells not on the river network should be zero. bank_epsilon : float, optional Minimum elevation difference between river cells and adjacent hillslope cells when walking up the banks from the river network. Passed through to ``fix_drainage``. river_epsilon : float, optional Minimum elevation difference per cell along the river segments. Used to enforce a monotonic drop in elevation from the upstream to downstream end of each segment. d4 : tuple of int, optional Direction numbering system for the D4 neighbors, given as ``(down, left, up, right)``. Defaults to ``(1, 2, 3, 4)`` to match the original R implementation. printflag : bool, optional If ``True``, print progress and diagnostic information while smoothing each river segment. Returns ------- Dict[str, np.ndarray] Dictionary containing: - ``\"dem.adj\"``: 2D array of DEM elevations after river smoothing and bank adjustments. - ``\"processed\"``: 2D mask indicating cells that were processed by this routine (1 = processed, 0 = not processed), which should match the river network mask when all segments are successfully handled. - ``\"summary\"``: 2D array summarizing each river segment with one row per segment and the following columns: 1. River segment ID number. 2. Row index of the top of the segment. 3. Column index of the top of the segment. 4. Row index of the bottom of the segment. 5. Column index of the bottom of the segment. 6. Length of the segment (number of cells). 7. Elevation at the top of the segment. 8. Elevation at the bottom of the segment. 9. Delta applied along the segment (i.e. (top-bottom)/length). """ # HF (row, col) endpoints -> internal (col, row) after dem.T t = river_summary[:, 1].copy() river_summary[:, 1] = river_summary[:, 2] river_summary[:, 2] = t t = river_summary[:, 3].copy() river_summary[:, 3] = river_summary[:, 4] river_summary[:, 4] = t dem = dem.T.copy() direction = direction.T.copy() if mask is not None: mask = mask.T.copy() river_segments = river_segments.T.copy() # R: nx=dim(direction)[1] ny=dim(direction)[2] (set after dir2) nx = direction.shape[0] ny = direction.shape[1] # D4 neighbors - Rows: down, left top right. Columns: (1)deltax, (2)deltay # R: kd=matrix(0, nrow=4, ncol=4) kd[,1]=c(0,-1,0,1) kd[,2]=c(-1,0,1,0) kd = np.zeros((4, 2)) kd[:, 0] = [0, -1, 0, 1] kd[:, 1] = [-1, 0, 1, 0] # R: if(missing(mask)){mask=matrix(1, nrow=nx, ncol=ny)} if mask is None: mask = np.ones((nx, ny)) # renumber the directions to 1=down, 2=left, 3=up, 4=right if a different numbering scheme was used # R: dir2=direction dir2 = direction.copy() # R: if(d4[1]!=1){dir2[which(direction==d4[1])]=1} etc. if d4[0] != 1: dir2[direction == d4[0]] = 1 if d4[1] != 2: dir2[direction == d4[1]] = 2 if d4[2] != 3: dir2[direction == d4[2]] = 3 if d4[3] != 4: dir2[direction == d4[3]] = 4 # setup a river list # R: nriver=nrow(river.summary) nriver = river_summary.shape[0] # R: marked.segments=rep(0,nriver) marked_segments = np.zeros(nriver) # R: marked.matrix=matrix(0, ncol=ny, nrow=nx) marked_matrix = np.zeros((nx, ny)) # Setup a smoothing summary # R: riversmooth.summary=matrix(0, nrow=nriver, ncol=9) riversmooth_summary = np.zeros((nriver, 9)) # R: riversmooth.summary[,1:5]=river.summary[,1:5] riversmooth_summary[:, 0:5] = river_summary[:, 0:5] # make a mask of the hillslope cells # R: hillmask=mask hillmask[which(river.segments>0)]=0 hillmask = mask.copy() hillmask[river_segments > 0] = 0 # First make a list of all the terminal river reaches # R: queue=which(river.summary[,6]<=(0)) queue = np.where(river_summary[:, 5] <= 0)[0] # R: if(length(queue)>0){active=TRUE}else{print("No terminal river segments...")} if queue.size > 0: active = True else: print("No terminal river segments provided, not adjusting DEM") active = False # R: dem2=dem dem2 = dem.copy() # get the length of every river segment # R: river.length=rep(0,max(river.summary[,1])) max_basin_id = int(np.max(river_summary[:, 0])) river_length = np.zeros(max_basin_id + 1) # R: for(i in 1:nx){ for(j in 1:ny){ rtemp=river.segments[i,j]; river.length[rtemp]=river.length[rtemp]+1 }} for i in range(nx): for j in range(ny): rtemp = int(river_segments[i, j]) if rtemp > 0: river_length[rtemp] = river_length[rtemp] + 1 # Loop over the river segments working upstream while active: # R: indr=queue[1] indr = int(queue[0]) # R: r=river.summary[indr,1] r = int(river_summary[indr, 0]) # R: rdown=river.summary[indr,6] rdown = river_summary[indr, 5] # R: length=river.length[r] seg_length = int(river_length[r]) # R: riversmooth.summary[indr,6]=river.length[r] riversmooth_summary[indr, 5] = river_length[r] # find the top and bottom elevations of the current river segment # R: top=dem2[river.summary[indr,2], river.summary[indr,3]] top = dem2[ int(river_summary[indr, 1]), int(river_summary[indr, 2]), ] # R: if(rdown<=0){ bottom=... length=length-1 } else{ bdir=... bottom=... } if rdown <= 0: # R: bottom=dem2[river.summary[indr,4], river.summary[indr,5]] bottom = dem2[ int(river_summary[indr, 3]), int(river_summary[indr, 4]), ] seg_length = seg_length - 1 else: # R: bdir=dir2[river.summary[indr,4], river.summary[indr,5]] bdir = int( dir2[ int(river_summary[indr, 3]), int(river_summary[indr, 4]), ] ) # R: bottom=dem2[(river.summary[indr,4]+kd[bdir,1]), (river.summary[indr,5]+kd[bdir,2])] # R uses 1-based dir (1..4); kd rows are 1..4 in R, so kd[bdir,] in Python is kd[bdir-1,] bottom = dem2[ int(river_summary[indr, 3]) + int(kd[bdir - 1, 0]), int(river_summary[indr, 4]) + int(kd[bdir - 1, 1]), ] # R: topmin=bottom+river.epsilon*length topmin = bottom + river_epsilon * seg_length # R: if(top<topmin){ if top < topmin: # R: top0=dem[river.summary[indr,2], river.summary[indr,3]] top0 = dem[ int(river_summary[indr, 1]), int(river_summary[indr, 2]), ] if rdown > 0: bdir = int( dir2[ int(river_summary[indr, 3]), int(river_summary[indr, 4]), ] ) bottom0 = dem[ int(river_summary[indr, 3]) + int(kd[bdir - 1, 0]), int(river_summary[indr, 4]) + int(kd[bdir - 1, 1]), ] else: bottom0 = dem[ int(river_summary[indr, 3]), int(river_summary[indr, 4]), ] # R: delta=max((top0-bottom0)/(length),river.epsilon) delta = max((top0 - bottom0) / seg_length, river_epsilon) # R: top=bottom+delta*length top = bottom + delta * seg_length # R: dem2[river.summary[indr,2], river.summary[indr,3]]=top dem2[ int(river_summary[indr, 1]), int(river_summary[indr, 2]), ] = top if printflag: print( f"River top elevation<river bottom elevation for segment {r}" ) print( f"Original top {round(top0, 2)} and original bottom {round(bottom0, 2)}" ) print( f"Adjusting the top elevation from {round(top0, 2)} to {round(top, 2)}" ) if printflag: print(f"River segment: {r}") print( f"Start: {river_summary[indr, 1]} {river_summary[indr, 2]} {round(top, 1)}" ) print( f"End: {river_summary[indr, 3]} {river_summary[indr, 4]} {round(bottom, 1)}" ) # walk from top to bottom smoothing out the river cells # R: indx=river.summary[indr,2] indy=river.summary[indr,3] indx = int(river_summary[indr, 1]) indy = int(river_summary[indr, 2]) # R: marked.matrix[indx,indy]=marked.matrix[indx,indy]+1 marked_matrix[indx, indy] = marked_matrix[indx, indy] + 1 # R: if(length>1){delta=(top-bottom)/(length)}else{delta=0} if seg_length > 1: delta = (top - bottom) / seg_length else: delta = 0.0 # R: if(delta<0){ print(...); delta=0 } if delta < 0: print( f"Warning: Calculated delta < 0, setting delta to 0 for segment {r}" ) delta = 0.0 # R: temp=top temp = top # R: riversmooth.summary[indr,7]=top [8]=bottom [9]=delta riversmooth_summary[indr, 6] = top riversmooth_summary[indr, 7] = bottom riversmooth_summary[indr, 8] = delta # R: if(length>1){ if seg_length > 1: # R: for(i in 2:length){ for i in range(1, seg_length): # R: temp=temp-delta temp = temp - delta # R: dirtemp=dir2[indx,indy] dirtemp = int(dir2[indx, indy]) # R: downindx=indx+kd[dirtemp,1] downindy=indy+kd[dirtemp,2] downindx = indx + int(kd[dirtemp - 1, 0]) downindy = indy + int(kd[dirtemp - 1, 1]) # R: if(river.segments[downindx,downindy]==r){ if river_segments[downindx, downindy] == r: # R: dem2[downindx,downindy]=temp dem2[downindx, downindy] = temp # R: marked.matrix[downindx,downindy]=... marked_matrix[downindx, downindy] = ( marked_matrix[downindx, downindy] + 1 ) # R: drainfix=FixDrainage(dem=dem2, direction=dir2, mask=hillmask, bank.epsilon=bank.epsilon, startpoint=c(downindx,downindy)) # R: dem2=drainfix$dem.adj drainfix = fix_drainage( dem=dem2, direction=dir2, mask=hillmask, bank_epsilon=bank_epsilon, startpoint=(downindx, downindy), d4=d4, ) dem2 = drainfix["dem.adj"] else: print(f"Warning: Check Segment for branches {r}") # R: indx=downindx indy=downindy indx = downindx indy = downindy # R: marked.segments[indr]= marked.segments[indr]+1 marked_segments[indr] = marked_segments[indr] + 1 # Find all of the river segments that drain to this segment # R: uplist=which(river.summary[,6]==r) uplist = np.where(river_summary[:, 5] == r)[0] # R: if(length(uplist>0)){queue=c(uplist,queue[-1])} else{queue=queue[-1]} # (R has typo length(uplist>0) should be length(uplist)>0) if uplist.size > 0: queue = np.concatenate([uplist, queue[1:]]) else: queue = queue[1:] # R: if(length(queue)==0){active=F} if queue.size == 0: active = False # Internal endpoint indices -> HydroFrame row/col for summary output summary_out = riversmooth_summary.copy() t = summary_out[:, 1].copy() summary_out[:, 1] = summary_out[:, 2] summary_out[:, 2] = t t = summary_out[:, 3].copy() summary_out[:, 3] = summary_out[:, 4] summary_out[:, 4] = t # Internal layout -> HydroFrame layout # R: output_list=list("dem.adj"=dem2, "processed"=marked.matrix, "summary"=riversmooth.summary) output_list = { "dem.adj": dem2.T, "processed": marked_matrix.T, "summary": summary_out, } return output_list