Skip to content

Pathsolvers

Generic A*

astar_search is the low-level A* implementation shared by GraphPlanner and grid_astar. It works on any hashable node type — you can use it directly to build a custom planner. See Using astar_search directly for a worked example.

Search a graph using the A* algorithm.

Parameters:

Name Type Description Default
start_node object

The starting node.

required
goal_node object

The goal node.

required
get_neighbors_func callable

A function that takes a node and returns an iterable of (neighbor, cost) tuples.

required
heuristic_func callable

A function that takes a node and returns the estimated cost to the goal.

required

Returns:

Name Type Description
path list or None

The path from start to goal as a list of nodes, or None if no path exists.

Source code in map_data/pathsolver/astar.py
def astar_search[N](
    start_node: N,
    goal_node: N,
    get_neighbors_func: Callable[[N], Iterable[tuple[N, float]]],
    heuristic_func: Callable[[N], float],
) -> list[N] | None:
    """
    Search a graph using the A* algorithm.

    Parameters
    ----------
    start_node : object
        The starting node.
    goal_node : object
        The goal node.
    get_neighbors_func : callable
        A function that takes a node and returns an iterable of (neighbor, cost) tuples.
    heuristic_func : callable
        A function that takes a node and returns the estimated cost to the goal.

    Returns
    -------
    path : list or None
        The path from start to goal as a list of nodes, or None if no path exists.

    """
    count = 0
    # Priority queue stores (f_score, count, current_node)
    # f_score = cost + heuristic  # noqa: ERA001
    q = [(0, count, start_node)]

    # visited maps node -> (cost, parent)
    visited = {start_node: (0, None)}
    closed = set()

    while q:
        (_, _, u) = heapq.heappop(q)
        if u in closed:
            continue
        closed.add(u)
        cost = visited[u][0]

        if u == goal_node:
            # Path found, reconstruct
            path = []
            curr = u
            while curr is not None:
                path.append(curr)
                curr = visited[curr][1]
            return path[::-1]

        for v, dist in get_neighbors_func(u):
            new_cost = cost + dist
            if v not in visited or new_cost < visited[v][0]:
                visited[v] = (new_cost, u)
                h = heuristic_func(v)
                count += 1
                heapq.heappush(q, (new_cost + h, count, v))

    return None

Graph Planner

Graph-based path planner that routes along the OSM road and footway network.

Builds an undirected weighted graph from the ways stored in a :class:~map_data.map_data.MapData instance. Manually annotated paths (negative-ID ways) are spliced into the graph by projecting their endpoints onto the nearest OSM edge, so annotations extend the traversable network seamlessly.

Planning is performed with A* (see :meth:plan). The planner operates entirely in UTM coordinates.

map_data.pathsolver.graph_planner.GraphPlanner.__init__(map_data, highway_types=None)

Initialize the graph planner.

Parameters:

Name Type Description Default
map_data MapData

Parsed map data containing the OSM ways and node cache.

required
highway_types list of str

Way categories to include in the graph. Supported values are "footway" and "road". Defaults to ["footway"].

None
Source code in map_data/pathsolver/graph_planner.py
def __init__(self, map_data: "MapData", highway_types: list[str] | None = None) -> None:
    """
    Initialize the graph planner.

    Parameters
    ----------
    map_data : MapData
        Parsed map data containing the OSM ways and node cache.
    highway_types : list of str, optional
        Way categories to include in the graph. Supported values are
        ``"footway"`` and ``"road"``. Defaults to ``["footway"]``.

    """
    self.map_data = map_data
    self.highway_types = highway_types or ["footway"]
    self.nodes: dict[int, np.ndarray] = self.map_data.get_points()
    self.graph: dict[int, list[tuple[int, float]]] = {}
    self._build_graph()

map_data.pathsolver.graph_planner.GraphPlanner.plan(path_utm)

Plan a path through a sequence of UTM waypoints along the graph.

Each consecutive pair of waypoints is routed independently. The waypoints are snapped to the nearest graph edge before planning, so they do not need to lie exactly on the network.

Parameters:

Name Type Description Default
path_utm ndarray

Array of shape (N, 2) containing [x, y] UTM coordinates of the desired waypoints, in order.

required

Returns:

Type Description
ndarray or None

Concatenated path as an (M, 2) UTM coordinate array, or None if any segment could not be routed.

