Introduction
I wrote the Runoff.py Python program as part of the Dumpster Project. The output of the program is a raster that shows the most direct downhill paths from given source points. This was used to visualize where the pollution from leaking dumpster goes. ArcMap did not seem to have this specific functionality readily available. I decided to use this as an opportunity to learn Python.
An interesting challenge was figuring out a way to convert the location of each source point from state plane coordinates to pixel coordinates. This was accomplished on lines 49 and 50 of the program. The pixel column coordinate is the quotient from dividing the distance the source point is from the left edge of the raster by the average width of a pixel. The pixel row coordinate is the same principle except it is the quotient after the distance from the top edge divided by average pixel height.
column = int((easting - left)//width)
row = int((top - northing)//height)
column = int((easting - left)//width)
row = int((top - northing)//height)
column = int((easting - left)//width)
row = int((top - northing)//height)
The Runoff.py program uses the value from a D8 flow raster at a given position, calculates the direction of flow travel, and then moves to the adjacent pixel in that direction. The program repeats this process until it goes off the edge of the flow raster or detects an infinite loop.
Output
The output of the program will resemble a network of lines such as the ones below. The hill-shade, legend and scale bar were added separately.
Runoff.py
project_db = "F:/dumpster/sm_dumpster.gdb"
flow_file = "flow_clipped" # Name of D8 flow raster.
dumpster_file = "dumpsters_simplified" # Name of source point feature class.
runoff_file = "dumpster_runoff_simp" # Name of the runoff raster.
arcpy.env.overwriteOutput = True
arcpy.env.workspace = project_db
arcpy.env.outputCoordinateSystem = flow_file
flow_raster = arcpy.Raster(flow_file)
flow_array = arcpy.RasterToNumPyArray(flow_raster, nodata_to_value=0)
# Prevent accessing indicies that are out of bounds.
def check_bounds(row, col):
"""Verify if (row, col) are within the boundaries of flow_raster.
# https://www.naftaliharris.com/blog/python-multiline-if-statements/
0 <= row <= flow_raster.height - 1
and 0 <= col <= flow_raster.width - 1
def convert_row_col(easting, northing):
"""Convert state plane coordinates to corresponding row and column
location for the pixel that the coordinates fall within.
height = flow_raster.meanCellHeight
width = flow_raster.meanCellWidth
left = flow_raster.extent.XMin
top = flow_raster.extent.YMax
column = int((easting - left)//width)
row = int((top - northing)//height)
def __init__(self, obj_id, state_coords):
self.x, self.y = state_coords
self.row, self.col = SourcePoint.convert_row_col(self.x, self.y)
"""Trace runoff from a given SourcePoint.
def __init__(self, source_point):
self.runoff_array = np.full(
[flow_raster.height, flow_raster.width],
self.row = source_point.row
self.col = source_point.col
self.cell_value = source_point.cell_value
# Tracing of runoff ends if (row, col) is out of bounds. This
# happens when runoff flows off the edge of the DEM.
if check_bounds(self.row, self.col):
# Check if a cell in runoff_array has flow. This is to prevent
# an infinite flow loop. A cell with value greater than zero
if self.runoff_array[self.row][self.col] == 0:
self.runoff_array[self.row][self.col] = \
self.flow_value = flow_array[self.row][self.col]
# There is flow when flow_value is greather than zero.
# A value of zero is used to indicate no data/no flow.
# Checking is necessary to prevent a mathematical error
# when taking the log of flow_value.
# The flow_directions tuple contains tuples
# representing (Δrow, Δcol). This is used for
# converting the D8 flow_value to the next location.
self.flow_dir_index = int(math.log(self.flow_value, 2))
self.delta_row, self.delta_col = flow_directions[
# Update row and col to the next cell. The process will
# repeat itself until flow stops, loops back on itself
# or the runoff continues off the edge of the DEM.
self.row = self.row + self.delta_row
self.col = self.col + self.delta_col
def __add__(self, other):
self.runoff_array = self.runoff_array + other.runoff_array
def save_raster(self, file):
self.runoff_raster = arcpy.NumPyArrayToRaster(
self.runoff_array, flow_raster.extent.lowerLeft,
flow_raster.meanCellWidth, flow_raster.meanCellHeight,
self.runoff_raster.save(file)
search_c = arcpy.da.SearchCursor(dumpster_file, ["OBJECTID", "total",
obj_id, total, coords = r
dumpster_point = SourcePoint(obj_id, coords)
dumpster_point.cell_value = total
dumpster_list.append(dumpster_point)
dumpster_runoff = DownstreamTrace(dumpster_list.pop())
dumpster_runoff + DownstreamTrace(x)
dumpster_runoff.save_raster(runoff_file)
import arcpy
import math
import numpy as np
project_db = "F:/dumpster/sm_dumpster.gdb"
flow_file = "flow_clipped" # Name of D8 flow raster.
dumpster_file = "dumpsters_simplified" # Name of source point feature class.
runoff_file = "dumpster_runoff_simp" # Name of the runoff raster.
arcpy.env.overwriteOutput = True
arcpy.env.workspace = project_db
arcpy.env.outputCoordinateSystem = flow_file
flow_raster = arcpy.Raster(flow_file)
flow_array = arcpy.RasterToNumPyArray(flow_raster, nodata_to_value=0)
# Prevent accessing indicies that are out of bounds.
def check_bounds(row, col):
"""Verify if (row, col) are within the boundaries of flow_raster.
"""
# https://www.naftaliharris.com/blog/python-multiline-if-statements/
if (
0 <= row <= flow_raster.height - 1
and 0 <= col <= flow_raster.width - 1
):
return True
else:
return False
class SourcePoint:
@staticmethod
def convert_row_col(easting, northing):
"""Convert state plane coordinates to corresponding row and column
location for the pixel that the coordinates fall within.
"""
height = flow_raster.meanCellHeight
width = flow_raster.meanCellWidth
left = flow_raster.extent.XMin
top = flow_raster.extent.YMax
column = int((easting - left)//width)
row = int((top - northing)//height)
return (row, column)
def __init__(self, obj_id, state_coords):
self.id = obj_id
self.cell_value = 1
self.x, self.y = state_coords
self.row, self.col = SourcePoint.convert_row_col(self.x, self.y)
class DownstreamTrace:
"""Trace runoff from a given SourcePoint.
"""
def __init__(self, source_point):
self.runoff_array = np.full(
[flow_raster.height, flow_raster.width],
0, dtype=np.uint8)
self.row = source_point.row
self.col = source_point.col
self.cell_value = source_point.cell_value
while True:
# Tracing of runoff ends if (row, col) is out of bounds. This
# happens when runoff flows off the edge of the DEM.
if check_bounds(self.row, self.col):
# Check if a cell in runoff_array has flow. This is to prevent
# an infinite flow loop. A cell with value greater than zero
# has flow.
if self.runoff_array[self.row][self.col] == 0:
self.runoff_array[self.row][self.col] = \
source_point.cell_value
self.flow_value = flow_array[self.row][self.col]
# There is flow when flow_value is greather than zero.
# A value of zero is used to indicate no data/no flow.
# Checking is necessary to prevent a mathematical error
# when taking the log of flow_value.
if self.flow_value > 0:
# The flow_directions tuple contains tuples
# representing (Δrow, Δcol). This is used for
# converting the D8 flow_value to the next location.
flow_directions = (
(0, 1),
(1, 1),
(1, 0),
(1, -1),
(0, -1),
(-1,-1),
(-1, 0),
(-1, 1)
)
self.flow_dir_index = int(math.log(self.flow_value, 2))
self.delta_row, self.delta_col = flow_directions[
self.flow_dir_index]
# Update row and col to the next cell. The process will
# repeat itself until flow stops, loops back on itself
# or the runoff continues off the edge of the DEM.
self.row = self.row + self.delta_row
self.col = self.col + self.delta_col
else:
break
else:
break
def __add__(self, other):
self.runoff_array = self.runoff_array + other.runoff_array
def save_raster(self, file):
self.runoff_raster = arcpy.NumPyArrayToRaster(
self.runoff_array, flow_raster.extent.lowerLeft,
flow_raster.meanCellWidth, flow_raster.meanCellHeight,
value_to_nodata=0)
self.runoff_raster.save(file)
search_c = arcpy.da.SearchCursor(dumpster_file, ["OBJECTID", "total",
"SHAPE@XY"])
dumpster_list = []
for r in search_c:
obj_id, total, coords = r
dumpster_point = SourcePoint(obj_id, coords)
dumpster_point.cell_value = total
dumpster_list.append(dumpster_point)
dumpster_runoff = DownstreamTrace(dumpster_list.pop())
for x in dumpster_list:
dumpster_runoff + DownstreamTrace(x)
dumpster_runoff.save_raster(runoff_file)
import arcpy
import math
import numpy as np
project_db = "F:/dumpster/sm_dumpster.gdb"
flow_file = "flow_clipped" # Name of D8 flow raster.
dumpster_file = "dumpsters_simplified" # Name of source point feature class.
runoff_file = "dumpster_runoff_simp" # Name of the runoff raster.
arcpy.env.overwriteOutput = True
arcpy.env.workspace = project_db
arcpy.env.outputCoordinateSystem = flow_file
flow_raster = arcpy.Raster(flow_file)
flow_array = arcpy.RasterToNumPyArray(flow_raster, nodata_to_value=0)
# Prevent accessing indicies that are out of bounds.
def check_bounds(row, col):
"""Verify if (row, col) are within the boundaries of flow_raster.
"""
# https://www.naftaliharris.com/blog/python-multiline-if-statements/
if (
0 <= row <= flow_raster.height - 1
and 0 <= col <= flow_raster.width - 1
):
return True
else:
return False
class SourcePoint:
@staticmethod
def convert_row_col(easting, northing):
"""Convert state plane coordinates to corresponding row and column
location for the pixel that the coordinates fall within.
"""
height = flow_raster.meanCellHeight
width = flow_raster.meanCellWidth
left = flow_raster.extent.XMin
top = flow_raster.extent.YMax
column = int((easting - left)//width)
row = int((top - northing)//height)
return (row, column)
def __init__(self, obj_id, state_coords):
self.id = obj_id
self.cell_value = 1
self.x, self.y = state_coords
self.row, self.col = SourcePoint.convert_row_col(self.x, self.y)
class DownstreamTrace:
"""Trace runoff from a given SourcePoint.
"""
def __init__(self, source_point):
self.runoff_array = np.full(
[flow_raster.height, flow_raster.width],
0, dtype=np.uint8)
self.row = source_point.row
self.col = source_point.col
self.cell_value = source_point.cell_value
while True:
# Tracing of runoff ends if (row, col) is out of bounds. This
# happens when runoff flows off the edge of the DEM.
if check_bounds(self.row, self.col):
# Check if a cell in runoff_array has flow. This is to prevent
# an infinite flow loop. A cell with value greater than zero
# has flow.
if self.runoff_array[self.row][self.col] == 0:
self.runoff_array[self.row][self.col] = \
source_point.cell_value
self.flow_value = flow_array[self.row][self.col]
# There is flow when flow_value is greather than zero.
# A value of zero is used to indicate no data/no flow.
# Checking is necessary to prevent a mathematical error
# when taking the log of flow_value.
if self.flow_value > 0:
# The flow_directions tuple contains tuples
# representing (Δrow, Δcol). This is used for
# converting the D8 flow_value to the next location.
flow_directions = (
(0, 1),
(1, 1),
(1, 0),
(1, -1),
(0, -1),
(-1,-1),
(-1, 0),
(-1, 1)
)
self.flow_dir_index = int(math.log(self.flow_value, 2))
self.delta_row, self.delta_col = flow_directions[
self.flow_dir_index]
# Update row and col to the next cell. The process will
# repeat itself until flow stops, loops back on itself
# or the runoff continues off the edge of the DEM.
self.row = self.row + self.delta_row
self.col = self.col + self.delta_col
else:
break
else:
break
def __add__(self, other):
self.runoff_array = self.runoff_array + other.runoff_array
def save_raster(self, file):
self.runoff_raster = arcpy.NumPyArrayToRaster(
self.runoff_array, flow_raster.extent.lowerLeft,
flow_raster.meanCellWidth, flow_raster.meanCellHeight,
value_to_nodata=0)
self.runoff_raster.save(file)
search_c = arcpy.da.SearchCursor(dumpster_file, ["OBJECTID", "total",
"SHAPE@XY"])
dumpster_list = []
for r in search_c:
obj_id, total, coords = r
dumpster_point = SourcePoint(obj_id, coords)
dumpster_point.cell_value = total
dumpster_list.append(dumpster_point)
dumpster_runoff = DownstreamTrace(dumpster_list.pop())
for x in dumpster_list:
dumpster_runoff + DownstreamTrace(x)
dumpster_runoff.save_raster(runoff_file)