Runoff Trace

Introduction

I wrote the Runoff.py Python program as part of the Dumpster Project. The output of the program is a raster that shows the most direct downhill paths from given source points. This was used to visualize where the pollution from leaking dumpster goes. ArcMap did not seem to have this specific functionality readily available. I decided to use this as an opportunity to learn Python.

An interesting challenge was figuring out a way to convert the location of each source point from state plane coordinates to pixel coordinates. This was accomplished on lines 49 and 50 of the program. The pixel column coordinate is the quotient from dividing the distance the source point is from the left edge of the raster by the average width of a pixel. The pixel row coordinate is the same principle except it is the quotient after the distance from the top edge divided by average pixel height.

column = int((easting - left)//width)
row = int((top - northing)//height)

The Runoff.py program uses the value from a D8 flow raster at a given position, calculates the direction of flow travel, and then moves to the adjacent pixel in that direction. The program repeats this process until it goes off the edge of the flow raster or detects an infinite loop.

Output

The output of the program will resemble a network of lines such as the ones below. The hill-shade, legend and scale bar were added separately.

Runoff.py

import arcpy
import math
import numpy as np

project_db = "F:/dumpster/sm_dumpster.gdb"

flow_file = "flow_clipped"  # Name of D8 flow raster.
dumpster_file = "dumpsters_simplified"  # Name of source point feature class.
runoff_file = "dumpster_runoff_simp"  # Name of the runoff raster.


arcpy.env.overwriteOutput = True
arcpy.env.workspace = project_db
arcpy.env.outputCoordinateSystem = flow_file


flow_raster = arcpy.Raster(flow_file)
flow_array = arcpy.RasterToNumPyArray(flow_raster, nodata_to_value=0)


# Prevent accessing indicies that are out of bounds.

def check_bounds(row, col):
    """Verify if (row, col) are within the boundaries of flow_raster.
    """

    # https://www.naftaliharris.com/blog/python-multiline-if-statements/
    if (
        0 <= row <= flow_raster.height - 1
        and 0 <= col <= flow_raster.width - 1
    ):
        return True
    else:
        return False


class SourcePoint:
    @staticmethod
    def convert_row_col(easting, northing):
        """Convert state plane coordinates to corresponding row and column
        location for the pixel that the coordinates fall within.
        """

        height = flow_raster.meanCellHeight
        width = flow_raster.meanCellWidth
        left = flow_raster.extent.XMin
        top = flow_raster.extent.YMax

        column = int((easting - left)//width)
        row = int((top - northing)//height)
        return (row, column)

    def __init__(self, obj_id, state_coords):
        self.id = obj_id
        self.cell_value = 1
        self.x, self.y = state_coords
        self.row, self.col = SourcePoint.convert_row_col(self.x, self.y)


class DownstreamTrace:
    """Trace runoff from a given SourcePoint.
    """

    def __init__(self, source_point):
        self.runoff_array = np.full(
                [flow_raster.height, flow_raster.width],
                0, dtype=np.uint8)

        self.row = source_point.row
        self.col = source_point.col
        self.cell_value = source_point.cell_value

        while True:

            # Tracing of runoff ends if (row, col) is out of bounds. This
            # happens when runoff flows off the edge of the DEM.
            if check_bounds(self.row, self.col):

                # Check if a cell in runoff_array has flow. This is to prevent
                # an infinite flow loop. A cell with value greater than zero
                # has flow.
                if self.runoff_array[self.row][self.col] == 0:
                    self.runoff_array[self.row][self.col] = \
                            source_point.cell_value
                    self.flow_value = flow_array[self.row][self.col]

                    # There is flow when flow_value is greather than zero.
                    # A value of zero is used to indicate no data/no flow.
                    # Checking is necessary to prevent a mathematical error
                    # when taking the log of flow_value.
                    if self.flow_value > 0:
                        # The flow_directions tuple contains tuples
                        # representing (Δrow, Δcol). This is used for
                        # converting the D8 flow_value to the next location.
                        flow_directions = (
                                (0, 1),
                                (1, 1),
                                (1, 0),
                                (1, -1),
                                (0, -1),
                                (-1,-1),
                                (-1, 0),
                                (-1, 1)
                        )

                        self.flow_dir_index = int(math.log(self.flow_value, 2))
                        self.delta_row, self.delta_col = flow_directions[
                                self.flow_dir_index]

                        # Update row and col to the next cell. The process will
                        # repeat itself until flow stops, loops back on itself
                        # or the runoff continues off the edge of the DEM.
                        self.row = self.row + self.delta_row
                        self.col = self.col + self.delta_col

                else:
                    break

            else:
                break

    def __add__(self, other):
        self.runoff_array = self.runoff_array + other.runoff_array

    def save_raster(self, file):
        self.runoff_raster = arcpy.NumPyArrayToRaster(
                self.runoff_array, flow_raster.extent.lowerLeft,
                flow_raster.meanCellWidth, flow_raster.meanCellHeight,
                value_to_nodata=0)
        self.runoff_raster.save(file)


search_c = arcpy.da.SearchCursor(dumpster_file, ["OBJECTID", "total",
                                                 "SHAPE@XY"])

dumpster_list = []

for r in search_c:
    obj_id, total, coords = r
    dumpster_point = SourcePoint(obj_id, coords)
    dumpster_point.cell_value = total
    dumpster_list.append(dumpster_point)

dumpster_runoff = DownstreamTrace(dumpster_list.pop())

for x in dumpster_list:
    dumpster_runoff + DownstreamTrace(x)

dumpster_runoff.save_raster(runoff_file)