Skip to content

RRTStar

src.python_motion_planning.path_planner.sample_search.rrt_star.RRTStar

Bases: RRT

Class for RRT* (Rapidly-exploring Random Tree Star) path planner.

RRT* extends RRT by: 1. Selecting the best parent (minimum cost) for a new node. 2. Rewiring nearby nodes through the new node if it improves their cost.

Parameters:

Name Type Description Default
*args

see parent class.

()
rewire_radius float

Neighborhood radius for rewiring (If None, adaptively calculated with gamma factor).

None
gamma float

A factor for calculating rewire_radius. For details, see [1]. (Disabled when rewire_radius is not None)

50.0
stop_until_sample_num int

Stop until sample number limitation is reached, otherwise stop when goal is found.

False
propagate_cost_to_children bool

Whether to propagate cost to children. This is a fix for ensuring the correctness of g-value of each node. But it may slow the algorithm a little.

True
*kwargs

see parent class.

{}
References

[1] Sampling-based Algorithms for Optimal Motion Planning

Examples:

Python Console Session
>>> map_ = Grid(bounds=[[0, 15], [0, 15]])
>>> planner = RRTStar(map_=map_, start=(5, 5), goal=(10, 10))
>>> path, path_info = planner.plan()
>>> print(path_info['success'])
True
Python Console Session
>>> planner = RRTStar(map_=map_, start=(5, 5), goal=(10, 10), sample_num=1000, stop_until_sample_num=True)
>>> path, path_info = planner.plan()
>>> print(path_info['success'])
True
Source code in src\python_motion_planning\path_planner\sample_search\rrt_star.py
Python
class RRTStar(RRT):
    """
    Class for RRT* (Rapidly-exploring Random Tree Star) path planner.

    RRT* extends RRT by:
        1. Selecting the best parent (minimum cost) for a new node.
        2. Rewiring nearby nodes through the new node if it improves their cost.

    Args:
        *args: see parent class.
        rewire_radius: Neighborhood radius for rewiring (If None, adaptively calculated with gamma factor).
        gamma: A factor for calculating rewire_radius. For details, see [1]. (Disabled when rewire_radius is not None)
        stop_until_sample_num: Stop until sample number limitation is reached, otherwise stop when goal is found.
        propagate_cost_to_children: Whether to propagate cost to children. This is a fix for ensuring the correctness of g-value of each node. But it may slow the algorithm a little.
        *kwargs: see parent class.

    References:
        [1] Sampling-based Algorithms for Optimal Motion Planning

    Examples:
        >>> map_ = Grid(bounds=[[0, 15], [0, 15]])
        >>> planner = RRTStar(map_=map_, start=(5, 5), goal=(10, 10))
        >>> path, path_info = planner.plan()
        >>> print(path_info['success'])
        True

        >>> planner = RRTStar(map_=map_, start=(5, 5), goal=(10, 10), sample_num=1000, stop_until_sample_num=True)
        >>> path, path_info = planner.plan()
        >>> print(path_info['success'])
        True
    """

    def __init__(self, *args,
                 rewire_radius: float = None,
                 gamma: float = 50.0,
                 stop_until_sample_num: int = False,
                 propagate_cost_to_children: bool = True,
                 **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.rewire_radius = rewire_radius
        self.gamma = gamma
        self.stop_until_sample_num = stop_until_sample_num
        self.best_results = self.failed_info
        self.propagate_cost_to_children = propagate_cost_to_children

        self._tree = None
        self._child = None

    def __str__(self) -> str:
        return "RRT*"

    def plan(self) -> Union[List[Tuple[float, ...]], Dict[str, Any]]:
        """
        RRT* path planning algorithm implementation.

        Returns:
            path: A list containing the path waypoints
            path_info: A dictionary containing path information
        """
        # Initialize tree with start node
        self._tree = {}
        self._child = ChildTree()
        start_node = Node(self.start, None, 0, 0)
        self._tree[self.start] = start_node

        # Initialize FAISS index
        if self.use_faiss:
            faiss_index = faiss.IndexFlatL2(self.dim)
            faiss_nodes = []
            self._faiss_add_node(start_node, faiss_index, faiss_nodes)

        for i in range(self.sample_num):
            # Generate random sample
            node_rand = self._generate_random_node()

            if node_rand.current in self._tree:
                continue

            # Find nearest node
            node_nearest = self._get_nearest_node(self._tree, node_rand, faiss_index, faiss_nodes)

            # Steer towards random sample
            node_new = self._steer(node_nearest, node_rand)
            if node_new is None:
                continue

            # Collision check
            if self.map_.in_collision(
                self.map_.point_float_to_int(node_nearest.current), 
                self.map_.point_float_to_int(node_new.current)
                ):
                continue

            # Find nearby nodes for choosing best parent
            near_nodes = self._get_near_nodes(node_new, faiss_index, faiss_nodes)

            # Choose parent with minimum cost
            min_parent = node_nearest
            min_cost = node_nearest.g + self.get_cost(node_nearest.current, node_new.current)

            for node in near_nodes:
                if self.map_.in_collision(
                    self.map_.point_float_to_int(node.current), 
                    self.map_.point_float_to_int(node_new.current)
                    ):
                    continue
                cost = node.g + self.get_cost(node.current, node_new.current)
                if cost < min_cost:
                    min_parent = node
                    min_cost = cost

            # Add new node
            if self.propagate_cost_to_children:
                self._child.remove(node_new.parent, node_new.current)
                self._child.add(min_parent.current, node_new.current)

            node_new.parent = min_parent.current
            node_new.g = min_cost
            self._tree[node_new.current] = node_new
            if self.use_faiss:
                self._faiss_add_node(node_new, faiss_index, faiss_nodes)

            # Rewire nearby nodes through new node
            for node in near_nodes:
                if node.current == min_parent.current:
                    continue
                if self.map_.in_collision(
                    self.map_.point_float_to_int(node.current),
                    self.map_.point_float_to_int(node_new.current)
                    ):
                    continue

                new_cost = node_new.g + self.get_cost(node_new.current, node.current)
                if new_cost < node.g:
                    if self.propagate_cost_to_children:
                        self._child.remove(node.parent, node.current)
                        self._child.add(node_new.current, node.current)

                    node.parent = node_new.current
                    node.g = new_cost
                    self._tree[node.current] = node

                    if self.propagate_cost_to_children:
                        self._propagate_cost_to_children(node)

            # Check goal connection
            dist_to_goal = self.get_cost(node_new.current, self.goal)
            if dist_to_goal <= self.max_dist:
                if not self.map_.in_collision(
                    self.map_.point_float_to_int(node_new.current), 
                    self.map_.point_float_to_int(self.goal)
                    ):
                    goal_cost = node_new.g + dist_to_goal
                    if self.goal not in self._tree or goal_cost < self._tree[self.goal].g:
                        if node_new.current == self.goal:
                            self._tree[self.goal] = node_new
                        else:
                            self._tree[self.goal] = Node(
                                self.goal,
                                node_new.current,
                                goal_cost,
                                0
                            )
                        path, length, cost = self.extract_path(self._tree)
                        self.best_results = path, {
                            "success": True,
                            "start": self.start,
                            "goal": self.goal,
                            "length": length,
                            "cost": cost,
                            "expand": self._tree,
                        }

                        if not self.stop_until_sample_num:
                            return self.best_results

        n = len(self._tree) + 1
        radius = self.gamma * ((math.log(n) / n) ** (1 / self.dim))

        # Planning stopped
        self.best_results[1]["expand"] = self._tree
        return self.best_results

    def _get_near_nodes(self, node_new: Node, index=None, nodes=None) -> List[Node]:
        """
        Get nearby nodes within rewiring radius.

        Args:
            node_new: Newly added node
            index: FAISS index (required when `use_faiss`=True)
            nodes: List of nodes in FAISS index (required when `use_faiss`=True)

        Returns:
            near_nodes: List of nearby nodes
        """
        # Adaptive radius from RRT* theory
        if self.rewire_radius is None:
            n = len(self._tree) + 1
            radius = self.gamma * ((math.log(n) / n) ** (1 / self.dim))
            if self.discrete:
                radius = max(1, radius)
        else:
            radius = self.rewire_radius

        near_nodes = []

        if self.use_faiss:
            # range search using faiss
            query = np.asarray(node_new.current, dtype=np.float32).reshape(1, -1)
            lims, D, I = index.range_search(query, radius * radius)
            for idx in I:
                near_nodes.append(nodes[idx])

        else:
            # brute force radius search
            for node in self._tree.values():
                if self.get_cost(node.current, node_new.current) <= radius:
                    near_nodes.append(node)

        return near_nodes

    def _propagate_cost_to_children(self, node: Node):
        """
        Propagate cost update to children of a node.

        Args:
            node: Node to propagate cost to children
        """
        child_set = self._child[node.current]
        if child_set is None:
            return

        for child in child_set:
            node_child = self._tree.get(child)

            if node_child is not None:
                old_g = node_child.g
                node_child.g = node.g + self.get_cost(
                    node.current, node_child.current
                )
                if not math.isclose(node_child.g, old_g):
                    self._propagate_cost_to_children(node_child)

plan()

RRT* path planning algorithm implementation.

Returns:

Name Type Description
path Union[List[Tuple[float, ...]], Dict[str, Any]]

A list containing the path waypoints

path_info Union[List[Tuple[float, ...]], Dict[str, Any]]

A dictionary containing path information

Source code in src\python_motion_planning\path_planner\sample_search\rrt_star.py
Python
def plan(self) -> Union[List[Tuple[float, ...]], Dict[str, Any]]:
    """
    RRT* path planning algorithm implementation.

    Returns:
        path: A list containing the path waypoints
        path_info: A dictionary containing path information
    """
    # Initialize tree with start node
    self._tree = {}
    self._child = ChildTree()
    start_node = Node(self.start, None, 0, 0)
    self._tree[self.start] = start_node

    # Initialize FAISS index
    if self.use_faiss:
        faiss_index = faiss.IndexFlatL2(self.dim)
        faiss_nodes = []
        self._faiss_add_node(start_node, faiss_index, faiss_nodes)

    for i in range(self.sample_num):
        # Generate random sample
        node_rand = self._generate_random_node()

        if node_rand.current in self._tree:
            continue

        # Find nearest node
        node_nearest = self._get_nearest_node(self._tree, node_rand, faiss_index, faiss_nodes)

        # Steer towards random sample
        node_new = self._steer(node_nearest, node_rand)
        if node_new is None:
            continue

        # Collision check
        if self.map_.in_collision(
            self.map_.point_float_to_int(node_nearest.current), 
            self.map_.point_float_to_int(node_new.current)
            ):
            continue

        # Find nearby nodes for choosing best parent
        near_nodes = self._get_near_nodes(node_new, faiss_index, faiss_nodes)

        # Choose parent with minimum cost
        min_parent = node_nearest
        min_cost = node_nearest.g + self.get_cost(node_nearest.current, node_new.current)

        for node in near_nodes:
            if self.map_.in_collision(
                self.map_.point_float_to_int(node.current), 
                self.map_.point_float_to_int(node_new.current)
                ):
                continue
            cost = node.g + self.get_cost(node.current, node_new.current)
            if cost < min_cost:
                min_parent = node
                min_cost = cost

        # Add new node
        if self.propagate_cost_to_children:
            self._child.remove(node_new.parent, node_new.current)
            self._child.add(min_parent.current, node_new.current)

        node_new.parent = min_parent.current
        node_new.g = min_cost
        self._tree[node_new.current] = node_new
        if self.use_faiss:
            self._faiss_add_node(node_new, faiss_index, faiss_nodes)

        # Rewire nearby nodes through new node
        for node in near_nodes:
            if node.current == min_parent.current:
                continue
            if self.map_.in_collision(
                self.map_.point_float_to_int(node.current),
                self.map_.point_float_to_int(node_new.current)
                ):
                continue

            new_cost = node_new.g + self.get_cost(node_new.current, node.current)
            if new_cost < node.g:
                if self.propagate_cost_to_children:
                    self._child.remove(node.parent, node.current)
                    self._child.add(node_new.current, node.current)

                node.parent = node_new.current
                node.g = new_cost
                self._tree[node.current] = node

                if self.propagate_cost_to_children:
                    self._propagate_cost_to_children(node)

        # Check goal connection
        dist_to_goal = self.get_cost(node_new.current, self.goal)
        if dist_to_goal <= self.max_dist:
            if not self.map_.in_collision(
                self.map_.point_float_to_int(node_new.current), 
                self.map_.point_float_to_int(self.goal)
                ):
                goal_cost = node_new.g + dist_to_goal
                if self.goal not in self._tree or goal_cost < self._tree[self.goal].g:
                    if node_new.current == self.goal:
                        self._tree[self.goal] = node_new
                    else:
                        self._tree[self.goal] = Node(
                            self.goal,
                            node_new.current,
                            goal_cost,
                            0
                        )
                    path, length, cost = self.extract_path(self._tree)
                    self.best_results = path, {
                        "success": True,
                        "start": self.start,
                        "goal": self.goal,
                        "length": length,
                        "cost": cost,
                        "expand": self._tree,
                    }

                    if not self.stop_until_sample_num:
                        return self.best_results

    n = len(self._tree) + 1
    radius = self.gamma * ((math.log(n) / n) ** (1 / self.dim))

    # Planning stopped
    self.best_results[1]["expand"] = self._tree
    return self.best_results