Skip to content

BaseVisualizer

src.python_motion_planning.common.visualizer.base_visualizer.BaseVisualizer

Bases: ABC

Base visualizer for motion planning.

Source code in src\python_motion_planning\common\visualizer\base_visualizer.py
Python
class BaseVisualizer(ABC):
    """
    Base visualizer for motion planning.
    """
    def __init__(self):
        self.dim = None
        self.trajs = {}

    def __del__(self):
        self.close()

    @abstractmethod
    def show(self):
        """
        Show plot.
        """
        pass

    @abstractmethod
    def close(self):
        """
        Close plot.
        """
        pass

    def get_traj_info(self, 
            rid: int, 
            ref_path: List[Tuple[float, ...]], 
            goal_pose: np.ndarray, 
            goal_dist_tol: float, 
            goal_orient_tol: float
        ) -> Dict[str, Any]:
        """
        Get trajectory information.

        Args:
            rid: Robot ID.
            ref_path: Reference pose path (world frame).
            goal_pose: Goal pose.
            goal_dist_tol: Distance tolerance for goal.
            goal_orient_tol: Orientation tolerance for goal.
        """
        traj = self.trajs[rid]

        info = {
            "traj_length": 0.0,
            "navigation_error": None,
            "DTW": None,
            "nDTW": None,
            "success": False,
            "dist_success": False, 
            "oracle_success": False,
            "oracle_dist_success": False,
            "success_time": None,
            "dist_success_time": None,
            "oracle_success_time": None,
            "oracle_dist_success_time": None,
        }

        goal_pos = goal_pose[:self.dim]
        goal_orient = goal_pose[self.dim:]

        for i in range(len(traj["poses"])):
            pose = traj["poses"][i]
            time = traj["time"][i]

            pos = pose[:self.dim]
            orient = pose[self.dim:]

            if i > 0:
                info["traj_length"] += np.linalg.norm(pos - traj["poses"][i-1][:self.dim])

            if np.linalg.norm(pos - goal_pos) <= goal_dist_tol:
                if not info["oracle_dist_success"]:
                    info["oracle_dist_success"] = True
                    info["oracle_dist_success_time"] = time

                if not info["dist_success"]:
                    info["dist_success"] = True
                    info["dist_success_time"] = time

                if np.abs(Geometry.regularize_orient(orient - goal_orient)) <= goal_orient_tol:
                    if not info["oracle_success"]:
                        info["oracle_success"] = True
                        info["oracle_success_time"] = time  

                    if not info["success"]:
                        info["success"] = True
                        info["success_time"] = time

                else:
                    info["success"] = False
                    info["success_time"] = None

            else:
                info["success"] = False
                info["success_time"] = None
                info["dist_success"] = False
                info["dist_success_time"] = None

        info["navigation_error"] = float(np.linalg.norm(traj["poses"][-1][:self.dim] - goal_pos))
        info["traj_length"] = float(info["traj_length"])
        info["DTW"], info["nDTW"] = self.calc_dtw_ndtw(np.array(traj["poses"])[:, :self.dim], np.array(ref_path)[:, :self.dim])
        return info

    def calc_dtw_ndtw(self, path1: np.ndarray, path2: np.ndarray) -> Tuple[float, float]:
        """
        Compute the Dynamic Time Warping (DTW) and normalized DTW (nDTW)
        between two N-dimensional paths.

        Args:
            path1 (np.ndarray): Path 1, shape (N, D)
            path2 (np.ndarray): Path 2, shape (M, D)

        Returns:
            dtw: accumulated dynamic time warping distance
            ndtw: normalized DTW in [0, 1], higher means more similar

        Reference:
            [1] General Evaluation for Instruction Conditioned Navigation using Dynamic Time Warping
        """
        # Input validation
        if path1.ndim != 2 or path2.ndim != 2:
            raise ValueError("Both paths must be 2D arrays with shape (T, D).")
        if path1.shape[1] != path2.shape[1]:
            raise ValueError("Paths must have the same dimensionality.")

        N, M = len(path1), len(path2)

        # Initialize DTW cost matrix
        dtw_matrix = np.full((N + 1, M + 1), np.inf)
        dtw_matrix[0, 0] = 0.0

        # Fill the DTW matrix
        for i in range(1, N + 1):
            for j in range(1, M + 1):
                # Euclidean distance between points
                cost = np.linalg.norm(path1[i - 1] - path2[j - 1])
                # Accumulate cost with the best previous step
                dtw_matrix[i, j] = cost + min(
                    dtw_matrix[i - 1, j],      # insertion
                    dtw_matrix[i, j - 1],      # deletion
                    dtw_matrix[i - 1, j - 1]   # match
                )

        # Final DTW distance
        dtw = dtw_matrix[N, M]

        # Normalized DTW: exponential decay with respect to path length
        max_len = max(N, M)
        ndtw = np.exp(-dtw / (max_len + 1e-8))  # nDTW ∈ (0, 1]

        return float(dtw), float(ndtw)

calc_dtw_ndtw(path1, path2)

Compute the Dynamic Time Warping (DTW) and normalized DTW (nDTW) between two N-dimensional paths.

Parameters:

Name Type Description Default
path1 ndarray

Path 1, shape (N, D)

required
path2 ndarray

Path 2, shape (M, D)

required

Returns:

Name Type Description
dtw float

accumulated dynamic time warping distance

ndtw float

normalized DTW in [0, 1], higher means more similar