Source code in map_data/pathsolver/graph_planner.py
def plan(self, path_utm: np.ndarray) -> np.ndarray | None:
    """
    Plan a path through a sequence of UTM waypoints along the graph.

    Each consecutive pair of waypoints is routed independently. The
    waypoints are snapped to the nearest graph edge before planning,
    so they do not need to lie exactly on the network.

    Parameters
    ----------
    path_utm : np.ndarray
        Array of shape ``(N, 2)`` containing ``[x, y]`` UTM coordinates
        of the desired waypoints, in order.

    Returns
    -------
    np.ndarray or None
        Concatenated path as an ``(M, 2)`` UTM coordinate array, or
        ``None`` if any segment could not be routed.

    """
    full_path = []

    for i in range(len(path_utm) - 1):
        p_start = path_utm[i]
        p_goal = path_utm[i + 1]

        # Find nearest edges and projections
        edge_start_info, _ = self._find_closest_edge(p_start)
        edge_goal_info, _ = self._find_closest_edge(p_goal)

        if not edge_start_info or not edge_goal_info:
            return None

        id_s = "temp_start"
        id_g = "temp_goal"
        n_s1, n_s2, p_proj_s = edge_start_info
        n_g1, n_g2, p_proj_g = edge_goal_info

        # Construct local subgraph for snapped points
        extra = {
            "positions": {id_s: p_proj_s, id_g: p_proj_g},
            id_s: [
                (n_s1, np.linalg.norm(p_proj_s - self.nodes[n_s1].ravel()[:2])),
                (n_s2, np.linalg.norm(p_proj_s - self.nodes[n_s2].ravel()[:2])),
            ],
            id_g: [
                (n_g1, np.linalg.norm(p_proj_g - self.nodes[n_g1].ravel()[:2])),
                (n_g2, np.linalg.norm(p_proj_g - self.nodes[n_g2].ravel()[:2])),
            ],
            n_s1: [(id_s, np.linalg.norm(p_proj_s - self.nodes[n_s1].ravel()[:2]))],
            n_s2: [(id_s, np.linalg.norm(p_proj_s - self.nodes[n_s2].ravel()[:2]))],
            n_g1: [(id_g, np.linalg.norm(p_proj_g - self.nodes[n_g1].ravel()[:2]))],
            n_g2: [(id_g, np.linalg.norm(p_proj_g - self.nodes[n_g2].ravel()[:2]))],
        }

        # Special case: start and goal on the same edge
        if (n_s1 == n_g1 and n_s2 == n_g2) or (n_s1 == n_g2 and n_s2 == n_g1):
            dist_sg = np.linalg.norm(p_proj_s - p_proj_g)
            extra[id_s].append((id_g, dist_sg))
            extra[id_g].append((id_s, dist_sg))

        # Route between temporary nodes (projections)
        segment = self.a_star(id_s, id_g, extra)
        if not segment:
            return None

        # Combine: clicked point -> projection -> graph path -> projection -> clicked point
        # segment already contains [p_proj_s, ..., p_proj_g]

        final_segment = []
        final_segment.append(p_start)

        for p in segment:
            if np.linalg.norm(p - final_segment[-1]) > TOLERANCE:
                final_segment.append(p)

        if np.linalg.norm(p_goal - final_segment[-1]) > TOLERANCE:
            final_segment.append(p_goal)

        if i > 0:
            full_path.extend(final_segment[1:])
        else:
            full_path.extend(final_segment)

    return np.array(full_path)

map_data.pathsolver.graph_planner.GraphPlanner.a_star(start_node, goal_node, extra_nodes=None)

Run A* between two graph nodes and return the path as UTM coordinates.

Parameters:

Name Type Description Default
start_node int or str

ID of the start node in the graph.

required
goal_node int or str

ID of the goal node in the graph.

required
extra_nodes dict

Temporary adjacency entries and positions for nodes not permanently in the graph (e.g. snapped projection points). Expected keys: "positions" mapping node ID → np.ndarray, plus per-node adjacency lists.

None

Returns:

Type Description
list of np.ndarray or None

Sequence of [x, y] UTM positions along the path, or None if no path exists.

Source code in map_data/pathsolver/graph_planner.py
def a_star(
    self,
    start_node: int | str,
    goal_node: int | str,
    extra_nodes: dict | None = None,
) -> list[np.ndarray] | None:
    """
    Run A* between two graph nodes and return the path as UTM coordinates.

    Parameters
    ----------
    start_node : int or str
        ID of the start node in the graph.
    goal_node : int or str
        ID of the goal node in the graph.
    extra_nodes : dict, optional
        Temporary adjacency entries and positions for nodes not
        permanently in the graph (e.g. snapped projection points).
        Expected keys: ``"positions"`` mapping node ID → ``np.ndarray``,
        plus per-node adjacency lists.

    Returns
    -------
    list of np.ndarray or None
        Sequence of ``[x, y]`` UTM positions along the path, or
        ``None`` if no path exists.

    """

    def get_neighbors(u: int | str) -> list[tuple[int | str, float]]:
        neighs: list[tuple[int | str, float]] = []
        if isinstance(u, int):
            neighs.extend(self.graph.get(u, []))
        if extra_nodes and u in extra_nodes:
            neighs.extend(extra_nodes[u])
        return neighs

    if not get_neighbors(start_node) and start_node != goal_node:
        return None

    def heuristic(u: int | str) -> float:
        p1 = self._get_node_pos(u, extra_nodes)
        p2 = self._get_node_pos(goal_node, extra_nodes)
        return float(np.linalg.norm(p1 - p2))

    node_path = astar_search(start_node, goal_node, get_neighbors, heuristic)

    if node_path is None:
        return None

    return [self._get_node_pos(node, extra_nodes) for node in node_path]

Grid A*

map_data.pathsolver.grid_astar.grid_astar(grid, start_utm, goal_utm, low, cs, *, simplify_path=True, grid_cost_weight=GRID_COST_WEIGHT)

Optimized A* search on a 2D grid.

