I wrote the Runoff.py Python program as part of the Dumpster Project. The output of the program is a raster that shows the most direct downhill paths from given source points. This was used to visualize where the pollution from leaking dumpster goes. ArcMap did not seem to have this specific functionality readily available. I decided to use this as an opportunity to learn Python.
An interesting challenge was figuring out a way to convert the location of each source point from state plane coordinates to pixel coordinates. This was accomplished on lines 49 and 50 of the program. The pixel column coordinate is the quotient from dividing the distance the source point is from the left edge of the raster by the average width of a pixel. The pixel row coordinate is the same principle except it is the quotient after the distance from the top edge divided by average pixel height.
column = int((easting - left)//width) row = int((top - northing)//height)
The Runoff.py program uses the value from a D8 flow raster at a given position, calculates the direction of flow travel, and then moves to the adjacent pixel in that direction. The program repeats this process until it goes off the edge of the flow raster or detects an infinite loop.
The output of the program will resemble a network of lines such as the ones below. The hill-shade, legend and scale bar were added separately.
import arcpy import math import numpy as np project_db = "F:/dumpster/sm_dumpster.gdb" flow_file = "flow_clipped" # Name of D8 flow raster. dumpster_file = "dumpsters_simplified" # Name of source point feature class. runoff_file = "dumpster_runoff_simp" # Name of the runoff raster. arcpy.env.overwriteOutput = True arcpy.env.workspace = project_db arcpy.env.outputCoordinateSystem = flow_file flow_raster = arcpy.Raster(flow_file) flow_array = arcpy.RasterToNumPyArray(flow_raster, nodata_to_value=0) # Prevent accessing indicies that are out of bounds. def check_bounds(row, col): """Verify if (row, col) are within the boundaries of flow_raster. """ # https://www.naftaliharris.com/blog/python-multiline-if-statements/ if ( 0 <= row <= flow_raster.height - 1 and 0 <= col <= flow_raster.width - 1 ): return True else: return False class SourcePoint: @staticmethod def convert_row_col(easting, northing): """Convert state plane coordinates to corresponding row and column location for the pixel that the coordinates fall within. """ height = flow_raster.meanCellHeight width = flow_raster.meanCellWidth left = flow_raster.extent.XMin top = flow_raster.extent.YMax column = int((easting - left)//width) row = int((top - northing)//height) return (row, column) def __init__(self, obj_id, state_coords): self.id = obj_id self.cell_value = 1 self.x, self.y = state_coords self.row, self.col = SourcePoint.convert_row_col(self.x, self.y) class DownstreamTrace: """Trace runoff from a given SourcePoint. """ def __init__(self, source_point): self.runoff_array = np.full( [flow_raster.height, flow_raster.width], 0, dtype=np.uint8) self.row = source_point.row self.col = source_point.col self.cell_value = source_point.cell_value while True: # Tracing of runoff ends if (row, col) is out of bounds. This # happens when runoff flows off the edge of the DEM. if check_bounds(self.row, self.col): # Check if a cell in runoff_array has flow. This is to prevent # an infinite flow loop. A cell with value greater than zero # has flow. if self.runoff_array[self.row][self.col] == 0: self.runoff_array[self.row][self.col] = \ source_point.cell_value self.flow_value = flow_array[self.row][self.col] # There is flow when flow_value is greather than zero. # A value of zero is used to indicate no data/no flow. # Checking is necessary to prevent a mathematical error # when taking the log of flow_value. if self.flow_value > 0: # The flow_directions tuple contains tuples # representing (Δrow, Δcol). This is used for # converting the D8 flow_value to the next location. flow_directions = ( (0, 1), (1, 1), (1, 0), (1, -1), (0, -1), (-1,-1), (-1, 0), (-1, 1) ) self.flow_dir_index = int(math.log(self.flow_value, 2)) self.delta_row, self.delta_col = flow_directions[ self.flow_dir_index] # Update row and col to the next cell. The process will # repeat itself until flow stops, loops back on itself # or the runoff continues off the edge of the DEM. self.row = self.row + self.delta_row self.col = self.col + self.delta_col else: break else: break def __add__(self, other): self.runoff_array = self.runoff_array + other.runoff_array def save_raster(self, file): self.runoff_raster = arcpy.NumPyArrayToRaster( self.runoff_array, flow_raster.extent.lowerLeft, flow_raster.meanCellWidth, flow_raster.meanCellHeight, value_to_nodata=0) self.runoff_raster.save(file) search_c = arcpy.da.SearchCursor(dumpster_file, ["OBJECTID", "total", "SHAPE@XY"]) dumpster_list =  for r in search_c: obj_id, total, coords = r dumpster_point = SourcePoint(obj_id, coords) dumpster_point.cell_value = total dumpster_list.append(dumpster_point) dumpster_runoff = DownstreamTrace(dumpster_list.pop()) for x in dumpster_list: dumpster_runoff + DownstreamTrace(x) dumpster_runoff.save_raster(runoff_file)