Skip to content

task

langroid/agent/task.py

TaskConfig

Bases: BaseModel

Configuration for a Task. This is a container for any params that we didn't include in the task __init__ method. We may eventually move all the task init params to this class, analogous to how we have config classes for Agent, ChatAgent, LanguageModel, etc.

Attributes:

Name Type Description
inf_loop_cycle_len int

max exact-loop cycle length: 0 => no inf loop test

inf_loop_dominance_factor float

dominance factor for exact-loop detection

inf_loop_wait_factor int

wait this * cycle_len msgs before loop-check

restart_as_subtask bool

whether to restart every run of this task when run as a subtask.

addressing_prefix str

"@"-like prefix an agent can use to address other agents, or entities of the agent. E.g., if this is "@", the addressing string would be "@Alice", or "@user", "@llm", "@agent", etc. If this is an empty string, then addressing is disabled. Default is empty string "". CAUTION: this is a deprecated practice, since normal prompts can accidentally contain such addressing prefixes, and will break your runs. This could happen especially when your prompt/context contains code, but of course could occur in normal text as well. Instead, use the RecipientTool to have agents address other agents or entities. If you do choose to use addressing_prefix, the recommended setting is to use langroid.utils.constants.AT, which currently is "|@|". Note that this setting does NOT affect the use of constants.SEND_TO -- this is always enabled since this is a critical way for responders to indicate that the message should be sent to a specific entity/agent. (Search for "SEND_TO" in the examples/ dir to see how this is used.)

allow_subtask_multi_oai_tools bool

whether to allow multiple OpenAI tool-calls to be sent to a sub-task.

recognize_string_signals bool

whether to recognize string-based signaling like DONE, SEND_TO, PASS, etc. Default is True, but note that we don't need to use string-based signaling, and it is recommended to use the new Orchestration tools instead (see agent/tools/orchestration.py), e.g. DoneTool, SendTool, etc.

Task(agent=None, name='', llm_delegate=False, single_round=False, system_message='', user_message='', restart=True, default_human_response=None, interactive=True, only_user_quits_root=True, erase_substeps=False, allow_null_result=False, max_stalled_steps=5, default_return_type=None, done_if_no_response=[], done_if_response=[], config=TaskConfig(), **kwargs)

A Task wraps an Agent object, and sets up the Agent's goals and instructions. A Task maintains two key variables:

  • self.pending_message, which is the message awaiting a response, and
  • self.pending_sender, which is the entity that sent the pending message.

The possible responders to self.pending_message are the Agent's own "native" responders (agent_response, llm_response, and user_response), and the run() methods of any sub-tasks. All responders have the same type-signature (somewhat simplified):

str | ChatDocument -> ChatDocument
Responders may or may not specify an intended recipient of their generated response.

The main top-level method in the Task class is run(), which repeatedly calls step() until done() returns true. The step() represents a "turn" in the conversation: this method sequentially (in round-robin fashion) calls the responders until it finds one that generates a valid response to the pending_message (as determined by the valid() method). Once a valid response is found, step() updates the pending_message and pending_sender variables, and on the next iteration, step() re-starts its search for a valid response from the beginning of the list of responders (the exception being that the human user always gets a chance to respond after each non-human valid response). This process repeats until done() returns true, at which point run() returns the value of result(), which is the final result of the task.

Parameters:

Name Type Description Default
agent Agent

agent associated with the task

None
name str

name of the task

''
llm_delegate bool

Whether to delegate "control" to LLM; conceptually, the "controlling entity" is the one "seeking" responses to its queries, and has a goal it is aiming to achieve, and decides when a task is done. The "controlling entity" is either the LLM or the USER. (Note within a Task there is just one LLM, and all other entities are proxies of the "User" entity). See also: done_if_response, done_if_no_response for more granular control of task termination.

False
single_round bool

If true, task runs until one message by "controller" (i.e. LLM if llm_delegate is true, otherwise USER) and subsequent response by non-controller [When a tool is involved, this will not give intended results. See done_if_response, done_if_no_response below]. termination]. If false, runs for the specified number of turns in run, or until done() is true. One run of step() is considered a "turn". See also: done_if_response, done_if_no_response for more granular control of task termination.

False
system_message str

if not empty, overrides agent's system_message

''
user_message str

if not empty, overrides agent's user_message

''
restart bool

if true, resets the agent's message history at every run.

True
default_human_response str | None

default response from user; useful for testing, to avoid interactive input from user. [Instead of this, setting interactive usually suffices]

None
default_return_type Optional[type]

if not None, extracts a value of this type from the result of self.run()

None
interactive bool

if true, wait for human input after each non-human response (prevents infinite loop of non-human responses). Default is true. If false, then default_human_response is set to "" Note: When interactive = False, the one exception is when the user is explicitly addressed, via "@user" or using RecipientTool, in which case the system will wait for a user response. In other words, use interactive=False when you want a "largely non-interactive" run, with the exception of explicit user addressing.

True
only_user_quits_root bool

if true, when interactive=True, only user can quit the root task (Ignored when interactive=False).

True
erase_substeps bool

if true, when task completes, erase intermediate conversation with subtasks from this agent's message_history, and also erase all subtask agents' message_history. Note: erasing can reduce prompt sizes, but results in repetitive sub-task delegation.

False
allow_null_result bool

If true, create dummy NO_ANSWER response when no valid response is found in a step. Optional, default is False. Note: In non-interactive mode, when this is set to True, you can have a situation where an LLM generates (non-tool) text, and no other responders have valid responses, and a "Null result" is inserted as a dummy response from the User entity, so the LLM will now respond to this Null result, and this will continue until the LLM emits a DONE signal (if instructed to do so), otherwise langroid detects a potential infinite loop after a certain number of such steps (= TaskConfig.inf_loop_wait_factor) and will raise an InfiniteLoopException.

False
max_stalled_steps int

task considered done after this many consecutive steps with no progress. Default is 3.

5
done_if_no_response List[Responder]

consider task done if NULL response from any of these responders. Default is empty list.

[]
done_if_response List[Responder]

consider task done if NON-NULL response from any of these responders. Default is empty list.

