RRT* with differential-drive dynamics implementation in Python

I spent time investigating and researching the differences between RRT (Rapidly Exploring Random Tree), an algorithm I learned during an undergraduate robotics course which I also ended up as a Grader for, and a more improved solution intended to find better solutions, RRT* (Rapidly Exploring Random Tree Star). I vaguely knew that RRT* tended to result in better solutions, and that it was a more commmon path planning algorithm in robotics, but I did not know the details.

In my research, I found that the two algorithms are very similar, with two key differences. The first of these is the largest, in that after sampling a new point and steering that point to the closest point, RRT* then rewires the tree by checking the local nodes, to see if any existing paths can be optimized by traversing through the new node. After a large number of samples, this allows the tree to tend towards the shortest straight line path from the start to any node. Second, typically in RRT once you find a path to the goal, you stop sampling as it is unlikely your path will get much better. In RRT*, due to the rewiring, after finding a path to the goal, you continue sampling as with more samples, the closer your path gets to the optimal path to the goal.

Lastly, although theoretically this is the best straight line path, due to the shape and location of obstacles it is possible for this path to perform poorly with robots, as the path may not be smooth or phyiscally possible due to the vehicle dynamics. This is why I also implemented RRT* with vehicle dynamics. Typically this is known as kinodynamic planning, which simply means we are involving the dynamics of the vehicle in the path planning as well. The main difference in this from typical RRT* is that the algorithm maintains a pose with each node as well, and to steer points, it randomly samples a number of forward and angular velocities within the bounds of the robot, and simulates that movement, and then chooses the commands which are closest to the sampled point.

Below, I show two jupyter notebook files where I first implemented RR*T, and then in the second I implement RRT* with differential drive robot dynamics.

%matplotlib widget
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

Below is the Node class, which is a simple data structure to represent each node in the RRT* tree, with the following member variables:

  • position, x,y position pair
  • parent, Node object represnting the parent Node in the RRT* tree
  • cost, Distance along the tree from the start Node to this Node
class Node:
    def __init__(self, position, parent=None):
        self.position = position
        self.parent = parent
        self.cost = 0.0

Below is the RRTStar class, which implements the RRT* algorithm. Within this class, there are various member variables and functions defined. This algorithm randomly samples points within a map, creating a tree for path planning in the map. If nodes are further away than a maximum distance, they will be moved closer to the node directly along a line connecting the sampled node and the nearest node. If this path is interrupted by an object, the node will not be added. The specific improvement of RRT* is that the parent of the new node is determined by checking the cost of each nearby node and choosing the parent which will result in the minimum cost. After this, it checks if any of the nearby nodes will benefit from having the new node as it’s parent node. Additionally, RRT* will continue sampling and updating the tree after a solution is found, converging towards an optimal path, unlike RRT which stops once a path is found.

