Source code for priority_flow.drainage_area

"""
Calculated drainage area.

Line-by-line translation of drainage_area.R (drainageArea) from the R PriorityFlow package.
Calculates the number of cells draining to any cell given a flow direction file.
R uses 1-based indexing; we use 0-based.
"""

import numpy as np
from typing import Optional, Tuple


[docs] def drainage_area( direction: np.ndarray, mask: Optional[np.ndarray] = None, d4: Tuple[int, int, int, int] = (1, 2, 3, 4), printflag: bool = False, ) -> np.ndarray: """ Calculate drainage area (number of upstream cells) for each grid cell given a flow direction file. Parameters ---------- direction : np.ndarray 2D array of D4 flow directions for each cell. Direction codes follow the ``d4`` numbering scheme. mask : np.ndarray, optional 2D mask with ones for cells to be processed and zeros for everything else. Cells outside the mask are excluded from the drainage-area computation. If ``None``, a mask of all ones with the same shape as ``direction`` is used. d4 : tuple of int, optional Direction numbering system for the D4 neighbors, given as ``(down, left, up, right)``. Defaults to ``(1, 2, 3, 4)`` to match the original R implementation. printflag : bool, optional If ``True``, print basic progress information as the algorithm walks downstream through the queue of cells. Returns ------- np.ndarray 2D array of drainage areas with the same shape as ``direction``. Each entry gives the number of cells (including itself) that drain to that cell, multiplied by ``mask`` so cells outside the mask have value 0. """ # HydroFrame layout -> internal R-style layout direction = direction.T.copy() if mask is not None: mask = mask.T.copy() # R: nx=nrow(direction) ny=ncol(direction) nx = direction.shape[0] ny = direction.shape[1] # R: if(missing(mask)){mask=matrix(1, nrow=nx, ncol=ny)} if mask is None: mask = np.ones((nx, ny)) # Setup the border # R: border=matrix(1, nrow=nx, ncol=ny) border = np.ones((nx, ny)) # R: border[2:(nx-1), 2:(ny-1)]= mask[1:(nx-2), 2:(ny-1)] + mask[3:nx, 2:(ny-1)] + ... border[1 : (nx - 1), 1 : (ny - 1)] = ( mask[0 : (nx - 2), 1 : (ny - 1)] + mask[2:nx, 1 : (ny - 1)] + mask[1 : (nx - 1), 0 : (ny - 2)] + mask[1 : (nx - 1), 2:ny] ) # R: border=border*mask border[which(border<4 & border!=0)]=1 border[border==4]=0 border = border * mask border[(border < 4) & (border != 0)] = 1 border[border == 4] = 0 # initialize drainage area matrix # R: drainarea=matrix(1, nrow=nx, ncol=ny) drainarea = np.ones((nx, ny)) # D4 neighbors # R: kd=matrix(0, nrow=4, ncol=2) ordered down, left top right kd = np.zeros((4, 2)) kd[:, 0] = [0, -1, 0, 1] kd[:, 1] = [-1, 0, 1, 0] # make masks of which cells drain down, up, left right # R: down=up=left=right=matrix(0, nrow=nx, ncol=ny) down = np.zeros((nx, ny)) up = np.zeros((nx, ny)) left = np.zeros((nx, ny)) right = np.zeros((nx, ny)) # R: down[which(direction==d4[1])]=1 etc. down[direction == d4[0]] = 1 left[direction == d4[1]] = 1 up[direction == d4[2]] = 1 right[direction == d4[3]] = 1 # calculate the number of cells draining to any cell # R: draincount=matrix(0, nrow=nx, ncol=ny) draincount = np.zeros((nx, ny)) # R: draincount[,1:(ny-1)]=draincount[,1:(ny-1)]+down[,2:ny] draincount[:, 0 : (ny - 1)] = draincount[:, 0 : (ny - 1)] + down[:, 1:ny] # R: draincount[,2:ny]=draincount[,2:ny]+up[,1:(ny-1)] draincount[:, 1:ny] = draincount[:, 1:ny] + up[:, 0 : (ny - 1)] # R: draincount[1:(nx-1),]=draincount[1:(nx-1),]+left[2:nx,] draincount[0 : (nx - 1), :] = draincount[0 : (nx - 1), :] + left[1:nx, :] # R: draincount[2:nx, ]=draincount[2:nx,]+right[1:(nx-1),] draincount[1:nx, :] = draincount[1:nx, :] + right[0 : (nx - 1), :] # give values outside the mask and on the border a negative count so they aren't processed # R: draincount[which(mask==0)]=(-99) draincount[mask == 0] = -99 # initialize a queue with all the headwater cells (i.e. cells with zero upstream neighbors) # R: draintemp=draincount draintemp = draincount.copy() # R: queue=which(draintemp==0, arr.ind=T) -> (row, col) for each queue_rows, queue_cols = np.where(draintemp == 0) queue = np.column_stack((queue_rows, queue_cols)) # R: qlist=which(draintemp==0) (linear indices - not used later in R, we can skip or keep) # R: blist=cbind(which(draintemp>0), which(draintemp>0, arr.ind=T)) -> cols: linear_idx, row, col # In R, which(draintemp>0) is column-major linear index. We need same for consistent indexing. # blist is used as: draintemp[blist[,1]] in R -> index by linear; and queue=blist[ilist,2:3]. # So we need (linear_index, row, col). R linear index: column-major. For indexing we use # ilist=which(draintemp[blist[,1]]==0) - so we need to index draintemp by linear index. # In Python we can use (row,col) to index: draintemp[blist[:,1], blist[:,2]]. So blist cols: (dummy, row, col) or (row, col) only. But then queue=blist[ilist,2:3] means we need 3 cols. So blist has (linear, row, col). For draintemp[blist[ilist,1], blist[ilist,2]] we get values. So we don't need linear in the check - we need row,col. So blist = (row, col) for draintemp>0, 2 cols. Then ilist = where(draintemp[blist[:,0], blist[:,1]]==0). queue = blist[ilist]. But R has 3 cols so that length(blist)/3 is nrow. So R blist is (linear, row, col). So queue = blist[ilist, 2:3]. So we need 3 columns. Build: rows, cols = where(draintemp>0); linear = rows + cols*nx for column-major? No - R column-major: (i,j) -> (j-1)*nx + i in 1-based, so 0-based (i,j) -> j*nx + i. So linear = rows + cols * nx. Then blist = column_stack((linear, rows, cols)). linear_idx = np.where(draintemp.ravel(order="F") > 0)[0] rows_b = linear_idx % nx cols_b = linear_idx // nx blist = np.column_stack((linear_idx, rows_b, cols_b)) # R: nqueue=nrow(queue) nqueue = queue.shape[0] ii = 1 # R: while(nqueue>0){ while nqueue > 0: if printflag: print("lap", ii, "ncell", nqueue) # loop through the queue # R: for(i in 1:nqueue){ for i in range(nqueue): # look downstream add 1 to the area and subtract 1 from the drainage # # R: xtemp=queue[i,1] ytemp=queue[i,2] xtemp = int(queue[i, 0]) ytemp = int(queue[i, 1]) # if it has a flow direction # R: if(is.na(direction[xtemp,ytemp])==F){ if not np.isnan(direction[xtemp, ytemp]): # R: dirtemp=which(d4==direction[xtemp,ytemp]) dirtemp = int(np.where(np.array(d4) == direction[xtemp, ytemp])[0][0]) # R: xds=xtemp+kd[dirtemp,1] yds=ytemp+kd[dirtemp,2] xds = int(xtemp + kd[dirtemp, 0]) yds = int(ytemp + kd[dirtemp, 1]) # add one to the area of the downstream cell as long as that cell is in the domain # R: if(xds<=nx & xds>=1 & yds<=ny & yds>=1){ if 0 <= xds < nx and 0 <= yds < ny: # R: drainarea[xds, yds]=drainarea[xds, yds]+drainarea[xtemp,ytemp] drainarea[xds, yds] = ( drainarea[xds, yds] + drainarea[xtemp, ytemp] ) # subtract one from the number of upstream cells from the downstream cell # R: draintemp[xds,yds]= draintemp[xds,yds] - 1 draintemp[xds, yds] = draintemp[xds, yds] - 1 # R: } #end if in the domain extent # R: } #end if not na # set the drain temp to -99 for current cell to indicate its been done # R: draintemp[xtemp,ytemp]=-99 draintemp[xtemp, ytemp] = -99 # R: } #end for i in 1:nqueue # make a new queue with the cells with zero upstream drains left # R: ilist=which(draintemp[blist[,1]]==0) # In R blist[,1] is column-major linear index. So draintemp[blist[,1]] are values at those cells. # In Python: values at (blist[:,1], blist[:,2]) if blist.shape[0] > 0: vals = draintemp[blist[:, 1].astype(int), blist[:, 2].astype(int)] ilist = np.where(vals == 0)[0] else: ilist = np.array([], dtype=int) # R: queue=blist[ilist,2:3] if ilist.size > 0: queue = blist[ilist, 1:3].copy() else: queue = np.empty((0, 2)) # R: if(length(ilist)!=length(blist)/3){ blist=blist[-ilist,] } else{ blist=NULL } if ilist.size != blist.shape[0]: blist = np.delete(blist, ilist, axis=0) else: blist = np.empty((0, 3)) if printflag: print("blist empty") # R: nqueue=length(queue)/2 nqueue = queue.shape[0] # R: if(nqueue==1){queue=matrix(queue, ncol=2, nrow=1)} if nqueue == 1: queue = queue.reshape(1, 2) # R: if(length(blist)/3==1){blist=matrix(blist, ncol=3, nrow=1)} if blist.shape[0] == 1: blist = blist.reshape(1, 3) ii = ii + 1 # R: } # R: drainarea=drainarea*mask return(drainarea) drainarea = drainarea * mask # Internal layout -> HydroFrame layout return drainarea.T