Parameters:

Name Type Description Default
grid ndarray

2D grid of costs (Y, X). inf means blocked.

required
start_utm tuple or ndarray

Starting point in UTM coordinates.

required
goal_utm tuple or ndarray

Goal point in UTM coordinates.

required
low tuple

(min_x, min_y) of the grid in UTM metres.

required
cs float

Cell size of the grid in metres.

required
simplify_path bool

Whether to simplify the resulting path with Douglas-Peucker.

True

Returns:

Type Description
ndarray or None

Found path as UTM coordinate array, or None if no path exists.

Source code in map_data/pathsolver/grid_astar.py
def grid_astar(
    grid: np.ndarray,
    start_utm: tuple[float, float] | np.ndarray,
    goal_utm: tuple[float, float] | np.ndarray,
    low: tuple[float, float],
    cs: float,
    *,
    simplify_path: bool = True,
    grid_cost_weight: float = GRID_COST_WEIGHT,
) -> np.ndarray | None:
    """
    Optimized A* search on a 2D grid.

    Parameters
    ----------
    grid : np.ndarray
        2D grid of costs (Y, X). ``inf`` means blocked.
    start_utm : tuple or np.ndarray
        Starting point in UTM coordinates.
    goal_utm : tuple or np.ndarray
        Goal point in UTM coordinates.
    low : tuple
        ``(min_x, min_y)`` of the grid in UTM metres.
    cs : float
        Cell size of the grid in metres.
    simplify_path : bool
        Whether to simplify the resulting path with Douglas-Peucker.

    Returns
    -------
    np.ndarray or None
        Found path as UTM coordinate array, or ``None`` if no path exists.

    """
    ny, nx = grid.shape

    # Convert UTM to grid indices
    def to_idx(p: tuple[float, float] | np.ndarray) -> tuple[int, int]:
        ix = int(np.floor((p[0] - low[0]) / cs))
        iy = int(np.floor((p[1] - low[1]) / cs))
        return ix, iy

    start_ix, start_iy = to_idx(start_utm)
    goal_ix, goal_iy = to_idx(goal_utm)

    if not (0 <= goal_ix < nx and 0 <= goal_iy < ny):
        logger.warning("Goal %s is outside the grid bounds; cannot plan path.", goal_utm)
        return None
    if not (0 <= start_ix < nx and 0 <= start_iy < ny):
        logger.warning("Start %s is outside the grid bounds; cannot plan path.", start_utm)
        return None

    if start_ix == goal_ix and start_iy == goal_iy:
        return np.array([start_utm, goal_utm])

    # Pre-calculate costs and pad with infinity to avoid boundary checks
    # grid is assumed to be 0.0 near paths, 1.0 away from paths.
    # Base traversal cost is 1.0 + grid_value * grid_cost_weight
    costs = 1.0 + grid * grid_cost_weight
    padded_costs = np.full((ny + 2, nx + 2), np.inf, dtype=np.float32)
    padded_costs[1:-1, 1:-1] = costs

    # Flattened grid size with padding
    p_nx = nx + 2
    p_ny = ny + 2

    g_scores = np.full(p_ny * p_nx, np.inf, dtype=np.float32)
    parents = np.full(p_ny * p_nx, -1, dtype=np.int32)

    start_flat = (start_iy + 1) * p_nx + (start_ix + 1)
    goal_flat = (goal_iy + 1) * p_nx + (goal_ix + 1)
    g_scores[start_flat] = 0.0

    # Priority queue: (f_score, g_score, ix, iy)
    h0 = np.sqrt((start_ix - goal_ix) ** 2 + (start_iy - goal_iy) ** 2)
    pq = [(h0, 0.0, start_ix, start_iy)]

    # Neighbor offsets in flattened padded grid (dy * p_nx + dx, dist)
    neighbors_data = []
    for dy in [-1, 0, 1]:
        for dx in [-1, 0, 1]:
            if dx == 0 and dy == 0:
                continue
            neighbors_data.append((dy * p_nx + dx, float(np.sqrt(dx**2 + dy**2))))

    flat_costs = padded_costs.ravel()

    while pq:
        _f, g_pushed, ix, iy = heapq.heappop(pq)

        u_flat = (iy + 1) * p_nx + (ix + 1)
        if g_scores[u_flat] < g_pushed - 1e-4:
            continue

        if u_flat == goal_flat:
            # Path found, reconstruct
            path_indices = []
            curr = u_flat
            while curr != -1:
                c_iy, c_ix = divmod(curr, p_nx)
                path_indices.append((c_ix - 1, c_iy - 1))
                curr = parents[curr]
            path_indices.reverse()

            # Convert back to UTM
            path = np.array([[ix * cs + low[0], iy * cs + low[1]] for ix, iy in path_indices])

            # Simplify path
            if simplify_path and len(path) > 2:
                path = np.array(LineString(path).simplify(cs / 2.0).coords)
            return path

        current_g = g_scores[u_flat]

        for offset, dist in neighbors_data:
            v_flat = u_flat + offset
            cost_val = flat_costs[v_flat]

            if np.isinf(cost_val):
                continue

            new_g = current_g + dist * cost_val
            if new_g < g_scores[v_flat]:
                g_scores[v_flat] = new_g
                parents[v_flat] = u_flat
                v_iy, v_ix = divmod(v_flat, p_nx)
                h = np.sqrt((v_ix - 1 - goal_ix) ** 2 + (v_iy - 1 - goal_iy) ** 2)
                heapq.heappush(pq, (new_g + h, new_g, v_ix - 1, v_iy - 1))

    return None