[]
Source code in langroid/agent/task.py
def __init__(
    self,
    agent: Optional[Agent] = None,
    name: str = "",
    llm_delegate: bool = False,
    single_round: bool = False,
    system_message: str = "",
    user_message: str | None = "",
    restart: bool = True,
    default_human_response: Optional[str] = None,
    interactive: bool = True,
    only_user_quits_root: bool = True,
    erase_substeps: bool = False,
    allow_null_result: bool = False,
    max_stalled_steps: int = 5,
    default_return_type: Optional[type] = None,
    done_if_no_response: List[Responder] = [],
    done_if_response: List[Responder] = [],
    config: TaskConfig = TaskConfig(),
    **kwargs: Any,  # catch-all for any legacy params, for backwards compatibility
):
    """
    A task to be performed by an agent.

    Args:
        agent (Agent): agent associated with the task
        name (str): name of the task
        llm_delegate (bool):
            Whether to delegate "control" to LLM; conceptually,
            the "controlling entity" is the one "seeking" responses to its queries,
            and has a goal it is aiming to achieve, and decides when a task is done.
            The "controlling entity" is either the LLM or the USER.
            (Note within a Task there is just one
            LLM, and all other entities are proxies of the "User" entity).
            See also: `done_if_response`, `done_if_no_response` for more granular
            control of task termination.
        single_round (bool):
            If true, task runs until one message by "controller"
            (i.e. LLM if `llm_delegate` is true, otherwise USER)
            and subsequent response by non-controller [When a tool is involved,
            this will not give intended results. See `done_if_response`,
            `done_if_no_response` below].
            termination]. If false, runs for the specified number of turns in
            `run`, or until `done()` is true.
            One run of step() is considered a "turn".
            See also: `done_if_response`, `done_if_no_response` for more granular
            control of task termination.
        system_message (str): if not empty, overrides agent's system_message
        user_message (str): if not empty, overrides agent's user_message
        restart (bool): if true, resets the agent's message history *at every run*.
        default_human_response (str|None): default response from user; useful for
            testing, to avoid interactive input from user.
            [Instead of this, setting `interactive` usually suffices]
        default_return_type: if not None, extracts a value of this type from the
            result of self.run()
        interactive (bool): if true, wait for human input after each non-human
            response (prevents infinite loop of non-human responses).
            Default is true. If false, then `default_human_response` is set to ""
            Note: When interactive = False, the one exception is when the user
            is explicitly addressed, via "@user" or using RecipientTool, in which
            case the system will wait for a user response. In other words, use
            `interactive=False` when you want a "largely non-interactive"
            run, with the exception of explicit user addressing.
        only_user_quits_root (bool): if true, when interactive=True, only user can
            quit the root task (Ignored when interactive=False).
        erase_substeps (bool): if true, when task completes, erase intermediate
            conversation with subtasks from this agent's `message_history`, and also
            erase all subtask agents' `message_history`.
            Note: erasing can reduce prompt sizes, but results in repetitive
            sub-task delegation.
        allow_null_result (bool):
            If true, create dummy NO_ANSWER response when no valid response is found
            in a step.
            Optional, default is False.
            *Note:* In non-interactive mode, when this is set to True,
            you can have a situation where an LLM generates (non-tool) text,
            and no other responders have valid responses, and a "Null result"
            is inserted as a dummy response from the User entity, so the LLM
            will now respond to this Null result, and this will continue
            until the LLM emits a DONE signal (if instructed to do so),
            otherwise langroid detects a potential infinite loop after
            a certain number of such steps (= `TaskConfig.inf_loop_wait_factor`)
            and will raise an InfiniteLoopException.
        max_stalled_steps (int): task considered done after this many consecutive
            steps with no progress. Default is 3.
        done_if_no_response (List[Responder]): consider task done if NULL
            response from any of these responders. Default is empty list.
        done_if_response (List[Responder]): consider task done if NON-NULL
            response from any of these responders. Default is empty list.
    """
    if agent is None:
        agent = ChatAgent()
    self.callbacks = SimpleNamespace(
        show_subtask_response=noop_fn,
        set_parent_agent=noop_fn,
    )
    self.config = config
    # how to behave as a sub-task; can be overridden by `add_sub_task()`
    self.config_sub_task = copy.deepcopy(config)
    # counts of distinct pending messages in history,
    # to help detect (exact) infinite loops
    self.message_counter: Counter[str] = Counter()
    self._init_message_counter()

    self.history: Deque[str] = deque(
        maxlen=self.config.inf_loop_cycle_len * self.config.inf_loop_wait_factor
    )
    # copy the agent's config, so that we don't modify the original agent's config,
    # which may be shared by other agents.
    try:
        config_copy = copy.deepcopy(agent.config)
        agent.config = config_copy
    except Exception:
        logger.warning(
            """
            Failed to deep-copy Agent config during task creation, 
            proceeding with original config. Be aware that changes to 
            the config may affect other agents using the same config.
            """
        )
    self.restart = restart
    agent = cast(ChatAgent, agent)
    self.agent: ChatAgent = agent
    if isinstance(agent, ChatAgent) and len(agent.message_history) == 0 or restart:
        self.agent.init_state()
        # possibly change the system and user messages
        if system_message:
            # we always have at least 1 task_message
            self.agent.set_system_message(system_message)
        if user_message:
            self.agent.set_user_message(user_message)
    self.max_cost: float = 0
    self.max_tokens: int = 0
    self.session_id: str = ""
    self.logger: None | RichFileLogger = None
    self.tsv_logger: None | logging.Logger = None
    self.color_log: bool = False if settings.notebook else True

    self.n_stalled_steps = 0  # how many consecutive steps with no progress?
    # how many 2-step-apart alternations of no_answer step-result have we had,
    # i.e. x1, N/A, x2, N/A, x3, N/A ...
    self.n_no_answer_alternations = 0
    self._no_answer_step: int = -5
    self._step_idx = -1  # current step index
    self.max_stalled_steps = max_stalled_steps
    self.done_if_response = [r.value for r in done_if_response]
    self.done_if_no_response = [r.value for r in done_if_no_response]
    self.is_done = False  # is task done (based on response)?
    self.is_pass_thru = False  # is current response a pass-thru?
    if name:
        # task name overrides name in agent config
        agent.config.name = name
    self.name = name or agent.config.name
    self.value: str = self.name

    self.default_human_response = default_human_response
    if default_human_response is not None:
        # only override agent's default_human_response if it is explicitly set
        self.agent.default_human_response = default_human_response
    self.interactive = interactive
    self.agent.interactive = interactive
    self.only_user_quits_root = only_user_quits_root
    self.message_history_idx = -1
    self.default_return_type = default_return_type

    # set to True if we want to collapse multi-turn conversation with sub-tasks into
    # just the first outgoing message and last incoming message.
    # Note this also completely erases sub-task agents' message_history.
    self.erase_substeps = erase_substeps
    self.allow_null_result = allow_null_result

    agent_entity_responders = agent.entity_responders()
    agent_entity_responders_async = agent.entity_responders_async()
    self.responders: List[Responder] = [e for e, _ in agent_entity_responders]
    self.responders_async: List[Responder] = [
        e for e, _ in agent_entity_responders_async
    ]
    self.non_human_responders: List[Responder] = [
        r for r in self.responders if r != Entity.USER
    ]
    self.non_human_responders_async: List[Responder] = [
        r for r in self.responders_async if r != Entity.USER
    ]

    self.human_tried = False  # did human get a chance to respond in last step?
    self._entity_responder_map: Dict[
        Entity, Callable[..., Optional[ChatDocument]]
    ] = dict(agent_entity_responders)

    self._entity_responder_async_map: Dict[
        Entity, Callable[..., Coroutine[Any, Any, Optional[ChatDocument]]]
    ] = dict(agent_entity_responders_async)

    self.name_sub_task_map: Dict[str, Task] = {}
    # latest message in a conversation among entities and agents.
    self.pending_message: Optional[ChatDocument] = None
    self.pending_sender: Responder = Entity.USER
    self.single_round = single_round
    self.turns = -1  # no limit
    self.llm_delegate = llm_delegate
    if llm_delegate:
        if self.single_round:
            # 0: User instructs (delegating to LLM);
            # 1: LLM (as the Controller) asks;
            # 2: user replies.
            self.turns = 2
    else:
        if self.single_round:
            # 0: User (as Controller) asks,
            # 1: LLM replies.
            self.turns = 1
    # other sub_tasks this task can delegate to
    self.sub_tasks: List[Task] = []
    self.caller: Task | None = None  # which task called this task's `run` method

