Skip to content

tesseract_robotics.tesseract_task_composer

Task composition and planning pipelines.

Overview

TaskComposer orchestrates complex planning tasks by chaining planners, post-processors, and validators into pipelines.

Python
from tesseract_robotics.tesseract_task_composer import (
    TaskComposerPluginFactory, TaskflowTaskComposerExecutor,
    TaskComposerDataStorage, TaskComposerContext,
    createTaskComposerPluginFactory, createTaskComposerDataStorage,
)

Quick Start

Python
from tesseract_robotics.tesseract_task_composer import (
    createTaskComposerPluginFactory, createTaskComposerDataStorage,
    TaskflowTaskComposerExecutor,
)
from tesseract_robotics.tesseract_command_language import (
    AnyPoly_wrap_CompositeInstruction, AnyPoly_wrap_ProfileDictionary,
    AnyPoly_as_CompositeInstruction,
)

# Create factory and executor
factory = createTaskComposerPluginFactory()
executor = TaskflowTaskComposerExecutor(factory, 4)  # 4 threads

# Get pipeline task
task = factory.createTaskComposerNode("TrajOptPipeline")

# Setup data storage
data = createTaskComposerDataStorage()
data.setData("planning_input", AnyPoly_wrap_CompositeInstruction(program))
data.setData("environment", AnyPoly_wrap_EnvironmentConst(env))
data.setData("profiles", AnyPoly_wrap_ProfileDictionary(profiles))

# Execute
future = executor.run(task, data)
future.wait()

# Get result
context = future.context
output_key = task.getOutputKeys().get("program")
result = AnyPoly_as_CompositeInstruction(context.data_storage.getData(output_key))

Components

TaskComposerPluginFactory

Loads pipeline definitions from YAML config.

Python
from tesseract_robotics.tesseract_task_composer import (
    TaskComposerPluginFactory, createTaskComposerPluginFactory
)

# Use bundled config (auto-configured)
factory = createTaskComposerPluginFactory()

# Or specify config file
factory = TaskComposerPluginFactory(config_path)

# Get available pipelines
# Common pipelines: TrajOptPipeline, OMPLPipeline, FreespaceMotionPipeline
task = factory.createTaskComposerNode("TrajOptPipeline")

TaskflowTaskComposerExecutor

Executes tasks with parallel support.

Python
from tesseract_robotics.tesseract_task_composer import TaskflowTaskComposerExecutor

executor = TaskflowTaskComposerExecutor(factory, num_threads=4)

future = executor.run(task, data_storage)
future.wait()

# Check success
if future.context.isSuccessful():
    print("Planning succeeded")

TaskComposerDataStorage

Key-value storage for pipeline data.

Python
from tesseract_robotics.tesseract_task_composer import createTaskComposerDataStorage
from tesseract_robotics.tesseract_command_language import (
    AnyPoly_wrap_CompositeInstruction, AnyPoly_wrap_ProfileDictionary,
)

data = createTaskComposerDataStorage()

# Set inputs
data.setData("planning_input", AnyPoly_wrap_CompositeInstruction(program))
data.setData("environment", AnyPoly_wrap_EnvironmentConst(env))
data.setData("profiles", AnyPoly_wrap_ProfileDictionary(profiles))

# Get outputs
result = data.getData("program")

TaskComposerContext

Execution context with results and status.

Python
context = future.context

# Check status
if context.isSuccessful():
    output = context.data_storage.getData(output_key)
else:
    print("Failed")

# Access task info
info = context.task_infos

TaskComposerFuture

Async execution handle.

Python
future = executor.run(task, data)

# Wait for completion
future.wait()

# Or wait with timeout (milliseconds)
completed = future.waitFor(5000)  # 5 seconds

# Get context
context = future.context

Available Pipelines

High-Level API (TaskComposer)

The TaskComposer class provides user-friendly pipeline names:

Pipeline Description
FreespaceMotionPipeline OMPL + TrajOpt smoothing + time param
CartesianMotionPipeline Cartesian path with TrajOpt
OMPLPipeline OMPL sampling-based planning only
TrajOptPipeline TrajOpt optimization only
DescartesPipeline Descartes graph search

Raw Config Pipelines

All pipelines available via createTaskComposerNode():

