DWA¶
src.python_motion_planning.controller.path_tracker.dwa.DWA
¶
Bases: PathTracker
Dynamic Window Approach (DWA) path-tracking controller.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args
|
see the parent class. |
()
|
|
robot_model
|
BaseRobot
|
robot model for kinematic simulation |
required |
obstacle_grid
|
Grid
|
occupancy grid map for collision checking |
None
|
vel_reso
|
float
|
resolution of velocity sampling |
array([0.2, 0.2, deg2rad(15)])
|
predict_time
|
float
|
forward simulation time horizon |
None
|
heading_weight
|
float
|
weight for heading term |
0.5
|
velocity_weight
|
float
|
weight for velocity term |
0.2
|
clearance_weight
|
float
|
weight for obstacle clearance term |
0.3
|
**kwargs
|
see the parent class. |
{}
|
Source code in src\python_motion_planning\controller\path_tracker\dwa.py
Python
class DWA(PathTracker):
"""
Dynamic Window Approach (DWA) path-tracking controller.
Args:
*args: see the parent class.
robot_model: robot model for kinematic simulation
obstacle_grid: occupancy grid map for collision checking
vel_reso: resolution of velocity sampling
predict_time: forward simulation time horizon
heading_weight: weight for heading term
velocity_weight: weight for velocity term
clearance_weight: weight for obstacle clearance term
**kwargs: see the parent class.
"""
def __init__(self,
*args,
robot_model: BaseRobot,
obstacle_grid: Grid = None,
vel_reso: float = np.array([0.2, 0.2, np.deg2rad(15)]),
predict_time: float = None,
heading_weight: float = 0.5,
velocity_weight: float = 0.2,
clearance_weight: float = 0.3,
**kwargs):
super().__init__(*args, **kwargs)
if robot_model.dim != self.dim:
raise ValueError("Dimension of robot model and controller must be the same")
self.robot_model = robot_model
if obstacle_grid.dim != self.dim:
raise ValueError("Dimension of obstacle grid and controller must be the same")
self.obstacle_grid = obstacle_grid
self.vel_reso = vel_reso
self.predict_time = predict_time if predict_time is not None else self.lookahead_distance / self.max_lin_speed
self.heading_weight = heading_weight
self.velocity_weight = velocity_weight
self.clearance_weight = clearance_weight
def get_action(self, obs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Get action from observation using Dynamic Window Approach.
Args:
obs: observation ([pos, orient, lin_vel, ang_vel])
Returns:
action: action in robot frame ([lin_acc, ang_acc])
target_pose: selected local goal ([pos, orient])
"""
if self.goal is None:
return np.zeros(self.action_space.shape), self.goal
pose, vel, pos, orient, lin_vel, ang_vel = self.get_pose_velocity(obs) # world frame
# Find the lookahead pose
target_pose = self._get_lookahead_pose(pos)
# search best control within dynamic window
best_vel, best_traj = self._evaluate_trajectories(pose, vel, target_pose)
# compute action (acceleration) to reach best velocity
desired_vel = best_vel # robot frame
desired_vel = self._stop_if_reached(desired_vel, pose) # robot frame
robot_vel = FrameTransformer.vel_world_to_robot(self.dim, vel, orient)
action = self._get_desired_action(desired_vel, robot_vel, orient) # robot frame
self.pred_traj = best_traj # for visualization
return action, target_pose
def _evaluate_trajectories(self, pose: np.ndarray, vel: np.ndarray, target_pose: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Evaluate trajectories in dynamic window and choose the best.
Args:
pose: current pose ([x, y, theta]) world frame
vel: current velocity ([vx, vy, omega]) world frame
target_pose: target pose ([x, y, theta]) world frame
Returns:
best_vel: best velocity command [vx, vy, omega] robot frame
best_traj: best simulated trajectory
"""
# v_min, v_max, w_min, w_max = dw
best_score = -float("inf")
best_vel = np.zeros_like(vel) # should be in world frame
best_scores = (0.0,0.0,0.0)
best_traj = []
orient = pose[self.dim:]
vel_robot = FrameTransformer.vel_world_to_robot(self.dim, vel, orient) # robot frame
vel_points = []
for d in range(self.action_space.shape[0]):
low = vel_robot[d] + self.action_space.low[d] * self.dt
high = vel_robot[d] + self.action_space.high[d] * self.dt
reso = self.vel_reso[d]
sample_points = np.arange(low, high, reso)
# make sure the high boundary velocity is included
sample_points = np.append(sample_points, [high])
if low < 0 and high > 0:
sample_points = np.append(sample_points, [0.0])
sample_points = np.unique(np.round(sample_points / self.eps) * self.eps) # unique with numerical precision
vel_points.append(sample_points)
vel_grid = np.meshgrid(*vel_points)
sampled_vels = np.stack(vel_grid, axis=-1).reshape(-1, vel_robot.shape[0]) # robot frame
for sampled_vel in sampled_vels:
# exclude velocities that exceed max speed limitation
sampled_lin_speed = np.linalg.norm(sampled_vel[:self.dim])
if sampled_lin_speed > self.max_lin_speed:
continue
sampled_ang_speed = np.linalg.norm(sampled_vel[self.dim:])
if sampled_ang_speed > self.max_ang_speed:
continue
vel_world = FrameTransformer.vel_robot_to_world(self.dim, sampled_vel, orient) # world frame
traj = self._forward_sim(pose, vel_world)
# evaluate cost terms
heading = self._heading_score(traj, target_pose)
velocity = self._velocity_score(sampled_lin_speed)
clearance = self._clearance_score(traj)
score = (self.heading_weight * heading +
self.velocity_weight * velocity +
self.clearance_weight * clearance)
if score > best_score:
best_score = score
best_scores = (self.heading_weight * heading, self.velocity_weight * velocity, self.clearance_weight * clearance)
best_vel = sampled_vel
best_traj = traj
return best_vel, np.array(best_traj)
def _forward_sim(self, pose: np.ndarray, vel: np.ndarray) -> List[np.ndarray]:
"""
Forward simulate trajectory using robot kinematic model.
Args:
pose: pose of robot (world frame)
vel: velocity of robot (world frame)
Returns:
traj: simulated trajectory (list of poses)
"""
traj = [pose.copy()]
time = 0.0
while time <= self.predict_time:
pose, vel, _ = self.robot_model.kinematic_model(pose, vel, np.zeros_like(vel), np.zeros_like(vel), self.dt)
traj.append(pose)
time += self.dt
return traj
def _heading_score(self, traj: List[np.ndarray], target_pose: np.ndarray) -> float:
"""
Compute heading cost (distance to goal).
Args:
traj: Trajectory.
target_pose: Target pose.
Returns:
float: Heading score.
"""
last_pose = traj[-1]
dist = np.linalg.norm(last_pose[:self.dim] - target_pose[:self.dim])
orient = np.linalg.norm(Geometry.regularize_orient(last_pose[self.dim:] - target_pose[self.dim:]))
normalized_dist = dist / self.goal_dist_tol
normalized_orient = orient / self.goal_orient_tol
dist_score = 0.5 * np.clip(1.0 / normalized_dist, 0.0, 1.0) # normalized_dist > 1.0
dist_score += 0.5 * np.clip(1.0 - normalized_dist, 0.0, 1.0) # 0.0 <= normalized_dist <= 1.0
orient_score = 0.5 * np.clip(1.0 / normalized_orient, 0.0, 1.0) # normalized_orient > 1.0
orient_score += 0.5 * np.clip(1.0 - normalized_orient, 0.0, 1.0) # 0.0 <= normalized_orient <= 1.0
total_score = self.k_theta * dist_score + (1 - self.k_theta) * orient_score
return total_score
def _velocity_score(self, lin_speed: np.ndarray) -> float:
"""
Compute the velocity score.
Args:
lin_speed (np.ndarray): The linear speed (robot frame)
Returns:
float: The velocity score
"""
return lin_speed / self.max_lin_speed
def _clearance_score(self, traj: List[np.ndarray]) -> float:
"""
Compute clearance score (min distance to obstacles).
If no grid map is provided, return 1.0.
Args:
traj: The trajectory
Returns:
float: The clearance score
"""
if self.obstacle_grid is None:
return 1.0
radius = self.robot_model.radius
min_dist = float("inf")
for p in traj:
grid_pt = self.obstacle_grid.world_to_map(tuple(p[:2]))
if not self.obstacle_grid.within_bounds(grid_pt) or self.obstacle_grid.type_map[grid_pt] == TYPES.OBSTACLE:
return -float("inf") # collision
# update min distance (Euclidean to occupied cells could be added here)
dist = self.obstacle_grid.esdf[grid_pt] * self.obstacle_grid.resolution # using ESDF to compute distance to nearest obstacle
min_dist = min(min_dist, dist)
normalized_min_dist = min_dist / self.robot_model.radius
if normalized_min_dist < self.eps:
return -float("inf")
return np.clip(1.0 - 1.0 / normalized_min_dist, 0.0, 1.0) # normalized
get_action(obs)
¶
Get action from observation using Dynamic Window Approach.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obs
|
ndarray
|
observation ([pos, orient, lin_vel, ang_vel]) |
required |
Returns:
Name | Type | Description |
---|---|---|
action |
ndarray
|
action in robot frame ([lin_acc, ang_acc]) |
target_pose |
ndarray
|
selected local goal ([pos, orient]) |
Source code in src\python_motion_planning\controller\path_tracker\dwa.py
Python
def get_action(self, obs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Get action from observation using Dynamic Window Approach.
Args:
obs: observation ([pos, orient, lin_vel, ang_vel])
Returns:
action: action in robot frame ([lin_acc, ang_acc])
target_pose: selected local goal ([pos, orient])
"""
if self.goal is None:
return np.zeros(self.action_space.shape), self.goal
pose, vel, pos, orient, lin_vel, ang_vel = self.get_pose_velocity(obs) # world frame
# Find the lookahead pose
target_pose = self._get_lookahead_pose(pos)
# search best control within dynamic window
best_vel, best_traj = self._evaluate_trajectories(pose, vel, target_pose)
# compute action (acceleration) to reach best velocity
desired_vel = best_vel # robot frame
desired_vel = self._stop_if_reached(desired_vel, pose) # robot frame
robot_vel = FrameTransformer.vel_world_to_robot(self.dim, vel, orient)
action = self._get_desired_action(desired_vel, robot_vel, orient) # robot frame
self.pred_traj = best_traj # for visualization
return action, target_pose