clone(i)

Returns a copy of this task, with a new agent.

Source code in langroid/agent/task.py
def clone(self, i: int) -> "Task":
    """
    Returns a copy of this task, with a new agent.
    """
    assert isinstance(self.agent, ChatAgent), "Task clone only works for ChatAgent"
    agent: ChatAgent = self.agent.clone(i)
    return Task(
        agent,
        name=self.name + f"-{i}",
        llm_delegate=self.llm_delegate,
        single_round=self.single_round,
        system_message=self.agent.system_message,
        user_message=self.agent.user_message,
        restart=self.restart,
        default_human_response=self.default_human_response,
        interactive=self.interactive,
        erase_substeps=self.erase_substeps,
        allow_null_result=self.allow_null_result,
        max_stalled_steps=self.max_stalled_steps,
        done_if_no_response=[Entity(s) for s in self.done_if_no_response],
        done_if_response=[Entity(s) for s in self.done_if_response],
        config=self.config,
    )

kill_session(session_id='') classmethod

Kill the session with the given session_id.

Source code in langroid/agent/task.py
@classmethod
def kill_session(cls, session_id: str = "") -> None:
    """
    Kill the session with the given session_id.
    """
    session_id_kill_key = f"{session_id}:kill"
    cls.cache().store(session_id_kill_key, "1")

kill()

Kill the task run associated with the current session.

Source code in langroid/agent/task.py
def kill(self) -> None:
    """
    Kill the task run associated with the current session.
    """
    self._cache_session_store("kill", "1")

add_sub_task(task)

Add a sub-task (or list of subtasks) that this task can delegate (or fail-over) to. Note that the sequence of sub-tasks is important, since these are tried in order, as the parent task searches for a valid response (unless a sub-task is explicitly addressed).

Parameters:

Name Type Description Default
task Task | List[Task] | Tuple[Task, TaskConfig] | List[Tuple[Task, TaskConfig]]

A task, or list of tasks, or a tuple of task and task config, or a list of tuples of task and task config. These tasks are added as sub-tasks of the current task. The task configs (if any) dictate how the tasks are run when invoked as sub-tasks of other tasks. This allows users to specify behavior applicable only in the context of a particular task-subtask combination.

required
Source code in langroid/agent/task.py
def add_sub_task(
    self,
    task: (
        Task | List[Task] | Tuple[Task, TaskConfig] | List[Tuple[Task, TaskConfig]]
    ),
) -> None:
    """
    Add a sub-task (or list of subtasks) that this task can delegate
    (or fail-over) to. Note that the sequence of sub-tasks is important,
    since these are tried in order, as the parent task searches for a valid
    response (unless a sub-task is explicitly addressed).

    Args:
        task: A task, or list of tasks, or a tuple of task and task config,
            or a list of tuples of task and task config.
            These tasks are added as sub-tasks of the current task.
            The task configs (if any) dictate how the tasks are run when
            invoked as sub-tasks of other tasks. This allows users to specify
            behavior applicable only in the context of a particular task-subtask
            combination.
    """
    if isinstance(task, list):
        for t in task:
            self.add_sub_task(t)
        return

    if isinstance(task, tuple):
        task, config = task
    else:
        config = TaskConfig()
    task.config_sub_task = config
    self.sub_tasks.append(task)
    self.name_sub_task_map[task.name] = task
    self.responders.append(cast(Responder, task))
    self.responders_async.append(cast(Responder, task))
    self.non_human_responders.append(cast(Responder, task))
    self.non_human_responders_async.append(cast(Responder, task))

init(msg=None)

Initialize the task, with an optional message to start the conversation. Initializes self.pending_message and self.pending_sender. Args: msg (str|ChatDocument): optional message to start the conversation.

Returns:

Type Description
ChatDocument | None

the initialized self.pending_message.

ChatDocument | None

Currently not used in the code, but provided for convenience.

Source code in langroid/agent/task.py
def init(self, msg: None | str | ChatDocument = None) -> ChatDocument | None:
    """
    Initialize the task, with an optional message to start the conversation.
    Initializes `self.pending_message` and `self.pending_sender`.
    Args:
        msg (str|ChatDocument): optional message to start the conversation.

    Returns:
        (ChatDocument|None): the initialized `self.pending_message`.
        Currently not used in the code, but provided for convenience.
    """
    self.pending_sender = Entity.USER
    if isinstance(msg, str):
        self.pending_message = ChatDocument(
            content=msg,
            metadata=ChatDocMetaData(
                sender=Entity.USER,
            ),
        )
    elif msg is None and len(self.agent.message_history) > 1:
        # if agent has a history beyond system msg, set the
        # pending message to the ChatDocument linked from
        # last message in the history
        last_agent_msg = self.agent.message_history[-1]
        self.pending_message = ChatDocument.from_id(last_agent_msg.chat_document_id)
        if self.pending_message is not None:
            self.pending_sender = self.pending_message.metadata.sender
    else:
        if isinstance(msg, ChatDocument):
            # carefully deep-copy: fresh metadata.id, register
            # as new obj in registry
            self.pending_message = ChatDocument.deepcopy(msg)
        if self.pending_message is not None and self.caller is not None:
            # msg may have come from `caller`, so we pretend this is from
            # the CURRENT task's USER entity
            self.pending_message.metadata.sender = Entity.USER
            # update parent, child, agent pointers
            if msg is not None:
                msg.metadata.child_id = self.pending_message.metadata.id
                self.pending_message.metadata.parent_id = msg.metadata.id
            self.pending_message.metadata.agent_id = self.agent.id

    self._show_pending_message_if_debug()

    if self.caller is not None and self.caller.logger is not None:
        self.logger = self.caller.logger
    elif self.logger is None:
        self.logger = RichFileLogger(
            str(Path(self.config.logs_dir) / f"{self.name}.log"),
            color=self.color_log,
        )

    if self.caller is not None and self.caller.tsv_logger is not None:
        self.tsv_logger = self.caller.tsv_logger
    elif self.tsv_logger is None:
        self.tsv_logger = setup_file_logger(
            "tsv_logger",
            str(Path(self.config.logs_dir) / f"{self.name}.tsv"),
        )
        header = ChatDocLoggerFields().tsv_header()
        self.tsv_logger.info(f" \tTask\tResponder\t{header}")

    self.log_message(Entity.USER, self.pending_message)
    return self.pending_message