RRT*

Rapidly-exploring Random Tree Star (RRT*) path planner.

Builds a collision-free tree by randomly sampling the free space and rewiring edges to minimise path cost. The planner is asymptotically optimal: given enough iterations it converges to the shortest feasible path.

Collision checking uses two sources simultaneously:

  • A Shapely STRtree of barrier polygons (hard obstacles).
  • A 2-D cost grid where cells at or above traversability_threshold are treated as blocked.

Traversable cells contribute a weighted cost to the edge cost, so the planner naturally prefers low-cost corridors (e.g. footways) over open terrain.

map_data.pathsolver.rrt_star.RRTStar.__init__(start, goal, obstacles, obstacles_tree, grid, low, grid_scale=1.0, max_iter=2000, step_size=2.0, neighbor_radius=5.0, traversability_threshold=10.0, grid_cost_weight=GRID_COST_WEIGHT, *, simplify=True, transfer_id=None, improve_after_goal=False, informed=True, adaptive_radius=True)

Initialize the RRT* planner.

Parameters:

Name Type Description Default
start ndarray

Starting position as a 2-element array [x, y] in world (UTM) coordinates.

required
goal ndarray

Goal position as a 2-element array [x, y] in world coordinates.

required
obstacles list

List of Shapely geometries representing hard barriers. Used together with obstacles_tree for fast spatial queries.

required
obstacles_tree STRtree or None

Pre-built Shapely STRtree index over obstacles. Pass None to skip polygon-based collision checking (grid only).

required
grid ndarray

2-D cost array with shape (Y, X). A value of 0.0 means fully free; values at or above traversability_threshold are blocked. Intermediate values increase edge cost.

required
low tuple of float

(min_x, min_y) corner of the grid in world coordinates.

required
grid_scale float

Metres per grid cell (default 1.0).

1.0
max_iter int

Maximum number of RRT* iterations (default 2000).

2000
step_size float

Maximum distance the tree extends toward a sampled point per iteration in metres (default 2.0).

2.0
neighbor_radius float

Radius in metres within which nearby nodes are considered for rewiring (default 5.0).

5.0
traversability_threshold float

Grid cost at which a cell is considered an obstacle (default 10.0). np.inf marks cells as hard obstacles.

10.0
simplify bool

If True, post-process the raw node path with a greedy line-of-sight simplification before returning (default True).

True
transfer_id str or None

Optional identifier used to check for external cancellation signals during planning. Pass None to disable.

None
improve_after_goal bool

If True, continue iterating after the goal is first reached to find a lower-cost path. If False (default), return as soon as the goal is reached.