class RRTStar:
    def __init__(self, start, goal, world_size, obstacles=[], step_size=0.5, max_iter=5000):
        self.start = Node(start)
        self.goal = Node(goal)
        self.world_size = world_size
        self.step_size = step_size
        self.max_iter = max_iter
        self.obstacles = obstacles
        self.node_list = [self.start]
        self.reached_goal = False

    def generate_random_node(self):
        # generate a random node within the map
        return Node(np.random.uniform(0, self.world_size, size=2))
    
    def steer(self, from_node, to_node):
        # calculate angle between from_node and to_node
        theta = np.arctan2(to_node.position[1] - from_node.position[1],
                           to_node.position[0] - from_node.position[0])
        
        # clip distance to step_size
        if np.linalg.norm(to_node.position - from_node.position) < self.step_size:
            new_position = to_node.position
        else:
            new_position = from_node.position + self.step_size * np.array([np.cos(theta), np.sin(theta)])

        # create new node and update its cost and parent
        new_node = Node(new_position, from_node)
        new_node.cost = from_node.cost + np.linalg.norm(new_position - from_node.position)
        new_node.parent = from_node
        return new_node
    
    def nearest_node(self, node):
        # find the nearest node in the map to a given node
        dlist = [np.linalg.norm(n.position - node.position) for n in self.node_list]
        min_index = dlist.index(min(dlist))
        return self.node_list[min_index]
    
    def find_nearby_nodes(self, new_node, radius):
        # find nearby nodes in the map to new_node within a specified radius
        nearby_nodes = []
        for node in self.node_list:
            if np.linalg.norm(node.position - new_node.position) <= radius:
                nearby_nodes.append(node)
        return nearby_nodes
    
    def choose_parent(self, new_node, nearby_nodes):
        # choose the best parent for new_node from nearby_nodes based on cost
        if not nearby_nodes:
            return new_node
        
        # check collision before considering this node as a parent
        costs = []
        valid_nodes = []
        for node in nearby_nodes:
            if not self.check_collision(node, new_node):
                cost = node.cost + np.linalg.norm(node.position - new_node.position)
                costs.append(cost)
                valid_nodes.append(node)
        
        if not valid_nodes:
            return new_node

        # select parent with minimum cost
        min_cost_index = costs.index(min(costs))
        best_parent = valid_nodes[min_cost_index]
        new_node.parent = best_parent
        new_node.cost = costs[min_cost_index]
        return new_node
    
    def rewire(self, new_node, nearby_nodes):

        # check all nearby_nodes to see if we can decrease their cost by rewiring through new_node
        for node in nearby_nodes:
            cost = new_node.cost + np.linalg.norm(node.position - new_node.position)
            if (cost < node.cost) and (self.check_collision(new_node, node) == False):
                node.parent = new_node
                node.cost = cost

                # update costs of all descendants
                self.update_descendants_cost(node)
    
    def update_descendants_cost(self, node):
        # update costs of all descendants of the given node
        for n in self.node_list:
            if n.parent == node:
                n.cost = node.cost + np.linalg.norm(n.position - node.position)
                self.update_descendants_cost(n)

    def check_collision(self, node1, node2):
        # check collision between line segment of node1 to node2, and each obstacle
        p1 = np.asarray(node1.position, dtype=float)
        p2 = np.asarray(node2.position, dtype=float)
        v = p2 - p1
        seg_len2 = np.dot(v, v)
        eps = 1e-8

        for center, radius in self.obstacles:
            c = np.asarray(center, dtype=float)
            if seg_len2 <= eps:
                if np.linalg.norm(p1 - c) <= radius:
                    return True
                continue

            t = np.dot(c - p1, v) / seg_len2
            t_clamped = np.clip(t, 0.0, 1.0)
            closest = p1 + t_clamped * v
            if np.linalg.norm(closest - c) <= radius:
                return True

        return False

Below is an animation function, which executes the RRT* algorithm, and plots the tree, obstacles, start, and goal within matplotlib.