Reference

[1] General Evaluation for Instruction Conditioned Navigation using Dynamic Time Warping

Source code in src\python_motion_planning\common\visualizer\base_visualizer.py
Python
def calc_dtw_ndtw(self, path1: np.ndarray, path2: np.ndarray) -> Tuple[float, float]:
    """
    Compute the Dynamic Time Warping (DTW) and normalized DTW (nDTW)
    between two N-dimensional paths.

    Args:
        path1 (np.ndarray): Path 1, shape (N, D)
        path2 (np.ndarray): Path 2, shape (M, D)

    Returns:
        dtw: accumulated dynamic time warping distance
        ndtw: normalized DTW in [0, 1], higher means more similar

    Reference:
        [1] General Evaluation for Instruction Conditioned Navigation using Dynamic Time Warping
    """
    # Input validation
    if path1.ndim != 2 or path2.ndim != 2:
        raise ValueError("Both paths must be 2D arrays with shape (T, D).")
    if path1.shape[1] != path2.shape[1]:
        raise ValueError("Paths must have the same dimensionality.")

    N, M = len(path1), len(path2)

    # Initialize DTW cost matrix
    dtw_matrix = np.full((N + 1, M + 1), np.inf)
    dtw_matrix[0, 0] = 0.0

    # Fill the DTW matrix
    for i in range(1, N + 1):
        for j in range(1, M + 1):
            # Euclidean distance between points
            cost = np.linalg.norm(path1[i - 1] - path2[j - 1])
            # Accumulate cost with the best previous step
            dtw_matrix[i, j] = cost + min(
                dtw_matrix[i - 1, j],      # insertion
                dtw_matrix[i, j - 1],      # deletion
                dtw_matrix[i - 1, j - 1]   # match
            )

    # Final DTW distance
    dtw = dtw_matrix[N, M]

    # Normalized DTW: exponential decay with respect to path length
    max_len = max(N, M)
    ndtw = np.exp(-dtw / (max_len + 1e-8))  # nDTW ∈ (0, 1]

    return float(dtw), float(ndtw)

close() abstractmethod

Close plot.

Source code in src\python_motion_planning\common\visualizer\base_visualizer.py
Python
@abstractmethod
def close(self):
    """
    Close plot.
    """
    pass

get_traj_info(rid, ref_path, goal_pose, goal_dist_tol, goal_orient_tol)

Get trajectory information.

Parameters:

Name Type Description Default
rid int

Robot ID.

required
ref_path List[Tuple[float, ...]]

Reference pose path (world frame).

required
goal_pose ndarray

Goal pose.

required
goal_dist_tol float

Distance tolerance for goal.

required
goal_orient_tol float

Orientation tolerance for goal.

required
Source code in src\python_motion_planning\common\visualizer\base_visualizer.py
Python
def get_traj_info(self, 
        rid: int, 
        ref_path: List[Tuple[float, ...]], 
        goal_pose: np.ndarray, 
        goal_dist_tol: float, 
        goal_orient_tol: float
    ) -> Dict[str, Any]:
    """
    Get trajectory information.

    Args:
        rid: Robot ID.
        ref_path: Reference pose path (world frame).
        goal_pose: Goal pose.
        goal_dist_tol: Distance tolerance for goal.
        goal_orient_tol: Orientation tolerance for goal.
    """
    traj = self.trajs[rid]

    info = {
        "traj_length": 0.0,
        "navigation_error": None,
        "DTW": None,
        "nDTW": None,
        "success": False,
        "dist_success": False, 
        "oracle_success": False,
        "oracle_dist_success": False,
        "success_time": None,
        "dist_success_time": None,
        "oracle_success_time": None,
        "oracle_dist_success_time": None,
    }

    goal_pos = goal_pose[:self.dim]
    goal_orient = goal_pose[self.dim:]

    for i in range(len(traj["poses"])):
        pose = traj["poses"][i]
        time = traj["time"][i]

        pos = pose[:self.dim]
        orient = pose[self.dim:]

        if i > 0:
            info["traj_length"] += np.linalg.norm(pos - traj["poses"][i-1][:self.dim])

        if np.linalg.norm(pos - goal_pos) <= goal_dist_tol:
            if not info["oracle_dist_success"]:
                info["oracle_dist_success"] = True
                info["oracle_dist_success_time"] = time

            if not info["dist_success"]:
                info["dist_success"] = True
                info["dist_success_time"] = time

            if np.abs(Geometry.regularize_orient(orient - goal_orient)) <= goal_orient_tol:
                if not info["oracle_success"]:
                    info["oracle_success"] = True
                    info["oracle_success_time"] = time  

                if not info["success"]:
                    info["success"] = True
                    info["success_time"] = time

            else:
                info["success"] = False
                info["success_time"] = None

        else:
            info["success"] = False
            info["success_time"] = None
            info["dist_success"] = False
            info["dist_success_time"] = None

    info["navigation_error"] = float(np.linalg.norm(traj["poses"][-1][:self.dim] - goal_pos))
    info["traj_length"] = float(info["traj_length"])
    info["DTW"], info["nDTW"] = self.calc_dtw_ndtw(np.array(traj["poses"])[:, :self.dim], np.array(ref_path)[:, :self.dim])
    return info

show() abstractmethod

Show plot.

Source code in src\python_motion_planning\common\visualizer\base_visualizer.py
Python
@abstractmethod
def show(self):
    """
    Show plot.
    """
    pass