reset_all_sub_tasks()

Recursively reset message history & state of own agent and those of all sub-tasks.

Source code in langroid/agent/task.py
def reset_all_sub_tasks(self) -> None:
    """
    Recursively reset message history & state of own agent and
    those of all sub-tasks.
    """
    self.agent.init_state()
    for t in self.sub_tasks:
        t.reset_all_sub_tasks()

run(msg=None, turns=-1, caller=None, max_cost=0, max_tokens=0, session_id='', allow_restart=True, return_type=None)

Synchronous version of run_async(). See run_async() for details.

Source code in langroid/agent/task.py
def run(
    self,
    msg: Any = None,
    turns: int = -1,
    caller: None | Task = None,
    max_cost: float = 0,
    max_tokens: int = 0,
    session_id: str = "",
    allow_restart: bool = True,
    return_type: Optional[Type[T]] = None,
) -> Optional[ChatDocument | T]:
    """Synchronous version of `run_async()`.
    See `run_async()` for details."""
    if allow_restart and (
        (self.restart and caller is None)
        or (self.config_sub_task.restart_as_subtask and caller is not None)
    ):
        # We are either at top level, with restart = True, OR
        # we are a sub-task with restart_as_subtask = True,
        # so reset own agent and recursively for all sub-tasks
        self.reset_all_sub_tasks()

    self.n_stalled_steps = 0
    self._no_answer_step = -5  # last step where the best explicit response was N/A
    # how many N/A alternations have we had so far? (for Inf loop detection)
    self.n_no_answer_alternations = 0
    self.max_cost = max_cost
    self.max_tokens = max_tokens
    self.session_id = session_id
    self._set_alive()
    self._init_message_counter()
    self.history.clear()

    msg_input = self.agent.to_ChatDocument(msg, author_entity=Entity.USER)

    if (
        isinstance(msg_input, ChatDocument)
        and msg_input.metadata.recipient != ""
        and msg_input.metadata.recipient != self.name
    ):
        # this task is not the intended recipient so return None
        return None

    self._pre_run_loop(
        msg=msg_input,
        caller=caller,
        is_async=False,
    )
    # self.turns overrides if it is > 0 and turns not set (i.e. = -1)
    turns = self.turns if turns < 0 else turns
    i = 0
    while True:
        self._step_idx = i  # used in step() below
        self.step()
        done, status = self.done()
        if done:
            if self._level == 0 and not settings.quiet:
                print("[magenta]Bye, hope this was useful!")
            break
        i += 1
        max_turns = (
            min(turns, settings.max_turns)
            if turns > 0 and settings.max_turns > 0
            else max(turns, settings.max_turns)
        )
        if max_turns > 0 and i >= max_turns:
            # Important to distinguish between:
            # (a) intentional run for a
            #     fixed number of turns, where we expect the pending message
            #     at that stage to be the desired result, and
            # (b) hitting max_turns limit, which is not intentional, and is an
            #     exception, resulting in a None task result
            status = (
                StatusCode.MAX_TURNS
                if i == settings.max_turns
                else StatusCode.FIXED_TURNS
            )
            break
        if (
            self.config.inf_loop_cycle_len > 0
            and i % self.config.inf_loop_cycle_len == 0
            and self._maybe_infinite_loop()
            or self.n_no_answer_alternations > self.config.inf_loop_wait_factor
        ):
            raise InfiniteLoopException(
                """Possible infinite loop detected!
                You can adjust infinite loop detection (or turn it off)
                by changing the params in the TaskConfig passed to the Task 
                constructor; see here:
                https://langroid.github.io/langroid/reference/agent/task/#langroid.agent.task.TaskConfig
                """
            )

    final_result = self.result(status)
    self._post_run_loop()
    if final_result is None:
        return None

    if return_type is None:
        return_type = self.default_return_type

    # If possible, take a final strict decoding step
    # when the output does not match `return_type`
    if return_type is not None and return_type != ChatDocument:
        parsed_result = self.agent.from_ChatDocument(final_result, return_type)

        if (
            parsed_result is None
            and isinstance(self.agent, ChatAgent)
            and self.agent._json_schema_available()
        ):
            strict_agent = self.agent[return_type]
            output_args = strict_agent._function_args()[-1]
            if output_args is not None:
                schema = output_args.function.parameters
                strict_result = strict_agent.llm_response(
                    f"""
                    A response adhering to the following JSON schema was expected:
                    {schema}

                    Please resubmit with the correct schema. 
                    """
                )

                if strict_result is not None:
                    return cast(
                        Optional[T],
                        strict_agent.from_ChatDocument(strict_result, return_type),
                    )

        return parsed_result

    return final_result

run_async(msg=None, turns=-1, caller=None, max_cost=0, max_tokens=0, session_id='', allow_restart=True, return_type=None) async

Loop over step() until task is considered done or turns is reached. Runs asynchronously.

Parameters:

Name Type Description Default
msg Any

initial user-role message to process; if None, the LLM will respond to its initial self.task_messages which set up and kick off the overall task. The agent tries to achieve this goal by looping over self.step() until the task is considered done; this can involve a series of messages produced by Agent, LLM or Human (User). Note that msg, if passed, is treated as message with role user; a "system" role message should not be passed here.

None
turns int

number of turns to run the task for; default is -1, which means run until task is done.

-1
caller Task | None

the calling task, if any

None
max_cost float

max cost allowed for the task (default 0 -> no limit)

0
max_tokens int

max tokens allowed for the task (default 0 -> no limit)

0
session_id str

session id for the task

''
allow_restart bool

whether to allow restarting the task

True
return_type Optional[Type[T]]

desired final result type

None

Returns:

Type Description
Optional[ChatDocument | T]

Optional[ChatDocument]: valid result of the task.