def animate(i, rrt_star, ax, save_counter):
    # perform multiple iterations per animation frame for faster and smoother growth
    iters_per_frame = 100
    for _ in range(iters_per_frame):
        if len(rrt_star.node_list) >= rrt_star.max_iter:
            break
        rand_node = rrt_star.generate_random_node()
        nearest_node = rrt_star.nearest_node(rand_node)
        new_node = rrt_star.steer(nearest_node, rand_node)
        
        if rrt_star.check_collision(nearest_node, new_node):
            continue 
        nearby_nodes = rrt_star.find_nearby_nodes(new_node, radius=2.0)
        new_node = rrt_star.choose_parent(new_node, nearby_nodes)
        rrt_star.rewire(new_node, nearby_nodes)
        rrt_star.node_list.append(new_node)

        # check if the goal is reached
        if np.linalg.norm(new_node.position - rrt_star.goal.position) < rrt_star.step_size:
            rrt_star.reached_goal = True
            rrt_star.goal.parent = new_node
            rrt_star.goal.cost = new_node.cost + np.linalg.norm(new_node.position - rrt_star.goal.position)
            rrt_star.node_list.append(rrt_star.goal)

    # reset plot
    ax.clear()
    ax.set_xlim(0, rrt_star.world_size)
    ax.set_ylim(0, rrt_star.world_size)
    ax.set_title(f"RRT* Growth - Nodes: {len(rrt_star.node_list)}")

    # draw obstacles
    for center, radius in rrt_star.obstacles:
        circle = plt.Circle(center, radius, color='gray', alpha=0.5)
        ax.add_patch(circle)

    # draw edges
    for node in rrt_star.node_list:
        if node.parent is not None:
            x = [node.position[0], node.parent.position[0]]
            y = [node.position[1], node.parent.position[1]]
            ax.plot(x, y, color='blue', linewidth=0.8)

    # draw nodes
    pts = np.array([n.position for n in rrt_star.node_list])
    if pts.size:
        ax.scatter(pts[:,0], pts[:,1], c='black', s=5)

    # draw start and goal
    ax.scatter(rrt_star.start.position[0], rrt_star.start.position[1], c='green', s=50, label='Start')
    ax.scatter(rrt_star.goal.position[0], rrt_star.goal.position[1], c='red', s=50, label='Goal')

    # if reached goal, draw path
    if rrt_star.reached_goal:
        path = []
        node = rrt_star.goal
        while node is not None:
            path.append(node.position)
            node = node.parent
        path = np.array(path[::-1])
        ax.plot(path[:,0], path[:,1], color='magenta', linewidth=2, label='Path')
        ax.legend()
    
    plt.savefig('rrt_star_progress' + str(i) + '.png')

Below is the initialization of the RRT* class, matplotlib plot, and the function call to begin and animate the RRT* algorithm.

start = np.array([5,5])
goal = np.array([17,17])

obstacles = [
    (np.array([10, 10]), 2.0),
    (np.array([6, 14]), 1.5),
    (np.array([14, 6]), 1.5),
    (np.array([12, 16]), 2.0),
    (np.array([16, 12]), 2.0),
    (np.array([8, 8]), 1.5)
]
world_size = 20

rrt_star = RRTStar(start, goal, world_size, obstacles=obstacles, step_size=0.5, max_iter=5000)

fig, ax = plt.subplots(figsize=(6,6))
ax.set_xlim(0, rrt_star.world_size)
ax.set_ylim(0, rrt_star.world_size)
ax.set_aspect('equal')

save_counter = 0
animated_rrt = animation.FuncAnimation(fig, animate, fargs=(rrt_star, ax, save_counter), frames=10000, interval=30)
plt.show()

This implementation of the RRT* algorithm, with the dynamics of a differential-drive robot is a continuation of the RRT* implementation I have made. This allows for dynamically possible motions for the robot, as opposed to straight line connections between nodes.

%matplotlib widget
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

Below is the Node class from my previous implementation of RRT*. This was adjusted to store the pose (x, y, theta) of the node, instead of simply the position (x,y). Additionally, the trajectory from the parent to this node is stored.

class Node:
    def __init__(self, pose, parent=None):
        self.pose = pose
        self.parent = parent
        self.cost = 0.0
        self.trajectory = []

Below is the RRTStarDynamics class. This class contains many similar functions as the previous RRTStar class I implemented. For this specific implementation, main difference is the changes to the steer() function, the rewire() function, the choose_parent() function, and the check_collision() function. The simulate_trajectory() function now utilizes the differential-drive robot dynamics to simulate a discretized trajectory.