Pipeline Description
FreespacePipeline OMPL + TrajOpt + time param
FreespaceIfoptPipeline OMPL + TrajOptIfopt (OSQP)
CartesianPipeline Cartesian path planning
OMPLPipeline OMPL only
TrajOptPipeline TrajOpt only
TrajOptIfoptPipeline TrajOptIfopt (OSQP solver)
DescartesFPipeline Descartes forward search
DescartesDPipeline Descartes backward search
RasterFtPipeline Raster with freespace transitions
RasterCtPipeline Raster with cartesian transitions
RasterFtOnlyPipeline Raster freespace only
RasterCtOnlyPipeline Raster cartesian only

See task_composer_plugins.yaml for the full list including global variants.

Pipeline Input/Output Keys

Different pipelines use different data keys:

Python
# Get keys from task
input_keys = task.getInputKeys()   # TaskComposerKeys
output_keys = task.getOutputKeys()

# Common patterns:
# TrajOptPipeline: input="planning_input", output="program"
# OMPLPipeline: input="program", output="program"

# Check available keys
if input_keys.has("planning_input"):
    data.setData("planning_input", ...)
elif input_keys.has("program"):
    data.setData("program", ...)

AnyPoly Wrapping

TaskComposer uses type-erased AnyPoly for data storage:

Python
from tesseract_robotics.tesseract_task_composer import (
    AnyPoly_wrap_CompositeInstruction,
    AnyPoly_wrap_EnvironmentConst,
    AnyPoly_wrap_ProfileDictionary,
    AnyPoly_as_CompositeInstruction,
)

# Wrap for storage
wrapped_program = AnyPoly_wrap_CompositeInstruction(program)
wrapped_env = AnyPoly_wrap_EnvironmentConst(env)
wrapped_profiles = AnyPoly_wrap_ProfileDictionary(profiles)

# Unwrap from storage
program = AnyPoly_as_CompositeInstruction(any_poly)

Complete Example

Python
from tesseract_robotics.tesseract_task_composer import (
    createTaskComposerPluginFactory, createTaskComposerDataStorage,
    TaskflowTaskComposerExecutor, AnyPoly_wrap_EnvironmentConst,
)
from tesseract_robotics.tesseract_command_language import (
    AnyPoly_wrap_CompositeInstruction, AnyPoly_wrap_ProfileDictionary,
    AnyPoly_as_CompositeInstruction,
)

# Setup
factory = createTaskComposerPluginFactory()
executor = TaskflowTaskComposerExecutor(factory, 4)
task = factory.createTaskComposerNode("FreespaceMotionPipeline")

# Prepare data
data = createTaskComposerDataStorage()
data.setData("planning_input", AnyPoly_wrap_CompositeInstruction(program))
data.setData("environment", AnyPoly_wrap_EnvironmentConst(env))
data.setData("profiles", AnyPoly_wrap_ProfileDictionary(profiles))

# Execute pipeline
future = executor.run(task, data)
future.wait()

# Extract result
context = future.context
if context.isSuccessful():
    output_key = task.getOutputKeys().get("program")
    result = AnyPoly_as_CompositeInstruction(
        context.data_storage.getData(output_key)
    )
    print(f"Planned trajectory with {len(result)} instructions")
else:
    print("Planning failed")
    for name, info in context.task_infos.items():
        if info.return_value != 0:
            print(f"  {name}: {info.message}")

High-Level Alternative

For simpler usage, use the tesseract_robotics.planning module:

Python
from tesseract_robotics.planning import Robot, plan

robot = Robot.from_tesseract_support("abb_irb2400")
result = plan(robot, program, pipeline="FreespaceMotionPipeline")

if result.successful:
    trajectory = result.trajectory

Auto-generated API Reference

TaskComposerKeys

__init__

Python
__init__() -> None

add

Python
add(port: str, key: str) -> None

addKeys

Python
addKeys(port: str, keys: Sequence[str]) -> None

get

Python
get(port: str) -> str

has

Python
has(port: str) -> bool

empty

Python
empty() -> bool

size

Python
size() -> int

data

Python
data() -> dict[str, str | list[str]]

TaskComposerDataStorage

__init__

Python
__init__() -> None

getName

Python
getName() -> str

setName

Python
setName(name: str) -> None

hasKey

Python
hasKey(key: str) -> bool

setData

Python
setData(key: str, data: AnyPoly) -> None

getData

Python
getData(key: str) -> AnyPoly