Source code in langroid/agent/task.py
async def run_async(
    self,
    msg: Any = None,
    turns: int = -1,
    caller: None | Task = None,
    max_cost: float = 0,
    max_tokens: int = 0,
    session_id: str = "",
    allow_restart: bool = True,
    return_type: Optional[Type[T]] = None,
) -> Optional[ChatDocument | T]:
    """
    Loop over `step()` until task is considered done or `turns` is reached.
    Runs asynchronously.

    Args:
        msg (Any): initial *user-role* message to process; if None,
            the LLM will respond to its initial `self.task_messages`
            which set up and kick off the overall task.
            The agent tries to achieve this goal by looping
            over `self.step()` until the task is considered
            done; this can involve a series of messages produced by Agent,
            LLM or Human (User). Note that `msg`, if passed, is treated as
            message with role `user`; a "system" role message should not be
            passed here.
        turns (int): number of turns to run the task for;
            default is -1, which means run until task is done.
        caller (Task|None): the calling task, if any
        max_cost (float): max cost allowed for the task (default 0 -> no limit)
        max_tokens (int): max tokens allowed for the task (default 0 -> no limit)
        session_id (str): session id for the task
        allow_restart (bool): whether to allow restarting the task
        return_type (Optional[Type[T]]): desired final result type

    Returns:
        Optional[ChatDocument]: valid result of the task.
    """

    # Even if the initial "sender" is not literally the USER (since the task could
    # have come from another LLM), as far as this agent is concerned, the initial
    # message can be considered to be from the USER
    # (from the POV of this agent's LLM).

    if allow_restart and (
        (self.restart and caller is None)
        or (self.config_sub_task.restart_as_subtask and caller is not None)
    ):
        # We are either at top level, with restart = True, OR
        # we are a sub-task with restart_as_subtask = True,
        # so reset own agent and recursively for all sub-tasks
        self.reset_all_sub_tasks()

    self.n_stalled_steps = 0
    self._no_answer_step = -5  # last step where the best explicit response was N/A
    # how many N/A alternations have we had so far? (for Inf loop detection)
    self.n_no_answer_alternations = 0
    self.max_cost = max_cost
    self.max_tokens = max_tokens
    self.session_id = session_id
    self._set_alive()
    self._init_message_counter()
    self.history.clear()

    msg_input = self.agent.to_ChatDocument(msg, author_entity=Entity.USER)

    if (
        isinstance(msg_input, ChatDocument)
        and msg_input.metadata.recipient != ""
        and msg_input.metadata.recipient != self.name
    ):
        # this task is not the intended recipient so return None
        return None

    self._pre_run_loop(
        msg=msg_input,
        caller=caller,
        is_async=False,
    )
    # self.turns overrides if it is > 0 and turns not set (i.e. = -1)
    turns = self.turns if turns < 0 else turns
    i = 0
    while True:
        self._step_idx = i  # used in step() below
        await self.step_async()
        await asyncio.sleep(0.01)  # temp yield to avoid blocking
        done, status = self.done()
        if done:
            if self._level == 0 and not settings.quiet:
                print("[magenta]Bye, hope this was useful!")
            break
        i += 1
        max_turns = (
            min(turns, settings.max_turns)
            if turns > 0 and settings.max_turns > 0
            else max(turns, settings.max_turns)
        )
        if max_turns > 0 and i >= max_turns:
            # Important to distinguish between:
            # (a) intentional run for a
            #     fixed number of turns, where we expect the pending message
            #     at that stage to be the desired result, and
            # (b) hitting max_turns limit, which is not intentional, and is an
            #     exception, resulting in a None task result
            status = (
                StatusCode.MAX_TURNS
                if i == settings.max_turns
                else StatusCode.FIXED_TURNS
            )
            break
        if (
            self.config.inf_loop_cycle_len > 0
            and i % self.config.inf_loop_cycle_len == 0
            and self._maybe_infinite_loop()
            or self.n_no_answer_alternations > self.config.inf_loop_wait_factor
        ):
            raise InfiniteLoopException(
                """Possible infinite loop detected!
                You can adjust infinite loop detection (or turn it off)
                by changing the params in the TaskConfig passed to the Task 
                constructor; see here:
                https://langroid.github.io/langroid/reference/agent/task/#langroid.agent.task.TaskConfig
                """
            )

    final_result = self.result(status)
    self._post_run_loop()
    if final_result is None:
        return None

    if return_type is None:
        return_type = self.default_return_type

    # If possible, take a final strict decoding step
    # when the output does not match `return_type`
    if return_type is not None and return_type != ChatDocument:
        parsed_result = self.agent.from_ChatDocument(final_result, return_type)

        if (
            parsed_result is None
            and isinstance(self.agent, ChatAgent)
            and self.agent._json_schema_available()
        ):
            strict_agent = self.agent[return_type]
            output_args = strict_agent._function_args()[-1]
            if output_args is not None:
                schema = output_args.function.parameters
                strict_result = await strict_agent.llm_response_async(
                    f"""
                    A response adhering to the following JSON schema was expected:
                    {schema}

                    Please resubmit with the correct schema. 
                    """
                )

                if strict_result is not None:
                    return cast(
                        Optional[T],
                        strict_agent.from_ChatDocument(strict_result, return_type),
                    )

        return parsed_result

    return final_result

step(turns=-1)

Synchronous version of step_async(). See step_async() for details. TODO: Except for the self.response() calls, this fn should be identical to step_async(). Consider refactoring to avoid duplication.

Source code in langroid/agent/task.py
def step(self, turns: int = -1) -> ChatDocument | None:
    """
    Synchronous version of `step_async()`. See `step_async()` for details.
    TODO: Except for the self.response() calls, this fn should be identical to
    `step_async()`. Consider refactoring to avoid duplication.
    """
    self.is_done = False
    parent = self.pending_message
    recipient = (
        ""
        if self.pending_message is None
        else self.pending_message.metadata.recipient
    )
    if not self._valid_recipient(recipient):
        logger.warning(f"Invalid recipient: {recipient}")
        error_doc = ChatDocument(
            content=f"Invalid recipient: {recipient}",
            metadata=ChatDocMetaData(
                sender=Entity.AGENT,
                sender_name=Entity.AGENT,
            ),
        )
        self._process_valid_responder_result(Entity.AGENT, parent, error_doc)
        return error_doc

    responders: List[Responder] = self.non_human_responders.copy()

    if (
        Entity.USER in self.responders
        and not self.human_tried
        and not self.agent.has_tool_message_attempt(self.pending_message)
    ):
        # Give human first chance if they haven't been tried in last step,
        # and the msg is not a tool-call attempt;
        # (When `interactive=False`, human is only allowed to respond only if
        #  if explicitly addressed)
        # This ensures human gets a chance to respond,
        #   other than to a LLM tool-call.
        # When there's a tool msg attempt we want the
        #  Agent to be the next responder; this only makes a difference in an
        #  interactive setting: LLM generates tool, then we don't want user to
        #  have to respond, and instead let the agent_response handle the tool.

        responders.insert(0, Entity.USER)

    found_response = False
    # (responder, result) from a responder who explicitly said NO_ANSWER
    no_answer_response: None | Tuple[Responder, ChatDocument] = None
    n_non_responders = 0
    for r in responders:
        self.is_pass_thru = False
        if not self._can_respond(r):
            n_non_responders += 1
            # create dummy msg for logging
            log_doc = ChatDocument(
                content="[CANNOT RESPOND]",
                metadata=ChatDocMetaData(
                    sender=r if isinstance(r, Entity) else Entity.USER,
                    sender_name=str(r),
                    recipient=recipient,
                ),
            )
            # no need to register this dummy msg in ObjectRegistry
            ChatDocument.delete_id(log_doc.id())
            self.log_message(r, log_doc)
            if n_non_responders == len(responders):
                # don't stay in this "non-response" loop forever
                break
            continue
        self.human_tried = r == Entity.USER
        result = self.response(r, turns)
        if result and NO_ANSWER in result.content:
            no_answer_response = (r, result)
        self.is_done = self._is_done_response(result, r)
        self.is_pass_thru = PASS in result.content if result else False
        if self.valid(result, r):
            found_response = True
            assert result is not None
            self._process_valid_responder_result(r, parent, result)
            break
        else:
            self.log_message(r, result)
        if self.is_done:
            # skip trying other responders in this step
            break
    if not found_response:  # did not find a valid response
        if no_answer_response:
            # even though there was no valid response from anyone in this step,
            # if there was at least one who EXPLICITLY said NO_ANSWER, then
            # we process that as a valid response.
            r, result = no_answer_response
            self._process_valid_responder_result(r, parent, result)
        else:
            self._process_invalid_step_result(parent)
    self._show_pending_message_if_debug()
    return self.pending_message