False
Source code in map_data/pathsolver/rrt_star.py
def __init__(
    self,
    start: np.ndarray,
    goal: np.ndarray,
    obstacles: list[sh.geometry.base.BaseGeometry],
    obstacles_tree: STRtree | None,
    grid: np.ndarray,
    low: tuple[float, float],
    grid_scale: float = 1.0,
    max_iter: int = 2000,
    step_size: float = 2.0,
    neighbor_radius: float = 5.0,
    traversability_threshold: float = 10.0,  # inf is blocked, high values are expensive
    grid_cost_weight: float = GRID_COST_WEIGHT,
    *,
    simplify: bool = True,
    transfer_id: str | None = None,
    improve_after_goal: bool = False,
    informed: bool = True,
    adaptive_radius: bool = True,
) -> None:
    """
    Initialize the RRT* planner.

    Parameters
    ----------
    start : np.ndarray
        Starting position as a 2-element array ``[x, y]`` in world
        (UTM) coordinates.
    goal : np.ndarray
        Goal position as a 2-element array ``[x, y]`` in world
        coordinates.
    obstacles : list
        List of Shapely geometries representing hard barriers. Used
        together with *obstacles_tree* for fast spatial queries.
    obstacles_tree : STRtree or None
        Pre-built Shapely STRtree index over *obstacles*. Pass ``None``
        to skip polygon-based collision checking (grid only).
    grid : np.ndarray
        2-D cost array with shape ``(Y, X)``. A value of ``0.0`` means
        fully free; values at or above *traversability_threshold* are
        blocked. Intermediate values increase edge cost.
    low : tuple of float
        ``(min_x, min_y)`` corner of the grid in world coordinates.
    grid_scale : float
        Metres per grid cell (default ``1.0``).
    max_iter : int
        Maximum number of RRT* iterations (default ``2000``).
    step_size : float
        Maximum distance the tree extends toward a sampled point per
        iteration in metres (default ``2.0``).
    neighbor_radius : float
        Radius in metres within which nearby nodes are considered for
        rewiring (default ``5.0``).
    traversability_threshold : float
        Grid cost at which a cell is considered an obstacle
        (default ``10.0``). ``np.inf`` marks cells as hard obstacles.
    simplify : bool
        If ``True``, post-process the raw node path with a greedy
        line-of-sight simplification before returning (default ``True``).
    transfer_id : str or None
        Optional identifier used to check for external cancellation
        signals during planning. Pass ``None`` to disable.
    improve_after_goal : bool
        If ``True``, continue iterating after the goal is first reached
        to find a lower-cost path. If ``False`` (default), return as
        soon as the goal is reached.

    """
    self.start = start
    self.goal = goal
    self.obstacles = obstacles
    self.obstacles_tree = obstacles_tree
    self.grid = grid  # (Y, X)
    self.grid_shape = grid.shape
    self.low = np.array(low)
    self.grid_scale = grid_scale
    self.max_iter = max_iter
    self.step_size = step_size
    self.neighbor_radius = neighbor_radius
    self.nodes = [self.start]
    self.parent = {0: None}
    self.cost = {0: 0.0}

    self._nodes_buf = np.empty((max_iter + 2, 2), dtype=np.float64)
    self._nodes_buf[0] = self.start
    self.goal_tolerance = step_size
    self.traversability_threshold = traversability_threshold
    self.grid_cost_weight = grid_cost_weight
    self.simplify = simplify
    self.transfer_id = transfer_id
    self.improve_after_goal = improve_after_goal
    self.informed = informed
    self.adaptive_radius = adaptive_radius
    self._best_cost: float = float("inf")
    self._kdtree: cKDTree | None = None
    self._kdtree_n: int = 0

    # Limit sampling area
    dist = np.linalg.norm(self.goal - self.start)
    margin = max(dist * 0.5, step_size * 10)
    self._sample_min = np.minimum(self.start, self.goal) - margin
    self._sample_max = np.maximum(self.start, self.goal) + margin

    # Clip to grid
    grid_max_x = self.low[0] + self.grid_shape[1] * grid_scale
    grid_max_y = self.low[1] + self.grid_shape[0] * grid_scale
    self._sample_min = np.maximum(self._sample_min, self.low)
    self._sample_max = np.minimum(self._sample_max, [grid_max_x, grid_max_y])

    # Informed RRT*: precompute ellipse geometry
    d = self.goal - self.start
    self._c_min: float = float(np.linalg.norm(d))
    self._ellipse_center: np.ndarray = (self.start + self.goal) / 2.0
    theta = math.atan2(float(d[1]), float(d[0]))
    ct, st = math.cos(theta), math.sin(theta)
    self._C_be: np.ndarray = np.array([[ct, -st], [st, ct]])

    # Adaptive radius: gamma* for 2-D from the asymptotic optimality formula
    sample_area = float(np.prod(self._sample_max - self._sample_min))
    self._gamma: float = 2.449 * math.sqrt(max(sample_area, 1.0) / math.pi)

    # Precompute traversable cells for faster sampling
    _xi_lo = max(0, int((self._sample_min[0] - self.low[0]) / grid_scale))
    _xi_hi = min(
        self.grid_shape[1],
        int(np.ceil((self._sample_max[0] - self.low[0]) / grid_scale)),
    )
    _yi_lo = max(0, int((self._sample_min[1] - self.low[1]) / grid_scale))
    _yi_hi = min(
        self.grid_shape[0],
        int(np.ceil((self._sample_max[1] - self.low[1]) / grid_scale)),
    )

    _sub = self.grid[_yi_lo:_yi_hi, _xi_lo:_xi_hi]
    _ys, _xs = np.where(_sub < self.traversability_threshold)
    if len(_xs) > 0:
        self._trav_xs = (_xs + _xi_lo) * grid_scale + self.low[0]
        self._trav_ys = (_ys + _yi_lo) * grid_scale + self.low[1]
    else:
        self._trav_xs = None
        self._trav_ys = None

map_data.pathsolver.rrt_star.RRTStar.find_path()

Run the RRT* algorithm and return the planned path.

Iterates up to max_iter times, growing the tree from start toward randomly sampled points and rewiring edges to reduce cost. The goal is sampled directly 10 % of the time to encourage convergence.

Returns:

Type Description
ndarray or None

Path as an (N, 2) array of [x, y] world coordinates, or None if no collision-free path was found within the iteration budget or if planning was cancelled via transfer_id.