The steer() function now samples num_samples trajectories with simulate_trajectory(). This is done with a random control input v, w based upon the robot’s control limits. The trajectory that ends closest to the sampled node is chosen as the location of the new node, alongside the sampled trajectory.
The simulate_trajectory() function creates a trajectory with a given control input.
The rewire() function now simulates num_samples trajectories for each nearby node, and if a trajectory ends within a threshold of the new node, if the cost is lower that node will be chosen as the new parent.
The choose_parent() function now simulates a trajectory similar to steer() to determine a new nodes parent.
The check_collision() function now checks if each pose in the trajectory is within an object, instead of just the mid-point of the trajectory.

class RRTStarDynamics:
    def __init__(self, start, goal, world_size, obstacles=[], threshold=0.05, step_size=0.5, max_iter=5000):
        self.start = Node(start)
        self.goal = Node(goal)
        self.world_size = world_size
        self.obstacles = obstacles
        self.threshold = threshold
        self.step_size = step_size
        self.max_iter = max_iter
        self.node_list = [self.start]
        self.reached_goal = False

    def generate_random_node(self):
        # generate a random node within the map with random orientation
        position = np.random.uniform(0, self.world_size, size=2)
        theta = np.random.uniform(-np.pi, np.pi)
        return Node(np.array([position[0], position[1], theta]))

    def steer(self, from_node, to_node, num_samples=10, delta_t=0.1, max_time=1.0, v_range=np.array([0.0, 1.0]), w_range=np.array([-np.pi/4, np.pi/4])):
        # calculate the best trajectory from from_node to to_node using sampled control inputs
        best_trajectory = None
        min_cost = float('inf')

        for _ in range(num_samples):
            # sample control inputs
            v = np.random.uniform(v_range[0], v_range[1])
            w = np.random.uniform(w_range[0], w_range[1])

            # simulate the motion for current control inputs
            trajectory = self.simulate_trajectory(from_node, v, w, delta_t, max_time)

            # if trajectory is not valid, skip this sample
            if not trajectory or len(trajectory) == 0:
                continue

            # compute the cost of this trajectory
            dist = np.linalg.norm(trajectory[-1][:2] - to_node.pose[:2])

            # check if this is the best trajectory, if so save it
            if dist < min_cost:
                min_cost = dist
                best_trajectory = trajectory

        return best_trajectory

    def simulate_trajectory(self, from_node, v, w, delta_t, max_time):
        # simulate the trajectory of the robot given the control inputs, using the differential drive model
        trajectory = []
        current_pose = from_node.pose

        # for each timestep, update the pose based on the control inputs and vehicle dynamics
        for _ in np.arange(0, max_time, delta_t):
            x = current_pose[0]
            y = current_pose[1]
            theta = current_pose[2]

            x += v * np.cos(theta) * delta_t
            y += v * np.sin(theta) * delta_t
            theta += w * delta_t

            if(x < 0 or x > self.world_size or y < 0 or y > self.world_size):
                break  # stop if trajectory goes out of bounds

            current_pose = np.array([x, y, theta])
            trajectory.append(current_pose)

        return trajectory

    def nearest_node(self, node):
        # find the nearest node in the map to a given node
        dlist = [np.linalg.norm(n.pose[:2] - node.pose[:2]) for n in self.node_list]
        min_index = np.argmin(dlist)
        return self.node_list[min_index]
    
    def find_nearby_nodes(self, new_node, radius):
        # find nearby nodes in the map to new_node within a specified radius
        nearby_nodes = []
        for node in self.node_list:
            dist = np.linalg.norm(node.pose[:2] - new_node.pose[:2])
            if dist <= radius:
                nearby_nodes.append(node)
        return nearby_nodes
    
    def choose_parent(self, new_node, nearby_nodes):
        # choose the best parent for new_node from nearby_nodes based on cost
        if not nearby_nodes:
            return new_node
        
        # check collision before considering this node as a parent
        min_cost = float('inf')
        best_parent = None
        best_traj = None
        
        # for each nearby node, simulate a trajectory to new_node and compute the cost
        for node in nearby_nodes:
            traj = self.steer(node, new_node)
            if traj and len(traj) > 0 and not self.check_collision(traj):

                endpoint = traj[-1][:2]
                target = new_node.pose[:2]

                # ensure the trajectory ends within self.threshold of new_node
                if np.linalg.norm(endpoint - target) > self.threshold:
                    continue

                # compute the cost of this trajectory, and if it's the best, save it
                cost = node.cost + self.trajectory_cost(traj)
                if cost < min_cost:
                    min_cost = cost
                    best_parent = node
                    best_traj = traj

        # update the nodes parent if a better parent was found
        if best_parent is not None:
            new_node.parent = best_parent
            new_node.cost = min_cost
            new_node.trajectory = best_traj

        return

    def rewire(self, new_node, nearby_nodes):

        # check all nearby_nodes to see if we can decrease their cost by rewiring through new_node
        for nearby_node in nearby_nodes:

            # skip if nearby_node is the parent of new_node or is the start node
            if nearby_node == new_node.parent or nearby_node == self.start:
                continue
            
            # steer from new_node to nearby_node, check if new_node is a better parent for any nearby node
            traj = self.steer(new_node, nearby_node)
            if traj and len(traj) > 0 and not self.check_collision(traj):

                # ensure that trajectory ends within self.threshold of nearby_node
                endpoint = traj[-1][:2]
                target = nearby_node.pose[:2]
                if np.linalg.norm(endpoint - target) > self.threshold:
                    continue
                
                # compute the cost of this trajectory, and if it's better, rewire nearby_node
                cost = nearby_node.cost + self.trajectory_cost(traj)
                if cost < nearby_node.cost:
                    nearby_node.parent = new_node
                    nearby_node.cost = cost
                    nearby_node.trajectory = traj
                    self.update_descendants_cost(nearby_node)

        return
    
    def update_descendants_cost(self, curr_node):
        # update costs of all descendants of the given node
        for node in self.node_list:
            if node.parent == curr_node:
                
                traj = node.trajectory
                if traj and len(traj) > 0:
                    node.cost = curr_node.cost + self.trajectory_cost(traj)
                    self.update_descendants_cost(node)

        return

    def trajectory_cost(self, trajectory):
        # compute the cost of a trajectory by summing the Euclidean distances between consecutive poses
        cost = 0.0
        for i in np.arange(1, len(trajectory)):
            cost += np.linalg.norm(trajectory[i][:2] - trajectory[i-1][:2])
        return cost
    
    def check_collision(self, trajectory):
        # check if any part of the trajectory collides with any obstacles
        for obs in self.obstacles:
            for pose in trajectory:
                if np.linalg.norm(pose[:2] - obs[0][:2]) <= obs[1]:
                    return True
        return False

    def get_path(self):
        # retrieve the path from start to goal by backtracking from goal to start
        if not self.reached_goal:
            return None
        path = []
        node = self.goal
        while node is not None:
            if node.trajectory and len(node.trajectory) > 0:
                path.extend(reversed(node.trajectory))
            else:
                path.append(node.pose)
            node = node.parent
        
        path.reverse()
        return path