step_async(turns=-1) async

A single "turn" in the task conversation: The "allowed" responders in this turn (which can be either the 3 "entities", or one of the sub-tasks) are tried in sequence, until a valid response is obtained; a valid response is one that contributes to the task, either by ending it, or producing a response to be further acted on. Update self.pending_message to the latest valid response (or NO_ANSWER if no valid response was obtained from any responder).

Parameters:

Name Type Description Default
turns int

number of turns to process. Typically used in testing where there is no human to "quit out" of current level, or in cases where we want to limit the number of turns of a delegated agent.

-1

Returns (ChatDocument|None): Updated self.pending_message. Currently the return value is not used by the task.run() method, but we return this as a convenience for other use-cases, e.g. where we want to run a task step by step in a different context.

Source code in langroid/agent/task.py
async def step_async(self, turns: int = -1) -> ChatDocument | None:
    """
    A single "turn" in the task conversation: The "allowed" responders in this
    turn (which can be either the 3 "entities", or one of the sub-tasks) are
    tried in sequence, until a _valid_ response is obtained; a _valid_
    response is one that contributes to the task, either by ending it,
    or producing a response to be further acted on.
    Update `self.pending_message` to the latest valid response (or NO_ANSWER
    if no valid response was obtained from any responder).

    Args:
        turns (int): number of turns to process. Typically used in testing
            where there is no human to "quit out" of current level, or in cases
            where we want to limit the number of turns of a delegated agent.

    Returns (ChatDocument|None):
        Updated `self.pending_message`. Currently the return value is not used
            by the `task.run()` method, but we return this as a convenience for
            other use-cases, e.g. where we want to run a task step by step in a
            different context.
    """
    self.is_done = False
    parent = self.pending_message
    recipient = (
        ""
        if self.pending_message is None
        else self.pending_message.metadata.recipient
    )
    if not self._valid_recipient(recipient):
        logger.warning(f"Invalid recipient: {recipient}")
        error_doc = ChatDocument(
            content=f"Invalid recipient: {recipient}",
            metadata=ChatDocMetaData(
                sender=Entity.AGENT,
                sender_name=Entity.AGENT,
            ),
        )
        self._process_valid_responder_result(Entity.AGENT, parent, error_doc)
        return error_doc

    responders: List[Responder] = self.non_human_responders_async.copy()

    if (
        Entity.USER in self.responders
        and not self.human_tried
        and not self.agent.has_tool_message_attempt(self.pending_message)
    ):
        # Give human first chance if they haven't been tried in last step,
        # and the msg is not a tool-call attempt;
        # This ensures human gets a chance to respond,
        #   other than to a LLM tool-call.
        # When there's a tool msg attempt we want the
        #  Agent to be the next responder; this only makes a difference in an
        #  interactive setting: LLM generates tool, then we don't want user to
        #  have to respond, and instead let the agent_response handle the tool.
        responders.insert(0, Entity.USER)

    found_response = False
    # (responder, result) from a responder who explicitly said NO_ANSWER
    no_answer_response: None | Tuple[Responder, ChatDocument] = None
    for r in responders:
        self.is_pass_thru = False
        if not self._can_respond(r):
            # create dummy msg for logging
            log_doc = ChatDocument(
                content="[CANNOT RESPOND]",
                metadata=ChatDocMetaData(
                    sender=r if isinstance(r, Entity) else Entity.USER,
                    sender_name=str(r),
                    recipient=recipient,
                ),
            )
            # no need to register this dummy msg in ObjectRegistry
            ChatDocument.delete_id(log_doc.id())
            self.log_message(r, log_doc)
            continue
        self.human_tried = r == Entity.USER
        result = await self.response_async(r, turns)
        if result and NO_ANSWER in result.content:
            no_answer_response = (r, result)
        self.is_done = self._is_done_response(result, r)
        self.is_pass_thru = PASS in result.content if result else False
        if self.valid(result, r):
            found_response = True
            assert result is not None
            self._process_valid_responder_result(r, parent, result)
            break
        else:
            self.log_message(r, result)
        if self.is_done:
            # skip trying other responders in this step
            break
    if not found_response:
        if no_answer_response:
            # even though there was no valid response from anyone in this step,
            # if there was at least one who EXPLICITLY said NO_ANSWER, then
            # we process that as a valid response.
            r, result = no_answer_response
            self._process_valid_responder_result(r, parent, result)
        else:
            self._process_invalid_step_result(parent)
    self._show_pending_message_if_debug()
    return self.pending_message

response(e, turns=-1)

Sync version of response_async(). See response_async() for details.

Source code in langroid/agent/task.py
def response(
    self,
    e: Responder,
    turns: int = -1,
) -> Optional[ChatDocument]:
    """
    Sync version of `response_async()`. See `response_async()` for details.
    """
    if isinstance(e, Task):
        actual_turns = e.turns if e.turns > 0 else turns
        e.agent.callbacks.set_parent_agent(self.agent)
        # e.callbacks.set_parent_agent(self.agent)
        pending_tools = self.agent.try_get_tool_messages(self.pending_message)
        # TODO disable this
        if (
            len(pending_tools) > 1
            and len(self.agent.oai_tool_calls) > 1
            and not self.config.allow_subtask_multi_oai_tools
        ):
            result = self._forbid_multi_oai_tools(e)
        else:
            result = e.run(
                self.pending_message,
                turns=actual_turns,
                caller=self,
                max_cost=self.max_cost,
                max_tokens=self.max_tokens,
            )
            # update result.tool_messages if any
            if isinstance(result, ChatDocument):
                self.agent.try_get_tool_messages(result)
            if result is not None:
                content, id2result, oai_tool_id = self.agent.process_tool_results(
                    result.content,
                    result.oai_tool_id2result,
                    (
                        self.pending_message.oai_tool_calls
                        if isinstance(self.pending_message, ChatDocument)
                        else None
                    ),
                )
                result.content = content
                result.oai_tool_id2result = id2result
                result.metadata.oai_tool_id = oai_tool_id

        result_str = (  # only used by callback to display content and possible tool
            "NONE"
            if result is None
            else "\n\n".join(str(m) for m in ChatDocument.to_LLMMessage(result))
        )
        maybe_tool = len(extract_top_level_json(result_str)) > 0
        self.callbacks.show_subtask_response(
            task=e,
            content=result_str,
            is_tool=maybe_tool,
        )
    else:
        response_fn = self._entity_responder_map[cast(Entity, e)]
        result = response_fn(self.pending_message)
        # update result.tool_messages if any
        if isinstance(result, ChatDocument):
            self.agent.try_get_tool_messages(result)

    result_chat_doc = self.agent.to_ChatDocument(
        result,
        chat_doc=self.pending_message,
        author_entity=e if isinstance(e, Entity) else Entity.USER,
    )
    return self._process_result_routing(result_chat_doc, e)

