Skip to content

DStarLite

src.python_motion_planning.global_planner.graph_search.d_star_lite.DStarLite

Bases: LPAStar

Class for D* Lite motion planning.

Parameters:

Name Type Description Default
start tuple

start point coordinate

required
goal tuple

goal point coordinate

required
env Env

environment

required
heuristic_type str

heuristic function type

'euclidean'

Examples:

Python Console Session
>>> import python_motion_planning as pmp
>>> planner = pmp.DStarLite((5, 5), (45, 25), pmp.Grid(51, 31))
>>> cost, path, _ = planner.plan()     # planning results only
>>> planner.plot.animation(path, str(planner), cost)  # animation
>>> planner.run()       # run both planning and animation
References

[1] D* Lite

Source code in src\python_motion_planning\global_planner\graph_search\d_star_lite.py
Python
class DStarLite(LPAStar):
    """
    Class for D* Lite motion planning.

    Parameters:
        start (tuple): start point coordinate
        goal (tuple): goal point coordinate
        env (Env): environment
        heuristic_type (str): heuristic function type

    Examples:
        >>> import python_motion_planning as pmp
        >>> planner = pmp.DStarLite((5, 5), (45, 25), pmp.Grid(51, 31))
        >>> cost, path, _ = planner.plan()     # planning results only
        >>> planner.plot.animation(path, str(planner), cost)  # animation
        >>> planner.run()       # run both planning and animation

    References:
        [1] D* Lite
    """
    def __init__(self, start: tuple, goal: tuple, env: Env, heuristic_type: str = "euclidean") -> None:
        GraphSearcher.__init__(self, start, goal, env, heuristic_type)
        # start and goal
        self.start = LNode(start, float('inf'), float('inf'), None)
        self.goal = LNode(goal, float('inf'), 0.0, None)
        # correction
        self.km = 0
        # OPEN set and expand zone
        self.U, self.EXPAND = [], []

        # intialize global information, record history infomation of map grids
        self.map = {s: LNode(s, float('inf'), float('inf'), None) for s in self.env.grid_map}
        self.map[self.goal.current] = self.goal
        self.map[self.start.current] = self.start
        # OPEN set with priority
        self.goal.key = self.calculateKey(self.goal)
        heapq.heappush(self.U, self.goal)

    def __str__(self) -> str:
        return "D* Lite"

    def OnPress(self, event) -> None:
        """
        Mouse button callback function.

        Parameters:
            event (MouseEvent): mouse event
        """
        x, y = int(event.xdata), int(event.ydata)
        if x < 0 or x > self.env.x_range - 1 or y < 0 or y > self.env.y_range - 1:
            print("Please choose right area!")
        else:
            print("Change position: x = {}, y = {}".format(x, y))

            cur_start, new_start = self.start, self.start
            update_start = True
            cost, count = 0, 0
            path = [self.start.current]
            self.EXPAND = []

            while cur_start != self.goal:
                neighbors = [node_n for node_n in self.getNeighbor(cur_start)
                    if not self.isCollision(cur_start, node_n)]
                next_node = min(neighbors, key=lambda n: n.g)
                path.append(next_node.current)
                cost += self.cost(cur_start, next_node)
                count += 1
                cur_start = next_node

                if update_start:
                    update_start = False
                    self.km = self.h(cur_start, new_start)
                    new_start = cur_start

                    node_change = self.map[(x, y)]
                    if (x, y) not in self.obstacles:
                        self.obstacles.add((x, y))
                    else:
                        self.obstacles.remove((x, y))
                        self.updateVertex(node_change)

                    self.env.update(self.obstacles)
                    for node_n in self.getNeighbor(node_change):
                        self.updateVertex(node_n)

                    self.computeShortestPath()    

            # animation
            self.plot.clean()
            self.plot.animation(path, str(self), cost, self.EXPAND)
            self.plot.update()

    def computeShortestPath(self) -> None:
        """
        Perceived dynamic obstacle information to optimize global path.
        """
        while True:
            node = min(self.U, key=lambda node: node.key)
            if node.key >= self.calculateKey(self.start) and \
                    self.start.rhs == self.start.g:
                break

            self.U.remove(node)
            self.EXPAND.append(node)

            # affected by obstacles
            if node.key < self.calculateKey(node):
                node.key = self.calculateKey(node)
                heapq.heappush(self.U, node)
            # Locally over-consistent -> Locally consistent
            elif node.g > node.rhs:
                node.g = node.rhs
                for node_n in self.getNeighbor(node):
                    self.updateVertex(node_n)
            # Locally under-consistent -> Locally over-consistent
            else:
                node.g = float("inf")
                self.updateVertex(node)
                for node_n in self.getNeighbor(node):
                    self.updateVertex(node_n)

    def updateVertex(self, node: LNode) -> None:
        """
        Update the status and the current cost to node and it's neighbor.

        Parameters:
            node (LNode): the node to be updated
        """
        # greed correction(reverse searching)
        if node != self.goal:
            node.rhs = min([node_n.g + self.cost(node_n, node)
                        for node_n in self.getNeighbor(node)])

        if node in self.U:
            self.U.remove(node)

        # Locally unconsistent nodes should be added into OPEN set (set U)
        if node.g != node.rhs:
            node.key = self.calculateKey(node)
            heapq.heappush(self.U, node)

    def calculateKey(self, node: LNode) -> list:
        """
        Calculate priority of node.

        Parameters:
            node (LNode): the node to be calculated

        Returns:
            key (list): the priority of node
        """
        return [min(node.g, node.rhs) + self.h(node, self.start) + self.km,
                min(node.g, node.rhs)]

    def extractPath(self) -> tuple:
        """
        Extract the path based on greedy policy.

        Returns:
            cost (float): the cost of planning path
            path (list): the planning path
        """
        node = self.start
        path = [node.current]
        cost, count = 0, 0
        while node != self.goal:
            neighbors = [node_n for node_n in self.getNeighbor(node) if not self.isCollision(node, node_n)]
            next_node = min(neighbors, key=lambda n: n.g)
            path.append(next_node.current)
            cost += self.cost(node, next_node)
            node = next_node
            count += 1
            if count == 1000:
                return cost, []
        return cost, list(path)