Source code in map_data/pathsolver/rrt_star.py
def find_path(self) -> np.ndarray | None:
    """
    Run the RRT* algorithm and return the planned path.

    Iterates up to *max_iter* times, growing the tree from ``start``
    toward randomly sampled points and rewiring edges to reduce cost.
    The goal is sampled directly 10 % of the time to encourage
    convergence.

    Returns
    -------
    np.ndarray or None
        Path as an ``(N, 2)`` array of ``[x, y]`` world coordinates,
        or ``None`` if no collision-free path was found within the
        iteration budget or if planning was cancelled via *transfer_id*.

    """
    from .replan import _is_cancelled

    goal_idx = None

    for _ in range(self.max_iter):
        if _is_cancelled(self.transfer_id):
            return None

        rand_point = self.goal if random.random() < GOAL_SAMPLE_BIAS else self._sample_point()
        nearest_idx = self._nearest_node(rand_point)
        new_point = self._steer(self.nodes[nearest_idx], rand_point)

        if self._is_collision(new_point):
            continue

        collision, nearest_seg_cost = self._segment_cost(self.nodes[nearest_idx], new_point)
        if collision:
            continue

        new_idx = len(self.nodes)
        self.nodes.append(new_point)
        self._nodes_buf[new_idx] = new_point
        min_cost = self.cost[nearest_idx] + nearest_seg_cost
        min_parent = nearest_idx

        if self.adaptive_radius:
            n_eff = max(new_idx, int(math.e) + 1)
            r = min(self._gamma * math.sqrt(math.log(n_eff) / n_eff), self.neighbor_radius)
        else:
            r = self.neighbor_radius
        near_indices = self._get_near_nodes(new_point, r)
        for idx in near_indices:
            # Reuse already-computed cost for the nearest node
            if idx == nearest_idx:
                col, sc = False, nearest_seg_cost
            else:
                col, sc = self._segment_cost(self.nodes[idx], new_point)
            if not col:
                c = self.cost[idx] + sc
                if c < min_cost:
                    min_cost = c
                    min_parent = idx

        self.parent[new_idx] = min_parent
        self.cost[new_idx] = min_cost

        # Rewire
        for idx in near_indices:
            if idx == min_parent:
                continue
            col, sc = self._segment_cost(new_point, self.nodes[idx])
            if not col:
                new_c = self.cost[new_idx] + sc
                if new_c < self.cost[idx]:
                    self.parent[idx] = new_idx
                    self.cost[idx] = new_c

        if np.linalg.norm(new_point - self.goal) < self.goal_tolerance:
            col, sc = self._segment_cost(new_point, self.goal)
            if not col:
                new_goal_cost = self.cost[new_idx] + sc
                if goal_idx is None:
                    goal_idx = len(self.nodes)
                    self.nodes.append(self.goal)
                    self._nodes_buf[goal_idx] = self.goal
                if new_goal_cost < self.cost.get(goal_idx, float("inf")):
                    self.parent[goal_idx] = new_idx
                    self.cost[goal_idx] = new_goal_cost
                    self._best_cost = new_goal_cost
                if not self.improve_after_goal:
                    path = self._reconstruct_path(goal_idx)
                    return np.array(path)

    if goal_idx is not None:
        path = self._reconstruct_path(goal_idx)
        return np.array(path)
    return None

map_data.pathsolver.rrt_star.RRTStar._is_collision(point1, point2=None)

Return True if a point or segment intersects an obstacle.

Checks both the Shapely obstacle polygons (via STRtree) and the cost grid (via Bresenham rasterisation). A point is in collision if its grid cost meets or exceeds traversability_threshold; a segment is in collision if any traversed cell does.

Source code in map_data/pathsolver/rrt_star.py
def _is_collision(self, point1: np.ndarray, point2: np.ndarray | None = None) -> bool:
    """
    Return ``True`` if a point or segment intersects an obstacle.

    Checks both the Shapely obstacle polygons (via STRtree) and the cost
    grid (via Bresenham rasterisation). A point is in collision if its
    grid cost meets or exceeds *traversability_threshold*; a segment is
    in collision if any traversed cell does.
    """
    if self.obstacles_tree:
        geom = Point(point1) if point2 is None else LineString([point1, point2])
        if len(self.obstacles_tree.query(geom, predicate="intersects")) > 0:
            return True

    if point2 is None:
        return self._get_grid_cost(point1) >= self.traversability_threshold

    p1_grid = (
        int((point1[0] - self.low[0]) / self.grid_scale),
        int((point1[1] - self.low[1]) / self.grid_scale),
    )
    p2_grid = (
        int((point2[0] - self.low[0]) / self.grid_scale),
        int((point2[1] - self.low[1]) / self.grid_scale),
    )

    for px, py in self._bresenham(p1_grid, p2_grid):
        in_bounds = 0 <= px < self.grid_shape[1] and 0 <= py < self.grid_shape[0]
        if in_bounds and self.grid[py, px] >= self.traversability_threshold:
            return True
    return False

map_data.pathsolver.rrt_star.RRTStar._sample_point()

Sample a random point.

When a solution exists and informed is enabled, samples uniformly from the informed ellipse. Otherwise biases toward traversable grid cells 90 % of the time.

Source code in map_data/pathsolver/rrt_star.py
def _sample_point(self) -> np.ndarray:
    """
    Sample a random point.

    When a solution exists and *informed* is enabled, samples uniformly
    from the informed ellipse.  Otherwise biases toward traversable grid
    cells 90 % of the time.
    """
    if self.informed and self._best_cost < float("inf"):
        return self._sample_informed()
    if self._trav_xs is not None and random.random() > GOAL_SAMPLE_BIAS:
        idx = random.randrange(len(self._trav_xs))
        return np.array(
            [
                self._trav_xs[idx] + random.uniform(-self.grid_scale / 2, self.grid_scale / 2),
                self._trav_ys[idx] + random.uniform(-self.grid_scale / 2, self.grid_scale / 2),
            ],
        )
    return np.array(
        [
            random.uniform(self._sample_min[0], self._sample_max[0]),
            random.uniform(self._sample_min[1], self._sample_max[1]),
        ],
    )

