Source code for priority_flow.stream_distance

"""
Find the distance to the nearest stream point following drainage directions.

Line-by-line translation of Stream_Distance.R (StreamDist) from the R
PriorityFlow package. R uses 1-based indexing; we use 0-based.
"""

import numpy as np
from typing import Dict, Optional, Tuple


[docs] def stream_dist( direction: np.ndarray, streammask: np.ndarray, domainmask: Optional[np.ndarray] = None, d4: Tuple[int, int, int, int] = (1, 2, 3, 4), ) -> Dict[str, np.ndarray]: """ Find stream distance using flow-direction traversal upstream from streams. """ # HydroFrame layout -> internal R-style layout direction = direction.T.copy() streammask = streammask.T.copy() if domainmask is not None: domainmask = domainmask.T.copy() # R: nx=dim(direction)[1] ; ny=dim(direction)[2] nx = direction.shape[0] ny = direction.shape[1] # R: # ku=matrix(0, nrow=4, ncol=3) # ku[,1]=c(0,1,0,-1) # ku[,2]=c(1,0,-1,0) # ku[,3]=c(1,2,3,4) ku = np.zeros((4, 3), dtype=int) ku[:, 0] = [0, 1, 0, -1] ku[:, 1] = [1, 0, -1, 0] ku[:, 2] = [1, 2, 3, 4] # R: if(missing(domainmask)){domainmask=matrix(1, nrow=nx, ncol=ny)} if domainmask is None: domainmask = np.ones((nx, ny)) # R: renumber directions to 1,2,3,4 if custom d4 was passed dir2 = direction.copy() if d4[0] != 1: dir2[direction == d4[0]] = 1 if d4[1] != 2: dir2[direction == d4[1]] = 2 if d4[2] != 3: dir2[direction == d4[2]] = 3 if d4[3] != 4: dir2[direction == d4[3]] = 4 # R: queue=which(streammask==1, arr.ind=T) # R arr.ind gives 1-based matrix indices; convert to 0-based. queue = np.argwhere(streammask == 1) # R: distance=matrix(NA, nrow=nx, ncol=ny) distance = np.full((nx, ny), np.nan) # R: distance[which(streammask==1)]=0 distance[streammask == 1] = 0 # R: # streamy=matrix(rep(1:ny,nx), ncol=ny, byrow=T) # streamx=matrix(rep(1:nx,ny), ncol=ny, byrow=F) # Keep R's 1-based index values in outputs. streamy = np.tile(np.arange(1, ny + 1), (nx, 1)).astype(float) streamx = np.tile(np.arange(1, nx + 1), (ny, 1)).T.astype(float) streamx[streammask == 0] = np.nan streamy[streammask == 0] = np.nan # R: active=TRUE active = True # R: while(active==T){ ... } while active: # Safety for empty queue (R version assumes non-empty) if queue.shape[0] == 0: break # R: indx=queue[1,1] ; indy=queue[1,2] indx = int(queue[0, 0]) indy = int(queue[0, 1]) # R: queuetemp=NULL queuetemp = [] # R: for(d in 1:4){ ... } for d in range(4): # R: tempx=indx+ku[d,1] ; tempy=indy+ku[d,2] tempx = indx + ku[d, 0] tempy = indy + ku[d, 1] # R: if(tempx*tempy>0 & tempx<nx & tempy<ny){ # 1-based strict interior bounds => 0-based: tempx>0,tempy>0,tempx<nx-1,tempy<ny-1 if tempx > 0 and tempy > 0 and tempx < (nx - 1) and tempy < (ny - 1): # R: if((d-dir2[tempx,tempy])==0 & streammask==0 & domainmask==1) if ( ((d + 1) - dir2[tempx, tempy]) == 0 and streammask[tempx, tempy] == 0 and domainmask[tempx, tempy] == 1 ): # R: distance[tempx,tempy]=distance[indx,indy]+1 distance[tempx, tempy] = distance[indx, indy] + 1 streamx[tempx, tempy] = streamx[indx, indy] streamy[tempx, tempy] = streamy[indx, indy] # R: queuetemp=rbind(c(tempx,tempy),queuetemp) queuetemp.insert(0, [tempx, tempy]) # R: if(length(queuetemp>0)){ queue=rbind(queuetemp,queue[-1,]) } else ... if len(queuetemp) > 0: queuetemp_arr = np.array(queuetemp, dtype=int) if queue.shape[0] > 1: queue = np.vstack((queuetemp_arr, queue[1:, :])) else: queue = queuetemp_arr else: # R: if(nrow(queue)>1){ queue=queue[-1,] ... } else {active=F} if queue.shape[0] > 1: queue = queue[1:, :] # R: if(length(queue)==2){queue=matrix(queue, ncol=2, byrow=T)} if queue.ndim == 1: queue = queue.reshape(1, 2) else: active = False # Internal layout -> HydroFrame layout output_list = { "stream.dist": distance.T, "stream.xind": streamx.T, "stream.yind": streamy.T, } return output_list