Skip to content

JPS

src.python_motion_planning.global_planner.graph_search.jps.JPS

Bases: AStar

Class for JPS motion planning.

Parameters:

Name Type Description Default
start tuple

start point coordinate

required
goal tuple

goal point coordinate

required
env Env

environment

required
heuristic_type str

heuristic function type

'euclidean'

Examples:

Python Console Session
>>> import python_motion_planning as pmp
>>> planner = pmp.JPS((5, 5), (45, 25), pmp.Grid(51, 31))
>>> cost, path, expand = planner.plan()     # planning results only
>>> planner.plot.animation(path, str(planner), cost, expand)  # animation
>>> planner.run()       # run both planning and animation
References

[1] Online Graph Pruning for Pathfinding On Grid Maps

Source code in src\python_motion_planning\global_planner\graph_search\jps.py
Python
class JPS(AStar):
    """
    Class for JPS motion planning.

    Parameters:
        start (tuple): start point coordinate
        goal (tuple): goal point coordinate
        env (Env): environment
        heuristic_type (str): heuristic function type

    Examples:
        >>> import python_motion_planning as pmp
        >>> planner = pmp.JPS((5, 5), (45, 25), pmp.Grid(51, 31))
        >>> cost, path, expand = planner.plan()     # planning results only
        >>> planner.plot.animation(path, str(planner), cost, expand)  # animation
        >>> planner.run()       # run both planning and animation

    References:
        [1] Online Graph Pruning for Pathfinding On Grid Maps
    """
    def __init__(self, start: tuple, goal: tuple, env: Env, heuristic_type: str = "euclidean") -> None:
        super().__init__(start, goal, env, heuristic_type)

    def __str__(self) -> str:
        return "Jump Point Search(JPS)"

    def plan(self) -> tuple:
        """
        JPS motion plan function.

        Returns:
            cost (float): path cost
            path (list): planning path
            expand (list): all nodes that planner has searched
        """
        # OPEN list (priority queue) and CLOSED list (hash table)
        OPEN = []
        heapq.heappush(OPEN, self.start)
        CLOSED = dict()

        while OPEN:
            node = heapq.heappop(OPEN)

            # exists in CLOSED list
            if node.current in CLOSED:
                continue

            # goal found
            if node == self.goal:
                CLOSED[node.current] = node
                cost, path = self.extractPath(CLOSED)
                return cost, path, list(CLOSED.values())

            jp_list = []
            for motion in self.motions:
                jp = self.jump(node, motion)
                # exists and not in CLOSED list
                if jp and jp.current not in CLOSED:
                    jp.parent = node.current
                    jp.h = self.h(jp, self.goal)
                    jp_list.append(jp)

            for jp in jp_list:
                # update OPEN list
                heapq.heappush(OPEN, jp)

                # goal found
                if jp == self.goal:
                    break

            CLOSED[node.current] = node
        return [], [], []

    def jump(self, node: Node, motion: Node):
        """
        Jumping search recursively.

        Parameters:
            node (Node): current node
            motion (Node): the motion that current node executes

        Returns:
            jump_point (Node): jump point or None if searching fails
        """
        # explore a new node
        new_node = node + motion
        new_node.parent = node.current
        new_node.h = self.h(new_node, self.goal)

        # hit the obstacle
        if new_node.current in self.obstacles:
            return None

        # goal found
        if new_node == self.goal:
            return new_node

        # diagonal
        if motion.x and motion.y:
            # if exists jump point at horizontal or vertical
            x_dir = Node((motion.x, 0), None, 1, None)
            y_dir = Node((0, motion.y), None, 1, None)
            if self.jump(new_node, x_dir) or self.jump(new_node, y_dir):
                return new_node

        # if exists forced neighbor
        if self.detectForceNeighbor(new_node, motion):
            return new_node
        else:
            return self.jump(new_node, motion)

    def detectForceNeighbor(self, node, motion):
        """
        Detect forced neighbor of node.

        Parameters:
            node (Node): current node
            motion (Node): the motion that current node executes

        Returns:
            flag (bool): True if current node has forced neighbor else Flase
        """
        x, y = node.current
        x_dir, y_dir = motion.current

        # horizontal
        if x_dir and not y_dir:
            if (x, y + 1) in self.obstacles and \
                (x + x_dir, y + 1) not in self.obstacles:
                return True
            if (x, y - 1) in self.obstacles and \
                (x + x_dir, y - 1) not in self.obstacles:
                return True

        # vertical
        if not x_dir and y_dir:
            if (x + 1, y) in self.obstacles and \
                (x + 1, y + y_dir) not in self.obstacles:
                return True
            if (x - 1, y) in self.obstacles and \
                (x - 1, y + y_dir) not in self.obstacles:
                return True

        # diagonal
        if x_dir and y_dir:
            if (x - x_dir, y) in self.obstacles and \
                (x - x_dir, y + y_dir) not in self.obstacles:
                return True
            if (x, y - y_dir) in self.obstacles and \
                (x + x_dir, y - y_dir) not in self.obstacles:
                return True

        return False