map_data.pathsolver.rrt_star.RRTStar._nearest_node(point)

Return the index of the tree node closest to point.

Uses a lazily rebuilt KD-tree for the bulk of the tree, plus a linear scan over nodes added since the last rebuild.

Source code in map_data/pathsolver/rrt_star.py
def _nearest_node(self, point: np.ndarray) -> int:
    """
    Return the index of the tree node closest to *point*.

    Uses a lazily rebuilt KD-tree for the bulk of the tree, plus a
    linear scan over nodes added since the last rebuild.
    """
    n = len(self.nodes)
    if self._kdtree is None or n - self._kdtree_n >= _KDTREE_REBUILD_INTERVAL:
        self._kdtree = cKDTree(self._nodes_buf[:n])
        self._kdtree_n = n

    _, best_idx = self._kdtree.query(point)
    best_d2 = float(((self._nodes_buf[best_idx] - point) ** 2).sum())

    # Linear scan over nodes added since the last rebuild
    for i in range(self._kdtree_n, n):
        d2 = float(((self._nodes_buf[i] - point) ** 2).sum())
        if d2 < best_d2:
            best_d2 = d2
            best_idx = i

    return int(best_idx)

map_data.pathsolver.rrt_star.RRTStar._steer(start, target)

Return a point at most step_size metres from start toward target.

Source code in map_data/pathsolver/rrt_star.py
def _steer(self, start: np.ndarray, target: np.ndarray) -> np.ndarray:
    """
    Return a point at most *step_size* metres from *start* toward *target*.
    """
    direction = target - start
    dist = np.linalg.norm(direction)
    if dist < self.step_size:
        return target
    return start + (direction / dist) * self.step_size

map_data.pathsolver.rrt_star.RRTStar._get_near_nodes(new_point, radius=None)

Return indices of all tree nodes within radius of new_point.

radius defaults to self.neighbor_radius when not supplied.

Source code in map_data/pathsolver/rrt_star.py
def _get_near_nodes(self, new_point: np.ndarray, radius: float | None = None) -> list[int]:
    """
    Return indices of all tree nodes within *radius* of *new_point*.

    *radius* defaults to ``self.neighbor_radius`` when not supplied.
    """
    r = radius if radius is not None else self.neighbor_radius
    n = len(self.nodes)
    new_idx = n - 1  # node just appended by the caller
    r2 = r ** 2

    if self._kdtree is None:
        sq_dists = ((self._nodes_buf[:n] - new_point) ** 2).sum(axis=1)
        return list(np.where((sq_dists < r2) & (sq_dists > 0))[0])

    # KD-tree covers [0, _kdtree_n); _nearest_node always rebuilds first so
    # new_point is never included in the tree.
    result: list[int] = list(self._kdtree.query_ball_point(new_point, r))

    # Linear scan over nodes added since the last rebuild, excluding new_point itself
    for i in range(self._kdtree_n, n):
        if i == new_idx:
            continue
        d2 = float(((self._nodes_buf[i] - new_point) ** 2).sum())
        if d2 < r2:
            result.append(i)

    return result

map_data.pathsolver.rrt_star.RRTStar._segment_cost(start, end)

Compute the cost of the segment from start to end.

Returns:

Type Description
tuple of (bool, float)

(collision, cost) where collision is True if the segment intersects an obstacle or blocked grid cell, and cost is the weighted traversal cost dist * (1 + avg_grid_cost * 5). Returns (True, inf) on collision.

Source code in map_data/pathsolver/rrt_star.py
def _segment_cost(self, start: np.ndarray, end: np.ndarray) -> tuple[bool, float]:
    """
    Compute the cost of the segment from *start* to *end*.

    Returns
    -------
    tuple of (bool, float)
        ``(collision, cost)`` where *collision* is ``True`` if the
        segment intersects an obstacle or blocked grid cell, and *cost*
        is the weighted traversal cost ``dist * (1 + avg_grid_cost * 5)``.
        Returns ``(True, inf)`` on collision.

    """
    if self.obstacles_tree and len(
        self.obstacles_tree.query(LineString([start, end]), predicate="intersects"),
    ) > 0:
        return True, float("inf")

    p1_grid = (
        int((start[0] - self.low[0]) / self.grid_scale),
        int((start[1] - self.low[1]) / self.grid_scale),
    )
    p2_grid = (
        int((end[0] - self.low[0]) / self.grid_scale),
        int((end[1] - self.low[1]) / self.grid_scale),
    )
    bres_line = self._bresenham(p1_grid, p2_grid)

    total_grid_cost = 0.0
    count = 0
    for x, y in bres_line:
        if 0 <= x < self.grid_shape[1] and 0 <= y < self.grid_shape[0]:
            c = self.grid[y, x]
            if c >= self.traversability_threshold:
                return True, float("inf")
            total_grid_cost += c
            count += 1

    avg_c = total_grid_cost / count if count > 0 else 0.0
    # Cost = dist * (1 + avg_grid_cost * penalty)  # noqa: ERA001
    # We use grid_cost_weight to match A* logic
    return False, np.linalg.norm(end - start) * (1.0 + avg_c * self.grid_cost_weight)