response_async(e, turns=-1) async

Get response to self.pending_message from a responder. If response is valid (i.e. it ends the current turn of seeking responses): -then return the response as a ChatDocument object, -otherwise return None. Args: e (Responder): responder to get response from. turns (int): number of turns to run the task for. Default is -1, which means run until task is done.

Returns:

Type Description
Optional[ChatDocument]

Optional[ChatDocument]: response to self.pending_message from entity if

Optional[ChatDocument]

valid, None otherwise

Source code in langroid/agent/task.py
async def response_async(
    self,
    e: Responder,
    turns: int = -1,
) -> Optional[ChatDocument]:
    """
    Get response to `self.pending_message` from a responder.
    If response is __valid__ (i.e. it ends the current turn of seeking
    responses):
        -then return the response as a ChatDocument object,
        -otherwise return None.
    Args:
        e (Responder): responder to get response from.
        turns (int): number of turns to run the task for.
            Default is -1, which means run until task is done.

    Returns:
        Optional[ChatDocument]: response to `self.pending_message` from entity if
        valid, None otherwise
    """
    if isinstance(e, Task):
        actual_turns = e.turns if e.turns > 0 else turns
        e.agent.callbacks.set_parent_agent(self.agent)
        pending_tools = self.agent.try_get_tool_messages(self.pending_message)
        # TODO disable this
        if (
            len(pending_tools) > 1
            and len(self.agent.oai_tool_calls) > 1
            and not self.config.allow_subtask_multi_oai_tools
        ):
            result = self._forbid_multi_oai_tools(e)
        else:
            # e.callbacks.set_parent_agent(self.agent)
            result = await e.run_async(
                self.pending_message,
                turns=actual_turns,
                caller=self,
                max_cost=self.max_cost,
                max_tokens=self.max_tokens,
            )
            # update result.tool_messages if any
            if isinstance(result, ChatDocument):
                self.agent.try_get_tool_messages(result)
            if result is not None:
                content, id2result, oai_tool_id = self.agent.process_tool_results(
                    result.content,
                    result.oai_tool_id2result,
                    (
                        self.pending_message.oai_tool_calls
                        if isinstance(self.pending_message, ChatDocument)
                        else None
                    ),
                )
                result.content = content
                result.oai_tool_id2result = id2result
                result.metadata.oai_tool_id = oai_tool_id

        result_str = (  # only used by callback to display content and possible tool
            "NONE"
            if result is None
            else "\n\n".join(str(m) for m in ChatDocument.to_LLMMessage(result))
        )
        maybe_tool = len(extract_top_level_json(result_str)) > 0
        self.callbacks.show_subtask_response(
            task=e,
            content=result_str,
            is_tool=maybe_tool,
        )
    else:
        response_fn = self._entity_responder_async_map[cast(Entity, e)]
        result = await response_fn(self.pending_message)
        # update result.tool_messages if any
        if isinstance(result, ChatDocument):
            self.agent.try_get_tool_messages(result)

    result_chat_doc = self.agent.to_ChatDocument(
        result,
        chat_doc=self.pending_message,
        author_entity=e if isinstance(e, Entity) else Entity.USER,
    )
    return self._process_result_routing(result_chat_doc, e)

result(status=None)

Get result of task. This is the default behavior. Derived classes can override this.

Note the result of a task is returned as if it is from the User entity.

Parameters:

Name Type Description Default
status StatusCode

status of the task when it ended

None

Returns: ChatDocument: result of task

Source code in langroid/agent/task.py
def result(self, status: StatusCode | None = None) -> ChatDocument | None:
    """
    Get result of task. This is the default behavior.
    Derived classes can override this.

    Note the result of a task is returned as if it is from the User entity.

    Args:
        status (StatusCode): status of the task when it ended
    Returns:
        ChatDocument: result of task
    """
    if status in [StatusCode.STALLED, StatusCode.MAX_TURNS, StatusCode.INF_LOOP]:
        # In these case we don't know (and don't want to try to guess)
        # what the task result should be, so we return None
        return None

    result_msg = self.pending_message

    content = result_msg.content if result_msg else ""
    content_any = result_msg.content_any if result_msg else None
    if DONE in content and self.config.recognize_string_signals:
        # assuming it is of the form "DONE: <content>"
        content = content.replace(DONE, "").strip()
    oai_tool_calls = result_msg.oai_tool_calls if result_msg else None
    oai_tool_id2result = result_msg.oai_tool_id2result if result_msg else None
    fun_call = result_msg.function_call if result_msg else None
    tool_messages = result_msg.tool_messages if result_msg else []
    # if there is a DoneTool or AgentDoneTool among these,
    # we extract content and tools from here, and ignore all others
    for t in tool_messages:
        if isinstance(t, FinalResultTool):
            content = ""
            content_any = None
            tool_messages = [t]  # pass it on to parent so it also quits
            break
        elif isinstance(t, (AgentDoneTool, DoneTool)):
            # there shouldn't be multiple tools like this; just take the first
            content = to_string(t.content)
            content_any = t.content
            fun_call = None
            oai_tool_calls = None
            if isinstance(t, AgentDoneTool):
                # AgentDoneTool may have tools, unlike DoneTool
                tool_messages = t.tools
            break
    # drop the "Done" tools since they should not be part of the task result,
    # or else they would cause the parent task to get unintentionally done!
    tool_messages = [
        t for t in tool_messages if not isinstance(t, (DoneTool, AgentDoneTool))
    ]
    block = result_msg.metadata.block if result_msg else None
    recipient = result_msg.metadata.recipient if result_msg else ""
    tool_ids = result_msg.metadata.tool_ids if result_msg else []

    # regardless of which entity actually produced the result,
    # when we return the result, we set entity to USER
    # since to the "parent" task, this result is equivalent to a response from USER
    result_doc = ChatDocument(
        content=content,
        content_any=content_any,
        oai_tool_calls=oai_tool_calls,
        oai_tool_id2result=oai_tool_id2result,
        function_call=fun_call,
        tool_messages=tool_messages,
        metadata=ChatDocMetaData(
            source=Entity.USER,
            sender=Entity.USER,
            block=block,
            status=status or (result_msg.metadata.status if result_msg else None),
            sender_name=self.name,
            recipient=recipient,
            tool_ids=tool_ids,
            parent_id=result_msg.id() if result_msg else "",
            agent_id=str(self.agent.id),
        ),
    )
    if self.pending_message is not None:
        self.pending_message.metadata.child_id = result_doc.id()

    return result_doc

done(result=None, r=None)

Check if task is done. This is the default behavior. Derived classes can override this. Args: result (ChatDocument|None): result from a responder r (Responder|None): responder that produced the result Not used here, but could be used by derived classes. Returns: bool: True if task is done, False otherwise StatusCode: status code indicating why task is done

