Task Composer¶
The Task Composer orchestrates complex planning tasks by chaining planners together.
Overview¶
graph TD
A[Input Program] --> B[MinLengthTask]
B --> C{Freespace?}
C -->|Yes| D[OMPLTask]
C -->|No| E[DescartesTask]
D --> F[TrajOptTask]
E --> F
F --> G[TimeParameterizationTask]
G --> H[ContactCheckTask]
H --> I[Output Trajectory]
Basic Usage¶
from tesseract_robotics.planning import Robot, Composer
import numpy as np
robot = Robot.from_tesseract_support("abb_irb2400")
composer = Composer(robot)
# Define waypoints
start = np.zeros(6)
via = np.array([0.5, 0, 0, 0, 0, 0])
goal = np.array([0.5, -0.5, 0.5, 0, 0.5, 0])
# Add segments
composer.add_freespace(goal_joints=via)
composer.add_freespace(goal_joints=goal)
composer.add_freespace(goal_joints=start) # Return home
# Plan all segments
result = composer.plan()
if result.success:
trajectories = result.get_trajectories()
print(f"Planned {len(trajectories)} segments")
from tesseract_robotics.tesseract_task_composer import (
TaskComposerPluginFactory,
TaskComposerDataStorage,
TaskComposerContext,
TaskComposerFuture
)
# Load plugin factory
factory = TaskComposerPluginFactory()
# Create executor
executor = factory.createTaskComposerExecutor("TaskflowExecutor")
# Create pipeline
pipeline = factory.createTaskComposerGraph("FreespaceMotionPipeline")
# Set up data storage
storage = TaskComposerDataStorage()
storage.setData("planning_input", program)
storage.setData("environment", env)
storage.setData("profiles", profiles)
# Execute
future = executor.run(pipeline, storage)
future.wait()
# Get result
context = future.context
trajectory = context.data_storage.getData("output_program")
Pipeline Types¶
FreespaceMotionPipeline¶
For joint-space motion without Cartesian constraints:
CartesianMotionPipeline¶
For Cartesian-constrained motion:
from tesseract_robotics.tesseract_common import Isometry3d
target_pose = Isometry3d.Identity()
target_pose.translate([0.8, 0.2, 0.5])
composer.add_cartesian(goal_pose=target_pose)
RasterMotionPipeline¶
For industrial raster patterns (welding, painting):
composer.add_raster(
approach_pose=approach,
raster_poses=raster_waypoints,
departure_pose=depart
)
Task Types¶
| Task | Purpose |
|---|---|
MinLengthTask |
Ensure minimum waypoints |
OMPLMotionPlannerTask |
Sampling-based planning |
TrajOptMotionPlannerTask |
Trajectory optimization |
DescartesMotionPlannerTask |
Cartesian graph search |
SimpleMotionPlannerTask |
Joint interpolation |
TimeOptimalParameterizationTask |
Add timing |
IterativeSplineParameterizationTask |
Smooth timing |
ContactCheckTask |
Collision validation |
FixStateBoundsTask |
Clamp to joint limits |
FixStateCollisionTask |
Push out of collision |
Profiles¶
Profiles configure each task's behavior:
from tesseract_robotics.tesseract_motion_planners_ompl import (
OMPLDefaultPlanProfile
)
from tesseract_robotics.tesseract_motion_planners_trajopt import (
TrajOptDefaultPlanProfile,
TrajOptDefaultCompositeProfile
)
# OMPL profile
ompl_profile = OMPLDefaultPlanProfile()
ompl_profile.planning_time = 5.0
# TrajOpt profiles
trajopt_plan = TrajOptDefaultPlanProfile()
trajopt_composite = TrajOptDefaultCompositeProfile()
trajopt_composite.smooth_velocities = True
# Add to profile dictionary
profiles = ProfileDictionary()
profiles.addProfile("ompl", "DEFAULT", ompl_profile)
profiles.addProfile("trajopt", "DEFAULT", trajopt_plan)
profiles.addProfile("trajopt", "DEFAULT", trajopt_composite)
Error Handling¶
result = composer.plan()
if not result.success:
print(f"Planning failed: {result.message}")
# Check which task failed
for task_name, task_result in result.task_results.items():
if not task_result.success:
print(f" Failed task: {task_name}")
print(f" Reason: {task_result.message}")
Common Failures¶
| Error | Cause | Solution |
|---|---|---|
| "OMPL failed to find solution" | Start/goal in collision or unreachable | Check collision, increase time |
| "TrajOpt failed to converge" | Bad initial trajectory | Use simpler planner first |
| "Contact check failed" | Trajectory has collisions | Increase collision margin |
| "IK failed" | Cartesian target unreachable | Check workspace limits |
Parallel Execution¶
The Task Composer uses Taskflow for parallel execution:
# Configure thread pool
executor = factory.createTaskComposerExecutor(
"TaskflowExecutor",
num_threads=8
)
Automatic Parallelization
Independent tasks (like IK for different waypoints) run in parallel automatically. The task graph determines dependencies.
Custom Pipelines¶
Define custom task graphs in YAML:
# custom_pipeline.yaml
task_composer_plugins:
tasks:
MyCustomPipeline:
type: graph
nodes:
- name: plan_ompl
type: OMPLMotionPlannerTask
input: planning_input
output: ompl_output
- name: smooth_trajopt
type: TrajOptMotionPlannerTask
input: ompl_output
output: trajopt_output
depends: [plan_ompl]
- name: time_param
type: TimeOptimalParameterizationTask
input: trajopt_output
output: output_program
depends: [smooth_trajopt]
Integration Example¶
Complete pick-and-place with Task Composer:
from tesseract_robotics.planning import Robot, Composer
import numpy as np
robot = Robot.from_tesseract_support("abb_irb2400")
composer = Composer(robot)
# Define poses
home = np.zeros(6)
pick_approach = np.array([0.3, -0.4, 0.3, 0, 0.5, 0])
pick = np.array([0.3, -0.4, 0.5, 0, 0.5, 0])
place_approach = np.array([-0.3, -0.4, 0.3, 0, 0.5, 0])
place = np.array([-0.3, -0.4, 0.5, 0, 0.5, 0])
# Build program
composer.add_freespace(goal_joints=pick_approach) # Approach pick
composer.add_linear(goal_joints=pick) # Linear to pick
# ... attach object ...
composer.add_linear(goal_joints=pick_approach) # Retreat
composer.add_freespace(goal_joints=place_approach) # Transit to place
composer.add_linear(goal_joints=place) # Linear to place
# ... detach object ...
composer.add_linear(goal_joints=place_approach) # Retreat
composer.add_freespace(goal_joints=home) # Return home
# Execute planning
result = composer.plan()
if result.success:
# Visualize
from tesseract_robotics.viewer import TesseractViewer
viewer = TesseractViewer()
viewer.update_environment(robot.env, [0, 0, 0])
viewer.update_trajectory(result.raw_results)
viewer.start_serve_background()
Performance Tips¶
Plugin Loading Overhead¶
First plan() call is slow (~10-15s)
The Task Composer uses dynamic plugin loading (dlopen). The first call to
plan() loads shared libraries for TrajOpt, OMPL, collision checking, etc.
Subsequent calls in the same process are fast (plugins cached in memory).
This is inherent to the plugin architecture (same behavior in C++).
Use warmup() for interactive applications:
from tesseract_robotics.planning import TaskComposer
# Option 1: Explicit warmup (recommended for timing visibility)
composer = TaskComposer.from_config()
composer.warmup(["TrajOptPipeline"]) # ~10-15s upfront
result = composer.plan(robot, program) # Now fast (~1-2s)
# Option 2: warmup=True parameter
composer = TaskComposer.from_config(warmup=True) # Loads common pipelines
# Option 3: Specific pipelines only
composer = TaskComposer.from_config(warmup=["TrajOptPipeline", "OMPLPipeline"])
When warmup helps:
- Interactive use (REPL, Jupyter) - pay once, plan many times
- GUI/service applications - initialize at startup, fast responses after
- Benchmarking - separate loading time from algorithm time
When warmup doesn't help:
- Single-run scripts (same total time either way)
Reuse the Composer¶
Create once, reuse many times
Warm Starting¶
Use previous solution as seed
For similar plans, use previous solution as seed:
Next Steps¶
- Low-Level SQP - Real-time trajectory optimization
- Online Planning Example - Dynamic replanning