removeData

Python
removeData(key: str) -> None

TaskComposerNodeInfo

name property

Python
name: str

return_value property

Python
return_value: int

status_code property

Python
status_code: int

status_message property

Python
status_message: str

elapsed_time property

Python
elapsed_time: float

TaskComposerNodeInfoContainer

__init__

Python
__init__() -> None

getAbortingNodeInfo

Python
getAbortingNodeInfo() -> object

getAllInfos

Python
getAllInfos() -> list

TaskComposerContext

name property writable

Python
name: str

dotgraph property writable

Python
dotgraph: bool

data_storage property writable

Python
data_storage: TaskComposerDataStorage

task_infos property

Python
task_infos: TaskComposerNodeInfoContainer

isAborted

Python
isAborted() -> bool

isSuccessful

Python
isSuccessful() -> bool

TaskComposerNode

getName

Python
getName() -> str

getNamespace

Python
getNamespace() -> str

getUUIDString

Python
getUUIDString() -> str

isConditional

Python
isConditional() -> bool

getInputKeys

Python
getInputKeys() -> TaskComposerKeys

getOutputKeys

Python
getOutputKeys() -> TaskComposerKeys

setInputKeys

Python
setInputKeys(input_keys: TaskComposerKeys) -> None

setOutputKeys

Python
setOutputKeys(output_keys: TaskComposerKeys) -> None

TaskComposerFuture

context property writable

Python
context: TaskComposerContext

valid

Python
valid() -> bool

ready

Python
ready() -> bool

wait

Python
wait() -> None

waitFor

Python
waitFor(duration: timedelta | float) -> 'std::__1::future_status'

TaskComposerExecutor

getName

Python
getName() -> str

getWorkerCount

Python
getWorkerCount() -> int

getTaskCount

Python
getTaskCount() -> int

TaskflowTaskComposerExecutor

getName

Python
getName() -> str

getWorkerCount

Python
getWorkerCount() -> int

getTaskCount

Python
getTaskCount() -> int

TaskComposerPluginFactory

__init__

Python
__init__(config: str, locator: object) -> None

Create from config file path (string) and GeneralResourceLocator

createTaskComposerExecutor

Python
createTaskComposerExecutor(name: str) -> TaskComposerExecutor

Create a task composer executor by name

createTaskComposerNode

Python
createTaskComposerNode(name: str) -> TaskComposerNode

Create a task composer node by name

hasTaskComposerExecutorPlugins

Python
hasTaskComposerExecutorPlugins() -> bool

hasTaskComposerNodePlugins

Python
hasTaskComposerNodePlugins() -> bool

getDefaultTaskComposerExecutorPlugin

Python
getDefaultTaskComposerExecutorPlugin() -> str

getDefaultTaskComposerNodePlugin

Python
getDefaultTaskComposerNodePlugin() -> str

AnyPoly

__init__

Python
__init__() -> None

isNull

Python
isNull() -> bool

getTypeName

Python
getTypeName() -> str

createTaskComposerDataStorage

Python
createTaskComposerDataStorage() -> TaskComposerDataStorage

Create a TaskComposerDataStorage as shared_ptr

createTaskComposerPluginFactory

Python
createTaskComposerPluginFactory(config: str, locator: object) -> TaskComposerPluginFactory

Create a TaskComposerPluginFactory from a config file path (string) and GeneralResourceLocator

AnyPoly_wrap_CompositeInstruction

Python
AnyPoly_wrap_CompositeInstruction(instruction: 'tesseract_planning::CompositeInstruction') -> AnyPoly

Wrap a CompositeInstruction into an AnyPoly

AnyPoly_wrap_ProfileDictionary

Python
AnyPoly_wrap_ProfileDictionary(profiles: 'tesseract_common::ProfileDictionary') -> AnyPoly

Wrap a ProfileDictionary shared_ptr into an AnyPoly

AnyPoly_wrap_EnvironmentConst

Python
AnyPoly_wrap_EnvironmentConst(environment: 'tesseract_environment::Environment') -> AnyPoly

Wrap a const Environment shared_ptr into an AnyPoly

AnyPoly_as_CompositeInstruction

Python
AnyPoly_as_CompositeInstruction(any_poly: AnyPoly) -> 'tesseract_planning::CompositeInstruction'

Extract a CompositeInstruction from an AnyPoly