OnPress(event)

Mouse button callback function.

Parameters:

Name Type Description Default
event MouseEvent

mouse event

required
Source code in src\python_motion_planning\global_planner\graph_search\d_star_lite.py
Python
def OnPress(self, event) -> None:
    """
    Mouse button callback function.

    Parameters:
        event (MouseEvent): mouse event
    """
    x, y = int(event.xdata), int(event.ydata)
    if x < 0 or x > self.env.x_range - 1 or y < 0 or y > self.env.y_range - 1:
        print("Please choose right area!")
    else:
        print("Change position: x = {}, y = {}".format(x, y))

        cur_start, new_start = self.start, self.start
        update_start = True
        cost, count = 0, 0
        path = [self.start.current]
        self.EXPAND = []

        while cur_start != self.goal:
            neighbors = [node_n for node_n in self.getNeighbor(cur_start)
                if not self.isCollision(cur_start, node_n)]
            next_node = min(neighbors, key=lambda n: n.g)
            path.append(next_node.current)
            cost += self.cost(cur_start, next_node)
            count += 1
            cur_start = next_node

            if update_start:
                update_start = False
                self.km = self.h(cur_start, new_start)
                new_start = cur_start

                node_change = self.map[(x, y)]
                if (x, y) not in self.obstacles:
                    self.obstacles.add((x, y))
                else:
                    self.obstacles.remove((x, y))
                    self.updateVertex(node_change)

                self.env.update(self.obstacles)
                for node_n in self.getNeighbor(node_change):
                    self.updateVertex(node_n)

                self.computeShortestPath()    

        # animation
        self.plot.clean()
        self.plot.animation(path, str(self), cost, self.EXPAND)
        self.plot.update()

calculateKey(node)

Calculate priority of node.

Parameters:

Name Type Description Default
node LNode

the node to be calculated

required

Returns:

Name Type Description
key list

the priority of node

Source code in src\python_motion_planning\global_planner\graph_search\d_star_lite.py
Python
def calculateKey(self, node: LNode) -> list:
    """
    Calculate priority of node.

    Parameters:
        node (LNode): the node to be calculated

    Returns:
        key (list): the priority of node
    """
    return [min(node.g, node.rhs) + self.h(node, self.start) + self.km,
            min(node.g, node.rhs)]

computeShortestPath()

Perceived dynamic obstacle information to optimize global path.

Source code in src\python_motion_planning\global_planner\graph_search\d_star_lite.py
Python
def computeShortestPath(self) -> None:
    """
    Perceived dynamic obstacle information to optimize global path.
    """
    while True:
        node = min(self.U, key=lambda node: node.key)
        if node.key >= self.calculateKey(self.start) and \
                self.start.rhs == self.start.g:
            break

        self.U.remove(node)
        self.EXPAND.append(node)

        # affected by obstacles
        if node.key < self.calculateKey(node):
            node.key = self.calculateKey(node)
            heapq.heappush(self.U, node)
        # Locally over-consistent -> Locally consistent
        elif node.g > node.rhs:
            node.g = node.rhs
            for node_n in self.getNeighbor(node):
                self.updateVertex(node_n)
        # Locally under-consistent -> Locally over-consistent
        else:
            node.g = float("inf")
            self.updateVertex(node)
            for node_n in self.getNeighbor(node):
                self.updateVertex(node_n)

extractPath()

Extract the path based on greedy policy.

Returns:

Name Type Description
cost float

the cost of planning path

path list

the planning path

Source code in src\python_motion_planning\global_planner\graph_search\d_star_lite.py
Python
def extractPath(self) -> tuple:
    """
    Extract the path based on greedy policy.

    Returns:
        cost (float): the cost of planning path
        path (list): the planning path
    """
    node = self.start
    path = [node.current]
    cost, count = 0, 0
    while node != self.goal:
        neighbors = [node_n for node_n in self.getNeighbor(node) if not self.isCollision(node, node_n)]
        next_node = min(neighbors, key=lambda n: n.g)
        path.append(next_node.current)
        cost += self.cost(node, next_node)
        node = next_node
        count += 1
        if count == 1000:
            return cost, []
    return cost, list(path)

updateVertex(node)

Update the status and the current cost to node and it's neighbor.

Parameters:

Name Type Description Default
node LNode

the node to be updated

required
Source code in src\python_motion_planning\global_planner\graph_search\d_star_lite.py
Python
def updateVertex(self, node: LNode) -> None:
    """
    Update the status and the current cost to node and it's neighbor.

    Parameters:
        node (LNode): the node to be updated
    """
    # greed correction(reverse searching)
    if node != self.goal:
        node.rhs = min([node_n.g + self.cost(node_n, node)
                    for node_n in self.getNeighbor(node)])

    if node in self.U:
        self.U.remove(node)

    # Locally unconsistent nodes should be added into OPEN set (set U)
    if node.g != node.rhs:
        node.key = self.calculateKey(node)
        heapq.heappush(self.U, node)