RRTConnect¶
src.python_motion_planning.path_planner.sample_search.rrt_connect.RRTConnect
¶
Bases: RRT
Class for RRT-Connect path planner, an improved version of RRT.
RRT-Connect uses two trees (one from start, one from goal) that grow towards each other, which typically results in faster convergence than standard RRT.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
see the parent class. |
()
|
|
*kwargs
|
see the parent class. |
{}
|
References
[1] RRT-connect: An efficient approach to single-query path planning.
Examples:
Python Console Session
>>> map_ = Grid(bounds=[[0, 15], [0, 15]])
>>> planner = RRTConnect(map_=map_, start=(5, 5), goal=(10, 10))
>>> path, path_info = planner.plan()
>>> print(path_info['success'])
True
Python Console Session
>>> planner.map_.type_map[3:10, 6] = TYPES.OBSTACLE
>>> path, path_info = planner.plan()
>>> print(path_info['success'])
True
Source code in src\python_motion_planning\path_planner\sample_search\rrt_connect.py
Python
class RRTConnect(RRT):
"""
Class for RRT-Connect path planner, an improved version of RRT.
RRT-Connect uses two trees (one from start, one from goal) that grow towards
each other, which typically results in faster convergence than standard RRT.
Args:
*args: see the parent class.
*kwargs: see the parent class.
References:
[1] RRT-connect: An efficient approach to single-query path planning.
Examples:
>>> map_ = Grid(bounds=[[0, 15], [0, 15]])
>>> planner = RRTConnect(map_=map_, start=(5, 5), goal=(10, 10))
>>> path, path_info = planner.plan()
>>> print(path_info['success'])
True
>>> planner.map_.type_map[3:10, 6] = TYPES.OBSTACLE
>>> path, path_info = planner.plan()
>>> print(path_info['success'])
True
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# Two trees for bidirectional search: one from start, one from goal
self.tree_a = None # Tree originating from start point
self.tree_b = None # Tree originating from goal point
if self.use_faiss:
self.index_a = None
self.index_b = None
self.nodes_a = []
self.nodes_b = []
def __str__(self) -> str:
return "RRT-Connect"
def plan(self) -> Union[List[Tuple[float, ...]], Dict[str, Any]]:
"""
RRT-Connect path planning algorithm implementation.
Returns:
path: A list containing the path waypoints
path_info: A dictionary containing path information
"""
# Initialize both trees with start and goal nodes respectively
self.tree_a = {self.start: Node(self.start, None, 0, 0)}
self.tree_b = {self.goal: Node(self.goal, None, 0, 0)}
# Initialize FAISS index
if self.use_faiss:
self.index_a = faiss.IndexFlatL2(self.dim)
self.index_b = faiss.IndexFlatL2(self.dim)
self.nodes_a = []
self.nodes_b = []
self._faiss_add_node(self.tree_a[self.start], self.index_a, self.nodes_a)
self._faiss_add_node(self.tree_b[self.goal], self.index_b, self.nodes_b)
# Main planning loop
for _ in range(self.sample_num):
# Generate random sample node
node_rand = self._generate_random_node()
# Extend tree A towards random sample
node_new_a = self._extend(self.tree_a, node_rand, self.index_a, self.nodes_a)
if node_new_a:
# Try to connect tree B to the new node from tree A
if self._connect(self.tree_b, node_new_a.current, self.index_b, self.nodes_b):
tree_a_goal = self.tree_a.get(self.goal)
if tree_a_goal is not None and tree_a_goal.parent is None:
self.tree_a, self.tree_b = self.tree_b, self.tree_a
# Path found - combine paths from both trees
path_a, length_a, cost_a = self._extract_subpath(self.tree_a, node_new_a.current, self.start)
path_b, length_b, cost_b = self._extract_subpath(self.tree_b, node_new_a.current, self.goal)
path_a = path_a[::-1]
# Combine paths (remove duplicate meeting point)
full_path = path_a + path_b[1:]
total_length = length_a + length_b
total_cost = cost_a + cost_b
return full_path, {
"success": True,
"start": self.start,
"goal": self.goal,
"length": total_length,
"cost": total_cost,
"expand": [self.tree_a, self.tree_b],
}
# Swap trees to maintain bidirectional growth
self.tree_a, self.tree_b = self.tree_b, self.tree_a
if self.use_faiss:
self.index_a, self.index_b = self.index_b, self.index_a
self.nodes_a, self.nodes_b = self.nodes_b, self.nodes_a
# If loop exits without return, planning failed
self.failed_info[1]["expand"] = [self.tree_a, self.tree_b]
return self.failed_info
def _extend(self, tree: Dict[Tuple[int, ...], Node], node_rand: Node, index=None, nodes=None) -> Union[Node, None]:
"""
Extend the tree towards a random node, adding at most one new node.
Args:
tree: The tree to extend
node_rand: The target node to extend towards
index: FAISS index (required when `use_faiss`=True)
nodes: List of nodes in FAISS index (required when `use_faiss`=True)
Returns:
The new node added to the tree, or None if no node was added
"""
# Find nearest node in the tree
node_near = self._get_nearest_node(tree, node_rand, index, nodes)
# Steer towards the random node
node_new = self._steer(node_near, node_rand)
if node_new is None:
return None
# Check if path is collision-free
if not self.map_.in_collision(
self.map_.point_float_to_int(node_near.current),
self.map_.point_float_to_int(node_new.current)
):
# Add new node to the tree
node_new.parent = node_near.current
node_new.g = node_near.g + self.get_cost(node_near.current, node_new.current)
tree[node_new.current] = node_new
if self.use_faiss:
self._faiss_add_node(node_new, index, nodes)
return node_new
return None
def _connect(self, tree: Dict[Tuple[int, ...], Node], target: Tuple, index=None, nodes=None) -> bool:
"""
Connect the tree to a target point by repeatedly extending towards it.
Args:
tree: The tree to connect
target: The target point to connect to
index: FAISS index (required when `use_faiss`=True)
nodes: List of nodes in FAISS index (required when `use_faiss`=True)
Returns:
True if connection successful, False otherwise
"""
while True:
# Create node for the target
node_target = Node(target, None, 0, 0)
# Find nearest node in the tree
node_near = self._get_nearest_node(tree, node_target, index, nodes)
# Check distance to target
dist = self.get_cost(node_near.current, target)
# If close enough, check final connection
if dist <= self.max_dist:
if not self.map_.in_collision(
self.map_.point_float_to_int(node_near.current),
self.map_.point_float_to_int(target)
):
# Add target to tree
node_new = Node(target, node_near.current,
node_near.g + dist, 0)
tree[target] = node_new
if self.use_faiss:
self._faiss_add_node(node_new, index, nodes)
return True
return False
# Otherwise, extend towards target
node_new = self._steer(node_near, node_target)
if node_new is None:
return False
# Check collision
if self.map_.in_collision(
self.map_.point_float_to_int(node_near.current),
self.map_.point_float_to_int(node_new.current)
):
return False
# Add new node to tree
node_new.parent = node_near.current
node_new.g = node_near.g + self.get_cost(node_near.current, node_new.current)
tree[node_new.current] = node_new
if self.use_faiss:
self._faiss_add_node(node_new, index, nodes)
def _extract_subpath(self, tree: dict, end_point: tuple, root: tuple) -> Tuple[List[Tuple[float, ...]], float, float]:
"""
Extract a subpath from the root of the tree to the end_point.
Args:
tree: Tree to extract path from
end_point: End point of the subpath
root: Root point of the tree
Returns:
path: the subpath
length: length of the subpath
cost: cost of the subpath
"""
length = 0
cost = 0
node = tree.get(end_point)
path = [node.current]
while node.current != root:
node_parent = tree.get(node.parent)
length += self.map_.get_distance(node.current, node_parent.current)
cost += self.get_cost(node.current, node_parent.current)
node = node_parent
path.append(node.current)
return path, length, cost
plan()
¶
RRT-Connect path planning algorithm implementation.
Returns:
| Name | Type | Description |
|---|---|---|
path |
Union[List[Tuple[float, ...]], Dict[str, Any]]
|
A list containing the path waypoints |
path_info |
Union[List[Tuple[float, ...]], Dict[str, Any]]
|
A dictionary containing path information |
Source code in src\python_motion_planning\path_planner\sample_search\rrt_connect.py
Python
def plan(self) -> Union[List[Tuple[float, ...]], Dict[str, Any]]:
"""
RRT-Connect path planning algorithm implementation.
Returns:
path: A list containing the path waypoints
path_info: A dictionary containing path information
"""
# Initialize both trees with start and goal nodes respectively
self.tree_a = {self.start: Node(self.start, None, 0, 0)}
self.tree_b = {self.goal: Node(self.goal, None, 0, 0)}
# Initialize FAISS index
if self.use_faiss:
self.index_a = faiss.IndexFlatL2(self.dim)
self.index_b = faiss.IndexFlatL2(self.dim)
self.nodes_a = []
self.nodes_b = []
self._faiss_add_node(self.tree_a[self.start], self.index_a, self.nodes_a)
self._faiss_add_node(self.tree_b[self.goal], self.index_b, self.nodes_b)
# Main planning loop
for _ in range(self.sample_num):
# Generate random sample node
node_rand = self._generate_random_node()
# Extend tree A towards random sample
node_new_a = self._extend(self.tree_a, node_rand, self.index_a, self.nodes_a)
if node_new_a:
# Try to connect tree B to the new node from tree A
if self._connect(self.tree_b, node_new_a.current, self.index_b, self.nodes_b):
tree_a_goal = self.tree_a.get(self.goal)
if tree_a_goal is not None and tree_a_goal.parent is None:
self.tree_a, self.tree_b = self.tree_b, self.tree_a
# Path found - combine paths from both trees
path_a, length_a, cost_a = self._extract_subpath(self.tree_a, node_new_a.current, self.start)
path_b, length_b, cost_b = self._extract_subpath(self.tree_b, node_new_a.current, self.goal)
path_a = path_a[::-1]
# Combine paths (remove duplicate meeting point)
full_path = path_a + path_b[1:]
total_length = length_a + length_b
total_cost = cost_a + cost_b
return full_path, {
"success": True,
"start": self.start,
"goal": self.goal,
"length": total_length,
"cost": total_cost,
"expand": [self.tree_a, self.tree_b],
}
# Swap trees to maintain bidirectional growth
self.tree_a, self.tree_b = self.tree_b, self.tree_a
if self.use_faiss:
self.index_a, self.index_b = self.index_b, self.index_a
self.nodes_a, self.nodes_b = self.nodes_b, self.nodes_a
# If loop exits without return, planning failed
self.failed_info[1]["expand"] = [self.tree_a, self.tree_b]
return self.failed_info