The animate() function performs the same task as in the default RRT* implementation, calling each function to implement the RRT* with robot dynamics, as well as plotting the growth of the algorithm.

def animate(i, rrt_star, ax, save_counter):
    # perform multiple iterations per animation frame for faster and smoother growth
    iters_per_frame = 10

    for _ in range(iters_per_frame):
        # check if max iterations reached
        if len(rrt_star.node_list) >= rrt_star.max_iter:
            break

        # biased sampling towards goal
        if(np.random.random() < 0.05):
            rand_node = Node(rrt_star.goal.pose.copy())
        else:
            rand_node = rrt_star.generate_random_node()

        # find nearest node and steer towards random node
        nearest_node = rrt_star.nearest_node(rand_node)
        new_trajectory = rrt_star.steer(nearest_node, rand_node)

        # if trajectory is invalid or collides, skip
        if not new_trajectory or len(new_trajectory) == 0 or rrt_star.check_collision(new_trajectory):
            continue

        # create a new node at the end of the steered trajectory
        new_pose = new_trajectory[-1]
        new_node = Node(new_pose)
        new_node.trajectory = new_trajectory
        new_node.parent = nearest_node
        new_node.cost = nearest_node.cost + rrt_star.trajectory_cost(new_trajectory)

        # choose the parent for the new node and rewire the tree
        nearby_nodes = rrt_star.find_nearby_nodes(new_node, radius=2.0)
        rrt_star.choose_parent(new_node, nearby_nodes)
        rrt_star.node_list.append(new_node)
        rrt_star.rewire(new_node, nearby_nodes)

        # check if the goal is reached within step_size
        if np.linalg.norm(new_node.pose[:2] - rrt_star.goal.pose[:2]) < rrt_star.step_size:
            new_cost = new_node.cost + np.linalg.norm(new_node.pose[:2] - rrt_star.goal.pose[:2])
            if not rrt_star.reached_goal or new_cost < rrt_star.goal.cost:
                rrt_star.reached_goal = True
                rrt_star.goal.parent = new_node
                rrt_star.goal.cost = new_cost

    # reset plot
    ax.clear()
    ax.set_xlim(0, rrt_star.world_size)
    ax.set_ylim(0, rrt_star.world_size)
    ax.set_title(f"RRT* With Dynamics")

    # draw obstacles
    for obs in rrt_star.obstacles:
        center, radius = obs[0], obs[1]
        circle = plt.Circle(center, radius, color='gray', alpha=0.5)
        ax.add_patch(circle)

    # draw trajectories between nodes
    for node in rrt_star.node_list:

        if node.parent and len(node.trajectory) > 0:
            traj = np.array(node.trajectory)
            ax.plot(traj[:, 0], traj[:, 1], color='blue', linewidth=0.8, alpha=0.7)

    # draw nodes
    pts = np.array([n.pose[:2] for n in rrt_star.node_list])
    if pts.size:
        ax.scatter(pts[:,0], pts[:,1], c='black', s=5, zorder=1)
    
    # draw start and goal
    ax.scatter(rrt_star.start.pose[0], rrt_star.start.pose[1], c='green', s=100, label='Start', zorder=2)
    ax.scatter(rrt_star.goal.pose[0], rrt_star.goal.pose[1], c='red', s=100, label='Goal', zorder=2)

    # if reached goal, draw path from start to goal
    if rrt_star.reached_goal:
        path = rrt_star.get_path()
        if path is not None and len(path) > 0:
            path = np.array(path)
            ax.plot(path[:,0], path[:,1], color='magenta', linewidth=2, label='Path')

    ax.legend(loc='lower right')
# define start and goal poses [x, y, theta]
start = np.array([2.0, 2.0, 0.0])
goal = np.array([18.0, 18.0, 0.0])

# define obstacles [[x, y], radius]
obstacles = [
    (np.array([10, 10]), 2.0),
    (np.array([6, 14]), 1.5),
    (np.array([14, 6]), 1.5),
    (np.array([12, 16]), 2.0),
    (np.array([16, 12]), 2.0),
    (np.array([8, 8]), 1.5)
]

# Create and run RRT*
rrt_star = RRTStarDynamics(start, goal, world_size=20, 
                                obstacles=obstacles, 
                                step_size=0.5, 
                                max_iter=5000)

# Animate the planning process
fig, ax = plt.subplots(figsize=(6,6))
ax.set_xlim(0, rrt_star.world_size)
ax.set_ylim(0, rrt_star.world_size)
ax.set_aspect('equal')

animated_rrt = animation.FuncAnimation(fig, animate, fargs=(rrt_star, ax, 0), frames=250, interval=60)
# animated_rrt.save('rrt_star_dynamics_website.gif', writer='pillow', fps=30) # Use 'pillow' writer for GIFs

plt.show()