detectForceNeighbor(node, motion)

Detect forced neighbor of node.

Parameters:

Name Type Description Default
node Node

current node

required
motion Node

the motion that current node executes

required

Returns:

Name Type Description
flag bool

True if current node has forced neighbor else Flase

Source code in src\python_motion_planning\global_planner\graph_search\jps.py
Python
def detectForceNeighbor(self, node, motion):
    """
    Detect forced neighbor of node.

    Parameters:
        node (Node): current node
        motion (Node): the motion that current node executes

    Returns:
        flag (bool): True if current node has forced neighbor else Flase
    """
    x, y = node.current
    x_dir, y_dir = motion.current

    # horizontal
    if x_dir and not y_dir:
        if (x, y + 1) in self.obstacles and \
            (x + x_dir, y + 1) not in self.obstacles:
            return True
        if (x, y - 1) in self.obstacles and \
            (x + x_dir, y - 1) not in self.obstacles:
            return True

    # vertical
    if not x_dir and y_dir:
        if (x + 1, y) in self.obstacles and \
            (x + 1, y + y_dir) not in self.obstacles:
            return True
        if (x - 1, y) in self.obstacles and \
            (x - 1, y + y_dir) not in self.obstacles:
            return True

    # diagonal
    if x_dir and y_dir:
        if (x - x_dir, y) in self.obstacles and \
            (x - x_dir, y + y_dir) not in self.obstacles:
            return True
        if (x, y - y_dir) in self.obstacles and \
            (x + x_dir, y - y_dir) not in self.obstacles:
            return True

    return False

jump(node, motion)

Jumping search recursively.

Parameters:

Name Type Description Default
node Node

current node

required
motion Node

the motion that current node executes

required

Returns:

Name Type Description
jump_point Node

jump point or None if searching fails

Source code in src\python_motion_planning\global_planner\graph_search\jps.py
Python
def jump(self, node: Node, motion: Node):
    """
    Jumping search recursively.

    Parameters:
        node (Node): current node
        motion (Node): the motion that current node executes

    Returns:
        jump_point (Node): jump point or None if searching fails
    """
    # explore a new node
    new_node = node + motion
    new_node.parent = node.current
    new_node.h = self.h(new_node, self.goal)

    # hit the obstacle
    if new_node.current in self.obstacles:
        return None

    # goal found
    if new_node == self.goal:
        return new_node

    # diagonal
    if motion.x and motion.y:
        # if exists jump point at horizontal or vertical
        x_dir = Node((motion.x, 0), None, 1, None)
        y_dir = Node((0, motion.y), None, 1, None)
        if self.jump(new_node, x_dir) or self.jump(new_node, y_dir):
            return new_node

    # if exists forced neighbor
    if self.detectForceNeighbor(new_node, motion):
        return new_node
    else:
        return self.jump(new_node, motion)

plan()

JPS motion plan function.

Returns:

Name Type Description
cost float

path cost

path list

planning path

expand list

all nodes that planner has searched

Source code in src\python_motion_planning\global_planner\graph_search\jps.py
Python
def plan(self) -> tuple:
    """
    JPS motion plan function.

    Returns:
        cost (float): path cost
        path (list): planning path
        expand (list): all nodes that planner has searched
    """
    # OPEN list (priority queue) and CLOSED list (hash table)
    OPEN = []
    heapq.heappush(OPEN, self.start)
    CLOSED = dict()

    while OPEN:
        node = heapq.heappop(OPEN)

        # exists in CLOSED list
        if node.current in CLOSED:
            continue

        # goal found
        if node == self.goal:
            CLOSED[node.current] = node
            cost, path = self.extractPath(CLOSED)
            return cost, path, list(CLOSED.values())

        jp_list = []
        for motion in self.motions:
            jp = self.jump(node, motion)
            # exists and not in CLOSED list
            if jp and jp.current not in CLOSED:
                jp.parent = node.current
                jp.h = self.h(jp, self.goal)
                jp_list.append(jp)

        for jp in jp_list:
            # update OPEN list
            heapq.heappush(OPEN, jp)

            # goal found
            if jp == self.goal:
                break

        CLOSED[node.current] = node
    return [], [], []