Source code in langroid/agent/task.py
def done(
    self, result: ChatDocument | None = None, r: Responder | None = None
) -> Tuple[bool, StatusCode]:
    """
    Check if task is done. This is the default behavior.
    Derived classes can override this.
    Args:
        result (ChatDocument|None): result from a responder
        r (Responder|None): responder that produced the result
            Not used here, but could be used by derived classes.
    Returns:
        bool: True if task is done, False otherwise
        StatusCode: status code indicating why task is done
    """
    if self._is_kill():
        return (True, StatusCode.KILL)
    result = result or self.pending_message
    allow_done_string = self.config.recognize_string_signals
    # An entity decided task is done, either via DoneTool,
    # or by explicitly saying DONE
    done_result = result is not None and (
        (
            DONE in (result.content if isinstance(result, str) else result.content)
            and allow_done_string
        )
        or any(
            isinstance(t, (DoneTool, AgentDoneTool, FinalResultTool))
            for t in result.tool_messages
        )
    )

    user_quit = (
        result is not None
        and (result.content in USER_QUIT_STRINGS or done_result)
        and result.metadata.sender == Entity.USER
    )
    if self._level == 0 and self._user_can_respond() and self.only_user_quits_root:
        # for top-level task, only user can quit out
        return (user_quit, StatusCode.USER_QUIT if user_quit else StatusCode.OK)

    if self.is_done:
        return (True, StatusCode.DONE)

    if self.n_stalled_steps >= self.max_stalled_steps:
        # we are stuck, so bail to avoid infinite loop
        logger.warning(
            f"Task {self.name} stuck for {self.max_stalled_steps} steps; exiting."
        )
        return (True, StatusCode.STALLED)

    if self.max_cost > 0 and self.agent.llm is not None:
        try:
            if self.agent.llm.tot_tokens_cost()[1] > self.max_cost:
                logger.warning(
                    f"Task {self.name} cost exceeded {self.max_cost}; exiting."
                )
                return (True, StatusCode.MAX_COST)
        except Exception:
            pass

    if self.max_tokens > 0 and self.agent.llm is not None:
        try:
            if self.agent.llm.tot_tokens_cost()[0] > self.max_tokens:
                logger.warning(
                    f"Task {self.name} uses > {self.max_tokens} tokens; exiting."
                )
                return (True, StatusCode.MAX_TOKENS)
        except Exception:
            pass
    final = (
        # no valid response from any entity/agent in current turn
        result is None
        or done_result
        or (  # current task is addressing message to caller task
            self.caller is not None
            and self.caller.name != ""
            and result.metadata.recipient == self.caller.name
        )
        or user_quit
    )
    return (final, StatusCode.OK)

valid(result, r)

Is the result from a Responder (i.e. an entity or sub-task) such that we can stop searching for responses in this step?

Source code in langroid/agent/task.py
def valid(
    self,
    result: Optional[ChatDocument],
    r: Responder,
) -> bool:
    """
    Is the result from a Responder (i.e. an entity or sub-task)
    such that we can stop searching for responses in this step?
    """
    # TODO caution we should ensure that no handler method (tool) returns simply
    # an empty string (e.g when showing contents of an empty file), since that
    # would be considered an invalid response, and other responders will wrongly
    # be given a chance to respond.

    # if task would be considered done given responder r's `result`,
    # then consider the result valid.
    if result is not None and self.done(result, r)[0]:
        return True
    return (
        result is not None
        and not self._is_empty_message(result)
        # some weaker LLMs, including even GPT-4o, may say "DO-NOT-KNOW."
        # (with a punctuation at the end), so need to strip out punctuation
        and re.sub(r"[,.!?:]", "", result.content.strip()) != NO_ANSWER
    )

log_message(resp, msg=None, mark=False)

Log current pending message, and related state, for lineage/debugging purposes.

Parameters:

Name Type Description Default
resp Responder

Responder that generated the msg

required
msg ChatDocument

Message to log. Defaults to None.

None
mark bool

Whether to mark the message as the final result of a task.step() call. Defaults to False.

False
Source code in langroid/agent/task.py
def log_message(
    self,
    resp: Responder,
    msg: ChatDocument | None = None,
    mark: bool = False,
) -> None:
    """
    Log current pending message, and related state, for lineage/debugging purposes.

    Args:
        resp (Responder): Responder that generated the `msg`
        msg (ChatDocument, optional): Message to log. Defaults to None.
        mark (bool, optional): Whether to mark the message as the final result of
            a `task.step()` call. Defaults to False.
    """
    default_values = ChatDocLoggerFields().dict().values()
    msg_str_tsv = "\t".join(str(v) for v in default_values)
    if msg is not None:
        msg_str_tsv = msg.tsv_str()

    mark_str = "*" if mark else " "
    task_name = self.name if self.name != "" else "root"
    resp_color = "white" if mark else "red"
    resp_str = f"[{resp_color}] {resp} [/{resp_color}]"

    if msg is None:
        msg_str = f"{mark_str}({task_name}) {resp_str}"
    else:
        color = {
            Entity.LLM: "green",
            Entity.USER: "blue",
            Entity.AGENT: "red",
            Entity.SYSTEM: "magenta",
        }[msg.metadata.sender]
        f = msg.log_fields()
        tool_type = f.tool_type.rjust(6)
        tool_name = f.tool.rjust(10)
        tool_str = f"{tool_type}({tool_name})" if tool_name != "" else ""
        sender = f"[{color}]" + str(f.sender_entity).rjust(10) + f"[/{color}]"
        sender_name = f.sender_name.rjust(10)
        recipient = "=>" + str(f.recipient).rjust(10)
        block = "X " + str(f.block or "").rjust(10)
        content = f"[{color}]{f.content}[/{color}]"
        msg_str = (
            f"{mark_str}({task_name}) "
            f"{resp_str} {sender}({sender_name}) "
            f"({recipient}) ({block}) {tool_str} {content}"
        )

    if self.logger is not None:
        self.logger.log(msg_str)
    if self.tsv_logger is not None:
        resp_str = str(resp)
        self.tsv_logger.info(f"{mark_str}\t{task_name}\t{resp_str}\t{msg_str_tsv}")

set_color_log(enable=True)

Flag to enable/disable color logging using rich.console. In some contexts, such as Colab notebooks, we may want to disable color logging using rich.console, since those logs show up in the cell output rather than in the log file. Turning off this feature will still create logs, but without the color formatting from rich.console Args: enable (bool): value of self.color_log to set to, which will enable/diable rich logging

Source code in langroid/agent/task.py
def set_color_log(self, enable: bool = True) -> None:
    """
    Flag to enable/disable color logging using rich.console.
    In some contexts, such as Colab notebooks, we may want to disable color logging
    using rich.console, since those logs show up in the cell output rather than
    in the log file. Turning off this feature will still create logs, but without
    the color formatting from rich.console
    Args:
        enable (bool): value of `self.color_log` to set to,
            which will enable/diable rich logging

    """
    self.color_log = enable