JPS¶
src.python_motion_planning.global_planner.graph_search.jps.JPS
¶
Bases: AStar
Class for JPS motion planning.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start
|
tuple
|
start point coordinate |
required |
goal
|
tuple
|
goal point coordinate |
required |
env
|
Env
|
environment |
required |
heuristic_type
|
str
|
heuristic function type |
'euclidean'
|
Examples:
Python Console Session
>>> import python_motion_planning as pmp
>>> planner = pmp.JPS((5, 5), (45, 25), pmp.Grid(51, 31))
>>> cost, path, expand = planner.plan() # planning results only
>>> planner.plot.animation(path, str(planner), cost, expand) # animation
>>> planner.run() # run both planning and animation
References
[1] Online Graph Pruning for Pathfinding On Grid Maps
Source code in src\python_motion_planning\global_planner\graph_search\jps.py
Python
class JPS(AStar):
"""
Class for JPS motion planning.
Parameters:
start (tuple): start point coordinate
goal (tuple): goal point coordinate
env (Env): environment
heuristic_type (str): heuristic function type
Examples:
>>> import python_motion_planning as pmp
>>> planner = pmp.JPS((5, 5), (45, 25), pmp.Grid(51, 31))
>>> cost, path, expand = planner.plan() # planning results only
>>> planner.plot.animation(path, str(planner), cost, expand) # animation
>>> planner.run() # run both planning and animation
References:
[1] Online Graph Pruning for Pathfinding On Grid Maps
"""
def __init__(self, start: tuple, goal: tuple, env: Env, heuristic_type: str = "euclidean") -> None:
super().__init__(start, goal, env, heuristic_type)
def __str__(self) -> str:
return "Jump Point Search(JPS)"
def plan(self) -> tuple:
"""
JPS motion plan function.
Returns:
cost (float): path cost
path (list): planning path
expand (list): all nodes that planner has searched
"""
# OPEN list (priority queue) and CLOSED list (hash table)
OPEN = []
heapq.heappush(OPEN, self.start)
CLOSED = dict()
while OPEN:
node = heapq.heappop(OPEN)
# exists in CLOSED list
if node.current in CLOSED:
continue
# goal found
if node == self.goal:
CLOSED[node.current] = node
cost, path = self.extractPath(CLOSED)
return cost, path, list(CLOSED.values())
jp_list = []
for motion in self.motions:
jp = self.jump(node, motion)
# exists and not in CLOSED list
if jp and jp.current not in CLOSED:
jp.parent = node.current
jp.h = self.h(jp, self.goal)
jp_list.append(jp)
for jp in jp_list:
# update OPEN list
heapq.heappush(OPEN, jp)
# goal found
if jp == self.goal:
break
CLOSED[node.current] = node
return [], [], []
def jump(self, node: Node, motion: Node):
"""
Jumping search recursively.
Parameters:
node (Node): current node
motion (Node): the motion that current node executes
Returns:
jump_point (Node): jump point or None if searching fails
"""
# explore a new node
new_node = node + motion
new_node.parent = node.current
new_node.h = self.h(new_node, self.goal)
# hit the obstacle
if new_node.current in self.obstacles:
return None
# goal found
if new_node == self.goal:
return new_node
# diagonal
if motion.x and motion.y:
# if exists jump point at horizontal or vertical
x_dir = Node((motion.x, 0), None, 1, None)
y_dir = Node((0, motion.y), None, 1, None)
if self.jump(new_node, x_dir) or self.jump(new_node, y_dir):
return new_node
# if exists forced neighbor
if self.detectForceNeighbor(new_node, motion):
return new_node
else:
return self.jump(new_node, motion)
def detectForceNeighbor(self, node, motion):
"""
Detect forced neighbor of node.
Parameters:
node (Node): current node
motion (Node): the motion that current node executes
Returns:
flag (bool): True if current node has forced neighbor else Flase
"""
x, y = node.current
x_dir, y_dir = motion.current
# horizontal
if x_dir and not y_dir:
if (x, y + 1) in self.obstacles and \
(x + x_dir, y + 1) not in self.obstacles:
return True
if (x, y - 1) in self.obstacles and \
(x + x_dir, y - 1) not in self.obstacles:
return True
# vertical
if not x_dir and y_dir:
if (x + 1, y) in self.obstacles and \
(x + 1, y + y_dir) not in self.obstacles:
return True
if (x - 1, y) in self.obstacles and \
(x - 1, y + y_dir) not in self.obstacles:
return True
# diagonal
if x_dir and y_dir:
if (x - x_dir, y) in self.obstacles and \
(x - x_dir, y + y_dir) not in self.obstacles:
return True
if (x, y - y_dir) in self.obstacles and \
(x + x_dir, y - y_dir) not in self.obstacles:
return True
return False
detectForceNeighbor(node, motion)
¶
Detect forced neighbor of node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node
|
Node
|
current node |
required |
motion
|
Node
|
the motion that current node executes |
required |
Returns:
Name | Type | Description |
---|---|---|
flag |
bool
|
True if current node has forced neighbor else Flase |
Source code in src\python_motion_planning\global_planner\graph_search\jps.py
Python
def detectForceNeighbor(self, node, motion):
"""
Detect forced neighbor of node.
Parameters:
node (Node): current node
motion (Node): the motion that current node executes
Returns:
flag (bool): True if current node has forced neighbor else Flase
"""
x, y = node.current
x_dir, y_dir = motion.current
# horizontal
if x_dir and not y_dir:
if (x, y + 1) in self.obstacles and \
(x + x_dir, y + 1) not in self.obstacles:
return True
if (x, y - 1) in self.obstacles and \
(x + x_dir, y - 1) not in self.obstacles:
return True
# vertical
if not x_dir and y_dir:
if (x + 1, y) in self.obstacles and \
(x + 1, y + y_dir) not in self.obstacles:
return True
if (x - 1, y) in self.obstacles and \
(x - 1, y + y_dir) not in self.obstacles:
return True
# diagonal
if x_dir and y_dir:
if (x - x_dir, y) in self.obstacles and \
(x - x_dir, y + y_dir) not in self.obstacles:
return True
if (x, y - y_dir) in self.obstacles and \
(x + x_dir, y - y_dir) not in self.obstacles:
return True
return False
jump(node, motion)
¶
Jumping search recursively.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node
|
Node
|
current node |
required |
motion
|
Node
|
the motion that current node executes |
required |
Returns:
Name | Type | Description |
---|---|---|
jump_point |
Node
|
jump point or None if searching fails |
Source code in src\python_motion_planning\global_planner\graph_search\jps.py
Python
def jump(self, node: Node, motion: Node):
"""
Jumping search recursively.
Parameters:
node (Node): current node
motion (Node): the motion that current node executes
Returns:
jump_point (Node): jump point or None if searching fails
"""
# explore a new node
new_node = node + motion
new_node.parent = node.current
new_node.h = self.h(new_node, self.goal)
# hit the obstacle
if new_node.current in self.obstacles:
return None
# goal found
if new_node == self.goal:
return new_node
# diagonal
if motion.x and motion.y:
# if exists jump point at horizontal or vertical
x_dir = Node((motion.x, 0), None, 1, None)
y_dir = Node((0, motion.y), None, 1, None)
if self.jump(new_node, x_dir) or self.jump(new_node, y_dir):
return new_node
# if exists forced neighbor
if self.detectForceNeighbor(new_node, motion):
return new_node
else:
return self.jump(new_node, motion)
plan()
¶
JPS motion plan function.
Returns:
Name | Type | Description |
---|---|---|
cost |
float
|
path cost |
path |
list
|
planning path |
expand |
list
|
all nodes that planner has searched |
Source code in src\python_motion_planning\global_planner\graph_search\jps.py
Python
def plan(self) -> tuple:
"""
JPS motion plan function.
Returns:
cost (float): path cost
path (list): planning path
expand (list): all nodes that planner has searched
"""
# OPEN list (priority queue) and CLOSED list (hash table)
OPEN = []
heapq.heappush(OPEN, self.start)
CLOSED = dict()
while OPEN:
node = heapq.heappop(OPEN)
# exists in CLOSED list
if node.current in CLOSED:
continue
# goal found
if node == self.goal:
CLOSED[node.current] = node
cost, path = self.extractPath(CLOSED)
return cost, path, list(CLOSED.values())
jp_list = []
for motion in self.motions:
jp = self.jump(node, motion)
# exists and not in CLOSED list
if jp and jp.current not in CLOSED:
jp.parent = node.current
jp.h = self.h(jp, self.goal)
jp_list.append(jp)
for jp in jp_list:
# update OPEN list
heapq.heappush(OPEN, jp)
# goal found
if jp == self.goal:
break
CLOSED[node.current] = node
return [], [], []