Runoff Trace

Introduction

I wrote the Runoff.py Python program as part of the Dumpster Project. The output of the program is a raster that shows the most direct downhill paths from given source points. This was used to visualize where the pollution from leaking dumpster goes. ArcMap did not seem to have this specific functionality readily available. I decided to use this as an opportunity to learn Python.

An interesting challenge was figuring out a way to convert the location of each source point from state plane coordinates to pixel coordinates. This was accomplished on lines 49 and 50 of the program. The pixel column coordinate is the quotient from dividing the distance the source point is from the left edge of the raster by the average width of a pixel. The pixel row coordinate is the same principle except it is the quotient after the distance from the top edge divided by average pixel height.

column = int((easting - left)//width)
row = int((top - northing)//height)
column = int((easting - left)//width) row = int((top - northing)//height)
column = int((easting - left)//width)
row = int((top - northing)//height)

The Runoff.py program uses the value from a D8 flow raster at a given position, calculates the direction of flow travel, and then moves to the adjacent pixel in that direction. The program repeats this process until it goes off the edge of the flow raster or detects an infinite loop.

Output

The output of the program will resemble a network of lines such as the ones below. The hill-shade, legend and scale bar were added separately.

Runoff.py

import arcpy
import math
import numpy as np
project_db = "F:/dumpster/sm_dumpster.gdb"
flow_file = "flow_clipped" # Name of D8 flow raster.
dumpster_file = "dumpsters_simplified" # Name of source point feature class.
runoff_file = "dumpster_runoff_simp" # Name of the runoff raster.
arcpy.env.overwriteOutput = True
arcpy.env.workspace = project_db
arcpy.env.outputCoordinateSystem = flow_file
flow_raster = arcpy.Raster(flow_file)
flow_array = arcpy.RasterToNumPyArray(flow_raster, nodata_to_value=0)
# Prevent accessing indicies that are out of bounds.
def check_bounds(row, col):
"""Verify if (row, col) are within the boundaries of flow_raster.
"""
# https://www.naftaliharris.com/blog/python-multiline-if-statements/
if (
0 <= row <= flow_raster.height - 1
and 0 <= col <= flow_raster.width - 1
):
return True
else:
return False
class SourcePoint:
@staticmethod
def convert_row_col(easting, northing):
"""Convert state plane coordinates to corresponding row and column
location for the pixel that the coordinates fall within.
"""
height = flow_raster.meanCellHeight
width = flow_raster.meanCellWidth
left = flow_raster.extent.XMin
top = flow_raster.extent.YMax
column = int((easting - left)//width)
row = int((top - northing)//height)
return (row, column)
def __init__(self, obj_id, state_coords):
self.id = obj_id
self.cell_value = 1
self.x, self.y = state_coords
self.row, self.col = SourcePoint.convert_row_col(self.x, self.y)
class DownstreamTrace:
"""Trace runoff from a given SourcePoint.
"""
def __init__(self, source_point):
self.runoff_array = np.full(
[flow_raster.height, flow_raster.width],
0, dtype=np.uint8)
self.row = source_point.row
self.col = source_point.col
self.cell_value = source_point.cell_value
while True:
# Tracing of runoff ends if (row, col) is out of bounds. This
# happens when runoff flows off the edge of the DEM.
if check_bounds(self.row, self.col):
# Check if a cell in runoff_array has flow. This is to prevent
# an infinite flow loop. A cell with value greater than zero
# has flow.
if self.runoff_array[self.row][self.col] == 0:
self.runoff_array[self.row][self.col] = \
source_point.cell_value
self.flow_value = flow_array[self.row][self.col]
# There is flow when flow_value is greather than zero.
# A value of zero is used to indicate no data/no flow.
# Checking is necessary to prevent a mathematical error
# when taking the log of flow_value.
if self.flow_value > 0:
# The flow_directions tuple contains tuples
# representing (Δrow, Δcol). This is used for
# converting the D8 flow_value to the next location.
flow_directions = (
(0, 1),
(1, 1),
(1, 0),
(1, -1),
(0, -1),
(-1,-1),
(-1, 0),
(-1, 1)
)
self.flow_dir_index = int(math.log(self.flow_value, 2))
self.delta_row, self.delta_col = flow_directions[
self.flow_dir_index]
# Update row and col to the next cell. The process will
# repeat itself until flow stops, loops back on itself
# or the runoff continues off the edge of the DEM.
self.row = self.row + self.delta_row
self.col = self.col + self.delta_col
else:
break
else:
break
def __add__(self, other):
self.runoff_array = self.runoff_array + other.runoff_array
def save_raster(self, file):
self.runoff_raster = arcpy.NumPyArrayToRaster(
self.runoff_array, flow_raster.extent.lowerLeft,
flow_raster.meanCellWidth, flow_raster.meanCellHeight,
value_to_nodata=0)
self.runoff_raster.save(file)
search_c = arcpy.da.SearchCursor(dumpster_file, ["OBJECTID", "total",
"SHAPE@XY"])
dumpster_list = []
for r in search_c:
obj_id, total, coords = r
dumpster_point = SourcePoint(obj_id, coords)
dumpster_point.cell_value = total
dumpster_list.append(dumpster_point)
dumpster_runoff = DownstreamTrace(dumpster_list.pop())
for x in dumpster_list:
dumpster_runoff + DownstreamTrace(x)
dumpster_runoff.save_raster(runoff_file)
import arcpy import math import numpy as np project_db = "F:/dumpster/sm_dumpster.gdb" flow_file = "flow_clipped" # Name of D8 flow raster. dumpster_file = "dumpsters_simplified" # Name of source point feature class. runoff_file = "dumpster_runoff_simp" # Name of the runoff raster. arcpy.env.overwriteOutput = True arcpy.env.workspace = project_db arcpy.env.outputCoordinateSystem = flow_file flow_raster = arcpy.Raster(flow_file) flow_array = arcpy.RasterToNumPyArray(flow_raster, nodata_to_value=0) # Prevent accessing indicies that are out of bounds. def check_bounds(row, col): """Verify if (row, col) are within the boundaries of flow_raster. """ # https://www.naftaliharris.com/blog/python-multiline-if-statements/ if ( 0 <= row <= flow_raster.height - 1 and 0 <= col <= flow_raster.width - 1 ): return True else: return False class SourcePoint: @staticmethod def convert_row_col(easting, northing): """Convert state plane coordinates to corresponding row and column location for the pixel that the coordinates fall within. """ height = flow_raster.meanCellHeight width = flow_raster.meanCellWidth left = flow_raster.extent.XMin top = flow_raster.extent.YMax column = int((easting - left)//width) row = int((top - northing)//height) return (row, column) def __init__(self, obj_id, state_coords): self.id = obj_id self.cell_value = 1 self.x, self.y = state_coords self.row, self.col = SourcePoint.convert_row_col(self.x, self.y) class DownstreamTrace: """Trace runoff from a given SourcePoint. """ def __init__(self, source_point): self.runoff_array = np.full( [flow_raster.height, flow_raster.width], 0, dtype=np.uint8) self.row = source_point.row self.col = source_point.col self.cell_value = source_point.cell_value while True: # Tracing of runoff ends if (row, col) is out of bounds. This # happens when runoff flows off the edge of the DEM. if check_bounds(self.row, self.col): # Check if a cell in runoff_array has flow. This is to prevent # an infinite flow loop. A cell with value greater than zero # has flow. if self.runoff_array[self.row][self.col] == 0: self.runoff_array[self.row][self.col] = \ source_point.cell_value self.flow_value = flow_array[self.row][self.col] # There is flow when flow_value is greather than zero. # A value of zero is used to indicate no data/no flow. # Checking is necessary to prevent a mathematical error # when taking the log of flow_value. if self.flow_value > 0: # The flow_directions tuple contains tuples # representing (Δrow, Δcol). This is used for # converting the D8 flow_value to the next location. flow_directions = ( (0, 1), (1, 1), (1, 0), (1, -1), (0, -1), (-1,-1), (-1, 0), (-1, 1) ) self.flow_dir_index = int(math.log(self.flow_value, 2)) self.delta_row, self.delta_col = flow_directions[ self.flow_dir_index] # Update row and col to the next cell. The process will # repeat itself until flow stops, loops back on itself # or the runoff continues off the edge of the DEM. self.row = self.row + self.delta_row self.col = self.col + self.delta_col else: break else: break def __add__(self, other): self.runoff_array = self.runoff_array + other.runoff_array def save_raster(self, file): self.runoff_raster = arcpy.NumPyArrayToRaster( self.runoff_array, flow_raster.extent.lowerLeft, flow_raster.meanCellWidth, flow_raster.meanCellHeight, value_to_nodata=0) self.runoff_raster.save(file) search_c = arcpy.da.SearchCursor(dumpster_file, ["OBJECTID", "total", "SHAPE@XY"]) dumpster_list = [] for r in search_c: obj_id, total, coords = r dumpster_point = SourcePoint(obj_id, coords) dumpster_point.cell_value = total dumpster_list.append(dumpster_point) dumpster_runoff = DownstreamTrace(dumpster_list.pop()) for x in dumpster_list: dumpster_runoff + DownstreamTrace(x) dumpster_runoff.save_raster(runoff_file)
import arcpy
import math
import numpy as np

project_db = "F:/dumpster/sm_dumpster.gdb"

flow_file = "flow_clipped"  # Name of D8 flow raster.
dumpster_file = "dumpsters_simplified"  # Name of source point feature class.
runoff_file = "dumpster_runoff_simp"  # Name of the runoff raster.


arcpy.env.overwriteOutput = True
arcpy.env.workspace = project_db
arcpy.env.outputCoordinateSystem = flow_file


flow_raster = arcpy.Raster(flow_file)
flow_array = arcpy.RasterToNumPyArray(flow_raster, nodata_to_value=0)


# Prevent accessing indicies that are out of bounds.

def check_bounds(row, col):
    """Verify if (row, col) are within the boundaries of flow_raster.
    """

    # https://www.naftaliharris.com/blog/python-multiline-if-statements/
    if (
        0 <= row <= flow_raster.height - 1
        and 0 <= col <= flow_raster.width - 1
    ):
        return True
    else:
        return False


class SourcePoint:
    @staticmethod
    def convert_row_col(easting, northing):
        """Convert state plane coordinates to corresponding row and column
        location for the pixel that the coordinates fall within.
        """

        height = flow_raster.meanCellHeight
        width = flow_raster.meanCellWidth
        left = flow_raster.extent.XMin
        top = flow_raster.extent.YMax

        column = int((easting - left)//width)
        row = int((top - northing)//height)
        return (row, column)

    def __init__(self, obj_id, state_coords):
        self.id = obj_id
        self.cell_value = 1
        self.x, self.y = state_coords
        self.row, self.col = SourcePoint.convert_row_col(self.x, self.y)


class DownstreamTrace:
    """Trace runoff from a given SourcePoint.
    """

    def __init__(self, source_point):
        self.runoff_array = np.full(
                [flow_raster.height, flow_raster.width],
                0, dtype=np.uint8)

        self.row = source_point.row
        self.col = source_point.col
        self.cell_value = source_point.cell_value

        while True:

            # Tracing of runoff ends if (row, col) is out of bounds. This
            # happens when runoff flows off the edge of the DEM.
            if check_bounds(self.row, self.col):

                # Check if a cell in runoff_array has flow. This is to prevent
                # an infinite flow loop. A cell with value greater than zero
                # has flow.
                if self.runoff_array[self.row][self.col] == 0:
                    self.runoff_array[self.row][self.col] = \
                            source_point.cell_value
                    self.flow_value = flow_array[self.row][self.col]

                    # There is flow when flow_value is greather than zero.
                    # A value of zero is used to indicate no data/no flow.
                    # Checking is necessary to prevent a mathematical error
                    # when taking the log of flow_value.
                    if self.flow_value > 0:
                        # The flow_directions tuple contains tuples
                        # representing (Δrow, Δcol). This is used for
                        # converting the D8 flow_value to the next location.
                        flow_directions = (
                                (0, 1),
                                (1, 1),
                                (1, 0),
                                (1, -1),
                                (0, -1),
                                (-1,-1),
                                (-1, 0),
                                (-1, 1)
                        )

                        self.flow_dir_index = int(math.log(self.flow_value, 2))
                        self.delta_row, self.delta_col = flow_directions[
                                self.flow_dir_index]

                        # Update row and col to the next cell. The process will
                        # repeat itself until flow stops, loops back on itself
                        # or the runoff continues off the edge of the DEM.
                        self.row = self.row + self.delta_row
                        self.col = self.col + self.delta_col

                else:
                    break

            else:
                break

    def __add__(self, other):
        self.runoff_array = self.runoff_array + other.runoff_array

    def save_raster(self, file):
        self.runoff_raster = arcpy.NumPyArrayToRaster(
                self.runoff_array, flow_raster.extent.lowerLeft,
                flow_raster.meanCellWidth, flow_raster.meanCellHeight,
                value_to_nodata=0)
        self.runoff_raster.save(file)


search_c = arcpy.da.SearchCursor(dumpster_file, ["OBJECTID", "total",
                                                 "SHAPE@XY"])

dumpster_list = []

for r in search_c:
    obj_id, total, coords = r
    dumpster_point = SourcePoint(obj_id, coords)
    dumpster_point.cell_value = total
    dumpster_list.append(dumpster_point)

dumpster_runoff = DownstreamTrace(dumpster_list.pop())

for x in dumpster_list:
    dumpster_runoff + DownstreamTrace(x)

dumpster_runoff.save_raster(runoff_file)