map_data.pathsolver.rrt_star.RRTStar._reconstruct_path(goal_idx)

Walk the parent chain from goal_idx back to the root and return the path.

Source code in map_data/pathsolver/rrt_star.py
def _reconstruct_path(self, goal_idx: int) -> list[np.ndarray]:
    """
    Walk the parent chain from *goal_idx* back to the root and return the path.
    """
    path = []
    curr = goal_idx
    while curr is not None:
        path.append(self.nodes[curr])
        curr = self.parent[curr]
    path.reverse()
    if self.simplify and len(path) > 2:
        return self._simplify_path(path)
    return path

map_data.pathsolver.rrt_star.RRTStar._simplify_path(path)

Greedily remove intermediate waypoints that have line-of-sight to a later node.

Source code in map_data/pathsolver/rrt_star.py
def _simplify_path(self, path: list[np.ndarray]) -> list[np.ndarray]:
    """
    Greedily remove intermediate waypoints that have line-of-sight to a later node.
    """
    if len(path) <= 2:
        return path
    simplified = [path[0]]
    curr = 0
    while curr < len(path) - 1:
        next_best = curr + 1
        for i in range(len(path) - 1, curr + 1, -1):
            col, _ = self._segment_cost(path[curr], path[i])
            if not col:
                next_best = i
                break
        simplified.append(path[next_best])
        curr = next_best
    return simplified

ReplanPath

ReplanPath is the local replanning engine used by the viewer's Planner mode and the replan CLI tool. It discretizes the area around the input waypoints into a cost grid, assigns traversal costs based on OSM highway and surface types, then finds a collision-free path using either Grid A or RRT.

Class attributes

These cost tables are loaded from config/planner_defaults.yaml at import time and can be overridden per-instance:

Attribute Type Description
HIGHWAY_COSTS dict[str, float] Per OSM highway value cost (0.0 = free, 1.0 = obstacle)
SURFACE_COSTS dict[str, float] Extra penalty per OSM surface value
DEFAULT_OFF_PATH_COST float Cost for cells not near any known way (default 0.9)
PATH_COST_CAP float Maximum cost a way cell can have (default 0.85, keeps ways preferred over off-path)

Constructor

ReplanPath(args, obstacles=None, transfer_id=None)
Parameter Type Description
args argparse.Namespace Planning parameters. Use parse_args([]) to get defaults.
obstacles list[shapely.Geometry] Obstacle geometries (from ways_to_shapely(md.barriers_list)).
transfer_id str \| None Optional UUID for cancellation via cancel_replan_backend().

args attributes used by ReplanPath:

Attribute Default Description
low (min_x, min_y) lower bound of the planning area in UTM metres
high (max_x, max_y) upper bound of the planning area in UTM metres
cell_size 0.25 Grid resolution in metres
inflate_obstacles 0.25 Buffer added to obstacle geometries in metres
simplify_path True Apply Douglas-Peucker simplification to the output
smooth_path False Apply gradient-descent smoothing after planning

Methods

fill_grid(map_data, highway_types=None, max_path_dist=2.0)

Populate the cost grid from map data. Must be called before replan().

Parameter Default Description
map_data A loaded MapData object
highway_types ["footway"] Which way categories to use. Pass ["footway", "road"] to include roads.
max_path_dist 2.0 Cells within this distance (m) of a known way receive an interpolated cost. Cells beyond receive DEFAULT_OFF_PATH_COST.

Cost model per cell: way_cost + (DEFAULT_OFF_PATH_COST - way_cost) × (dist / max_path_dist)², where way_cost = min(PATH_COST_CAP, HIGHWAY_COSTS[highway] + SURFACE_COSTS[surface]). Obstacle cells are set to inf.

replan(path, algorithm="astar")

Plan a path through the cost grid.

Parameter Description
path np.ndarray of shape (N, 2+) — input waypoints in UTM metres
algorithm "astar" (Grid A) or "rrt" (RRT)

Returns np.ndarray (replanned path), None (cancelled), or False (no path found). Segments between consecutive waypoints are processed in parallel via joblib.

visualize(path, old_path=None)

Save a matplotlib debug plot as replan.png showing the cost grid, obstacles, and path.

Cancellation

To cancel a running replan() call from another thread, pass a transfer_id UUID to the constructor and call:

from map_data.pathsolver.replan import cancel_replan_backend

cancel_replan_backend(transfer_id)

Minimal usage example

from map_data.map_data import MapData
from map_data.pathsolver.replan import ReplanPath, parse_args
from map_data.utils.parsing import ways_to_shapely
from map_data.utils.gpx import parse_path

md = MapData.load("coords.mapdata")
path_data = parse_path("waypoints.gpx")   # (utm_array, zone_num, zone_let)

args = parse_args([])
args.low  = (md.min_x, md.min_y)
args.high = (md.max_x, md.max_y)

replanner = ReplanPath(args, ways_to_shapely(md.barriers_list))
replanner.fill_grid(md, highway_types=["footway"], max_path_dist=2.0)

new_path = replanner.replan(path_data[0], algorithm="astar")