DStar¶
src.python_motion_planning.global_planner.graph_search.d_star.DStar
¶
Bases: GraphSearcher
Class for D* motion planning.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start
|
tuple
|
start point coordinate |
required |
goal
|
tuple
|
goal point coordinate |
required |
env
|
Env
|
environment |
required |
Examples:
>>> import python_motion_planning as pmp
>>> planner = pmp.DStar((5, 5), (45, 25), pmp.Grid(51, 31))
>>> cost, path, _ = planner.plan() # planning results only
>>> planner.plot.animation(path, str(planner), cost) # animation
>>> planner.run() # run both planning and animation
References
[1]Optimal and Efficient Path Planning for Partially-Known Environments
Source code in src\python_motion_planning\global_planner\graph_search\d_star.py
class DStar(GraphSearcher):
"""
Class for D* motion planning.
Parameters:
start (tuple): start point coordinate
goal (tuple): goal point coordinate
env (Env): environment
Examples:
>>> import python_motion_planning as pmp
>>> planner = pmp.DStar((5, 5), (45, 25), pmp.Grid(51, 31))
>>> cost, path, _ = planner.plan() # planning results only
>>> planner.plot.animation(path, str(planner), cost) # animation
>>> planner.run() # run both planning and animation
References:
[1]Optimal and Efficient Path Planning for Partially-Known Environments
"""
def __init__(self, start: tuple, goal: tuple, env: Env) -> None:
super().__init__(start, goal, env, None)
self.start = DNode(start, None, 'NEW', float('inf'), float("inf"))
self.goal = DNode(goal, None, 'NEW', 0, float('inf'))
# allowed motions
self.motions = [DNode(motion.current, None, None, motion.g, 0) for motion in self.env.motions]
# OPEN list and EXPAND list
self.OPEN = []
self.EXPAND = []
# record history infomation of map grids
self.map = {s: DNode(s, None, 'NEW', float("inf"), float("inf")) for s in self.env.grid_map}
self.map[self.goal.current] = self.goal
self.map[self.start.current] = self.start
# intialize OPEN list
self.insert(self.goal, 0)
def __str__(self) -> str:
return "Dynamic A*(D*)"
def plan(self) -> tuple:
"""
D* static motion planning function.
Returns:
cost (float): path cost
path (list): planning path
_ (None): None
"""
while True:
self.processState()
if self.start.t == 'CLOSED':
break
cost, path = self.extractPath(self.map)
return cost, path, None
def run(self) -> None:
"""
Running both plannig and animation.
"""
# static planning
cost, path, _ = self.plan()
# animation
self.plot.connect('button_press_event', self.OnPress)
self.plot.animation(path, str(self), cost=cost)
def OnPress(self, event) -> None:
"""
Mouse button callback function.
Parameters:
event (MouseEvent): mouse event
"""
x, y = int(event.xdata), int(event.ydata)
if x < 0 or x > self.env.x_range - 1 or y < 0 or y > self.env.y_range - 1:
print("Please choose right area!")
else:
if (x, y) not in self.obstacles:
print("Add obstacle at: ({}, {})".format(x, y))
# update obstacles
self.obstacles.add((x, y))
self.env.update(self.obstacles)
# move from start to goal, replan locally when meeting collisions
node = self.start
self.EXPAND, path, cost = [], [], 0
while node != self.goal:
node_parent = self.map[node.parent]
if self.isCollision(node, node_parent):
self.modify(node, node_parent)
continue
path.append(node.current)
cost += self.cost(node, node_parent)
node = node_parent
self.plot.clean()
self.plot.animation(path, str(self), cost, self.EXPAND)
self.plot.update()
def extractPath(self, closed_list: dict) -> tuple:
"""
Extract the path based on the CLOSED list.
Parameters:
closed_list (dict): CLOSED list
Returns:
cost (float): the cost of planning path
path (list): the planning path
"""
cost = 0
node = self.start
path = [node.current]
while node != self.goal:
node_parent = closed_list[node.parent]
cost += self.cost(node, node_parent)
node = node_parent
path.append(node.current)
return cost, path
def processState(self) -> float:
"""
Broadcast dynamic obstacle information.
Returns:
min_k (float): minimum k value of map
"""
# get node in OPEN list with min k value
node = self.min_state
self.EXPAND.append(node)
if node is None:
return -1
# record the min k value of this iteration
k_old = self.min_k
# move node from OPEN list to CLOSED list
self.delete(node)
# k_min < h[x] --> x: RAISE state (try to reduce k value by neighbor)
if k_old < node.h:
for node_n in self.getNeighbor(node):
if node_n.h <= k_old and node.h > node_n.h + self.cost(node, node_n):
# update h_value and choose parent
node.parent = node_n.current
node.h = node_n.h + self.cost(node, node_n)
# k_min >= h[x] -- > x: LOWER state (cost reductions)
if k_old == node.h:
for node_n in self.getNeighbor(node):
if node_n.t == 'NEW' or \
(node_n.parent == node.current and node_n.h != node.h + self.cost(node, node_n)) or \
(node_n.parent != node.current and node_n.h > node.h + self.cost(node, node_n)):
# Condition:
# 1) t[node_n] == 'NEW': not visited
# 2) node_n's parent: cost reduction
# 3) node_n find a better parent
node_n.parent = node.current
self.insert(node_n, node.h + self.cost(node, node_n))
else:
for node_n in self.getNeighbor(node):
if node_n.t == 'NEW' or \
(node_n.parent == node.current and node_n.h != node.h + self.cost(node, node_n)):
# Condition:
# 1) t[node_n] == 'NEW': not visited
# 2) node_n's parent: cost reduction
node_n.parent = node.current
self.insert(node_n, node.h + self.cost(node, node_n))
else:
if node_n.parent != node.current and \
node_n.h > node.h + self.cost(node, node_n):
# Condition: LOWER happened in OPEN list (s), s should be explored again
self.insert(node, node.h)
else:
if node_n.parent != node.current and \
node.h > node_n.h + self.cost(node, node_n) and \
node_n.t == 'CLOSED' and \
node_n.h > k_old:
# Condition: LOWER happened in CLOSED list (s_n), s_n should be explored again
self.insert(node_n, node_n.h)
return self.min_k
@property
def min_state(self) -> DNode:
"""
Choose the node with the minimum k value in OPEN list.
"""
if not self.OPEN:
return None
return min(self.OPEN, key=lambda node: node.k)
@property
def min_k(self) -> float:
"""
Choose the minimum k value for nodes in OPEN list.
"""
return self.min_state.k
def insert(self, node: DNode, h_new: float) -> None:
"""
Insert node into OPEN list.
Parameters:
node (DNode): the node to insert
h_new (float): new or better cost to come value
"""
if node.t == 'NEW': node.k = h_new
elif node.t == 'OPEN': node.k = min(node.k, h_new)
elif node.t == 'CLOSED': node.k = min(node.h, h_new)
node.h, node.t = h_new, 'OPEN'
self.OPEN.append(node)
def delete(self, node: DNode) -> None:
"""
Delete node from OPEN list.
Parameters:
node (DNode): the node to delete
"""
if node.t == 'OPEN':
node.t = 'CLOSED'
self.OPEN.remove(node)
def modify(self, node: DNode, node_parent: DNode) -> None:
"""
Start processing from node.
Parameters:
node (DNode): the node to modify
node_parent (DNode): the parent node of `node`
"""
if node.t == 'CLOSED':
self.insert(node, node_parent.h + self.cost(node, node_parent))
while True:
k_min = self.processState()
if k_min >= node.h:
break
def getNeighbor(self, node: DNode) -> list:
"""
Find neighbors of node.
Parameters:
node (DNode): current node
Returns:
neighbors (list): neighbors of current node
"""
neighbors = []
for motion in self.motions:
n = self.map[(node + motion).current]
if not self.isCollision(node, n):
neighbors.append(n)
return neighbors
min_k: float
property
¶
Choose the minimum k value for nodes in OPEN list.
min_state: DNode
property
¶
Choose the node with the minimum k value in OPEN list.
OnPress(event)
¶
Mouse button callback function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
|
MouseEvent
|
mouse event |
required |
Source code in src\python_motion_planning\global_planner\graph_search\d_star.py
def OnPress(self, event) -> None:
"""
Mouse button callback function.
Parameters:
event (MouseEvent): mouse event
"""
x, y = int(event.xdata), int(event.ydata)
if x < 0 or x > self.env.x_range - 1 or y < 0 or y > self.env.y_range - 1:
print("Please choose right area!")
else:
if (x, y) not in self.obstacles:
print("Add obstacle at: ({}, {})".format(x, y))
# update obstacles
self.obstacles.add((x, y))
self.env.update(self.obstacles)
# move from start to goal, replan locally when meeting collisions
node = self.start
self.EXPAND, path, cost = [], [], 0
while node != self.goal:
node_parent = self.map[node.parent]
if self.isCollision(node, node_parent):
self.modify(node, node_parent)
continue
path.append(node.current)
cost += self.cost(node, node_parent)
node = node_parent
self.plot.clean()
self.plot.animation(path, str(self), cost, self.EXPAND)
self.plot.update()
delete(node)
¶
Delete node from OPEN list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node
|
DNode
|
the node to delete |
required |
extractPath(closed_list)
¶
Extract the path based on the CLOSED list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
closed_list
|
dict
|
CLOSED list |
required |
Returns:
Name | Type | Description |
---|---|---|
cost |
float
|
the cost of planning path |
path |
list
|
the planning path |
Source code in src\python_motion_planning\global_planner\graph_search\d_star.py
def extractPath(self, closed_list: dict) -> tuple:
"""
Extract the path based on the CLOSED list.
Parameters:
closed_list (dict): CLOSED list
Returns:
cost (float): the cost of planning path
path (list): the planning path
"""
cost = 0
node = self.start
path = [node.current]
while node != self.goal:
node_parent = closed_list[node.parent]
cost += self.cost(node, node_parent)
node = node_parent
path.append(node.current)
return cost, path
getNeighbor(node)
¶
Find neighbors of node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node
|
DNode
|
current node |
required |
Returns:
Name | Type | Description |
---|---|---|
neighbors |
list
|
neighbors of current node |
Source code in src\python_motion_planning\global_planner\graph_search\d_star.py
def getNeighbor(self, node: DNode) -> list:
"""
Find neighbors of node.
Parameters:
node (DNode): current node
Returns:
neighbors (list): neighbors of current node
"""
neighbors = []
for motion in self.motions:
n = self.map[(node + motion).current]
if not self.isCollision(node, n):
neighbors.append(n)
return neighbors
insert(node, h_new)
¶
Insert node into OPEN list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node
|
DNode
|
the node to insert |
required |
h_new
|
float
|
new or better cost to come value |
required |
Source code in src\python_motion_planning\global_planner\graph_search\d_star.py
def insert(self, node: DNode, h_new: float) -> None:
"""
Insert node into OPEN list.
Parameters:
node (DNode): the node to insert
h_new (float): new or better cost to come value
"""
if node.t == 'NEW': node.k = h_new
elif node.t == 'OPEN': node.k = min(node.k, h_new)
elif node.t == 'CLOSED': node.k = min(node.h, h_new)
node.h, node.t = h_new, 'OPEN'
self.OPEN.append(node)
modify(node, node_parent)
¶
Start processing from node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node
|
DNode
|
the node to modify |
required |
node_parent
|
DNode
|
the parent node of |
required |
Source code in src\python_motion_planning\global_planner\graph_search\d_star.py
def modify(self, node: DNode, node_parent: DNode) -> None:
"""
Start processing from node.
Parameters:
node (DNode): the node to modify
node_parent (DNode): the parent node of `node`
"""
if node.t == 'CLOSED':
self.insert(node, node_parent.h + self.cost(node, node_parent))
while True:
k_min = self.processState()
if k_min >= node.h:
break
plan()
¶
D* static motion planning function.
Returns:
Name | Type | Description |
---|---|---|
cost |
float
|
path cost |
path |
list
|
planning path |
_ |
None
|
None |
Source code in src\python_motion_planning\global_planner\graph_search\d_star.py
processState()
¶
Broadcast dynamic obstacle information.
Returns:
Name | Type | Description |
---|---|---|
min_k |
float
|
minimum k value of map |
Source code in src\python_motion_planning\global_planner\graph_search\d_star.py
def processState(self) -> float:
"""
Broadcast dynamic obstacle information.
Returns:
min_k (float): minimum k value of map
"""
# get node in OPEN list with min k value
node = self.min_state
self.EXPAND.append(node)
if node is None:
return -1
# record the min k value of this iteration
k_old = self.min_k
# move node from OPEN list to CLOSED list
self.delete(node)
# k_min < h[x] --> x: RAISE state (try to reduce k value by neighbor)
if k_old < node.h:
for node_n in self.getNeighbor(node):
if node_n.h <= k_old and node.h > node_n.h + self.cost(node, node_n):
# update h_value and choose parent
node.parent = node_n.current
node.h = node_n.h + self.cost(node, node_n)
# k_min >= h[x] -- > x: LOWER state (cost reductions)
if k_old == node.h:
for node_n in self.getNeighbor(node):
if node_n.t == 'NEW' or \
(node_n.parent == node.current and node_n.h != node.h + self.cost(node, node_n)) or \
(node_n.parent != node.current and node_n.h > node.h + self.cost(node, node_n)):
# Condition:
# 1) t[node_n] == 'NEW': not visited
# 2) node_n's parent: cost reduction
# 3) node_n find a better parent
node_n.parent = node.current
self.insert(node_n, node.h + self.cost(node, node_n))
else:
for node_n in self.getNeighbor(node):
if node_n.t == 'NEW' or \
(node_n.parent == node.current and node_n.h != node.h + self.cost(node, node_n)):
# Condition:
# 1) t[node_n] == 'NEW': not visited
# 2) node_n's parent: cost reduction
node_n.parent = node.current
self.insert(node_n, node.h + self.cost(node, node_n))
else:
if node_n.parent != node.current and \
node_n.h > node.h + self.cost(node, node_n):
# Condition: LOWER happened in OPEN list (s), s should be explored again
self.insert(node, node.h)
else:
if node_n.parent != node.current and \
node.h > node_n.h + self.cost(node, node_n) and \
node_n.t == 'CLOSED' and \
node_n.h > k_old:
# Condition: LOWER happened in CLOSED list (s_n), s_n should be explored again
self.insert(node_n, node_n.h)
return self.min_k
run()
¶
Running both plannig and animation.