Source code for priority_flow.linear_distance

"""
Linear Distance functions for PriorityFlow.

This module provides functions to calculate the minimum linear distance between
every point in a 2D array and a mask of target points using a step-by-step
distance checking algorithm.
"""

import numpy as np
from typing import Optional


[docs] def lin_dist( target_points: np.ndarray, mask: Optional[np.ndarray] = None, cell_size: float = 1.0, max_dist: Optional[float] = None, printflag: bool = False ) -> np.ndarray: """ Calculate the minimum linear distance from a feature set. Calculates the minimum linear distance between every point in a 2D array and a mask of target points. Note that this function assumes dx=dy, i.e., square grid cells. If this is not the case, it will not work correctly. NOTE: This function has not yet been tested and is not fully implemented. Parameters ---------- target_points : np.ndarray Matrix with a value of 1 for all cells that you would like to calculate the distance to and 0 for all other cells mask : np.ndarray, optional Processing mask. Defaults to processing everything cell_size : float, optional The size of a grid cell. Note that this function assumes dx=dy, i.e., square grid cells. Defaults to 1.0 max_dist : float, optional Maximum distance to check. All cells with no distance found <= max_dist will be assigned max_dist. If None, checks out to the total size of the domain printflag : bool, optional Whether to print progress information. Defaults to False Returns ------- np.ndarray Matrix containing the minimum distance from each cell to the nearest target point. Cells outside the mask are assigned NaN. Notes ----- This function implements a step-by-step distance checking algorithm that: 1. Creates an ordered list of distance steps and rotations to check 2. Iteratively checks each distance incrementally 3. Uses array shifting to efficiently count target points at each distance 4. Assigns distances to cells as they are found 5. Continues until all cells have been assigned distances or max_dist is reached The algorithm handles: - Square grid cells (dx = dy) - Configurable maximum distance limits - Efficient array operations for distance checking - Mask-based domain processing - Progressive distance assignment The function assumes square grid cells and uses Euclidean distance calculations. For non-square grids, results may be incorrect. """ nx, ny = target_points.shape if mask is None: mask = np.ones((nx, ny)) # Default to processing everything # Incomplete matrix for cells still needing to be processed # Target points and points outside the mask are initialized to 0 incomplete = (1 - target_points) * mask n_missing = np.sum(incomplete) distance = np.zeros((nx, ny)) # Figure out the order of steps (horizontal and vertical) and rotations # (distance from horizontal and vertical) to check # If max_dist is unspecified, check out to the total size of the domain if max_dist is None: max_dist = np.sqrt((nx * cell_size)**2 + (ny * cell_size)**2) order_list = [] for s in range(1, max(nx, ny) + 1): for r in range(s + 1): dist = np.sqrt((r * cell_size)**2 + (s * cell_size)**2) if dist <= max_dist: order_list.append([s, r, dist]) # Convert to numpy array and sort by distance if order_list: order = np.array(order_list) sort_indices = np.argsort(order[:, 2]) order = order[sort_indices] else: order = np.empty((0, 3)) ndist = order.shape[0] # Number of unique distances to test if printflag: print("Orders created") while n_missing > 0: for d in range(ndist): s = int(order[d, 0]) r = int(order[d, 1]) d_temp = order[d, 2] if printflag: print(f"Step={s}, Rotation={r}, N_missing={n_missing}, distance={d_temp:.4f}") # Counting up the number of target points within a given step (s = later distance from center) # and rotation (r = distance off vertical or horizontal axes) temp_count = np.zeros((nx, ny)) # Check if s and r are within domain bounds if s < nx and r < ny: # Apply target point counting with array shifting temp_count[0:(nx-s), 0:(ny-r)] += target_points[s:nx, r:ny] temp_count[0:(nx-s), r:ny] += target_points[s:nx, 0:(ny-r)] temp_count[s:nx, 0:(ny-r)] += target_points[0:(nx-s), r:ny] temp_count[s:nx, r:ny] += target_points[0:(nx-s), 0:(ny-r)] # Check if r and s are within domain bounds (for rotation) if r < nx and s < ny: temp_count[0:(nx-r), 0:(ny-s)] += target_points[r:nx, s:ny] temp_count[0:(nx-r), s:ny] += target_points[r:nx, 0:(ny-s)] temp_count[r:nx, 0:(ny-s)] += target_points[0:(nx-r), s:ny] temp_count[r:nx, s:ny] += target_points[0:(nx-r), 0:(ny-s)] # Convert to binary (any target points found = 1) temp_count[temp_count > 0] = 1 # Record the distance for any cell with a temp_count=1 that is still incomplete distance += temp_count * incomplete * d_temp # Set all the cells that had a temp_count=1 to complete incomplete = incomplete - temp_count incomplete[incomplete < 0] = 0 # Count up the number of cells still needing a distance n_missing = np.sum(incomplete) # Stop if you are out of missing cells if n_missing == 0: break if printflag: print("Distance calculated") # Set cells outside the mask to NaN distance[mask == 0] = np.nan return distance