Topographic Analysis Algorithms¶
Overview¶
Topographic analysis in EEMT quantifies how landscape morphology controls water flow, energy distribution, and mass transfer processes. The framework implements multiple topographic indices including the Topographic Wetness Index (TWI), Mass Conservative Wetness Index (MCWI), and various flow routing algorithms.
Topographic Wetness Index (TWI)¶
Mathematical Definition¶
The Topographic Wetness Index quantifies the tendency of water to accumulate at any point in the landscape:
Where: - As = Specific contributing area [m²/m] (upslope area per unit contour width) - β = Local slope angle [radians] - tan(β) = Local slope gradient
Physical Interpretation¶
TWI values indicate moisture conditions:
| TWI Range | Interpretation | Typical Locations |
|---|---|---|
| < 4 | Very dry | Ridges, steep slopes |
| 4-6 | Dry | Upper hillslopes |
| 6-8 | Moderate | Mid-slopes |
| 8-10 | Moist | Lower slopes, flats |
| 10-12 | Wet | Convergent areas |
| > 12 | Very wet | Valley bottoms, streams |
Implementation¶
def calculate_twi(dem, flow_accumulation=None):
"""
Calculate Topographic Wetness Index
Parameters:
- dem: Digital elevation model
- flow_accumulation: Pre-calculated flow accumulation (optional)
Returns:
- twi: Topographic wetness index
"""
# Calculate slope in radians
slope_degrees = calculate_slope(dem)
slope_radians = np.deg2rad(slope_degrees)
# Avoid division by zero
slope_radians = np.maximum(slope_radians, 0.001)
# Calculate flow accumulation if not provided
if flow_accumulation is None:
flow_accumulation = calculate_flow_accumulation(dem)
# Get cell size
cell_size = get_cell_size(dem)
# Specific contributing area (m²/m)
# Flow accumulation × cell area / cell width
specific_area = flow_accumulation * cell_size
# TWI calculation
twi = np.log(specific_area / np.tan(slope_radians))
# Handle invalid values
twi[np.isinf(twi)] = np.nan
twi[twi < 0] = 0
return twi
Mass Conservative Wetness Index (MCWI)¶
Theoretical Foundation¶
The MCWI improves upon TWI by ensuring mass conservation across the landscape:
Where: - MCWIi = Mass conservative wetness index at location i - TWIi = Traditional wetness index at location i - P̄ = Mean precipitation over the domain - TWI = Mean TWI over the domain
This normalization ensures:
Enhanced MCWI with Precipitation¶
For spatially variable precipitation:
def calculate_mcwi(twi, precipitation=None):
"""
Calculate Mass Conservative Wetness Index
Parameters:
- twi: Topographic wetness index
- precipitation: Spatial precipitation field (optional)
Returns:
- mcwi: Mass conservative wetness index
"""
if precipitation is None:
# Assume uniform precipitation
precipitation = np.ones_like(twi)
# Calculate means
mean_twi = np.nanmean(twi)
mean_precip = np.nanmean(precipitation)
# Normalize TWI to conserve mass
mcwi = twi * (mean_precip / mean_twi)
# Apply precipitation weighting
mcwi = mcwi * (precipitation / mean_precip)
# Verify mass conservation
total_input = np.nansum(precipitation)
total_output = np.nansum(mcwi)
conservation_error = abs(total_input - total_output) / total_input
if conservation_error > 0.01: # 1% tolerance
print(f"Warning: Mass conservation error = {conservation_error:.2%}")
return mcwi
Lateral Redistribution¶
MCWI enables lateral water redistribution:
def redistribute_water_mcwi(precipitation, mcwi, convergence_factor=1.0):
"""
Redistribute water based on MCWI
Parameters:
- precipitation: Input precipitation field
- mcwi: Mass conservative wetness index
- convergence_factor: Strength of redistribution (0=none, 1=full)
Returns:
- redistributed: Water after lateral redistribution
"""
# Normalize MCWI to [0, 1]
mcwi_norm = (mcwi - np.nanmin(mcwi)) / (np.nanmax(mcwi) - np.nanmin(mcwi))
# Calculate redistribution weights
weights = convergence_factor * mcwi_norm + (1 - convergence_factor)
# Ensure mass conservation
weights = weights * (np.nansum(precipitation) / np.nansum(precipitation * weights))
# Apply redistribution
redistributed = precipitation * weights
return redistributed
Flow Accumulation Algorithms¶
D8 (Eight-Direction) Flow¶
The simplest flow routing method, directing all flow to the steepest downslope neighbor:
def d8_flow_direction(dem):
"""
D8 flow direction algorithm
Returns flow direction codes:
32 64 128
16 X 1
8 4 2
"""
# Define neighbor offsets
neighbors = [
(-1, 1, 32), (0, 1, 64), (1, 1, 128),
(-1, 0, 16), (1, 0, 1),
(-1, -1, 8), (0, -1, 4), (1, -1, 2)
]
flow_dir = np.zeros_like(dem, dtype=np.int32)
for i in range(dem.shape[0]):
for j in range(dem.shape[1]):
max_drop = 0
direction = 0
for di, dj, code in neighbors:
ni, nj = i + di, j + dj
if 0 <= ni < dem.shape[0] and 0 <= nj < dem.shape[1]:
# Calculate slope
distance = np.sqrt(di**2 + dj**2) * cell_size
drop = (dem[i, j] - dem[ni, nj]) / distance
if drop > max_drop:
max_drop = drop
direction = code
flow_dir[i, j] = direction
return flow_dir
D-Infinity (Continuous Flow Direction)¶
D-infinity allows flow dispersion across multiple neighbors:
def dinf_flow_direction(dem):
"""
D-infinity flow direction algorithm
Returns:
- flow_angle: Flow direction in radians (0-2π)
- slope: Maximum downslope gradient
"""
# Calculate slopes to 8 neighbors
e0 = dem[1:-1, 2:] - dem[1:-1, 1:-1] # E
e1 = dem[:-2, 2:] - dem[1:-1, 1:-1] # NE
e2 = dem[:-2, 1:-1] - dem[1:-1, 1:-1] # N
e3 = dem[:-2, :-2] - dem[1:-1, 1:-1] # NW
e4 = dem[1:-1, :-2] - dem[1:-1, 1:-1] # W
e5 = dem[2:, :-2] - dem[1:-1, 1:-1] # SW
e6 = dem[2:, 1:-1] - dem[1:-1, 1:-1] # S
e7 = dem[2:, 2:] - dem[1:-1, 1:-1] # SE
# Calculate flow angle for each triangular facet
flow_angles = []
slopes = []
for k in range(8):
# Get adjacent edges
e1_k = eval(f'e{k}')
e2_k = eval(f'e{(k+1)%8}')
# Calculate flow direction within facet
angle, slope = calculate_facet_flow(e1_k, e2_k, k)
flow_angles.append(angle)
slopes.append(slope)
# Select steepest facet
max_slope_idx = np.argmax(slopes, axis=0)
flow_angle = np.choose(max_slope_idx, flow_angles)
max_slope = np.max(slopes, axis=0)
return flow_angle, max_slope
Multiple Flow Direction (MFD)¶
MFD distributes flow to all downslope neighbors:
def mfd_flow_distribution(dem, exponent=1.1):
"""
Multiple Flow Direction algorithm
Parameters:
- dem: Digital elevation model
- exponent: Flow partition exponent (higher = more convergent)
Returns:
- flow_fractions: Dict of flow fractions to each neighbor
"""
flow_fractions = {}
for i in range(dem.shape[0]):
for j in range(dem.shape[1]):
# Calculate slope to all neighbors
slopes = []
valid_neighbors = []
for di in [-1, 0, 1]:
for dj in [-1, 0, 1]:
if di == 0 and dj == 0:
continue
ni, nj = i + di, j + dj
if 0 <= ni < dem.shape[0] and 0 <= nj < dem.shape[1]:
if dem[ni, nj] < dem[i, j]: # Downslope
distance = np.sqrt(di**2 + dj**2) * cell_size
slope = (dem[i, j] - dem[ni, nj]) / distance
slopes.append(slope ** exponent)
valid_neighbors.append((ni, nj))
# Normalize to sum to 1
if slopes:
total = sum(slopes)
fractions = [s / total for s in slopes]
flow_fractions[(i, j)] = dict(zip(valid_neighbors, fractions))
else:
flow_fractions[(i, j)] = {} # Pit or flat
return flow_fractions
Flow Accumulation Calculation¶
Recursive Algorithm¶
def calculate_flow_accumulation(flow_direction, weights=None):
"""
Calculate flow accumulation from flow direction
Parameters:
- flow_direction: Flow direction grid (D8 codes or MFD fractions)
- weights: Optional weight grid (e.g., precipitation)
Returns:
- accumulation: Flow accumulation grid
"""
if weights is None:
weights = np.ones_like(flow_direction, dtype=np.float32)
accumulation = weights.copy()
# Find outlets (cells with no upstream neighbors)
outlets = find_outlets(flow_direction)
# Process from outlets upstream
processed = np.zeros_like(flow_direction, dtype=bool)
def accumulate_recursive(i, j):
if processed[i, j]:
return accumulation[i, j]
# Find upstream cells
upstream = find_upstream_cells(i, j, flow_direction)
# Accumulate from upstream
for ui, uj in upstream:
if not processed[ui, uj]:
accumulate_recursive(ui, uj)
accumulation[i, j] += accumulation[ui, uj]
processed[i, j] = True
return accumulation[i, j]
# Process all cells
for i in range(flow_direction.shape[0]):
for j in range(flow_direction.shape[1]):
accumulate_recursive(i, j)
return accumulation
Slope and Aspect Calculation¶
Slope Algorithms¶
def calculate_slope(dem, method='horn'):
"""
Calculate slope from DEM
Methods:
- 'horn': Horn (1981) 3rd-order finite difference
- 'zevenbergen': Zevenbergen & Thorne (1987)
- 'average': Simple average method
"""
cell_size = get_cell_size(dem)
if method == 'horn':
# Horn's method (most accurate)
dz_dx = ((dem[:-2, 2:] + 2*dem[1:-1, 2:] + dem[2:, 2:]) -
(dem[:-2, :-2] + 2*dem[1:-1, :-2] + dem[2:, :-2])) / (8 * cell_size)
dz_dy = ((dem[2:, :-2] + 2*dem[2:, 1:-1] + dem[2:, 2:]) -
(dem[:-2, :-2] + 2*dem[:-2, 1:-1] + dem[:-2, 2:])) / (8 * cell_size)
elif method == 'zevenbergen':
# Zevenbergen & Thorne method
dz_dx = (dem[1:-1, 2:] - dem[1:-1, :-2]) / (2 * cell_size)
dz_dy = (dem[2:, 1:-1] - dem[:-2, 1:-1]) / (2 * cell_size)
else: # average
# Simple average
dz_dx = np.diff(dem, axis=1) / cell_size
dz_dy = np.diff(dem, axis=0) / cell_size
# Calculate slope magnitude
slope_radians = np.arctan(np.sqrt(dz_dx**2 + dz_dy**2))
slope_degrees = np.rad2deg(slope_radians)
return slope_degrees
Aspect Calculation¶
def calculate_aspect(dem):
"""
Calculate aspect (flow direction azimuth)
Returns:
- aspect: Degrees clockwise from north (0-360)
"""
# Calculate gradients
dz_dx = np.gradient(dem, axis=1)
dz_dy = np.gradient(dem, axis=0)
# Calculate aspect (mathematical convention: CCW from East)
aspect_math = np.arctan2(dz_dy, -dz_dx)
# Convert to geographic convention (CW from North)
aspect_geo = np.rad2deg(aspect_math)
aspect_geo = 90 - aspect_geo
# Normalize to 0-360
aspect_geo[aspect_geo < 0] += 360
# Flat areas have undefined aspect
flat_mask = (dz_dx == 0) & (dz_dy == 0)
aspect_geo[flat_mask] = -1 # Undefined
return aspect_geo
Curvature Analysis¶
Profile and Plan Curvature¶
def calculate_curvature(dem):
"""
Calculate terrain curvature
Returns:
- profile_curvature: Curvature in flow direction
- plan_curvature: Curvature perpendicular to flow
- mean_curvature: Average curvature
"""
cell_size = get_cell_size(dem)
# Second derivatives
d2z_dx2 = np.diff(dem, n=2, axis=1) / (cell_size**2)
d2z_dy2 = np.diff(dem, n=2, axis=0) / (cell_size**2)
# Cross derivative (using central differences)
dz_dx = np.gradient(dem, cell_size, axis=1)
dz_dy = np.gradient(dem, cell_size, axis=0)
d2z_dxdy = np.gradient(dz_dx, cell_size, axis=0)
# First derivatives squared
p = dz_dx**2
q = dz_dy**2
# Profile curvature (curvature in direction of maximum slope)
profile_curv = -(p * d2z_dx2 + 2 * dz_dx * dz_dy * d2z_dxdy + q * d2z_dy2) / \
((p + q) * np.sqrt(1 + p + q)**3)
# Plan curvature (curvature perpendicular to slope direction)
plan_curv = -(q * d2z_dx2 - 2 * dz_dx * dz_dy * d2z_dxdy + p * d2z_dy2) / \
((p + q)**(3/2))
# Mean curvature
mean_curv = -(d2z_dx2 + d2z_dy2) / 2
return profile_curv, plan_curv, mean_curv
Curvature Classification¶
def classify_landforms(profile_curv, plan_curv):
"""
Classify landforms based on curvature
Returns landform classes:
1. Peak/Ridge (convex-convex)
2. Ridge (convex-linear)
3. Shoulder (convex-concave)
4. Planar (linear-linear)
5. Pass (linear-concave)
6. Channel (concave-concave)
7. Footslope (concave-linear)
8. Hollow (concave-convex)
9. Valley/Pit (linear-convex)
"""
# Define thresholds
threshold = 0.1 # Curvature threshold for classification
# Initialize landform grid
landforms = np.zeros_like(profile_curv, dtype=np.int8)
# Classify based on curvature combinations
landforms[(profile_curv > threshold) & (plan_curv > threshold)] = 1 # Peak
landforms[(profile_curv > threshold) & (np.abs(plan_curv) <= threshold)] = 2 # Ridge
landforms[(profile_curv > threshold) & (plan_curv < -threshold)] = 3 # Shoulder
landforms[(np.abs(profile_curv) <= threshold) & (np.abs(plan_curv) <= threshold)] = 4 # Planar
landforms[(np.abs(profile_curv) <= threshold) & (plan_curv < -threshold)] = 5 # Pass
landforms[(profile_curv < -threshold) & (plan_curv < -threshold)] = 6 # Channel
landforms[(profile_curv < -threshold) & (np.abs(plan_curv) <= threshold)] = 7 # Footslope
landforms[(profile_curv < -threshold) & (plan_curv > threshold)] = 8 # Hollow
landforms[(np.abs(profile_curv) <= threshold) & (plan_curv > threshold)] = 9 # Valley
return landforms
Topographic Position Index¶
def calculate_tpi(dem, outer_radius, inner_radius=0):
"""
Calculate Topographic Position Index
TPI = elevation - mean(neighborhood elevation)
Parameters:
- dem: Digital elevation model
- outer_radius: Outer radius of annulus (cells)
- inner_radius: Inner radius of annulus (cells)
Returns:
- tpi: Topographic position index
"""
from scipy.ndimage import generic_filter
# Create annulus kernel
y, x = np.ogrid[-outer_radius:outer_radius+1, -outer_radius:outer_radius+1]
mask = (x**2 + y**2 <= outer_radius**2) & (x**2 + y**2 > inner_radius**2)
# Calculate neighborhood mean
def mean_filter(values):
return np.mean(values[mask.flatten()])
neighborhood_mean = generic_filter(dem, mean_filter, size=2*outer_radius+1)
# Calculate TPI
tpi = dem - neighborhood_mean
return tpi
Integration with EEMT¶
Topographic Controls on Energy¶
def apply_topographic_controls(solar_radiation, temperature, precipitation, twi):
"""
Apply topographic modifications to climate variables
Parameters:
- solar_radiation: Base solar radiation
- temperature: Base temperature
- precipitation: Base precipitation
- twi: Topographic wetness index
Returns:
- Modified climate variables
"""
# Solar radiation already includes topographic effects
solar_modified = solar_radiation
# Temperature modification based on cold air pooling
# Cold air accumulates in high TWI areas
cold_pool_effect = np.where(twi > 10, -2.0, 0.0) # °C cooling
temp_modified = temperature + cold_pool_effect
# Precipitation enhancement in convergent zones
# Use MCWI for mass-conservative redistribution
mcwi = calculate_mcwi(twi, precipitation)
precip_modified = redistribute_water_mcwi(precipitation, mcwi)
return {
'solar': solar_modified,
'temperature': temp_modified,
'precipitation': precip_modified
}
Quality Assurance¶
Validation Checks¶
def validate_topographic_indices(dem, twi, flow_acc):
"""
Quality control for topographic calculations
"""
checks = {}
# Check TWI range
twi_range = (np.nanmin(twi), np.nanmax(twi))
checks['twi_range_valid'] = (0 <= twi_range[0]) and (twi_range[1] <= 20)
# Check flow accumulation consistency
total_cells = dem.size
max_accumulation = np.nanmax(flow_acc)
checks['flow_acc_valid'] = max_accumulation <= total_cells
# Check for sinks/pits
sinks = identify_sinks(dem)
checks['sink_percentage'] = (np.sum(sinks) / dem.size) * 100
# Check slope calculation
slopes = calculate_slope(dem)
checks['max_slope_valid'] = np.nanmax(slopes) <= 90
return checks
References¶
-
Beven, K. J., & Kirkby, M. J. (1979). A physically based, variable contributing area model of basin hydrology. Hydrological Sciences Bulletin, 24(1), 43-69.
-
Tarboton, D. G. (1997). A new method for the determination of flow directions and upslope areas in grid digital elevation models. Water Resources Research, 33(2), 309-319.
-
Quinn, P., et al. (1991). The prediction of hillslope flow paths for distributed hydrological modelling using digital terrain models. Hydrological Processes, 5(1), 59-79.
Next: EEMT Calculations →