Skip to content

agent

langroid/agent/init.py

Agent(config=AgentConfig())

Bases: ABC

An Agent is an abstraction that encapsulates mainly two components:

  • a language model (LLM)
  • a vector store (vecdb)

plus associated components such as a parser, and variables that hold information about any tool/function-calling messages that have been defined.

Source code in langroid/agent/base.py
def __init__(self, config: AgentConfig = AgentConfig()):
    self.config = config
    self.lock = asyncio.Lock()  # for async access to update self.llm.usage_cost
    self.dialog: List[Tuple[str, str]] = []  # seq of LLM (prompt, response) tuples
    self.llm_tools_map: Dict[str, Type[ToolMessage]] = {}
    self.llm_tools_handled: Set[str] = set()
    self.llm_tools_usable: Set[str] = set()
    self.llm_tools_known: Set[str] = set()  # all known tools, handled/used or not
    # Indicates which tool-names are allowed to be inferred when
    # the LLM "forgets" to include the request field in its
    # tool-call.
    self.enabled_requests_for_inference: Optional[Set[str]] = (
        None  # If None, we allow all
    )
    self.interactive: bool = True  # may be modified by Task wrapper
    self.token_stats_str = ""
    self.default_human_response: Optional[str] = None
    self._indent = ""
    self.llm = LanguageModel.create(config.llm)
    self.vecdb = VectorStore.create(config.vecdb) if config.vecdb else None
    self.tool_error = False
    if config.parsing is not None and self.config.llm is not None:
        # token_encoding_model is used to obtain the tokenizer,
        # so in case it's an OpenAI model, we ensure that the tokenizer
        # corresponding to the model is used.
        if isinstance(self.llm, OpenAIGPT) and self.llm.is_openai_chat_model():
            config.parsing.token_encoding_model = self.llm.config.chat_model
    self.parser: Optional[Parser] = (
        Parser(config.parsing) if config.parsing else None
    )
    if config.add_to_registry:
        ObjectRegistry.register_object(self)

    self.callbacks = SimpleNamespace(
        start_llm_stream=lambda: noop_fn,
        start_llm_stream_async=async_lambda_noop_fn,
        cancel_llm_stream=noop_fn,
        finish_llm_stream=noop_fn,
        show_llm_response=noop_fn,
        show_agent_response=noop_fn,
        get_user_response=None,
        get_user_response_async=None,
        get_last_step=noop_fn,
        set_parent_agent=noop_fn,
        show_error_message=noop_fn,
        show_start_response=noop_fn,
    )
    Agent.init_state(self)

indent: str property writable

Indentation to print before any responses from the agent's entities.

all_llm_tools_known: set[str] property

All known tools; this may extend self.llm_tools_known.

init_state()

Initialize all state vars. Called by Task.run() if restart is True

Source code in langroid/agent/base.py
def init_state(self) -> None:
    """Initialize all state vars. Called by Task.run() if restart is True"""
    self.total_llm_token_cost = 0.0
    self.total_llm_token_usage = 0

entity_responders()

Sequence of (entity, response_method) pairs. This sequence is used in a Task to respond to the current pending message. See Task.step() for details. Returns: Sequence of (entity, response_method) pairs.

Source code in langroid/agent/base.py
def entity_responders(
    self,
) -> List[
    Tuple[Entity, Callable[[None | str | ChatDocument], None | ChatDocument]]
]:
    """
    Sequence of (entity, response_method) pairs. This sequence is used
        in a `Task` to respond to the current pending message.
        See `Task.step()` for details.
    Returns:
        Sequence of (entity, response_method) pairs.
    """
    return [
        (Entity.AGENT, self.agent_response),
        (Entity.LLM, self.llm_response),
        (Entity.USER, self.user_response),
    ]

entity_responders_async()

Async version of entity_responders. See there for details.

Source code in langroid/agent/base.py
def entity_responders_async(
    self,
) -> List[
    Tuple[
        Entity,
        Callable[
            [None | str | ChatDocument], Coroutine[Any, Any, None | ChatDocument]
        ],
    ]
]:
    """
    Async version of `entity_responders`. See there for details.
    """
    return [
        (Entity.AGENT, self.agent_response_async),
        (Entity.LLM, self.llm_response_async),
        (Entity.USER, self.user_response_async),
    ]

enable_message_handling(message_class=None)

Enable an agent to RESPOND (i.e. handle) a "tool" message of a specific type from LLM. Also "registers" (i.e. adds) the message_class to the self.llm_tools_map dict.

Parameters:

Name Type Description Default
message_class Optional[Type[ToolMessage]]

The message class to enable; Optional; if None, all known message classes are enabled for handling.

None
Source code in langroid/agent/base.py
def enable_message_handling(
    self, message_class: Optional[Type[ToolMessage]] = None
) -> None:
    """
    Enable an agent to RESPOND (i.e. handle) a "tool" message of a specific type
        from LLM. Also "registers" (i.e. adds) the `message_class` to the
        `self.llm_tools_map` dict.

    Args:
        message_class (Optional[Type[ToolMessage]]): The message class to enable;
            Optional; if None, all known message classes are enabled for handling.

    """
    for t in self._get_tool_list(message_class):
        self.llm_tools_handled.add(t)

disable_message_handling(message_class=None)

Disable a message class from being handled by this Agent.

Parameters:

Name Type Description Default
message_class Optional[Type[ToolMessage]]

The message class to disable. If None, all message classes are disabled.

None
Source code in langroid/agent/base.py
def disable_message_handling(
    self,
    message_class: Optional[Type[ToolMessage]] = None,
) -> None:
    """
    Disable a message class from being handled by this Agent.

    Args:
        message_class (Optional[Type[ToolMessage]]): The message class to disable.
            If None, all message classes are disabled.
    """
    for t in self._get_tool_list(message_class):
        self.llm_tools_handled.discard(t)

sample_multi_round_dialog()

Generate a sample multi-round dialog based on enabled message classes. Returns: str: The sample dialog string.

Source code in langroid/agent/base.py
def sample_multi_round_dialog(self) -> str:
    """
    Generate a sample multi-round dialog based on enabled message classes.
    Returns:
        str: The sample dialog string.
    """
    enabled_classes: List[Type[ToolMessage]] = list(self.llm_tools_map.values())
    # use at most 2 sample conversations, no need to be exhaustive;
    sample_convo = [
        msg_cls().usage_examples(random=True)  # type: ignore
        for i, msg_cls in enumerate(enabled_classes)
        if i < 2
    ]
    return "\n\n".join(sample_convo)

create_agent_response(content=None, content_any=None, tool_messages=[], oai_tool_calls=None, oai_tool_choice='auto', oai_tool_id2result=None, function_call=None, recipient='')

Template for agent_response.

Source code in langroid/agent/base.py
def create_agent_response(
    self,
    content: str | None = None,
    content_any: Any = None,
    tool_messages: List[ToolMessage] = [],
    oai_tool_calls: Optional[List[OpenAIToolCall]] = None,
    oai_tool_choice: ToolChoiceTypes | Dict[str, Dict[str, str] | str] = "auto",
    oai_tool_id2result: OrderedDict[str, str] | None = None,
    function_call: LLMFunctionCall | None = None,
    recipient: str = "",
) -> ChatDocument:
    """Template for agent_response."""
    return self.response_template(
        Entity.AGENT,
        content=content,
        content_any=content_any,
        tool_messages=tool_messages,
        oai_tool_calls=oai_tool_calls,
        oai_tool_choice=oai_tool_choice,
        oai_tool_id2result=oai_tool_id2result,
        function_call=function_call,
        recipient=recipient,
    )

agent_response_async(msg=None) async

Asynch version of agent_response. See there for details.

Source code in langroid/agent/base.py
async def agent_response_async(
    self,
    msg: Optional[str | ChatDocument] = None,
) -> Optional[ChatDocument]:
    """
    Asynch version of `agent_response`. See there for details.
    """
    if msg is None:
        return None

    results = await self.handle_message_async(msg)

    return self._agent_response_final(msg, results)

agent_response(msg=None)

Response from the "agent itself", typically (but not only) used to handle LLM's "tool message" or function_call (e.g. OpenAI function_call). Args: msg (str|ChatDocument): the input to respond to: if msg is a string, and it contains a valid JSON-structured "tool message", or if msg is a ChatDocument, and it contains a function_call. Returns: Optional[ChatDocument]: the response, packaged as a ChatDocument

Source code in langroid/agent/base.py
def agent_response(
    self,
    msg: Optional[str | ChatDocument] = None,
) -> Optional[ChatDocument]:
    """
    Response from the "agent itself", typically (but not only)
    used to handle LLM's "tool message" or `function_call`
    (e.g. OpenAI `function_call`).
    Args:
        msg (str|ChatDocument): the input to respond to: if msg is a string,
            and it contains a valid JSON-structured "tool message", or
            if msg is a ChatDocument, and it contains a `function_call`.
    Returns:
        Optional[ChatDocument]: the response, packaged as a ChatDocument

    """
    if msg is None:
        return None

    results = self.handle_message(msg)

    return self._agent_response_final(msg, results)

process_tool_results(results, id2result, tool_calls=None)

Process results from a response, based on whether they are results of OpenAI tool-calls from THIS agent, so that we can construct an appropriate LLMMessage that contains tool results.

Parameters:

Name Type Description Default
results str

A possible string result from handling tool(s)

required
id2result OrderedDict[str, str] | None

A dict of OpenAI tool id -> result, if there are multiple tool results.

required
tool_calls List[OpenAIToolCall] | None

List of OpenAI tool-calls that the results are a response to.

None
Return
  • str: The response string
  • Dict[str,str]|None: A dict of OpenAI tool id -> result, if there are multiple tool results.
  • str|None: tool_id if there was a single tool result
Source code in langroid/agent/base.py
def process_tool_results(
    self,
    results: str,
    id2result: OrderedDict[str, str] | None,
    tool_calls: List[OpenAIToolCall] | None = None,
) -> Tuple[str, Dict[str, str] | None, str | None]:
    """
    Process results from a response, based on whether
    they are results of OpenAI tool-calls from THIS agent, so that
    we can construct an appropriate LLMMessage that contains tool results.

    Args:
        results (str): A possible string result from handling tool(s)
        id2result (OrderedDict[str,str]|None): A dict of OpenAI tool id -> result,
            if there are multiple tool results.
        tool_calls (List[OpenAIToolCall]|None): List of OpenAI tool-calls that the
            results are a response to.

    Return:
        - str: The response string
        - Dict[str,str]|None: A dict of OpenAI tool id -> result, if there are
            multiple tool results.
        - str|None: tool_id if there was a single tool result

    """
    id2result_ = copy.deepcopy(id2result) if id2result is not None else None
    results_str = ""
    oai_tool_id = None

    if results != "":
        # in this case ignore id2result
        assert (
            id2result is None
        ), "id2result should be None when results string is non-empty!"
        results_str = results
        if len(self.oai_tool_calls) > 0:
            # We only have one result, so in case there is a
            # "pending" OpenAI tool-call, we expect no more than 1 such.
            assert (
                len(self.oai_tool_calls) == 1
            ), "There are multiple pending tool-calls, but only one result!"
            # We record the tool_id of the tool-call that
            # the result is a response to, so that ChatDocument.to_LLMMessage
            # can properly set the `tool_call_id` field of the LLMMessage.
            oai_tool_id = self.oai_tool_calls[0].id
    elif id2result is not None and id2result_ is not None:  # appease mypy
        if len(id2result_) == len(self.oai_tool_calls):
            # if the number of pending tool calls equals the number of results,
            # then ignore the ids in id2result, and use the results in order,
            # which is preserved since id2result is an OrderedDict.
            assert len(id2result_) > 1, "Expected to see > 1 result in id2result!"
            results_str = ""
            id2result_ = OrderedDict(
                zip(
                    [tc.id or "" for tc in self.oai_tool_calls], id2result_.values()
                )
            )
        else:
            assert (
                tool_calls is not None
            ), "tool_calls cannot be None when id2result is not None!"
            # This must be an OpenAI tool id -> result map;
            # However some ids may not correspond to the tool-calls in the list of
            # pending tool-calls (self.oai_tool_calls).
            # Such results are concatenated into a simple string, to store in the
            # ChatDocument.content, and the rest
            # (i.e. those that DO correspond to tools in self.oai_tool_calls)
            # are stored as a dict in ChatDocument.oai_tool_id2result.

            # OAI tools from THIS agent, awaiting response
            pending_tool_ids = [tc.id for tc in self.oai_tool_calls]
            # tool_calls that the results are a response to
            # (but these may have been sent from another agent, hence may not be in
            # self.oai_tool_calls)
            parent_tool_id2name = {
                tc.id: tc.function.name
                for tc in tool_calls or []
                if tc.function is not None
            }

            # (id, result) for result NOT corresponding to self.oai_tool_calls,
            # i.e. these are results of EXTERNAL tool-calls from another agent.
            external_tool_id_results = []

            for tc_id, result in id2result.items():
                if tc_id not in pending_tool_ids:
                    external_tool_id_results.append((tc_id, result))
                    id2result_.pop(tc_id)
            if len(external_tool_id_results) == 0:
                results_str = ""
            elif len(external_tool_id_results) == 1:
                results_str = external_tool_id_results[0][1]
            else:
                results_str = "\n\n".join(
                    [
                        f"Result from tool/function "
                        f"{parent_tool_id2name[id]}: {result}"
                        for id, result in external_tool_id_results
                    ]
                )

            if len(id2result_) == 0:
                id2result_ = None
            elif len(id2result_) == 1 and len(external_tool_id_results) == 0:
                results_str = list(id2result_.values())[0]
                oai_tool_id = list(id2result_.keys())[0]
                id2result_ = None

    return results_str, id2result_, oai_tool_id

response_template(e, content=None, content_any=None, tool_messages=[], oai_tool_calls=None, oai_tool_choice='auto', oai_tool_id2result=None, function_call=None, recipient='')

Template for response from entity e.

Source code in langroid/agent/base.py
def response_template(
    self,
    e: Entity,
    content: str | None = None,
    content_any: Any = None,
    tool_messages: List[ToolMessage] = [],
    oai_tool_calls: Optional[List[OpenAIToolCall]] = None,
    oai_tool_choice: ToolChoiceTypes | Dict[str, Dict[str, str] | str] = "auto",
    oai_tool_id2result: OrderedDict[str, str] | None = None,
    function_call: LLMFunctionCall | None = None,
    recipient: str = "",
) -> ChatDocument:
    """Template for response from entity `e`."""
    return ChatDocument(
        content=content or "",
        content_any=content_any,
        tool_messages=tool_messages,
        oai_tool_calls=oai_tool_calls,
        oai_tool_id2result=oai_tool_id2result,
        function_call=function_call,
        oai_tool_choice=oai_tool_choice,
        metadata=ChatDocMetaData(
            source=e, sender=e, sender_name=self.config.name, recipient=recipient
        ),
    )

create_user_response(content=None, content_any=None, tool_messages=[], oai_tool_calls=None, oai_tool_choice='auto', oai_tool_id2result=None, function_call=None, recipient='')

Template for user_response.

Source code in langroid/agent/base.py
def create_user_response(
    self,
    content: str | None = None,
    content_any: Any = None,
    tool_messages: List[ToolMessage] = [],
    oai_tool_calls: List[OpenAIToolCall] | None = None,
    oai_tool_choice: ToolChoiceTypes | Dict[str, Dict[str, str] | str] = "auto",
    oai_tool_id2result: OrderedDict[str, str] | None = None,
    function_call: LLMFunctionCall | None = None,
    recipient: str = "",
) -> ChatDocument:
    """Template for user_response."""
    return self.response_template(
        e=Entity.USER,
        content=content,
        content_any=content_any,
        tool_messages=tool_messages,
        oai_tool_calls=oai_tool_calls,
        oai_tool_choice=oai_tool_choice,
        oai_tool_id2result=oai_tool_id2result,
        function_call=function_call,
        recipient=recipient,
    )

user_can_respond(msg=None)

Whether the user can respond to a message.

Parameters:

Name Type Description Default
msg str | ChatDocument

the string to respond to.

None

Returns:

Source code in langroid/agent/base.py
def user_can_respond(self, msg: Optional[str | ChatDocument] = None) -> bool:
    """
    Whether the user can respond to a message.

    Args:
        msg (str|ChatDocument): the string to respond to.

    Returns:

    """
    # When msg explicitly addressed to user, this means an actual human response
    # is being sought.
    need_human_response = (
        isinstance(msg, ChatDocument) and msg.metadata.recipient == Entity.USER
    )

    if not self.interactive and not need_human_response:
        return False

    return True

user_response_async(msg=None) async

Asynch version of user_response. See there for details.

Source code in langroid/agent/base.py
async def user_response_async(
    self,
    msg: Optional[str | ChatDocument] = None,
) -> Optional[ChatDocument]:
    """
    Asynch version of `user_response`. See there for details.
    """
    if not self.user_can_respond(msg):
        return None

    if self.default_human_response is not None:
        user_msg = self.default_human_response
    else:
        if (
            self.callbacks.get_user_response_async is not None
            and self.callbacks.get_user_response_async is not async_noop_fn
        ):
            user_msg = await self.callbacks.get_user_response_async(prompt="")
        elif self.callbacks.get_user_response is not None:
            user_msg = self.callbacks.get_user_response(prompt="")
        else:
            user_msg = Prompt.ask(
                f"[blue]{self.indent}"
                + self.config.human_prompt
                + f"\n{self.indent}"
            )

    return self._user_response_final(msg, user_msg)

user_response(msg=None)

Get user response to current message. Could allow (human) user to intervene with an actual answer, or quit using "q" or "x"

Parameters:

Name Type Description Default
msg str | ChatDocument

the string to respond to.

None

Returns:

Type Description
Optional[ChatDocument]

(str) User response, packaged as a ChatDocument

Source code in langroid/agent/base.py
def user_response(
    self,
    msg: Optional[str | ChatDocument] = None,
) -> Optional[ChatDocument]:
    """
    Get user response to current message. Could allow (human) user to intervene
    with an actual answer, or quit using "q" or "x"

    Args:
        msg (str|ChatDocument): the string to respond to.

    Returns:
        (str) User response, packaged as a ChatDocument

    """

    if not self.user_can_respond(msg):
        return None

    if self.default_human_response is not None:
        user_msg = self.default_human_response
    else:
        if self.callbacks.get_user_response is not None:
            # ask user with empty prompt: no need for prompt
            # since user has seen the conversation so far.
            # But non-empty prompt can be useful when Agent
            # uses a tool that requires user input, or in other scenarios.
            user_msg = self.callbacks.get_user_response(prompt="")
        else:
            user_msg = Prompt.ask(
                f"[blue]{self.indent}"
                + self.config.human_prompt
                + f"\n{self.indent}"
            )

    return self._user_response_final(msg, user_msg)

llm_can_respond(message=None)

Whether the LLM can respond to a message. Args: message (str|ChatDocument): message or ChatDocument object to respond to.

Returns:

Source code in langroid/agent/base.py
@no_type_check
def llm_can_respond(self, message: Optional[str | ChatDocument] = None) -> bool:
    """
    Whether the LLM can respond to a message.
    Args:
        message (str|ChatDocument): message or ChatDocument object to respond to.

    Returns:

    """
    if self.llm is None:
        return False

    if message is not None and len(self.try_get_tool_messages(message)) > 0:
        # if there is a valid "tool" message (either JSON or via `function_call`)
        # then LLM cannot respond to it
        return False

    return True

can_respond(message=None)

Whether the agent can respond to a message. Used in Task.py to skip a sub-task when we know it would not respond. Args: message (str|ChatDocument): message or ChatDocument object to respond to.

Source code in langroid/agent/base.py
def can_respond(self, message: Optional[str | ChatDocument] = None) -> bool:
    """
    Whether the agent can respond to a message.
    Used in Task.py to skip a sub-task when we know it would not respond.
    Args:
        message (str|ChatDocument): message or ChatDocument object to respond to.
    """
    tools = self.try_get_tool_messages(message)
    if len(tools) == 0 and self.config.respond_tools_only:
        return False
    if message is not None and self.has_only_unhandled_tools(message):
        # The message has tools that are NOT enabled to be handled by this agent,
        # which means the agent cannot respond to it.
        return False
    return True

create_llm_response(content=None, content_any=None, tool_messages=[], oai_tool_calls=None, oai_tool_choice='auto', oai_tool_id2result=None, function_call=None, recipient='')

Template for llm_response.

Source code in langroid/agent/base.py
def create_llm_response(
    self,
    content: str | None = None,
    content_any: Any = None,
    tool_messages: List[ToolMessage] = [],
    oai_tool_calls: None | List[OpenAIToolCall] = None,
    oai_tool_choice: ToolChoiceTypes | Dict[str, Dict[str, str] | str] = "auto",
    oai_tool_id2result: OrderedDict[str, str] | None = None,
    function_call: LLMFunctionCall | None = None,
    recipient: str = "",
) -> ChatDocument:
    """Template for llm_response."""
    return self.response_template(
        Entity.LLM,
        content=content,
        content_any=content_any,
        tool_messages=tool_messages,
        oai_tool_calls=oai_tool_calls,
        oai_tool_choice=oai_tool_choice,
        oai_tool_id2result=oai_tool_id2result,
        function_call=function_call,
        recipient=recipient,
    )

llm_response_async(message=None) async

Asynch version of llm_response. See there for details.

Source code in langroid/agent/base.py
@no_type_check
async def llm_response_async(
    self,
    message: Optional[str | ChatDocument] = None,
) -> Optional[ChatDocument]:
    """
    Asynch version of `llm_response`. See there for details.
    """
    if message is None or not self.llm_can_respond(message):
        return None

    if isinstance(message, ChatDocument):
        prompt = message.content
    else:
        prompt = message

    output_len = self.config.llm.max_output_tokens
    if self.num_tokens(prompt) + output_len > self.llm.completion_context_length():
        output_len = self.llm.completion_context_length() - self.num_tokens(prompt)
        if output_len < self.config.llm.min_output_tokens:
            raise ValueError(
                """
            Token-length of Prompt + Output is longer than the
            completion context length of the LLM!
            """
            )
        else:
            logger.warning(
                f"""
            Requested output length has been shortened to {output_len}
            so that the total length of Prompt + Output is less than
            the completion context length of the LLM.
            """
            )

    with StreamingIfAllowed(self.llm, self.llm.get_stream()):
        response = await self.llm.agenerate(prompt, output_len)

    if not self.llm.get_stream() or response.cached and not settings.quiet:
        # We would have already displayed the msg "live" ONLY if
        # streaming was enabled, AND we did not find a cached response.
        # If we are here, it means the response has not yet been displayed.
        cached = f"[red]{self.indent}(cached)[/red]" if response.cached else ""
        print(cached + "[green]" + escape(response.message))
    async with self.lock:
        self.update_token_usage(
            response,
            prompt,
            self.llm.get_stream(),
            chat=False,  # i.e. it's a completion model not chat model
            print_response_stats=self.config.show_stats and not settings.quiet,
        )
    cdoc = ChatDocument.from_LLMResponse(response, displayed=True)
    # Preserve trail of tool_ids for OpenAI Assistant fn-calls
    cdoc.metadata.tool_ids = (
        [] if isinstance(message, str) else message.metadata.tool_ids
    )
    return cdoc

llm_response(message=None)

LLM response to a prompt. Args: message (str|ChatDocument): prompt string, or ChatDocument object

Returns:

Type Description
Optional[ChatDocument]

Response from LLM, packaged as a ChatDocument

Source code in langroid/agent/base.py
@no_type_check
def llm_response(
    self,
    message: Optional[str | ChatDocument] = None,
) -> Optional[ChatDocument]:
    """
    LLM response to a prompt.
    Args:
        message (str|ChatDocument): prompt string, or ChatDocument object

    Returns:
        Response from LLM, packaged as a ChatDocument
    """
    if message is None or not self.llm_can_respond(message):
        return None

    if isinstance(message, ChatDocument):
        prompt = message.content
    else:
        prompt = message

    with ExitStack() as stack:  # for conditionally using rich spinner
        if not self.llm.get_stream():
            # show rich spinner only if not streaming!
            cm = status("LLM responding to message...")
            stack.enter_context(cm)
        output_len = self.config.llm.max_output_tokens
        if (
            self.num_tokens(prompt) + output_len
            > self.llm.completion_context_length()
        ):
            output_len = self.llm.completion_context_length() - self.num_tokens(
                prompt
            )
            if output_len < self.config.llm.min_output_tokens:
                raise ValueError(
                    """
                Token-length of Prompt + Output is longer than the
                completion context length of the LLM!
                """
                )
            else:
                logger.warning(
                    f"""
                Requested output length has been shortened to {output_len}
                so that the total length of Prompt + Output is less than
                the completion context length of the LLM.
                """
                )
        if self.llm.get_stream() and not settings.quiet:
            console.print(f"[green]{self.indent}", end="")
        response = self.llm.generate(prompt, output_len)

    if not self.llm.get_stream() or response.cached and not settings.quiet:
        # we would have already displayed the msg "live" ONLY if
        # streaming was enabled, AND we did not find a cached response
        # If we are here, it means the response has not yet been displayed.
        cached = f"[red]{self.indent}(cached)[/red]" if response.cached else ""
        console.print(f"[green]{self.indent}", end="")
        print(cached + "[green]" + escape(response.message))
    self.update_token_usage(
        response,
        prompt,
        self.llm.get_stream(),
        chat=False,  # i.e. it's a completion model not chat model
        print_response_stats=self.config.show_stats and not settings.quiet,
    )
    cdoc = ChatDocument.from_LLMResponse(response, displayed=True)
    # Preserve trail of tool_ids for OpenAI Assistant fn-calls
    cdoc.metadata.tool_ids = (
        [] if isinstance(message, str) else message.metadata.tool_ids
    )
    return cdoc

has_tool_message_attempt(msg)

Check whether msg contains a Tool/fn-call attempt (by the LLM).

CAUTION: This uses self.get_tool_messages(msg) which as a side-effect may update msg.tool_messages when msg is a ChatDocument, if there are any tools in msg.

Source code in langroid/agent/base.py
def has_tool_message_attempt(self, msg: str | ChatDocument | None) -> bool:
    """
    Check whether msg contains a Tool/fn-call attempt (by the LLM).

    CAUTION: This uses self.get_tool_messages(msg) which as a side-effect
    may update msg.tool_messages when msg is a ChatDocument, if there are
    any tools in msg.
    """
    if msg is None:
        return False
    try:
        tools = self.get_tool_messages(msg)
        return len(tools) > 0
    except (ValidationError, XMLException):
        # there is a tool/fn-call attempt but had a validation error,
        # so we still consider this a tool message "attempt"
        return True
    return False

has_only_unhandled_tools(msg)

Does the msg have at least one tool, and ALL tools are disabled for handling by this agent?

Source code in langroid/agent/base.py
def has_only_unhandled_tools(self, msg: str | ChatDocument) -> bool:
    """
    Does the msg have at least one tool, and ALL tools are
    disabled for handling by this agent?
    """
    if msg is None:
        return False
    tools = self.try_get_tool_messages(msg, all_tools=True)
    if len(tools) == 0:
        return False
    return all(not self._tool_recipient_match(t) for t in tools)

get_tool_messages(msg, all_tools=False)

Get ToolMessages recognized in msg, handle-able by this agent. NOTE: as a side-effect, this will update msg.tool_messages when msg is a ChatDocument and msg contains tool messages. The intent here is that update=True should be set ONLY within agent_response() or agent_response_async() methods. In other words, we want to persist the msg.tool_messages only AFTER the agent has had a chance to handle the tools.

Parameters:

Name Type Description Default
msg str | ChatDocument

the message to extract tools from.

required
all_tools bool
  • if True, return all tools, i.e. any recognized tool in self.llm_tools_known, whether it is handled by this agent or not;
  • otherwise, return only the tools handled by this agent.
False

Returns:

Type Description
List[ToolMessage]

List[ToolMessage]: list of ToolMessage objects

Source code in langroid/agent/base.py
def get_tool_messages(
    self,
    msg: str | ChatDocument | None,
    all_tools: bool = False,
) -> List[ToolMessage]:
    """
    Get ToolMessages recognized in msg, handle-able by this agent.
    NOTE: as a side-effect, this will update msg.tool_messages
    when msg is a ChatDocument and msg contains tool messages.
    The intent here is that update=True should be set ONLY within agent_response()
    or agent_response_async() methods. In other words, we want to persist the
    msg.tool_messages only AFTER the agent has had a chance to handle the tools.

    Args:
        msg (str|ChatDocument): the message to extract tools from.
        all_tools (bool):
            - if True, return all tools,
                i.e. any recognized tool in self.llm_tools_known,
                whether it is handled by this agent or not;
            - otherwise, return only the tools handled by this agent.

    Returns:
        List[ToolMessage]: list of ToolMessage objects
    """

    if msg is None:
        return []

    if isinstance(msg, str):
        json_tools = self.get_formatted_tool_messages(msg)
        if all_tools:
            return json_tools
        else:
            return [
                t
                for t in json_tools
                if self._tool_recipient_match(t) and t.default_value("request")
            ]

    if all_tools and len(msg.all_tool_messages) > 0:
        # We've already identified all_tool_messages in the msg;
        # return the corresponding ToolMessage objects
        return msg.all_tool_messages
    if len(msg.tool_messages) > 0:
        # We've already found tool_messages,
        # (either via OpenAI Fn-call or Langroid-native ToolMessage);
        # or they were added by an agent_response.
        # note these could be from a forwarded msg from another agent,
        # so return ONLY the messages THIS agent to enabled to handle.
        return msg.tool_messages
    assert isinstance(msg, ChatDocument)
    if (
        msg.content != ""
        and msg.oai_tool_calls is None
        and msg.function_call is None
    ):

        tools = self.get_formatted_tool_messages(msg.content)
        msg.all_tool_messages = tools
        # filter for actually handle-able tools, and recipient is this agent
        my_tools = [t for t in tools if self._tool_recipient_match(t)]
        msg.tool_messages = my_tools

        if all_tools:
            return tools
        else:
            return my_tools

    # otherwise, we look for `tool_calls` (possibly multiple)
    tools = self.get_oai_tool_calls_classes(msg)
    msg.all_tool_messages = tools
    my_tools = [t for t in tools if self._tool_recipient_match(t)]
    msg.tool_messages = my_tools

    if len(tools) == 0:
        # otherwise, we look for a `function_call`
        fun_call_cls = self.get_function_call_class(msg)
        tools = [fun_call_cls] if fun_call_cls is not None else []
        msg.all_tool_messages = tools
        my_tools = [t for t in tools if self._tool_recipient_match(t)]
        msg.tool_messages = my_tools
    if all_tools:
        return tools
    else:
        return my_tools

get_formatted_tool_messages(input_str)

Returns ToolMessage objects (tools) corresponding to tool-formatted substrings, if any. ASSUMPTION - These tools are either ALL JSON-based, or ALL XML-based (i.e. not a mix of both). Terminology: a "formatted tool msg" is one which the LLM generates as part of its raw string output, rather than within a JSON object in the API response (i.e. this method does not extract tools/fns returned by OpenAI's tools/fns API or similar APIs).

Parameters:

Name Type Description Default
input_str str

input string, typically a message sent by an LLM

required

Returns:

Type Description
List[ToolMessage]

List[ToolMessage]: list of ToolMessage objects

Source code in langroid/agent/base.py
def get_formatted_tool_messages(self, input_str: str) -> List[ToolMessage]:
    """
    Returns ToolMessage objects (tools) corresponding to
    tool-formatted substrings, if any.
    ASSUMPTION - These tools are either ALL JSON-based, or ALL XML-based
    (i.e. not a mix of both).
    Terminology: a "formatted tool msg" is one which the LLM generates as
        part of its raw string output, rather than within a JSON object
        in the API response (i.e. this method does not extract tools/fns returned
        by OpenAI's tools/fns API or similar APIs).

    Args:
        input_str (str): input string, typically a message sent by an LLM

    Returns:
        List[ToolMessage]: list of ToolMessage objects
    """
    self.tool_error = False
    substrings = XMLToolMessage.find_candidates(input_str)
    is_json = False
    if len(substrings) == 0:
        substrings = extract_top_level_json(input_str)
        is_json = len(substrings) > 0
        if not is_json:
            return []

    results = [self._get_one_tool_message(j, is_json) for j in substrings]
    valid_results = [r for r in results if r is not None]
    # If any tool is correctly formed we do not set the flag
    if len(valid_results) > 0:
        self.tool_error = False
    return valid_results

get_function_call_class(msg)

From ChatDocument (constructed from an LLM Response), get the ToolMessage corresponding to the function_call if it exists.

Source code in langroid/agent/base.py
def get_function_call_class(self, msg: ChatDocument) -> Optional[ToolMessage]:
    """
    From ChatDocument (constructed from an LLM Response), get the `ToolMessage`
    corresponding to the `function_call` if it exists.
    """
    if msg.function_call is None:
        return None
    tool_name = msg.function_call.name
    tool_msg = msg.function_call.arguments or {}
    if tool_name not in self.llm_tools_handled:
        logger.warning(
            f"""
            The function_call '{tool_name}' is not handled
            by the agent named '{self.config.name}'!
            If you intended this agent to handle this function_call,
            either the fn-call name is incorrectly generated by the LLM,
            (in which case you may need to adjust your LLM instructions),
            or you need to enable this agent to handle this fn-call.
            """
        )
        if tool_name not in self.all_llm_tools_known:
            self.tool_error = True
        return None
    self.tool_error = False
    tool_class = self.llm_tools_map[tool_name]
    tool_msg.update(dict(request=tool_name))
    tool = tool_class.parse_obj(tool_msg)
    return tool

get_oai_tool_calls_classes(msg)

From ChatDocument (constructed from an LLM Response), get a list of ToolMessages corresponding to the tool_calls, if any.

Source code in langroid/agent/base.py
def get_oai_tool_calls_classes(self, msg: ChatDocument) -> List[ToolMessage]:
    """
    From ChatDocument (constructed from an LLM Response), get
     a list of ToolMessages corresponding to the `tool_calls`, if any.
    """

    if msg.oai_tool_calls is None:
        return []
    tools = []
    all_errors = True
    for tc in msg.oai_tool_calls:
        if tc.function is None:
            continue
        tool_name = tc.function.name
        tool_msg = tc.function.arguments or {}
        if tool_name not in self.llm_tools_handled:
            logger.warning(
                f"""
                The tool_call '{tool_name}' is not handled
                by the agent named '{self.config.name}'!
                If you intended this agent to handle this function_call,
                either the fn-call name is incorrectly generated by the LLM,
                (in which case you may need to adjust your LLM instructions),
                or you need to enable this agent to handle this fn-call.
                """
            )
            continue
        all_errors = False
        tool_class = self.llm_tools_map[tool_name]
        tool_msg.update(dict(request=tool_name))
        tool = tool_class.parse_obj(tool_msg)
        tool.id = tc.id or ""
        tools.append(tool)
    # When no tool is valid, set the recovery flag
    self.tool_error = all_errors
    return tools

tool_validation_error(ve)

Handle a validation error raised when parsing a tool message, when there is a legit tool name used, but it has missing/bad fields. Args: tool (ToolMessage): The tool message that failed validation ve (ValidationError): The exception raised

Returns:

Name Type Description
str str

The error message to send back to the LLM

Source code in langroid/agent/base.py
def tool_validation_error(self, ve: ValidationError) -> str:
    """
    Handle a validation error raised when parsing a tool message,
        when there is a legit tool name used, but it has missing/bad fields.
    Args:
        tool (ToolMessage): The tool message that failed validation
        ve (ValidationError): The exception raised

    Returns:
        str: The error message to send back to the LLM
    """
    tool_name = cast(ToolMessage, ve.model).default_value("request")
    bad_field_errors = "\n".join(
        [f"{e['loc']}: {e['msg']}" for e in ve.errors() if "loc" in e]
    )
    return f"""
    There were one or more errors in your attempt to use the
    TOOL or function_call named '{tool_name}':
    {bad_field_errors}
    Please write your message again, correcting the errors.
    """

handle_message_async(msg) async

Asynch version of handle_message. See there for details.

Source code in langroid/agent/base.py
async def handle_message_async(
    self, msg: str | ChatDocument
) -> None | str | OrderedDict[str, str] | ChatDocument:
    """
    Asynch version of `handle_message`. See there for details.
    """
    try:
        tools = self.get_tool_messages(msg)
        tools = [t for t in tools if self._tool_recipient_match(t)]
    except ValidationError as ve:
        # correct tool name but bad fields
        return self.tool_validation_error(ve)
    except XMLException as xe:  # from XMLToolMessage parsing
        return str(xe)
    except ValueError:
        # invalid tool name
        # We return None since returning "invalid tool name" would
        # be considered a valid result in task loop, and would be treated
        # as a response to the tool message even though the tool was not intended
        # for this agent.
        return None
    if len(tools) > 1 and not self.config.allow_multiple_tools:
        return self.to_ChatDocument("ERROR: Use ONE tool at a time!")
    if len(tools) == 0:
        fallback_result = self.handle_message_fallback(msg)
        if fallback_result is None:
            return None
        return self.to_ChatDocument(
            fallback_result,
            chat_doc=msg if isinstance(msg, ChatDocument) else None,
        )
    chat_doc = msg if isinstance(msg, ChatDocument) else None

    results = self._get_multiple_orch_tool_errs(tools)
    if not results:
        results = [
            await self.handle_tool_message_async(t, chat_doc=chat_doc)
            for t in tools
        ]
        # if there's a solitary ChatDocument|str result, return it as is
        if len(results) == 1 and isinstance(results[0], (str, ChatDocument)):
            return results[0]

    return self._handle_message_final(tools, results)

handle_message(msg)

Handle a "tool" message either a string containing one or more valid "tool" JSON substrings, or a ChatDocument containing a function_call attribute. Handle with the corresponding handler method, and return the results as a combined string.

Parameters:

Name Type Description Default
msg str | ChatDocument

The string or ChatDocument to handle

required

Returns:

Type Description
None | str | OrderedDict[str, str] | ChatDocument

The result of the handler method can be: - None if no tools successfully handled, or no tools present - str if langroid-native JSON tools were handled, and results concatenated, OR there's a SINGLE OpenAI tool-call. (We do this so the common scenario of a single tool/fn-call has a simple behavior). - Dict[str, str] if multiple OpenAI tool-calls were handled (dict is an id->result map) - ChatDocument if a handler returned a ChatDocument, intended to be the final response of the agent_response method.

Source code in langroid/agent/base.py
def handle_message(
    self, msg: str | ChatDocument
) -> None | str | OrderedDict[str, str] | ChatDocument:
    """
    Handle a "tool" message either a string containing one or more
    valid "tool" JSON substrings,  or a
    ChatDocument containing a `function_call` attribute.
    Handle with the corresponding handler method, and return
    the results as a combined string.

    Args:
        msg (str | ChatDocument): The string or ChatDocument to handle

    Returns:
        The result of the handler method can be:
         - None if no tools successfully handled, or no tools present
         - str if langroid-native JSON tools were handled, and results concatenated,
             OR there's a SINGLE OpenAI tool-call.
            (We do this so the common scenario of a single tool/fn-call
             has a simple behavior).
         - Dict[str, str] if multiple OpenAI tool-calls were handled
             (dict is an id->result map)
         - ChatDocument if a handler returned a ChatDocument, intended to be the
             final response of the `agent_response` method.
    """
    try:
        tools = self.get_tool_messages(msg)
        tools = [t for t in tools if self._tool_recipient_match(t)]
    except ValidationError as ve:
        # correct tool name but bad fields
        return self.tool_validation_error(ve)
    except XMLException as xe:  # from XMLToolMessage parsing
        return str(xe)
    except ValueError:
        # invalid tool name
        # We return None since returning "invalid tool name" would
        # be considered a valid result in task loop, and would be treated
        # as a response to the tool message even though the tool was not intended
        # for this agent.
        return None
    if len(tools) > 1 and not self.config.allow_multiple_tools:
        return self.to_ChatDocument("ERROR: Use ONE tool at a time!")
    if len(tools) == 0:
        fallback_result = self.handle_message_fallback(msg)
        if fallback_result is None:
            return None
        return self.to_ChatDocument(
            fallback_result,
            chat_doc=msg if isinstance(msg, ChatDocument) else None,
        )
    chat_doc = msg if isinstance(msg, ChatDocument) else None

    results = self._get_multiple_orch_tool_errs(tools)
    if not results:
        results = [self.handle_tool_message(t, chat_doc=chat_doc) for t in tools]
        # if there's a solitary ChatDocument|str result, return it as is
        if len(results) == 1 and isinstance(results[0], (str, ChatDocument)):
            return results[0]

    return self._handle_message_final(tools, results)

handle_message_fallback(msg)

Fallback method for the "no-tools" scenario. This method can be overridden by subclasses, e.g., to create a "reminder" message when a tool is expected but the LLM "forgot" to generate one.

Parameters:

Name Type Description Default
msg str | ChatDocument

The input msg to handle

required

Returns: Any: The result of the handler method

Source code in langroid/agent/base.py
def handle_message_fallback(self, msg: str | ChatDocument) -> Any:
    """
    Fallback method for the "no-tools" scenario.
    This method can be overridden by subclasses, e.g.,
    to create a "reminder" message when a tool is expected but the LLM "forgot"
    to generate one.

    Args:
        msg (str | ChatDocument): The input msg to handle
    Returns:
        Any: The result of the handler method
    """
    return None

to_ChatDocument(msg, orig_tool_name=None, chat_doc=None, author_entity=Entity.AGENT)

Convert result of a responder (agent_response or llm_response, or task.run()), or tool handler, or handle_message_fallback, to a ChatDocument, to enable handling by other responders/tasks in a task loop possibly involving multiple agents.

Parameters:

Name Type Description Default
msg Any

The result of a responder or tool handler or task.run()

required
orig_tool_name str

The original tool name that generated the response, if any.

None
chat_doc ChatDocument

The original ChatDocument object that msg is a response to.

None
author_entity Entity

The intended author of the result ChatDocument

AGENT
Source code in langroid/agent/base.py
def to_ChatDocument(
    self,
    msg: Any,
    orig_tool_name: str | None = None,
    chat_doc: Optional[ChatDocument] = None,
    author_entity: Entity = Entity.AGENT,
) -> Optional[ChatDocument]:
    """
    Convert result of a responder (agent_response or llm_response, or task.run()),
    or tool handler, or handle_message_fallback,
    to a ChatDocument, to enable handling by other
    responders/tasks in a task loop possibly involving multiple agents.

    Args:
        msg (Any): The result of a responder or tool handler or task.run()
        orig_tool_name (str): The original tool name that generated the response,
            if any.
        chat_doc (ChatDocument): The original ChatDocument object that `msg`
            is a response to.
        author_entity (Entity): The intended author of the result ChatDocument
    """
    if msg is None or isinstance(msg, ChatDocument):
        return msg

    is_agent_author = author_entity == Entity.AGENT

    if isinstance(msg, str):
        return self.response_template(author_entity, content=msg, content_any=msg)
    elif isinstance(msg, ToolMessage):
        # result is a ToolMessage, so...
        result_tool_name = msg.default_value("request")
        if (
            is_agent_author
            and result_tool_name in self.llm_tools_handled
            and (orig_tool_name is None or orig_tool_name != result_tool_name)
        ):
            # TODO: do we need to remove the tool message from the chat_doc?
            # if (chat_doc is not None and
            #     msg in chat_doc.tool_messages):
            #    chat_doc.tool_messages.remove(msg)
            # if we can handle it, do so
            result = self.handle_tool_message(msg, chat_doc=chat_doc)
            if result is not None and isinstance(result, ChatDocument):
                return result
        else:
            # else wrap it in an agent response and return it so
            # orchestrator can find a respondent
            return self.response_template(author_entity, tool_messages=[msg])
    else:
        result = to_string(msg)

    return (
        None
        if result is None
        else self.response_template(author_entity, content=result, content_any=msg)
    )

from_ChatDocument(msg, output_type)

Extract a desired output_type from a ChatDocument object. We use this fallback order: - if msg.content_any exists and matches the output_type, return it - if msg.content exists and output_type is str return it - if output_type is a ToolMessage, return the first tool in msg.tool_messages - if output_type is a list of ToolMessage, return all tools in msg.tool_messages - search for a tool in msg.tool_messages that has a field of output_type, and if found, return that field value - return None if all the above fail

Source code in langroid/agent/base.py
def from_ChatDocument(self, msg: ChatDocument, output_type: Type[T]) -> Optional[T]:
    """
    Extract a desired output_type from a ChatDocument object.
    We use this fallback order:
    - if `msg.content_any` exists and matches the output_type, return it
    - if `msg.content` exists and output_type is str return it
    - if output_type is a ToolMessage, return the first tool in `msg.tool_messages`
    - if output_type is a list of ToolMessage,
        return all tools in `msg.tool_messages`
    - search for a tool in `msg.tool_messages` that has a field of output_type,
         and if found, return that field value
    - return None if all the above fail
    """
    content = msg.content
    if output_type is str and content != "":
        return cast(T, content)
    content_any = msg.content_any
    if content_any is not None and isinstance(content_any, output_type):
        return cast(T, content_any)

    tools = self.try_get_tool_messages(msg, all_tools=True)

    if get_origin(output_type) is list:
        list_element_type = get_args(output_type)[0]
        if issubclass(list_element_type, ToolMessage):
            # list_element_type is a subclass of ToolMessage:
            # We output a list of objects derived from list_element_type
            return cast(
                T,
                [t for t in tools if isinstance(t, list_element_type)],
            )
    elif get_origin(output_type) is None and issubclass(output_type, ToolMessage):
        # output_type is a subclass of ToolMessage:
        # return the first tool that has this specific output_type
        for tool in tools:
            if isinstance(tool, output_type):
                return cast(T, tool)
        return None
    elif get_origin(output_type) is None and output_type in (str, int, float, bool):
        # attempt to get the output_type from the content,
        # if it's a primitive type
        primitive_value = from_string(content, output_type)  # type: ignore
        if primitive_value is not None:
            return cast(T, primitive_value)

    # then search for output_type as a field in a tool
    for tool in tools:
        value = tool.get_value_of_type(output_type)
        if value is not None:
            return cast(T, value)
    return None

handle_tool_message_async(tool, chat_doc=None) async

Asynch version of handle_tool_message. See there for details.

Source code in langroid/agent/base.py
async def handle_tool_message_async(
    self,
    tool: ToolMessage,
    chat_doc: Optional[ChatDocument] = None,
) -> None | str | ChatDocument:
    """
    Asynch version of `handle_tool_message`. See there for details.
    """
    tool_name = tool.default_value("request")
    if hasattr(tool, "_handler"):
        handler_name = getattr(tool, "_handler", tool_name)
    else:
        handler_name = tool_name
    handler_method = getattr(self, handler_name + "_async", None)
    if handler_method is None:
        return self.handle_tool_message(tool, chat_doc=chat_doc)
    has_chat_doc_arg = (
        chat_doc is not None
        and "chat_doc" in inspect.signature(handler_method).parameters
    )
    try:
        if has_chat_doc_arg:
            maybe_result = await handler_method(tool, chat_doc=chat_doc)
        else:
            maybe_result = await handler_method(tool)
        result = self.to_ChatDocument(maybe_result, tool_name, chat_doc)
    except Exception as e:
        # raise the error here since we are sure it's
        # not a pydantic validation error,
        # which we check in `handle_message`
        raise e
    return self._maybe_truncate_result(
        result, tool._max_result_tokens
    )  # type: ignore

handle_tool_message(tool, chat_doc=None)

Respond to a tool request from the LLM, in the form of an ToolMessage object. Args: tool: ToolMessage object representing the tool request. chat_doc: Optional ChatDocument object containing the tool request. This is passed to the tool-handler method only if it has a chat_doc argument.

Returns:

Source code in langroid/agent/base.py
def handle_tool_message(
    self,
    tool: ToolMessage,
    chat_doc: Optional[ChatDocument] = None,
) -> None | str | ChatDocument:
    """
    Respond to a tool request from the LLM, in the form of an ToolMessage object.
    Args:
        tool: ToolMessage object representing the tool request.
        chat_doc: Optional ChatDocument object containing the tool request.
            This is passed to the tool-handler method only if it has a `chat_doc`
            argument.

    Returns:

    """
    tool_name = tool.default_value("request")
    if hasattr(tool, "_handler"):
        handler_name = getattr(tool, "_handler", tool_name)
    else:
        handler_name = tool_name
    handler_method = getattr(self, handler_name, None)
    if handler_method is None:
        return None
    has_chat_doc_arg = (
        chat_doc is not None
        and "chat_doc" in inspect.signature(handler_method).parameters
    )
    try:
        if has_chat_doc_arg:
            maybe_result = handler_method(tool, chat_doc=chat_doc)
        else:
            maybe_result = handler_method(tool)
        result = self.to_ChatDocument(maybe_result, tool_name, chat_doc)
    except Exception as e:
        # raise the error here since we are sure it's
        # not a pydantic validation error,
        # which we check in `handle_message`
        raise e
    return self._maybe_truncate_result(
        result, tool._max_result_tokens
    )  # type: ignore

update_token_usage(response, prompt, stream, chat=True, print_response_stats=True)

Updates response.usage obj (token usage and cost fields).the usage memebr It updates the cost after checking the cache and updates the tokens (prompts and completion) if the response stream is True, because OpenAI doesn't returns these fields.

Parameters:

Name Type Description Default
response LLMResponse

LLMResponse object

required
prompt str | List[LLMMessage]

prompt or list of LLMMessage objects

required
stream bool

whether to update the usage in the response object if the response is not cached.

required
chat bool

whether this is a chat model or a completion model

True
print_response_stats bool

whether to print the response stats

True
Source code in langroid/agent/base.py
def update_token_usage(
    self,
    response: LLMResponse,
    prompt: str | List[LLMMessage],
    stream: bool,
    chat: bool = True,
    print_response_stats: bool = True,
) -> None:
    """
    Updates `response.usage` obj (token usage and cost fields).the usage memebr
    It updates the cost after checking the cache and updates the
    tokens (prompts and completion) if the response stream is True, because OpenAI
    doesn't returns these fields.

    Args:
        response (LLMResponse): LLMResponse object
        prompt (str | List[LLMMessage]): prompt or list of LLMMessage objects
        stream (bool): whether to update the usage in the response object
            if the response is not cached.
        chat (bool): whether this is a chat model or a completion model
        print_response_stats (bool): whether to print the response stats
    """
    if response is None or self.llm is None:
        return

    # Note: If response was not streamed, then
    # `response.usage` would already have been set by the API,
    # so we only need to update in the stream case.
    if stream:
        # usage, cost = 0 when response is from cache
        prompt_tokens = 0
        completion_tokens = 0
        cost = 0.0
        if not response.cached:
            prompt_tokens = self.num_tokens(prompt)
            completion_tokens = self.num_tokens(response.message)
            if response.function_call is not None:
                completion_tokens += self.num_tokens(str(response.function_call))
            cost = self.compute_token_cost(prompt_tokens, completion_tokens)
        response.usage = LLMTokenUsage(
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            cost=cost,
        )

    # update total counters
    if response.usage is not None:
        self.total_llm_token_cost += response.usage.cost
        self.total_llm_token_usage += response.usage.total_tokens
        self.llm.update_usage_cost(
            chat,
            response.usage.prompt_tokens,
            response.usage.completion_tokens,
            response.usage.cost,
        )
        chat_length = 1 if isinstance(prompt, str) else len(prompt)
        self.token_stats_str = self._get_response_stats(
            chat_length, self.total_llm_token_cost, response
        )
        if print_response_stats:
            print(self.indent + self.token_stats_str)

ask_agent(agent, request, no_answer=NO_ANSWER, user_confirm=True)

Send a request to another agent, possibly after confirming with the user. This is not currently used, since we rely on the task loop and RecipientTool to address requests to other agents. It is generally best to avoid using this method.

Parameters:

Name Type Description Default
agent Agent

agent to ask

required
request str

request to send

required
no_answer str

expected response when agent does not know the answer

NO_ANSWER
user_confirm bool

whether to gate the request with a human confirmation

True

Returns:

Name Type Description
str Optional[str]

response from agent

Source code in langroid/agent/base.py
def ask_agent(
    self,
    agent: "Agent",
    request: str,
    no_answer: str = NO_ANSWER,
    user_confirm: bool = True,
) -> Optional[str]:
    """
    Send a request to another agent, possibly after confirming with the user.
    This is not currently used, since we rely on the task loop and
    `RecipientTool` to address requests to other agents. It is generally best to
    avoid using this method.

    Args:
        agent (Agent): agent to ask
        request (str): request to send
        no_answer (str): expected response when agent does not know the answer
        user_confirm (bool): whether to gate the request with a human confirmation

    Returns:
        str: response from agent
    """
    agent_type = type(agent).__name__
    if user_confirm:
        user_response = Prompt.ask(
            f"""[magenta]Here is the request or message:
            {request}
            Should I forward this to {agent_type}?""",
            default="y",
            choices=["y", "n"],
        )
        if user_response not in ["y", "yes"]:
            return None
    answer = agent.llm_response(request)
    if answer != no_answer:
        return (f"{agent_type} says: " + str(answer)).strip()
    return None

AgentConfig

Bases: BaseSettings

General config settings for an LLM agent. This is nested, combining configs of various components.

ChatDocument(**data)

Bases: Document

Represents a message in a conversation among agents. All responders of an agent have signature ChatDocument -> ChatDocument (modulo None, str, etc), and so does the Task.run() method.

Attributes:

Name Type Description
oai_tool_calls Optional[List[OpenAIToolCall]]

Tool-calls from an OpenAI-compatible API

oai_tool_id2results Optional[OrderedDict[str, str]]

Results of tool-calls from OpenAI (dict is a map of tool_id -> result)

oai_tool_choice ToolChoiceTypes | Dict[str, Dict[str, str] | str]

ToolChoiceTypes | Dict[str, str]: Param controlling how the LLM should choose tool-use in its response (auto, none, required, or a specific tool)

function_call Optional[LLMFunctionCall]

Function-call from an OpenAI-compatible API (deprecated by OpenAI, in favor of tool-calls)

tool_messages List[ToolMessage]

Langroid ToolMessages extracted from - content field (via JSON parsing), - oai_tool_calls, or - function_call

metadata ChatDocMetaData

Metadata for the message, e.g. sender, recipient.

attachment None | ChatDocAttachment

Any additional data attached.

Source code in langroid/agent/chat_document.py
def __init__(self, **data: Any):
    super().__init__(**data)
    ObjectRegistry.register_object(self)

delete_id(id) staticmethod

Remove ChatDocument with given id from ObjectRegistry, and all its descendants.

Source code in langroid/agent/chat_document.py
@staticmethod
def delete_id(id: str) -> None:
    """Remove ChatDocument with given id from ObjectRegistry,
    and all its descendants.
    """
    chat_doc = ChatDocument.from_id(id)
    # first delete all descendants
    while chat_doc is not None:
        next_chat_doc = chat_doc.child
        ObjectRegistry.remove(chat_doc.id())
        chat_doc = next_chat_doc

get_tool_names()

Get names of attempted tool usages (JSON or non-JSON) in the content of the message. Returns: List[str]: list of attempted tool names (We say "attempted" since we ONLY look at the request component of the tool-call representation, and we're not fully parsing it into the corresponding tool message class)

Source code in langroid/agent/chat_document.py
def get_tool_names(self) -> List[str]:
    """
    Get names of attempted tool usages (JSON or non-JSON) in the content
        of the message.
    Returns:
        List[str]: list of *attempted* tool names
        (We say "attempted" since we ONLY look at the `request` component of the
        tool-call representation, and we're not fully parsing it into the
        corresponding tool message class)

    """
    tool_candidates = XMLToolMessage.find_candidates(self.content)
    if len(tool_candidates) == 0:
        tool_candidates = extract_top_level_json(self.content)
        if len(tool_candidates) == 0:
            return []
        tools = [json.loads(tc).get("request") for tc in tool_candidates]
    else:
        tool_dicts = [
            XMLToolMessage.extract_field_values(tc) for tc in tool_candidates
        ]
        tools = [td.get("request") for td in tool_dicts if td is not None]
    return [str(tool) for tool in tools if tool is not None]

log_fields()

Fields for logging in csv/tsv logger Returns: List[str]: list of fields

Source code in langroid/agent/chat_document.py
def log_fields(self) -> ChatDocLoggerFields:
    """
    Fields for logging in csv/tsv logger
    Returns:
        List[str]: list of fields
    """
    tool_type = ""  # FUNC or TOOL
    tool = ""  # tool name or function name
    if self.function_call is not None:
        tool_type = "FUNC"
        tool = self.function_call.name
    else:
        try:
            json_tools = self.get_tool_names()
        except Exception:
            json_tools = []
        if json_tools != []:
            tool_type = "TOOL"
            tool = json_tools[0]
    recipient = self.metadata.recipient
    content = self.content
    sender_entity = self.metadata.sender
    sender_name = self.metadata.sender_name
    if tool_type == "FUNC":
        content += str(self.function_call)
    return ChatDocLoggerFields(
        sender_entity=sender_entity,
        sender_name=sender_name,
        recipient=recipient,
        block=self.metadata.block,
        tool_type=tool_type,
        tool=tool,
        content=content,
    )

pop_tool_ids()

Pop the last tool_id from the stack of tool_ids.

Source code in langroid/agent/chat_document.py
def pop_tool_ids(self) -> None:
    """
    Pop the last tool_id from the stack of tool_ids.
    """
    if len(self.metadata.tool_ids) > 0:
        self.metadata.tool_ids.pop()

from_LLMResponse(response, displayed=False) staticmethod

Convert LLMResponse to ChatDocument. Args: response (LLMResponse): LLMResponse to convert. displayed (bool): Whether this response was displayed to the user. Returns: ChatDocument: ChatDocument representation of this LLMResponse.

Source code in langroid/agent/chat_document.py
@staticmethod
def from_LLMResponse(
    response: LLMResponse,
    displayed: bool = False,
) -> "ChatDocument":
    """
    Convert LLMResponse to ChatDocument.
    Args:
        response (LLMResponse): LLMResponse to convert.
        displayed (bool): Whether this response was displayed to the user.
    Returns:
        ChatDocument: ChatDocument representation of this LLMResponse.
    """
    recipient, message = response.get_recipient_and_message()
    message = message.strip()
    if message in ["''", '""']:
        message = ""
    if response.function_call is not None:
        ChatDocument._clean_fn_call(response.function_call)
    if response.oai_tool_calls is not None:
        # there must be at least one if it's not None
        for oai_tc in response.oai_tool_calls:
            ChatDocument._clean_fn_call(oai_tc.function)
    return ChatDocument(
        content=message,
        content_any=message,
        oai_tool_calls=response.oai_tool_calls,
        function_call=response.function_call,
        metadata=ChatDocMetaData(
            source=Entity.LLM,
            sender=Entity.LLM,
            usage=response.usage,
            displayed=displayed,
            cached=response.cached,
            recipient=recipient,
        ),
    )

to_LLMMessage(message, oai_tools=None) staticmethod

Convert to list of LLMMessage, to incorporate into msg-history sent to LLM API. Usually there will be just a single LLMMessage, but when the ChatDocument contains results from multiple OpenAI tool-calls, we would have a sequence LLMMessages, one per tool-call result.

Parameters:

Name Type Description Default
message str | ChatDocument

Message to convert.

required
oai_tools Optional[List[OpenAIToolCall]]

Tool-calls currently awaiting response, from the ChatAgent's latest message.

None

Returns: List[LLMMessage]: list of LLMMessages corresponding to this ChatDocument.

Source code in langroid/agent/chat_document.py
@staticmethod
def to_LLMMessage(
    message: Union[str, "ChatDocument"],
    oai_tools: Optional[List[OpenAIToolCall]] = None,
) -> List[LLMMessage]:
    """
    Convert to list of LLMMessage, to incorporate into msg-history sent to LLM API.
    Usually there will be just a single LLMMessage, but when the ChatDocument
    contains results from multiple OpenAI tool-calls, we would have a sequence
    LLMMessages, one per tool-call result.

    Args:
        message (str|ChatDocument): Message to convert.
        oai_tools (Optional[List[OpenAIToolCall]]): Tool-calls currently awaiting
            response, from the ChatAgent's latest message.
    Returns:
        List[LLMMessage]: list of LLMMessages corresponding to this ChatDocument.
    """
    sender_name = None
    sender_role = Role.USER
    fun_call = None
    oai_tool_calls = None
    tool_id = ""  # for OpenAI Assistant
    chat_document_id: str = ""
    if isinstance(message, ChatDocument):
        content = message.content or to_string(message.content_any) or ""
        fun_call = message.function_call
        oai_tool_calls = message.oai_tool_calls
        if message.metadata.sender == Entity.USER and fun_call is not None:
            # This may happen when a (parent agent's) LLM generates a
            # a Function-call, and it ends up being sent to the current task's
            # LLM (possibly because the function-call is mis-named or has other
            # issues and couldn't be handled by handler methods).
            # But a function-call can only be generated by an entity with
            # Role.ASSISTANT, so we instead put the content of the function-call
            # in the content of the message.
            content += " " + str(fun_call)
            fun_call = None
        if message.metadata.sender == Entity.USER and oai_tool_calls is not None:
            # same reasoning as for function-call above
            content += " " + "\n\n".join(str(tc) for tc in oai_tool_calls)
            oai_tool_calls = None
        sender_name = message.metadata.sender_name
        tool_ids = message.metadata.tool_ids
        tool_id = tool_ids[-1] if len(tool_ids) > 0 else ""
        chat_document_id = message.id()
        if message.metadata.sender == Entity.SYSTEM:
            sender_role = Role.SYSTEM
        if (
            message.metadata.parent is not None
            and message.metadata.parent.function_call is not None
        ):
            # This is a response to a function call, so set the role to FUNCTION.
            sender_role = Role.FUNCTION
            sender_name = message.metadata.parent.function_call.name
        elif oai_tools is not None and len(oai_tools) > 0:
            pending_tool_ids = [tc.id for tc in oai_tools]
            # The ChatAgent has pending OpenAI tool-call(s),
            # so the current ChatDocument contains
            # results for some/all/none of them.

            if len(oai_tools) == 1:
                # Case 1:
                # There was exactly 1 pending tool-call, and in this case
                # the result would be a plain string in `content`
                return [
                    LLMMessage(
                        role=Role.TOOL,
                        tool_call_id=oai_tools[0].id,
                        content=content,
                        chat_document_id=chat_document_id,
                    )
                ]

            elif (
                message.metadata.oai_tool_id is not None
                and message.metadata.oai_tool_id in pending_tool_ids
            ):
                # Case 2:
                # ChatDocument.content has result of a single tool-call
                return [
                    LLMMessage(
                        role=Role.TOOL,
                        tool_call_id=message.metadata.oai_tool_id,
                        content=content,
                        chat_document_id=chat_document_id,
                    )
                ]
            elif message.oai_tool_id2result is not None:
                # Case 2:
                # There were > 1 tool-calls awaiting response,
                assert (
                    len(message.oai_tool_id2result) > 1
                ), "oai_tool_id2result must have more than 1 item."
                return [
                    LLMMessage(
                        role=Role.TOOL,
                        tool_call_id=tool_id,
                        content=result,
                        chat_document_id=chat_document_id,
                    )
                    for tool_id, result in message.oai_tool_id2result.items()
                ]
        elif message.metadata.sender == Entity.LLM:
            sender_role = Role.ASSISTANT
    else:
        # LLM can only respond to text content, so extract it
        content = message

    return [
        LLMMessage(
            role=sender_role,
            tool_id=tool_id,  # for OpenAI Assistant
            content=content,
            function_call=fun_call,
            tool_calls=oai_tool_calls,
            name=sender_name,
            chat_document_id=chat_document_id,
        )
    ]

ChatAgentConfig

Bases: AgentConfig

Configuration for ChatAgent Attributes: system_message: system message to include in message sequence (typically defines role and task of agent). Used only if task is not specified in the constructor. user_message: user message to include in message sequence. Used only if task is not specified in the constructor. use_tools: whether to use our own ToolMessages mechanism use_functions_api: whether to use functions/tools native to the LLM API (e.g. OpenAI's function_call or tool_call mechanism) use_tools_api: When use_functions_api is True, if this is also True, the OpenAI tool-call API is used, rather than the older/deprecated function-call API. However the tool-call API has some tricky aspects, hence we set this to False by default. strict_recovery: whether to enable strict schema recovery when there is a tool-generation error. enable_orchestration_tool_handling: whether to enable handling of orchestration tools, e.g. ForwardTool, DoneTool, PassTool, etc. output_format: When supported by the LLM (certain OpenAI LLMs and local LLMs served by providers such as vLLM), ensures that the output is a JSON matching the corresponding schema via grammar-based decoding handle_output_format: When output_format is a ToolMessage T, controls whether T is "enabled for handling". use_output_format: When output_format is a ToolMessage T, controls whether T is "enabled for use" (by LLM) and instructions on using T are added to the system message. instructions_output_format: Controls whether we generate instructions for output_format in the system message. use_tools_on_output_format: Controls whether to automatically switch to the Langroid-native tools mechanism when output_format is set. Note that LLMs may generate tool calls which do not belong to output_format even when strict JSON mode is enabled, so this should be enabled when such tool calls are not desired. output_format_include_defaults: Whether to include fields with default arguments in the output schema

ChatAgent(config=ChatAgentConfig(), task=None)

Bases: Agent

Chat Agent interacting with external env (could be human, or external tools). The agent (the LLM actually) is provided with an optional "Task Spec", which is a sequence of LLMMessages. These are used to initialize the task_messages of the agent. In most applications we will use a ChatAgent rather than a bare Agent. The Agent class mainly exists to hold various common methods and attributes. One difference between ChatAgent and Agent is that ChatAgent's llm_response method uses "chat mode" API (i.e. one that takes a message sequence rather than a single message), whereas the same method in the Agent class uses "completion mode" API (i.e. one that takes a single message).

config: settings for the agent
Source code in langroid/agent/chat_agent.py
def __init__(
    self,
    config: ChatAgentConfig = ChatAgentConfig(),
    task: Optional[List[LLMMessage]] = None,
):
    """
    Chat-mode agent initialized with task spec as the initial message sequence
    Args:
        config: settings for the agent

    """
    super().__init__(config)
    self.config: ChatAgentConfig = config
    self.config._set_fn_or_tools(self._fn_call_available())
    self.message_history: List[LLMMessage] = []
    self.init_state()
    # An agent's "task" is defined by a system msg and an optional user msg;
    # These are "priming" messages that kick off the agent's conversation.
    self.system_message: str = self.config.system_message
    self.user_message: str | None = self.config.user_message

    if task is not None:
        # if task contains a system msg, we override the config system msg
        if len(task) > 0 and task[0].role == Role.SYSTEM:
            self.system_message = task[0].content
        # if task contains a user msg, we override the config user msg
        if len(task) > 1 and task[1].role == Role.USER:
            self.user_message = task[1].content

    # system-level instructions for using tools/functions:
    # We maintain these as tools/functions are enabled/disabled,
    # and whenever an LLM response is sought, these are used to
    # recreate the system message (via `_create_system_and_tools_message`)
    # each time, so it reflects the current set of enabled tools/functions.
    # (a) these are general instructions on using certain tools/functions,
    #   if they are specified in a ToolMessage class as a classmethod `instructions`
    self.system_tool_instructions: str = ""
    # (b) these are only for the builtin in Langroid TOOLS mechanism:
    self.system_tool_format_instructions: str = ""

    self.llm_functions_map: Dict[str, LLMFunctionSpec] = {}
    self.llm_functions_handled: Set[str] = set()
    self.llm_functions_usable: Set[str] = set()
    self.llm_function_force: Optional[Dict[str, str]] = None

    self.output_format: Optional[type[ToolMessage | BaseModel]] = None

    self.saved_requests_and_tool_setings = self._requests_and_tool_settings()
    # This variable is not None and equals a `ToolMessage` T, if and only if:
    # (a) T has been set as the output_format of this agent, AND
    # (b) T has been "enabled for use" ONLY for enforcing this output format, AND
    # (c) T has NOT been explicitly "enabled for use" by this Agent.
    self.enabled_use_output_format: Optional[type[ToolMessage]] = None
    # As above but deals with "enabled for handling" instead of "enabled for use".
    self.enabled_handling_output_format: Optional[type[ToolMessage]] = None
    if config.output_format is not None:
        self.set_output_format(config.output_format)
    # instructions specifically related to enforcing `output_format`
    self.output_format_instructions = ""

    # controls whether to disable strict schemas for this agent if
    # strict mode causes exception
    self.disable_strict = False
    # Tracks whether any strict tool is enabled; used to determine whether to set
    # `self.disable_strict` on an exception
    self.any_strict = False
    # Tracks the set of tools on which we force-disable strict decoding
    self.disable_strict_tools_set: set[str] = set()

    if self.config.enable_orchestration_tool_handling:
        # Only enable HANDLING by `agent_response`, NOT LLM generation of these.
        # This is useful where tool-handlers or agent_response generate these
        # tools, and need to be handled.
        # We don't want enable orch tool GENERATION by default, since that
        # might clutter-up the LLM system message unnecessarily.
        from langroid.agent.tools.orchestration import (
            AgentDoneTool,
            AgentSendTool,
            DonePassTool,
            DoneTool,
            ForwardTool,
            PassTool,
            ResultTool,
            SendTool,
        )

        self.enable_message(ForwardTool, use=False, handle=True)
        self.enable_message(DoneTool, use=False, handle=True)
        self.enable_message(AgentDoneTool, use=False, handle=True)
        self.enable_message(PassTool, use=False, handle=True)
        self.enable_message(DonePassTool, use=False, handle=True)
        self.enable_message(SendTool, use=False, handle=True)
        self.enable_message(AgentSendTool, use=False, handle=True)
        self.enable_message(ResultTool, use=False, handle=True)

task_messages: List[LLMMessage] property

The task messages are the initial messages that define the task of the agent. There will be at least a system message plus possibly a user msg. Returns: List[LLMMessage]: the task messages

all_llm_tools_known: set[str] property

All known tools; we include output_format if it is a ToolMessage.

init_state()

Initialize the state of the agent. Just conversation state here, but subclasses can override this to initialize other state.

Source code in langroid/agent/chat_agent.py
def init_state(self) -> None:
    """
    Initialize the state of the agent. Just conversation state here,
    but subclasses can override this to initialize other state.
    """
    super().init_state()
    self.clear_history(0)
    self.clear_dialog()

from_id(id) staticmethod

Get an agent from its ID Args: agent_id (str): ID of the agent Returns: ChatAgent: The agent with the given ID

Source code in langroid/agent/chat_agent.py
@staticmethod
def from_id(id: str) -> "ChatAgent":
    """
    Get an agent from its ID
    Args:
        agent_id (str): ID of the agent
    Returns:
        ChatAgent: The agent with the given ID
    """
    return cast(ChatAgent, Agent.from_id(id))

clone(i=0)

Create i'th clone of this agent, ensuring tool use/handling is cloned. Important: We assume all member variables are in the init method here and in the Agent class. TODO: We are attempting to clone an agent after its state has been changed in possibly many ways. Below is an imperfect solution. Caution advised. Revisit later.

Source code in langroid/agent/chat_agent.py
def clone(self, i: int = 0) -> "ChatAgent":
    """Create i'th clone of this agent, ensuring tool use/handling is cloned.
    Important: We assume all member variables are in the __init__ method here
    and in the Agent class.
    TODO: We are attempting to clone an agent after its state has been
    changed in possibly many ways. Below is an imperfect solution. Caution advised.
    Revisit later.
    """
    agent_cls = type(self)
    config_copy = copy.deepcopy(self.config)
    config_copy.name = f"{config_copy.name}-{i}"
    new_agent = agent_cls(config_copy)
    new_agent.system_tool_instructions = self.system_tool_instructions
    new_agent.system_tool_format_instructions = self.system_tool_format_instructions
    new_agent.llm_tools_map = self.llm_tools_map
    new_agent.llm_functions_map = self.llm_functions_map
    new_agent.llm_functions_handled = self.llm_functions_handled
    new_agent.llm_functions_usable = self.llm_functions_usable
    new_agent.llm_function_force = self.llm_function_force
    # Caution - we are copying the vector-db, maybe we don't always want this?
    new_agent.vecdb = self.vecdb
    new_agent.id = ObjectRegistry.new_id()
    if self.config.add_to_registry:
        ObjectRegistry.register_object(new_agent)
    return new_agent

clear_history(start=-2)

Clear the message history, starting at the index start

Parameters:

Name Type Description Default
start int

index of first message to delete; default = -2 (i.e. delete last 2 messages, typically these are the last user and assistant messages)

-2
Source code in langroid/agent/chat_agent.py
def clear_history(self, start: int = -2) -> None:
    """
    Clear the message history, starting at the index `start`

    Args:
        start (int): index of first message to delete; default = -2
                (i.e. delete last 2 messages, typically these
                are the last user and assistant messages)
    """
    if start < 0:
        n = len(self.message_history)
        start = max(0, n + start)
    dropped = self.message_history[start:]
    # consider the dropped msgs in REVERSE order, so we are
    # carefully updating self.oai_tool_calls
    for msg in reversed(dropped):
        self._drop_msg_update_tool_calls(msg)
        # clear out the chat document from the ObjectRegistry
        ChatDocument.delete_id(msg.chat_document_id)
    self.message_history = self.message_history[:start]

update_history(message, response)

Update the message history with the latest user message and LLM response. Args: message (str): user message response: (str): LLM response

Source code in langroid/agent/chat_agent.py
def update_history(self, message: str, response: str) -> None:
    """
    Update the message history with the latest user message and LLM response.
    Args:
        message (str): user message
        response: (str): LLM response
    """
    self.message_history.extend(
        [
            LLMMessage(role=Role.USER, content=message),
            LLMMessage(role=Role.ASSISTANT, content=response),
        ]
    )

tool_format_rules()

Specification of tool formatting rules (typically JSON-based but can be non-JSON, e.g. XMLToolMessage), based on the currently enabled usable ToolMessages

Returns:

Name Type Description
str str

formatting rules

Source code in langroid/agent/chat_agent.py
def tool_format_rules(self) -> str:
    """
    Specification of tool formatting rules
    (typically JSON-based but can be non-JSON, e.g. XMLToolMessage),
    based on the currently enabled usable `ToolMessage`s

    Returns:
        str: formatting rules
    """
    # ONLY Usable tools (i.e. LLM-generation allowed),
    usable_tool_classes: List[Type[ToolMessage]] = [
        t
        for t in list(self.llm_tools_map.values())
        if t.default_value("request") in self.llm_tools_usable
    ]

    if len(usable_tool_classes) == 0:
        return "You can ask questions in natural language."
    format_instructions = "\n\n".join(
        [
            msg_cls.format_instructions(tool=self.config.use_tools)
            for msg_cls in usable_tool_classes
        ]
    )
    # if any of the enabled classes has json_group_instructions, then use that,
    # else fall back to ToolMessage.json_group_instructions
    for msg_cls in usable_tool_classes:
        if hasattr(msg_cls, "json_group_instructions") and callable(
            getattr(msg_cls, "json_group_instructions")
        ):
            return msg_cls.group_format_instructions().format(
                format_instructions=format_instructions
            )
    return ToolMessage.group_format_instructions().format(
        format_instructions=format_instructions
    )

tool_instructions()

Instructions for tools or function-calls, for enabled and usable Tools. These are inserted into system prompt regardless of whether we are using our own ToolMessage mechanism or the LLM's function-call mechanism.

Returns:

Name Type Description
str str

concatenation of instructions for all usable tools

Source code in langroid/agent/chat_agent.py
def tool_instructions(self) -> str:
    """
    Instructions for tools or function-calls, for enabled and usable Tools.
    These are inserted into system prompt regardless of whether we are using
    our own ToolMessage mechanism or the LLM's function-call mechanism.

    Returns:
        str: concatenation of instructions for all usable tools
    """
    enabled_classes: List[Type[ToolMessage]] = list(self.llm_tools_map.values())
    if len(enabled_classes) == 0:
        return ""
    instructions = []
    for msg_cls in enabled_classes:
        if (
            hasattr(msg_cls, "instructions")
            and inspect.ismethod(msg_cls.instructions)
            and msg_cls.default_value("request") in self.llm_tools_usable
        ):
            # example will be shown in tool_format_rules() when using TOOLs,
            # so we don't need to show it here.
            example = "" if self.config.use_tools else (msg_cls.usage_examples())
            if example != "":
                example = "EXAMPLES:\n" + example
            class_instructions = msg_cls.instructions()
            guidance = (
                ""
                if class_instructions == ""
                else ("GUIDANCE: " + class_instructions)
            )
            if guidance == "" and example == "":
                continue
            instructions.append(
                textwrap.dedent(
                    f"""
                    TOOL: {msg_cls.default_value("request")}:
                    {guidance}
                    {example}
                    """.lstrip()
                )
            )
    if len(instructions) == 0:
        return ""
    instructions_str = "\n\n".join(instructions)
    return textwrap.dedent(
        f"""
        === GUIDELINES ON SOME TOOLS/FUNCTIONS USAGE ===
        {instructions_str}
        """.lstrip()
    )

augment_system_message(message)

Augment the system message with the given message. Args: message (str): system message

Source code in langroid/agent/chat_agent.py
def augment_system_message(self, message: str) -> None:
    """
    Augment the system message with the given message.
    Args:
        message (str): system message
    """
    self.system_message += "\n\n" + message

last_message_with_role(role)

from message_history, return the last message with role role

Source code in langroid/agent/chat_agent.py
def last_message_with_role(self, role: Role) -> LLMMessage | None:
    """from `message_history`, return the last message with role `role`"""
    n_role_msgs = len([m for m in self.message_history if m.role == role])
    if n_role_msgs == 0:
        return None
    idx = self.nth_message_idx_with_role(role, n_role_msgs)
    return self.message_history[idx]

nth_message_idx_with_role(role, n)

Index of nth message in message_history, with specified role. (n is assumed to be 1-based, i.e. 1 is the first message with that role). Return -1 if not found. Index = 0 is the first message in the history.

Source code in langroid/agent/chat_agent.py
def nth_message_idx_with_role(self, role: Role, n: int) -> int:
    """Index of `n`th message in message_history, with specified role.
    (n is assumed to be 1-based, i.e. 1 is the first message with that role).
    Return -1 if not found. Index = 0 is the first message in the history.
    """
    indices_with_role = [
        i for i, m in enumerate(self.message_history) if m.role == role
    ]

    if len(indices_with_role) < n:
        return -1
    return indices_with_role[n - 1]

update_last_message(message, role=Role.USER)

Update the last message that has role role in the message history. Useful when we want to replace a long user prompt, that may contain context documents plus a question, with just the question. Args: message (str): new message to replace with role (str): role of message to replace

Source code in langroid/agent/chat_agent.py
def update_last_message(self, message: str, role: str = Role.USER) -> None:
    """
    Update the last message that has role `role` in the message history.
    Useful when we want to replace a long user prompt, that may contain context
    documents plus a question, with just the question.
    Args:
        message (str): new message to replace with
        role (str): role of message to replace
    """
    if len(self.message_history) == 0:
        return
    # find last message in self.message_history with role `role`
    for i in range(len(self.message_history) - 1, -1, -1):
        if self.message_history[i].role == role:
            self.message_history[i].content = message
            break

unhandled_tools()

The set of tools that are known but not handled. Useful in task flow: an agent can refuse to accept an incoming msg when it only has unhandled tools.

Source code in langroid/agent/chat_agent.py
def unhandled_tools(self) -> set[str]:
    """The set of tools that are known but not handled.
    Useful in task flow: an agent can refuse to accept an incoming msg
    when it only has unhandled tools.
    """
    return self.llm_tools_known - self.llm_tools_handled

enable_message(message_class, use=True, handle=True, force=False, require_recipient=False, include_defaults=True)

Add the tool (message class) to the agent, and enable either - tool USE (i.e. the LLM can generate JSON to use this tool), - tool HANDLING (i.e. the agent can handle JSON from this tool),

Parameters:

Name Type Description Default
message_class Optional[Type[ToolMessage] | List[Type[ToolMessage]]]

The ToolMessage class OR List of such classes to enable, for USE, or HANDLING, or both. If this is a list of ToolMessage classes, then the remain args are applied to all classes. Optional; if None, then apply the enabling to all tools in the agent's toolset that have been enabled so far.

required
use bool

IF True, allow the agent (LLM) to use this tool (or all tools), else disallow

True
handle bool

if True, allow the agent (LLM) to handle (i.e. respond to) this tool (or all tools)

True
force bool

whether to FORCE the agent (LLM) to USE the specific tool represented by message_class. force is ignored if message_class is None.

False
require_recipient bool

whether to require that recipient be specified when using the tool message (only applies if use is True).

False
include_defaults bool

whether to include fields that have default values, in the "properties" section of the JSON format instructions. (Normally the OpenAI completion API ignores these fields, but the Assistant fn-calling seems to pay attn to these, and if we don't want this, we should set this to False.)

True
Source code in langroid/agent/chat_agent.py
def enable_message(
    self,
    message_class: Optional[Type[ToolMessage] | List[Type[ToolMessage]]],
    use: bool = True,
    handle: bool = True,
    force: bool = False,
    require_recipient: bool = False,
    include_defaults: bool = True,
) -> None:
    """
    Add the tool (message class) to the agent, and enable either
    - tool USE (i.e. the LLM can generate JSON to use this tool),
    - tool HANDLING (i.e. the agent can handle JSON from this tool),

    Args:
        message_class: The ToolMessage class OR List of such classes to enable,
            for USE, or HANDLING, or both.
            If this is a list of ToolMessage classes, then the remain args are
            applied to all classes.
            Optional; if None, then apply the enabling to all tools in the
            agent's toolset that have been enabled so far.
        use: IF True, allow the agent (LLM) to use this tool (or all tools),
            else disallow
        handle: if True, allow the agent (LLM) to handle (i.e. respond to) this
            tool (or all tools)
        force: whether to FORCE the agent (LLM) to USE the specific
             tool represented by `message_class`.
             `force` is ignored if `message_class` is None.
        require_recipient: whether to require that recipient be specified
            when using the tool message (only applies if `use` is True).
        include_defaults: whether to include fields that have default values,
            in the "properties" section of the JSON format instructions.
            (Normally the OpenAI completion API ignores these fields,
            but the Assistant fn-calling seems to pay attn to these,
            and if we don't want this, we should set this to False.)
    """
    if message_class is not None and isinstance(message_class, list):
        for mc in message_class:
            self.enable_message(
                mc,
                use=use,
                handle=handle,
                force=force,
                require_recipient=require_recipient,
                include_defaults=include_defaults,
            )
        return None
    if require_recipient and message_class is not None:
        message_class = message_class.require_recipient()
    if isinstance(message_class, XMLToolMessage):
        # XMLToolMessage is not compatible with OpenAI's Tools/functions API,
        # so we disable use of functions API, enable langroid-native Tools,
        # which are prompt-based.
        self.config.use_functions_api = False
        self.config.use_tools = True
    super().enable_message_handling(message_class)  # enables handling only
    tools = self._get_tool_list(message_class)
    if message_class is not None:
        request = message_class.default_value("request")
        if request == "":
            raise ValueError(
                f"""
                ToolMessage class {message_class} must have a non-empty 
                'request' field if it is to be enabled as a tool.
                """
            )
        llm_function = message_class.llm_function_schema(defaults=include_defaults)
        self.llm_functions_map[request] = llm_function
        if force:
            self.llm_function_force = dict(name=request)
        else:
            self.llm_function_force = None

    for t in tools:
        self.llm_tools_known.add(t)

        if handle:
            self.llm_tools_handled.add(t)
            self.llm_functions_handled.add(t)

            if (
                self.enabled_handling_output_format is not None
                and self.enabled_handling_output_format.name() == t
            ):
                # `t` was designated as "enabled for handling" ONLY for
                # output_format enforcement, but we are explicitly ]
                # enabling it for handling here, so we set the variable to None.
                self.enabled_handling_output_format = None
        else:
            self.llm_tools_handled.discard(t)
            self.llm_functions_handled.discard(t)

        if use:
            tool_class = self.llm_tools_map[t]
            if tool_class._allow_llm_use:
                self.llm_tools_usable.add(t)
                self.llm_functions_usable.add(t)
            else:
                logger.warning(
                    f"""
                    ToolMessage class {tool_class} does not allow LLM use,
                    because `_allow_llm_use=False` either in the Tool or a 
                    parent class of this tool;
                    so not enabling LLM use for this tool!
                    If you intended an LLM to use this tool, 
                    set `_allow_llm_use=True` when you define the tool.
                    """
                )
            if (
                self.enabled_use_output_format is not None
                and self.enabled_use_output_format.default_value("request") == t
            ):
                # `t` was designated as "enabled for use" ONLY for output_format
                # enforcement, but we are explicitly enabling it for use here,
                # so we set the variable to None.
                self.enabled_use_output_format = None
        else:
            self.llm_tools_usable.discard(t)
            self.llm_functions_usable.discard(t)

    # Set tool instructions and JSON format instructions
    if self.config.use_tools:
        self.system_tool_format_instructions = self.tool_format_rules()
    self.system_tool_instructions = self.tool_instructions()

set_output_format(output_type, force_tools=None, use=None, handle=None, instructions=None, is_copy=False)

Sets output_format to output_type and, if force_tools is enabled, switches to the native Langroid tools mechanism to ensure that no tool calls not of output_type are generated. By default, force_tools follows the use_tools_on_output_format parameter in the config.

If output_type is None, restores to the state prior to setting output_format.

If use, we enable use of output_type when it is a subclass of ToolMesage. Note that this primarily controls instruction generation: the model will always generate output_type regardless of whether use is set. Defaults to the use_output_format parameter in the config. Similarly, handling of output_type is controlled by handle, which defaults to the handle_output_format parameter in the config.

instructions controls whether we generate instructions specifying the output format schema. Defaults to the instructions_output_format parameter in the config.

is_copy is set when called via __getitem__. In that case, we must copy certain fields to ensure that we do not overwrite the main agent's setings.

Source code in langroid/agent/chat_agent.py
def set_output_format(
    self,
    output_type: Optional[type],
    force_tools: Optional[bool] = None,
    use: Optional[bool] = None,
    handle: Optional[bool] = None,
    instructions: Optional[bool] = None,
    is_copy: bool = False,
) -> None:
    """
    Sets `output_format` to `output_type` and, if `force_tools` is enabled,
    switches to the native Langroid tools mechanism to ensure that no tool
    calls not of `output_type` are generated. By default, `force_tools`
    follows the `use_tools_on_output_format` parameter in the config.

    If `output_type` is None, restores to the state prior to setting
    `output_format`.

    If `use`, we enable use of `output_type` when it is a subclass
    of `ToolMesage`. Note that this primarily controls instruction
    generation: the model will always generate `output_type` regardless
    of whether `use` is set. Defaults to the `use_output_format`
    parameter in the config. Similarly, handling of `output_type` is
    controlled by `handle`, which defaults to the
    `handle_output_format` parameter in the config.

    `instructions` controls whether we generate instructions specifying
    the output format schema. Defaults to the `instructions_output_format`
    parameter in the config.

    `is_copy` is set when called via `__getitem__`. In that case, we must
    copy certain fields to ensure that we do not overwrite the main agent's
    setings.
    """
    # Disable usage of an output format which was not specifically enabled
    # by `enable_message`
    if self.enabled_use_output_format is not None:
        self.disable_message_use(self.enabled_use_output_format)
        self.enabled_use_output_format = None

    # Disable handling of an output format which did not specifically have
    # handling enabled via `enable_message`
    if self.enabled_handling_output_format is not None:
        self.disable_message_handling(self.enabled_handling_output_format)
        self.enabled_handling_output_format = None

    # Reset any previous instructions
    self.output_format_instructions = ""

    if output_type is None:
        self.output_format = None
        (
            requests_for_inference,
            use_functions_api,
            use_tools,
        ) = self.saved_requests_and_tool_setings
        self.config = self.config.copy()
        self.enabled_requests_for_inference = requests_for_inference
        self.config.use_functions_api = use_functions_api
        self.config.use_tools = use_tools
    else:
        if force_tools is None:
            force_tools = self.config.use_tools_on_output_format

        if not any(
            (isclass(output_type) and issubclass(output_type, t))
            for t in [ToolMessage, BaseModel]
        ):
            output_type = get_pydantic_wrapper(output_type)

        if self.output_format is None and force_tools:
            self.saved_requests_and_tool_setings = (
                self._requests_and_tool_settings()
            )

        self.output_format = output_type
        if issubclass(output_type, ToolMessage):
            name = output_type.default_value("request")
            if use is None:
                use = self.config.use_output_format

            if handle is None:
                handle = self.config.handle_output_format

            if use or handle:
                is_usable = name in self.llm_tools_usable.union(
                    self.llm_functions_usable
                )
                is_handled = name in self.llm_tools_handled.union(
                    self.llm_functions_handled
                )

                if is_copy:
                    if use:
                        # We must copy `llm_tools_usable` so the base agent
                        # is unmodified
                        self.llm_tools_usable = copy.copy(self.llm_tools_usable)
                        self.llm_functions_usable = copy.copy(
                            self.llm_functions_usable
                        )
                    if handle:
                        # If handling the tool, do the same for `llm_tools_handled`
                        self.llm_tools_handled = copy.copy(self.llm_tools_handled)
                        self.llm_functions_handled = copy.copy(
                            self.llm_functions_handled
                        )
                # Enable `output_type`
                self.enable_message(
                    output_type,
                    # Do not override existing settings
                    use=use or is_usable,
                    handle=handle or is_handled,
                )

                # If the `output_type` ToilMessage was not already enabled for
                # use, this means we are ONLY enabling it for use specifically
                # for enforcing this output format, so we set the
                # `enabled_use_output_forma  to this output_type, to
                # record that it should be disabled when `output_format` is changed
                if not is_usable:
                    self.enabled_use_output_format = output_type

                # (same reasoning as for use-enabling)
                if not is_handled:
                    self.enabled_handling_output_format = output_type

            generated_tool_instructions = name in self.llm_tools_usable.union(
                self.llm_functions_usable
            )
        else:
            generated_tool_instructions = False

        if instructions is None:
            instructions = self.config.instructions_output_format
        if issubclass(output_type, BaseModel) and instructions:
            if generated_tool_instructions:
                # Already generated tool instructions as part of "enabling for use",
                # so only need to generate a reminder to use this tool.
                name = cast(ToolMessage, output_type).default_value("request")
                self.output_format_instructions = textwrap.dedent(
                    f"""
                    === OUTPUT FORMAT INSTRUCTIONS ===

                    Please provide output using the `{name}` tool/function.
                    """
                )
            else:
                if issubclass(output_type, ToolMessage):
                    output_format_schema = output_type.llm_function_schema(
                        request=True,
                        defaults=self.config.output_format_include_defaults,
                    ).parameters
                else:
                    output_format_schema = output_type.schema()

                format_schema_for_strict(output_format_schema)

                self.output_format_instructions = textwrap.dedent(
                    f"""
                    === OUTPUT FORMAT INSTRUCTIONS ===
                    Please provide output as JSON with the following schema:

                    {output_format_schema}
                    """
                )

        if force_tools:
            if issubclass(output_type, ToolMessage):
                self.enabled_requests_for_inference = {
                    output_type.default_value("request")
                }
            if self.config.use_functions_api:
                self.config = self.config.copy()
                self.config.use_functions_api = False
                self.config.use_tools = True

disable_message_handling(message_class=None)

Disable this agent from RESPONDING to a message_class (Tool). If message_class is None, then disable this agent from responding to ALL. Args: message_class: The ToolMessage class to disable; Optional.

Source code in langroid/agent/chat_agent.py
def disable_message_handling(
    self,
    message_class: Optional[Type[ToolMessage]] = None,
) -> None:
    """
    Disable this agent from RESPONDING to a `message_class` (Tool). If
        `message_class` is None, then disable this agent from responding to ALL.
    Args:
        message_class: The ToolMessage class to disable; Optional.
    """
    super().disable_message_handling(message_class)
    for t in self._get_tool_list(message_class):
        self.llm_tools_handled.discard(t)
        self.llm_functions_handled.discard(t)

disable_message_use(message_class)

Disable this agent from USING a message class (Tool). If message_class is None, then disable this agent from USING ALL tools. Args: message_class: The ToolMessage class to disable. If None, disable all.

Source code in langroid/agent/chat_agent.py
def disable_message_use(
    self,
    message_class: Optional[Type[ToolMessage]],
) -> None:
    """
    Disable this agent from USING a message class (Tool).
    If `message_class` is None, then disable this agent from USING ALL tools.
    Args:
        message_class: The ToolMessage class to disable.
            If None, disable all.
    """
    for t in self._get_tool_list(message_class):
        self.llm_tools_usable.discard(t)
        self.llm_functions_usable.discard(t)

disable_message_use_except(message_class)

Disable this agent from USING ALL messages EXCEPT a message class (Tool) Args: message_class: The only ToolMessage class to allow

Source code in langroid/agent/chat_agent.py
def disable_message_use_except(self, message_class: Type[ToolMessage]) -> None:
    """
    Disable this agent from USING ALL messages EXCEPT a message class (Tool)
    Args:
        message_class: The only ToolMessage class to allow
    """
    request = message_class.__fields__["request"].default
    to_remove = [r for r in self.llm_tools_usable if r != request]
    for r in to_remove:
        self.llm_tools_usable.discard(r)
        self.llm_functions_usable.discard(r)

get_tool_messages(msg, all_tools=False)

Extracts messages and tracks whether any errors occured. If strict mode was enabled, disables it for the tool, else triggers strict recovery.

Source code in langroid/agent/chat_agent.py
def get_tool_messages(
    self,
    msg: str | ChatDocument | None,
    all_tools: bool = False,
) -> List[ToolMessage]:
    """
    Extracts messages and tracks whether any errors occured. If strict mode
    was enabled, disables it for the tool, else triggers strict recovery.
    """
    self.tool_error = False
    try:
        tools = super().get_tool_messages(msg, all_tools)
    except ValidationError as ve:
        tool_class = ve.model
        if issubclass(tool_class, ToolMessage):
            was_strict = (
                self.config.use_functions_api
                and self.config.use_tools_api
                and self._strict_mode_for_tool(tool_class)
            )
            # If the result of strict output for a tool using the
            # OpenAI tools API fails to parse, we infer that the
            # schema edits necessary for compatibility prevented
            # adherence to the underlying `ToolMessage` schema and
            # disable strict output for the tool
            if was_strict:
                name = tool_class.default_value("request")
                self.disable_strict_tools_set.add(name)
                logging.warning(
                    f"""
                    Validation error occured with strict tool format.
                    Disabling strict mode for the {name} tool.
                    """
                )
            else:
                # We will trigger the strict recovery mechanism to force
                # the LLM to correct its output, allowing us to parse
                self.tool_error = True

        raise ve

    return tools

truncate_message(idx, tokens=5, warning='...[Contents truncated!]')

Truncate message at idx in msg history to tokens tokens

Source code in langroid/agent/chat_agent.py
def truncate_message(
    self,
    idx: int,
    tokens: int = 5,
    warning: str = "...[Contents truncated!]",
) -> LLMMessage:
    """Truncate message at idx in msg history to `tokens` tokens"""
    llm_msg = self.message_history[idx]
    orig_content = llm_msg.content
    new_content = (
        self.parser.truncate_tokens(orig_content, tokens)
        if self.parser is not None
        else orig_content[: tokens * 4]  # approx truncation
    )
    llm_msg.content = new_content + "\n" + warning
    return llm_msg

llm_response(message=None)

Respond to a single user message, appended to the message history, in "chat" mode Args: message (str|ChatDocument): message or ChatDocument object to respond to. If None, use the self.task_messages Returns: LLM response as a ChatDocument object

Source code in langroid/agent/chat_agent.py
def llm_response(
    self, message: Optional[str | ChatDocument] = None
) -> Optional[ChatDocument]:
    """
    Respond to a single user message, appended to the message history,
    in "chat" mode
    Args:
        message (str|ChatDocument): message or ChatDocument object to respond to.
            If None, use the self.task_messages
    Returns:
        LLM response as a ChatDocument object
    """
    if self.llm is None:
        return None

    # If enabled and a tool error occurred, we recover by generating the tool in
    # strict json mode
    if (
        self.tool_error
        and self.output_format is None
        and self._json_schema_available()
        and self.config.strict_recovery
    ):
        AnyTool = self._get_any_tool_message()
        self.set_output_format(
            AnyTool,
            force_tools=True,
            use=True,
            handle=True,
            instructions=True,
        )
        recovery_message = self._strict_recovery_instructions(AnyTool)

        if message is None:
            message = recovery_message
        elif isinstance(message, str):
            message = message + recovery_message
        else:
            message.content = message.content + recovery_message

        return self.llm_response(message)

    hist, output_len = self._prep_llm_messages(message)
    if len(hist) == 0:
        return None
    tool_choice = (
        "auto"
        if isinstance(message, str)
        else (message.oai_tool_choice if message is not None else "auto")
    )
    with StreamingIfAllowed(self.llm, self.llm.get_stream()):
        try:
            response = self.llm_response_messages(hist, output_len, tool_choice)
        except openai.BadRequestError as e:
            if self.any_strict:
                self.disable_strict = True
                self.set_output_format(None)
                logging.warning(
                    f"""
                    OpenAI BadRequestError raised with strict mode enabled.
                    Message: {e.message}
                    Disabling strict mode and retrying.
                    """
                )
                return self.llm_response(message)
            else:
                raise e
    self.message_history.extend(ChatDocument.to_LLMMessage(response))
    response.metadata.msg_idx = len(self.message_history) - 1
    response.metadata.agent_id = self.id
    if isinstance(message, ChatDocument):
        self._reduce_raw_tool_results(message)
    # Preserve trail of tool_ids for OpenAI Assistant fn-calls
    response.metadata.tool_ids = (
        []
        if isinstance(message, str)
        else message.metadata.tool_ids if message is not None else []
    )

    return response

llm_response_async(message=None) async

Async version of llm_response. See there for details.

Source code in langroid/agent/chat_agent.py
async def llm_response_async(
    self, message: Optional[str | ChatDocument] = None
) -> Optional[ChatDocument]:
    """
    Async version of `llm_response`. See there for details.
    """
    if self.llm is None:
        return None

    # If enabled and a tool error occurred, we recover by generating the tool in
    # strict json mode
    if (
        self.tool_error
        and self.output_format is None
        and self._json_schema_available()
        and self.config.strict_recovery
    ):
        AnyTool = self._get_any_tool_message()
        self.set_output_format(
            AnyTool,
            force_tools=True,
            use=True,
            handle=True,
            instructions=True,
        )
        recovery_message = self._strict_recovery_instructions(AnyTool)

        if message is None:
            message = recovery_message
        elif isinstance(message, str):
            message = message + recovery_message
        else:
            message.content = message.content + recovery_message

        return self.llm_response(message)

    hist, output_len = self._prep_llm_messages(message)
    if len(hist) == 0:
        return None
    tool_choice = (
        "auto"
        if isinstance(message, str)
        else (message.oai_tool_choice if message is not None else "auto")
    )
    with StreamingIfAllowed(self.llm, self.llm.get_stream()):
        try:
            response = await self.llm_response_messages_async(
                hist, output_len, tool_choice
            )
        except openai.BadRequestError as e:
            if self.any_strict:
                self.disable_strict = True
                self.set_output_format(None)
                logging.warning(
                    f"""
                    OpenAI BadRequestError raised with strict mode enabled.
                    Message: {e.message}
                    Disabling strict mode and retrying.
                    """
                )
                return await self.llm_response_async(message)
            else:
                raise e
    self.message_history.extend(ChatDocument.to_LLMMessage(response))
    response.metadata.msg_idx = len(self.message_history) - 1
    response.metadata.agent_id = self.id
    if isinstance(message, ChatDocument):
        self._reduce_raw_tool_results(message)
    # Preserve trail of tool_ids for OpenAI Assistant fn-calls
    response.metadata.tool_ids = (
        []
        if isinstance(message, str)
        else message.metadata.tool_ids if message is not None else []
    )

    return response

init_message_history()

Initialize the message history with the system message and user message

Source code in langroid/agent/chat_agent.py
def init_message_history(self) -> None:
    """
    Initialize the message history with the system message and user message
    """
    self.message_history = [self._create_system_and_tools_message()]
    if self.user_message:
        self.message_history.append(
            LLMMessage(role=Role.USER, content=self.user_message)
        )

llm_response_messages(messages, output_len=None, tool_choice='auto')

Respond to a series of messages, e.g. with OpenAI ChatCompletion Args: messages: seq of messages (with role, content fields) sent to LLM output_len: max number of tokens expected in response. If None, use the LLM's default max_output_tokens. Returns: Document (i.e. with fields "content", "metadata")

Source code in langroid/agent/chat_agent.py
def llm_response_messages(
    self,
    messages: List[LLMMessage],
    output_len: Optional[int] = None,
    tool_choice: ToolChoiceTypes | Dict[str, str | Dict[str, str]] = "auto",
) -> ChatDocument:
    """
    Respond to a series of messages, e.g. with OpenAI ChatCompletion
    Args:
        messages: seq of messages (with role, content fields) sent to LLM
        output_len: max number of tokens expected in response.
                If None, use the LLM's default max_output_tokens.
    Returns:
        Document (i.e. with fields "content", "metadata")
    """
    assert self.config.llm is not None and self.llm is not None
    output_len = output_len or self.config.llm.max_output_tokens
    streamer = noop_fn
    if self.llm.get_stream():
        streamer = self.callbacks.start_llm_stream()
    self.llm.config.streamer = streamer
    with ExitStack() as stack:  # for conditionally using rich spinner
        if not self.llm.get_stream() and not settings.quiet:
            # show rich spinner only if not streaming!
            # (Why? b/c the intent of showing a spinner is to "show progress",
            # and we don't need to do that when streaming, since
            # streaming output already shows progress.)
            cm = status(
                "LLM responding to messages...",
                log_if_quiet=False,
            )
            stack.enter_context(cm)
        if self.llm.get_stream() and not settings.quiet:
            console.print(f"[green]{self.indent}", end="")
        functions, fun_call, tools, force_tool, output_format = (
            self._function_args()
        )
        assert self.llm is not None
        response = self.llm.chat(
            messages,
            output_len,
            tools=tools,
            tool_choice=force_tool or tool_choice,
            functions=functions,
            function_call=fun_call,
            response_format=output_format,
        )
    if self.llm.get_stream():
        self.callbacks.finish_llm_stream(
            content=str(response),
            is_tool=self.has_tool_message_attempt(
                ChatDocument.from_LLMResponse(response, displayed=True),
            ),
        )
    self.llm.config.streamer = noop_fn
    if response.cached:
        self.callbacks.cancel_llm_stream()
    self._render_llm_response(response)
    self.update_token_usage(
        response,  # .usage attrib is updated!
        messages,
        self.llm.get_stream(),
        chat=True,
        print_response_stats=self.config.show_stats and not settings.quiet,
    )
    chat_doc = ChatDocument.from_LLMResponse(response, displayed=True)
    self.oai_tool_calls = response.oai_tool_calls or []
    self.oai_tool_id2call.update(
        {t.id: t for t in self.oai_tool_calls if t.id is not None}
    )

    # If using strict output format, parse the output JSON
    self._load_output_format(chat_doc)

    return chat_doc

llm_response_messages_async(messages, output_len=None, tool_choice='auto') async

Async version of llm_response_messages. See there for details.

Source code in langroid/agent/chat_agent.py
async def llm_response_messages_async(
    self,
    messages: List[LLMMessage],
    output_len: Optional[int] = None,
    tool_choice: ToolChoiceTypes | Dict[str, str | Dict[str, str]] = "auto",
) -> ChatDocument:
    """
    Async version of `llm_response_messages`. See there for details.
    """
    assert self.config.llm is not None and self.llm is not None
    output_len = output_len or self.config.llm.max_output_tokens
    functions, fun_call, tools, force_tool, output_format = self._function_args()
    assert self.llm is not None

    streamer_async = async_noop_fn
    if self.llm.get_stream():
        streamer_async = await self.callbacks.start_llm_stream_async()
    self.llm.config.streamer_async = streamer_async

    response = await self.llm.achat(
        messages,
        output_len,
        tools=tools,
        tool_choice=force_tool or tool_choice,
        functions=functions,
        function_call=fun_call,
        response_format=output_format,
    )
    if self.llm.get_stream():
        self.callbacks.finish_llm_stream(
            content=str(response),
            is_tool=self.has_tool_message_attempt(
                ChatDocument.from_LLMResponse(response, displayed=True),
            ),
        )
    self.llm.config.streamer_async = async_noop_fn
    if response.cached:
        self.callbacks.cancel_llm_stream()
    self._render_llm_response(response)
    self.update_token_usage(
        response,  # .usage attrib is updated!
        messages,
        self.llm.get_stream(),
        chat=True,
        print_response_stats=self.config.show_stats and not settings.quiet,
    )
    chat_doc = ChatDocument.from_LLMResponse(response, displayed=True)
    self.oai_tool_calls = response.oai_tool_calls or []
    self.oai_tool_id2call.update(
        {t.id: t for t in self.oai_tool_calls if t.id is not None}
    )

    # If using strict output format, parse the output JSON
    self._load_output_format(chat_doc)

    return chat_doc

llm_response_forget(message)

LLM Response to single message, and restore message_history. In effect a "one-off" message & response that leaves agent message history state intact.

Parameters:

Name Type Description Default
message str

user message

required

Returns:

Type Description
ChatDocument

A Document object with the response.

Source code in langroid/agent/chat_agent.py
def llm_response_forget(self, message: str) -> ChatDocument:
    """
    LLM Response to single message, and restore message_history.
    In effect a "one-off" message & response that leaves agent
    message history state intact.

    Args:
        message (str): user message

    Returns:
        A Document object with the response.

    """
    # explicitly call THIS class's respond method,
    # not a derived class's (or else there would be infinite recursion!)
    n_msgs = len(self.message_history)
    with StreamingIfAllowed(self.llm, self.llm.get_stream()):  # type: ignore
        response = cast(ChatDocument, ChatAgent.llm_response(self, message))
    # If there is a response, then we will have two additional
    # messages in the message history, i.e. the user message and the
    # assistant response. We want to (carefully) remove these two messages.
    if len(self.message_history) > n_msgs:
        msg = self.message_history.pop()
        self._drop_msg_update_tool_calls(msg)

    if len(self.message_history) > n_msgs:
        msg = self.message_history.pop()
        self._drop_msg_update_tool_calls(msg)

    # If using strict output format, parse the output JSON
    self._load_output_format(response)

    return response

llm_response_forget_async(message) async

Async version of llm_response_forget. See there for details.

Source code in langroid/agent/chat_agent.py
async def llm_response_forget_async(self, message: str) -> ChatDocument:
    """
    Async version of `llm_response_forget`. See there for details.
    """
    # explicitly call THIS class's respond method,
    # not a derived class's (or else there would be infinite recursion!)
    n_msgs = len(self.message_history)
    with StreamingIfAllowed(self.llm, self.llm.get_stream()):  # type: ignore
        response = cast(
            ChatDocument, await ChatAgent.llm_response_async(self, message)
        )
    # If there is a response, then we will have two additional
    # messages in the message history, i.e. the user message and the
    # assistant response. We want to (carefully) remove these two messages.
    if len(self.message_history) > n_msgs:
        msg = self.message_history.pop()
        self._drop_msg_update_tool_calls(msg)

    if len(self.message_history) > n_msgs:
        msg = self.message_history.pop()
        self._drop_msg_update_tool_calls(msg)
    return response

chat_num_tokens(messages=None)

Total number of tokens in the message history so far.

Parameters:

Name Type Description Default
messages Optional[List[LLMMessage]]

if provided, compute the number of tokens in this list of messages, rather than the current message history.

None

Returns: int: number of tokens in message history

Source code in langroid/agent/chat_agent.py
def chat_num_tokens(self, messages: Optional[List[LLMMessage]] = None) -> int:
    """
    Total number of tokens in the message history so far.

    Args:
        messages: if provided, compute the number of tokens in this list of
            messages, rather than the current message history.
    Returns:
        int: number of tokens in message history
    """
    if self.parser is None:
        raise ValueError(
            "ChatAgent.parser is None. "
            "You must set ChatAgent.parser "
            "before calling chat_num_tokens()."
        )
    hist = messages if messages is not None else self.message_history
    return sum([self.parser.num_tokens(m.content) for m in hist])

message_history_str(i=None)

Return a string representation of the message history Args: i: if provided, return only the i-th message when i is postive, or last k messages when i = -k. Returns:

Source code in langroid/agent/chat_agent.py
def message_history_str(self, i: Optional[int] = None) -> str:
    """
    Return a string representation of the message history
    Args:
        i: if provided, return only the i-th message when i is postive,
            or last k messages when i = -k.
    Returns:
    """
    if i is None:
        return "\n".join([str(m) for m in self.message_history])
    elif i > 0:
        return str(self.message_history[i])
    else:
        return "\n".join([str(m) for m in self.message_history[i:]])

ToolMessage

Bases: ABC, BaseModel

Abstract Class for a class that defines the structure of a "Tool" message from an LLM. Depending on context, "tools" are also referred to as "plugins", or "function calls" (in the context of OpenAI LLMs). Essentially, they are a way for the LLM to express its intent to run a special function or method. Currently these "tools" are handled by methods of the agent.

Attributes:

Name Type Description
request str

name of agent method to map to.

purpose str

purpose of agent method, expressed in general terms. (This is used when auto-generating the tool instruction to the LLM)

examples() classmethod

Examples to use in few-shot demos with formatting instructions. Each example can be either: - just a ToolMessage instance, e.g. MyTool(param1=1, param2="hello"), or - a tuple (description, ToolMessage instance), where the description is a natural language "thought" that leads to the tool usage, e.g. ("I want to find the square of 5", SquareTool(num=5)) In some scenarios, including such a description can significantly enhance reliability of tool use. Returns:

Source code in langroid/agent/tool_message.py
@classmethod
def examples(cls) -> List["ToolMessage" | Tuple[str, "ToolMessage"]]:
    """
    Examples to use in few-shot demos with formatting instructions.
    Each example can be either:
    - just a ToolMessage instance, e.g. MyTool(param1=1, param2="hello"), or
    - a tuple (description, ToolMessage instance), where the description is
        a natural language "thought" that leads to the tool usage,
        e.g. ("I want to find the square of 5",  SquareTool(num=5))
        In some scenarios, including such a description can significantly
        enhance reliability of tool use.
    Returns:
    """
    return []

usage_examples(random=False) classmethod

Instruction to the LLM showing examples of how to use the tool-message.

Parameters:

Name Type Description Default
random bool

whether to pick a random example from the list of examples. Set to true when using this to illustrate a dialog between LLM and user. (if false, use ALL examples)

False

Returns: str: examples of how to use the tool/function-call

Source code in langroid/agent/tool_message.py
@classmethod
def usage_examples(cls, random: bool = False) -> str:
    """
    Instruction to the LLM showing examples of how to use the tool-message.

    Args:
        random (bool): whether to pick a random example from the list of examples.
            Set to `true` when using this to illustrate a dialog between LLM and
            user.
            (if false, use ALL examples)
    Returns:
        str: examples of how to use the tool/function-call
    """
    # pick a random example of the fields
    if len(cls.examples()) == 0:
        return ""
    if random:
        examples = [choice(cls.examples())]
    else:
        examples = cls.examples()
    formatted_examples = [
        (
            f"EXAMPLE {i}: (THOUGHT: {ex[0]}) => \n{ex[1].format_example()}"
            if isinstance(ex, tuple)
            else f"EXAMPLE {i}:\n {ex.format_example()}"
        )
        for i, ex in enumerate(examples, 1)
    ]
    return "\n\n".join(formatted_examples)

get_value_of_type(target_type)

Try to find a value of a desired type in the fields of the ToolMessage.

Source code in langroid/agent/tool_message.py
def get_value_of_type(self, target_type: Type[Any]) -> Any:
    """Try to find a value of a desired type in the fields of the ToolMessage."""
    ignore_fields = self.Config.schema_extra["exclude"].union(["request"])
    for field_name in set(self.dict().keys()) - ignore_fields:
        value = getattr(self, field_name)
        if is_instance_of(value, target_type):
            return value
    return None

default_value(f) classmethod

Returns the default value of the given field, for the message-class Args: f (str): field name

Returns:

Name Type Description
Any Any

default value of the field, or None if not set or if the field does not exist.

Source code in langroid/agent/tool_message.py
@classmethod
def default_value(cls, f: str) -> Any:
    """
    Returns the default value of the given field, for the message-class
    Args:
        f (str): field name

    Returns:
        Any: default value of the field, or None if not set or if the
            field does not exist.
    """
    schema = cls.schema()
    properties = schema["properties"]
    return properties.get(f, {}).get("default", None)

format_instructions(tool=False) classmethod

Default Instructions to the LLM showing how to use the tool/function-call. Works for GPT4 but override this for weaker LLMs if needed.

Parameters:

Name Type Description Default
tool bool

instructions for Langroid-native tool use? (e.g. for non-OpenAI LLM) (or else it would be for OpenAI Function calls). Ignored in the default implementation, but can be used in subclasses.

False

Returns: str: instructions on how to use the message

Source code in langroid/agent/tool_message.py
@classmethod
def format_instructions(cls, tool: bool = False) -> str:
    """
    Default Instructions to the LLM showing how to use the tool/function-call.
    Works for GPT4 but override this for weaker LLMs if needed.

    Args:
        tool: instructions for Langroid-native tool use? (e.g. for non-OpenAI LLM)
            (or else it would be for OpenAI Function calls).
            Ignored in the default implementation, but can be used in subclasses.
    Returns:
        str: instructions on how to use the message
    """
    # TODO: when we attempt to use a "simpler schema"
    # (i.e. all nested fields explicit without definitions),
    # we seem to get worse results, so we turn it off for now
    param_dict = (
        # cls.simple_schema() if tool else
        cls.llm_function_schema(request=True).parameters
    )
    examples_str = ""
    if cls.examples():
        examples_str = "EXAMPLES:\n" + cls.usage_examples()
    return textwrap.dedent(
        f"""
        TOOL: {cls.default_value("request")}
        PURPOSE: {cls.default_value("purpose")} 
        JSON FORMAT: {
            json.dumps(param_dict, indent=4)
        }
        {examples_str}
        """.lstrip()
    )

group_format_instructions() staticmethod

Template for instructions for a group of tools. Works with GPT4 but override this for weaker LLMs if needed.

Source code in langroid/agent/tool_message.py
@staticmethod
def group_format_instructions() -> str:
    """Template for instructions for a group of tools.
    Works with GPT4 but override this for weaker LLMs if needed.
    """
    return textwrap.dedent(
        """
        === ALL AVAILABLE TOOLS and THEIR FORMAT INSTRUCTIONS ===
        You have access to the following TOOLS to accomplish your task:

        {format_instructions}

        When one of the above TOOLs is applicable, you must express your 
        request as "TOOL:" followed by the request in the above format.
        """
    )

llm_function_schema(request=False, defaults=True) classmethod

Clean up the schema of the Pydantic class (which can recursively contain other Pydantic classes), to create a version compatible with OpenAI Function-call API.

Adapted from this excellent library: https://github.com/jxnl/instructor/blob/main/instructor/function_calls.py

Parameters:

Name Type Description Default
request bool

whether to include the "request" field in the schema. (we set this to True when using Langroid-native TOOLs as opposed to OpenAI Function calls)

False
defaults bool

whether to include fields with default values in the schema, in the "properties" section.

True

Returns:

Name Type Description
LLMFunctionSpec LLMFunctionSpec

the schema as an LLMFunctionSpec

Source code in langroid/agent/tool_message.py
@classmethod
def llm_function_schema(
    cls,
    request: bool = False,
    defaults: bool = True,
) -> LLMFunctionSpec:
    """
    Clean up the schema of the Pydantic class (which can recursively contain
    other Pydantic classes), to create a version compatible with OpenAI
    Function-call API.

    Adapted from this excellent library:
    https://github.com/jxnl/instructor/blob/main/instructor/function_calls.py

    Args:
        request: whether to include the "request" field in the schema.
            (we set this to True when using Langroid-native TOOLs as opposed to
            OpenAI Function calls)
        defaults: whether to include fields with default values in the schema,
                in the "properties" section.

    Returns:
        LLMFunctionSpec: the schema as an LLMFunctionSpec

    """
    schema = copy.deepcopy(cls.schema())
    docstring = parse(cls.__doc__ or "")
    parameters = {
        k: v for k, v in schema.items() if k not in ("title", "description")
    }
    for param in docstring.params:
        if (name := param.arg_name) in parameters["properties"] and (
            description := param.description
        ):
            if "description" not in parameters["properties"][name]:
                parameters["properties"][name]["description"] = description

    excludes = cls.Config.schema_extra["exclude"]
    if not request:
        excludes = excludes.union({"request"})
    # exclude 'excludes' from parameters["properties"]:
    parameters["properties"] = {
        field: details
        for field, details in parameters["properties"].items()
        if field not in excludes and (defaults or details.get("default") is None)
    }
    parameters["required"] = sorted(
        k
        for k, v in parameters["properties"].items()
        if ("default" not in v and k not in excludes)
    )
    if request:
        parameters["required"].append("request")

        # If request is present it must match the default value
        # Similar to defining request as a literal type
        parameters["request"] = {
            "enum": [cls.default_value("request")],
            "type": "string",
        }

    if "description" not in schema:
        if docstring.short_description:
            schema["description"] = docstring.short_description
        else:
            schema["description"] = (
                f"Correctly extracted `{cls.__name__}` with all "
                f"the required parameters with correct types"
            )

    # Handle nested ToolMessage fields
    if "definitions" in parameters:
        for v in parameters["definitions"].values():
            if "exclude" in v:
                v.pop("exclude")

                remove_if_exists("purpose", v["properties"])
                remove_if_exists("id", v["properties"])
                if (
                    "request" in v["properties"]
                    and "default" in v["properties"]["request"]
                ):
                    if "required" not in v:
                        v["required"] = []
                    v["required"].append("request")
                    v["properties"]["request"] = {
                        "type": "string",
                        "enum": [v["properties"]["request"]["default"]],
                    }

    parameters.pop("exclude")
    _recursive_purge_dict_key(parameters, "title")
    _recursive_purge_dict_key(parameters, "additionalProperties")
    return LLMFunctionSpec(
        name=cls.default_value("request"),
        description=cls.default_value("purpose"),
        parameters=parameters,
    )

simple_schema() classmethod

Return a simplified schema for the message, with only the request and required fields. Returns: Dict[str, Any]: simplified schema

Source code in langroid/agent/tool_message.py
@classmethod
def simple_schema(cls) -> Dict[str, Any]:
    """
    Return a simplified schema for the message, with only the request and
    required fields.
    Returns:
        Dict[str, Any]: simplified schema
    """
    schema = generate_simple_schema(
        cls,
        exclude=list(cls.Config.schema_extra["exclude"]),
    )
    return schema

Task(agent=None, name='', llm_delegate=False, single_round=False, system_message='', user_message='', restart=True, default_human_response=None, interactive=True, only_user_quits_root=True, erase_substeps=False, allow_null_result=False, max_stalled_steps=5, default_return_type=None, done_if_no_response=[], done_if_response=[], config=TaskConfig(), **kwargs)

A Task wraps an Agent object, and sets up the Agent's goals and instructions. A Task maintains two key variables:

  • self.pending_message, which is the message awaiting a response, and
  • self.pending_sender, which is the entity that sent the pending message.

The possible responders to self.pending_message are the Agent's own "native" responders (agent_response, llm_response, and user_response), and the run() methods of any sub-tasks. All responders have the same type-signature (somewhat simplified):

str | ChatDocument -> ChatDocument
Responders may or may not specify an intended recipient of their generated response.

The main top-level method in the Task class is run(), which repeatedly calls step() until done() returns true. The step() represents a "turn" in the conversation: this method sequentially (in round-robin fashion) calls the responders until it finds one that generates a valid response to the pending_message (as determined by the valid() method). Once a valid response is found, step() updates the pending_message and pending_sender variables, and on the next iteration, step() re-starts its search for a valid response from the beginning of the list of responders (the exception being that the human user always gets a chance to respond after each non-human valid response). This process repeats until done() returns true, at which point run() returns the value of result(), which is the final result of the task.

Parameters:

Name Type Description Default
agent Agent

agent associated with the task

None
name str

name of the task

''
llm_delegate bool

Whether to delegate "control" to LLM; conceptually, the "controlling entity" is the one "seeking" responses to its queries, and has a goal it is aiming to achieve, and decides when a task is done. The "controlling entity" is either the LLM or the USER. (Note within a Task there is just one LLM, and all other entities are proxies of the "User" entity). See also: done_if_response, done_if_no_response for more granular control of task termination.

False
single_round bool

If true, task runs until one message by "controller" (i.e. LLM if llm_delegate is true, otherwise USER) and subsequent response by non-controller [When a tool is involved, this will not give intended results. See done_if_response, done_if_no_response below]. termination]. If false, runs for the specified number of turns in run, or until done() is true. One run of step() is considered a "turn". See also: done_if_response, done_if_no_response for more granular control of task termination.

False
system_message str

if not empty, overrides agent's system_message

''
user_message str

if not empty, overrides agent's user_message

''
restart bool

if true, resets the agent's message history at every run.

True
default_human_response str | None

default response from user; useful for testing, to avoid interactive input from user. [Instead of this, setting interactive usually suffices]

None
default_return_type Optional[type]

if not None, extracts a value of this type from the result of self.run()

None
interactive bool

if true, wait for human input after each non-human response (prevents infinite loop of non-human responses). Default is true. If false, then default_human_response is set to "" Note: When interactive = False, the one exception is when the user is explicitly addressed, via "@user" or using RecipientTool, in which case the system will wait for a user response. In other words, use interactive=False when you want a "largely non-interactive" run, with the exception of explicit user addressing.

True
only_user_quits_root bool

if true, when interactive=True, only user can quit the root task (Ignored when interactive=False).

True
erase_substeps bool

if true, when task completes, erase intermediate conversation with subtasks from this agent's message_history, and also erase all subtask agents' message_history. Note: erasing can reduce prompt sizes, but results in repetitive sub-task delegation.

False
allow_null_result bool

If true, create dummy NO_ANSWER response when no valid response is found in a step. Optional, default is False. Note: In non-interactive mode, when this is set to True, you can have a situation where an LLM generates (non-tool) text, and no other responders have valid responses, and a "Null result" is inserted as a dummy response from the User entity, so the LLM will now respond to this Null result, and this will continue until the LLM emits a DONE signal (if instructed to do so), otherwise langroid detects a potential infinite loop after a certain number of such steps (= TaskConfig.inf_loop_wait_factor) and will raise an InfiniteLoopException.

False
max_stalled_steps int

task considered done after this many consecutive steps with no progress. Default is 3.

5
done_if_no_response List[Responder]

consider task done if NULL response from any of these responders. Default is empty list.

[]
done_if_response List[Responder]

consider task done if NON-NULL response from any of these responders. Default is empty list.

[]
Source code in langroid/agent/task.py
def __init__(
    self,
    agent: Optional[Agent] = None,
    name: str = "",
    llm_delegate: bool = False,
    single_round: bool = False,
    system_message: str = "",
    user_message: str | None = "",
    restart: bool = True,
    default_human_response: Optional[str] = None,
    interactive: bool = True,
    only_user_quits_root: bool = True,
    erase_substeps: bool = False,
    allow_null_result: bool = False,
    max_stalled_steps: int = 5,
    default_return_type: Optional[type] = None,
    done_if_no_response: List[Responder] = [],
    done_if_response: List[Responder] = [],
    config: TaskConfig = TaskConfig(),
    **kwargs: Any,  # catch-all for any legacy params, for backwards compatibility
):
    """
    A task to be performed by an agent.

    Args:
        agent (Agent): agent associated with the task
        name (str): name of the task
        llm_delegate (bool):
            Whether to delegate "control" to LLM; conceptually,
            the "controlling entity" is the one "seeking" responses to its queries,
            and has a goal it is aiming to achieve, and decides when a task is done.
            The "controlling entity" is either the LLM or the USER.
            (Note within a Task there is just one
            LLM, and all other entities are proxies of the "User" entity).
            See also: `done_if_response`, `done_if_no_response` for more granular
            control of task termination.
        single_round (bool):
            If true, task runs until one message by "controller"
            (i.e. LLM if `llm_delegate` is true, otherwise USER)
            and subsequent response by non-controller [When a tool is involved,
            this will not give intended results. See `done_if_response`,
            `done_if_no_response` below].
            termination]. If false, runs for the specified number of turns in
            `run`, or until `done()` is true.
            One run of step() is considered a "turn".
            See also: `done_if_response`, `done_if_no_response` for more granular
            control of task termination.
        system_message (str): if not empty, overrides agent's system_message
        user_message (str): if not empty, overrides agent's user_message
        restart (bool): if true, resets the agent's message history *at every run*.
        default_human_response (str|None): default response from user; useful for
            testing, to avoid interactive input from user.
            [Instead of this, setting `interactive` usually suffices]
        default_return_type: if not None, extracts a value of this type from the
            result of self.run()
        interactive (bool): if true, wait for human input after each non-human
            response (prevents infinite loop of non-human responses).
            Default is true. If false, then `default_human_response` is set to ""
            Note: When interactive = False, the one exception is when the user
            is explicitly addressed, via "@user" or using RecipientTool, in which
            case the system will wait for a user response. In other words, use
            `interactive=False` when you want a "largely non-interactive"
            run, with the exception of explicit user addressing.
        only_user_quits_root (bool): if true, when interactive=True, only user can
            quit the root task (Ignored when interactive=False).
        erase_substeps (bool): if true, when task completes, erase intermediate
            conversation with subtasks from this agent's `message_history`, and also
            erase all subtask agents' `message_history`.
            Note: erasing can reduce prompt sizes, but results in repetitive
            sub-task delegation.
        allow_null_result (bool):
            If true, create dummy NO_ANSWER response when no valid response is found
            in a step.
            Optional, default is False.
            *Note:* In non-interactive mode, when this is set to True,
            you can have a situation where an LLM generates (non-tool) text,
            and no other responders have valid responses, and a "Null result"
            is inserted as a dummy response from the User entity, so the LLM
            will now respond to this Null result, and this will continue
            until the LLM emits a DONE signal (if instructed to do so),
            otherwise langroid detects a potential infinite loop after
            a certain number of such steps (= `TaskConfig.inf_loop_wait_factor`)
            and will raise an InfiniteLoopException.
        max_stalled_steps (int): task considered done after this many consecutive
            steps with no progress. Default is 3.
        done_if_no_response (List[Responder]): consider task done if NULL
            response from any of these responders. Default is empty list.
        done_if_response (List[Responder]): consider task done if NON-NULL
            response from any of these responders. Default is empty list.
    """
    if agent is None:
        agent = ChatAgent()
    self.callbacks = SimpleNamespace(
        show_subtask_response=noop_fn,
        set_parent_agent=noop_fn,
    )
    self.config = config
    # how to behave as a sub-task; can be overridden by `add_sub_task()`
    self.config_sub_task = copy.deepcopy(config)
    # counts of distinct pending messages in history,
    # to help detect (exact) infinite loops
    self.message_counter: Counter[str] = Counter()
    self._init_message_counter()

    self.history: Deque[str] = deque(
        maxlen=self.config.inf_loop_cycle_len * self.config.inf_loop_wait_factor
    )
    # copy the agent's config, so that we don't modify the original agent's config,
    # which may be shared by other agents.
    try:
        config_copy = copy.deepcopy(agent.config)
        agent.config = config_copy
    except Exception:
        logger.warning(
            """
            Failed to deep-copy Agent config during task creation, 
            proceeding with original config. Be aware that changes to 
            the config may affect other agents using the same config.
            """
        )
    self.restart = restart
    agent = cast(ChatAgent, agent)
    self.agent: ChatAgent = agent
    if isinstance(agent, ChatAgent) and len(agent.message_history) == 0 or restart:
        self.agent.init_state()
        # possibly change the system and user messages
        if system_message:
            # we always have at least 1 task_message
            self.agent.set_system_message(system_message)
        if user_message:
            self.agent.set_user_message(user_message)
    self.max_cost: float = 0
    self.max_tokens: int = 0
    self.session_id: str = ""
    self.logger: None | RichFileLogger = None
    self.tsv_logger: None | logging.Logger = None
    self.color_log: bool = False if settings.notebook else True

    self.n_stalled_steps = 0  # how many consecutive steps with no progress?
    # how many 2-step-apart alternations of no_answer step-result have we had,
    # i.e. x1, N/A, x2, N/A, x3, N/A ...
    self.n_no_answer_alternations = 0
    self._no_answer_step: int = -5
    self._step_idx = -1  # current step index
    self.max_stalled_steps = max_stalled_steps
    self.done_if_response = [r.value for r in done_if_response]
    self.done_if_no_response = [r.value for r in done_if_no_response]
    self.is_done = False  # is task done (based on response)?
    self.is_pass_thru = False  # is current response a pass-thru?
    if name:
        # task name overrides name in agent config
        agent.config.name = name
    self.name = name or agent.config.name
    self.value: str = self.name

    self.default_human_response = default_human_response
    if default_human_response is not None:
        # only override agent's default_human_response if it is explicitly set
        self.agent.default_human_response = default_human_response
    self.interactive = interactive
    self.agent.interactive = interactive
    self.only_user_quits_root = only_user_quits_root
    self.message_history_idx = -1
    self.default_return_type = default_return_type

    # set to True if we want to collapse multi-turn conversation with sub-tasks into
    # just the first outgoing message and last incoming message.
    # Note this also completely erases sub-task agents' message_history.
    self.erase_substeps = erase_substeps
    self.allow_null_result = allow_null_result

    agent_entity_responders = agent.entity_responders()
    agent_entity_responders_async = agent.entity_responders_async()
    self.responders: List[Responder] = [e for e, _ in agent_entity_responders]
    self.responders_async: List[Responder] = [
        e for e, _ in agent_entity_responders_async
    ]
    self.non_human_responders: List[Responder] = [
        r for r in self.responders if r != Entity.USER
    ]
    self.non_human_responders_async: List[Responder] = [
        r for r in self.responders_async if r != Entity.USER
    ]

    self.human_tried = False  # did human get a chance to respond in last step?
    self._entity_responder_map: Dict[
        Entity, Callable[..., Optional[ChatDocument]]
    ] = dict(agent_entity_responders)

    self._entity_responder_async_map: Dict[
        Entity, Callable[..., Coroutine[Any, Any, Optional[ChatDocument]]]
    ] = dict(agent_entity_responders_async)

    self.name_sub_task_map: Dict[str, Task] = {}
    # latest message in a conversation among entities and agents.
    self.pending_message: Optional[ChatDocument] = None
    self.pending_sender: Responder = Entity.USER
    self.single_round = single_round
    self.turns = -1  # no limit
    self.llm_delegate = llm_delegate
    if llm_delegate:
        if self.single_round:
            # 0: User instructs (delegating to LLM);
            # 1: LLM (as the Controller) asks;
            # 2: user replies.
            self.turns = 2
    else:
        if self.single_round:
            # 0: User (as Controller) asks,
            # 1: LLM replies.
            self.turns = 1
    # other sub_tasks this task can delegate to
    self.sub_tasks: List[Task] = []
    self.caller: Task | None = None  # which task called this task's `run` method

clone(i)

Returns a copy of this task, with a new agent.

Source code in langroid/agent/task.py
def clone(self, i: int) -> "Task":
    """
    Returns a copy of this task, with a new agent.
    """
    assert isinstance(self.agent, ChatAgent), "Task clone only works for ChatAgent"
    agent: ChatAgent = self.agent.clone(i)
    return Task(
        agent,
        name=self.name + f"-{i}",
        llm_delegate=self.llm_delegate,
        single_round=self.single_round,
        system_message=self.agent.system_message,
        user_message=self.agent.user_message,
        restart=self.restart,
        default_human_response=self.default_human_response,
        interactive=self.interactive,
        erase_substeps=self.erase_substeps,
        allow_null_result=self.allow_null_result,
        max_stalled_steps=self.max_stalled_steps,
        done_if_no_response=[Entity(s) for s in self.done_if_no_response],
        done_if_response=[Entity(s) for s in self.done_if_response],
        config=self.config,
    )

kill_session(session_id='') classmethod

Kill the session with the given session_id.

Source code in langroid/agent/task.py
@classmethod
def kill_session(cls, session_id: str = "") -> None:
    """
    Kill the session with the given session_id.
    """
    session_id_kill_key = f"{session_id}:kill"
    cls.cache().store(session_id_kill_key, "1")

kill()

Kill the task run associated with the current session.

Source code in langroid/agent/task.py
def kill(self) -> None:
    """
    Kill the task run associated with the current session.
    """
    self._cache_session_store("kill", "1")

add_sub_task(task)

Add a sub-task (or list of subtasks) that this task can delegate (or fail-over) to. Note that the sequence of sub-tasks is important, since these are tried in order, as the parent task searches for a valid response (unless a sub-task is explicitly addressed).

Parameters:

Name Type Description Default
task Task | List[Task] | Tuple[Task, TaskConfig] | List[Tuple[Task, TaskConfig]]

A task, or list of tasks, or a tuple of task and task config, or a list of tuples of task and task config. These tasks are added as sub-tasks of the current task. The task configs (if any) dictate how the tasks are run when invoked as sub-tasks of other tasks. This allows users to specify behavior applicable only in the context of a particular task-subtask combination.

required
Source code in langroid/agent/task.py
def add_sub_task(
    self,
    task: (
        Task | List[Task] | Tuple[Task, TaskConfig] | List[Tuple[Task, TaskConfig]]
    ),
) -> None:
    """
    Add a sub-task (or list of subtasks) that this task can delegate
    (or fail-over) to. Note that the sequence of sub-tasks is important,
    since these are tried in order, as the parent task searches for a valid
    response (unless a sub-task is explicitly addressed).

    Args:
        task: A task, or list of tasks, or a tuple of task and task config,
            or a list of tuples of task and task config.
            These tasks are added as sub-tasks of the current task.
            The task configs (if any) dictate how the tasks are run when
            invoked as sub-tasks of other tasks. This allows users to specify
            behavior applicable only in the context of a particular task-subtask
            combination.
    """
    if isinstance(task, list):
        for t in task:
            self.add_sub_task(t)
        return

    if isinstance(task, tuple):
        task, config = task
    else:
        config = TaskConfig()
    task.config_sub_task = config
    self.sub_tasks.append(task)
    self.name_sub_task_map[task.name] = task
    self.responders.append(cast(Responder, task))
    self.responders_async.append(cast(Responder, task))
    self.non_human_responders.append(cast(Responder, task))
    self.non_human_responders_async.append(cast(Responder, task))

init(msg=None)

Initialize the task, with an optional message to start the conversation. Initializes self.pending_message and self.pending_sender. Args: msg (str|ChatDocument): optional message to start the conversation.

Returns:

Type Description
ChatDocument | None

the initialized self.pending_message.

ChatDocument | None

Currently not used in the code, but provided for convenience.

Source code in langroid/agent/task.py
def init(self, msg: None | str | ChatDocument = None) -> ChatDocument | None:
    """
    Initialize the task, with an optional message to start the conversation.
    Initializes `self.pending_message` and `self.pending_sender`.
    Args:
        msg (str|ChatDocument): optional message to start the conversation.

    Returns:
        (ChatDocument|None): the initialized `self.pending_message`.
        Currently not used in the code, but provided for convenience.
    """
    self.pending_sender = Entity.USER
    if isinstance(msg, str):
        self.pending_message = ChatDocument(
            content=msg,
            metadata=ChatDocMetaData(
                sender=Entity.USER,
            ),
        )
    elif msg is None and len(self.agent.message_history) > 1:
        # if agent has a history beyond system msg, set the
        # pending message to the ChatDocument linked from
        # last message in the history
        last_agent_msg = self.agent.message_history[-1]
        self.pending_message = ChatDocument.from_id(last_agent_msg.chat_document_id)
        if self.pending_message is not None:
            self.pending_sender = self.pending_message.metadata.sender
    else:
        if isinstance(msg, ChatDocument):
            # carefully deep-copy: fresh metadata.id, register
            # as new obj in registry
            self.pending_message = ChatDocument.deepcopy(msg)
        if self.pending_message is not None and self.caller is not None:
            # msg may have come from `caller`, so we pretend this is from
            # the CURRENT task's USER entity
            self.pending_message.metadata.sender = Entity.USER
            # update parent, child, agent pointers
            if msg is not None:
                msg.metadata.child_id = self.pending_message.metadata.id
                self.pending_message.metadata.parent_id = msg.metadata.id
            self.pending_message.metadata.agent_id = self.agent.id

    self._show_pending_message_if_debug()

    if self.caller is not None and self.caller.logger is not None:
        self.logger = self.caller.logger
    elif self.logger is None:
        self.logger = RichFileLogger(
            str(Path(self.config.logs_dir) / f"{self.name}.log"),
            color=self.color_log,
        )

    if self.caller is not None and self.caller.tsv_logger is not None:
        self.tsv_logger = self.caller.tsv_logger
    elif self.tsv_logger is None:
        self.tsv_logger = setup_file_logger(
            "tsv_logger",
            str(Path(self.config.logs_dir) / f"{self.name}.tsv"),
        )
        header = ChatDocLoggerFields().tsv_header()
        self.tsv_logger.info(f" \tTask\tResponder\t{header}")

    self.log_message(Entity.USER, self.pending_message)
    return self.pending_message

reset_all_sub_tasks()

Recursively reset message history & state of own agent and those of all sub-tasks.

Source code in langroid/agent/task.py
def reset_all_sub_tasks(self) -> None:
    """
    Recursively reset message history & state of own agent and
    those of all sub-tasks.
    """
    self.agent.init_state()
    for t in self.sub_tasks:
        t.reset_all_sub_tasks()

run(msg=None, turns=-1, caller=None, max_cost=0, max_tokens=0, session_id='', allow_restart=True, return_type=None)

Synchronous version of run_async(). See run_async() for details.

Source code in langroid/agent/task.py
def run(
    self,
    msg: Any = None,
    turns: int = -1,
    caller: None | Task = None,
    max_cost: float = 0,
    max_tokens: int = 0,
    session_id: str = "",
    allow_restart: bool = True,
    return_type: Optional[Type[T]] = None,
) -> Optional[ChatDocument | T]:
    """Synchronous version of `run_async()`.
    See `run_async()` for details."""
    if allow_restart and (
        (self.restart and caller is None)
        or (self.config_sub_task.restart_as_subtask and caller is not None)
    ):
        # We are either at top level, with restart = True, OR
        # we are a sub-task with restart_as_subtask = True,
        # so reset own agent and recursively for all sub-tasks
        self.reset_all_sub_tasks()

    self.n_stalled_steps = 0
    self._no_answer_step = -5  # last step where the best explicit response was N/A
    # how many N/A alternations have we had so far? (for Inf loop detection)
    self.n_no_answer_alternations = 0
    self.max_cost = max_cost
    self.max_tokens = max_tokens
    self.session_id = session_id
    self._set_alive()
    self._init_message_counter()
    self.history.clear()

    msg_input = self.agent.to_ChatDocument(msg, author_entity=Entity.USER)

    if (
        isinstance(msg_input, ChatDocument)
        and msg_input.metadata.recipient != ""
        and msg_input.metadata.recipient != self.name
    ):
        # this task is not the intended recipient so return None
        return None

    self._pre_run_loop(
        msg=msg_input,
        caller=caller,
        is_async=False,
    )
    # self.turns overrides if it is > 0 and turns not set (i.e. = -1)
    turns = self.turns if turns < 0 else turns
    i = 0
    while True:
        self._step_idx = i  # used in step() below
        self.step()
        done, status = self.done()
        if done:
            if self._level == 0 and not settings.quiet:
                print("[magenta]Bye, hope this was useful!")
            break
        i += 1
        max_turns = (
            min(turns, settings.max_turns)
            if turns > 0 and settings.max_turns > 0
            else max(turns, settings.max_turns)
        )
        if max_turns > 0 and i >= max_turns:
            # Important to distinguish between:
            # (a) intentional run for a
            #     fixed number of turns, where we expect the pending message
            #     at that stage to be the desired result, and
            # (b) hitting max_turns limit, which is not intentional, and is an
            #     exception, resulting in a None task result
            status = (
                StatusCode.MAX_TURNS
                if i == settings.max_turns
                else StatusCode.FIXED_TURNS
            )
            break
        if (
            self.config.inf_loop_cycle_len > 0
            and i % self.config.inf_loop_cycle_len == 0
            and self._maybe_infinite_loop()
            or self.n_no_answer_alternations > self.config.inf_loop_wait_factor
        ):
            raise InfiniteLoopException(
                """Possible infinite loop detected!
                You can adjust infinite loop detection (or turn it off)
                by changing the params in the TaskConfig passed to the Task 
                constructor; see here:
                https://langroid.github.io/langroid/reference/agent/task/#langroid.agent.task.TaskConfig
                """
            )

    final_result = self.result(status)
    self._post_run_loop()
    if final_result is None:
        return None

    if return_type is None:
        return_type = self.default_return_type

    # If possible, take a final strict decoding step
    # when the output does not match `return_type`
    if return_type is not None and return_type != ChatDocument:
        parsed_result = self.agent.from_ChatDocument(final_result, return_type)

        if (
            parsed_result is None
            and isinstance(self.agent, ChatAgent)
            and self.agent._json_schema_available()
        ):
            strict_agent = self.agent[return_type]
            output_args = strict_agent._function_args()[-1]
            if output_args is not None:
                schema = output_args.function.parameters
                strict_result = strict_agent.llm_response(
                    f"""
                    A response adhering to the following JSON schema was expected:
                    {schema}

                    Please resubmit with the correct schema. 
                    """
                )

                if strict_result is not None:
                    return cast(
                        Optional[T],
                        strict_agent.from_ChatDocument(strict_result, return_type),
                    )

        return parsed_result

    return final_result

run_async(msg=None, turns=-1, caller=None, max_cost=0, max_tokens=0, session_id='', allow_restart=True, return_type=None) async

Loop over step() until task is considered done or turns is reached. Runs asynchronously.

Parameters:

Name Type Description Default
msg Any

initial user-role message to process; if None, the LLM will respond to its initial self.task_messages which set up and kick off the overall task. The agent tries to achieve this goal by looping over self.step() until the task is considered done; this can involve a series of messages produced by Agent, LLM or Human (User). Note that msg, if passed, is treated as message with role user; a "system" role message should not be passed here.

None
turns int

number of turns to run the task for; default is -1, which means run until task is done.

-1
caller Task | None

the calling task, if any

None
max_cost float

max cost allowed for the task (default 0 -> no limit)

0
max_tokens int

max tokens allowed for the task (default 0 -> no limit)

0
session_id str

session id for the task

''
allow_restart bool

whether to allow restarting the task

True
return_type Optional[Type[T]]

desired final result type

None

Returns:

Type Description
Optional[ChatDocument | T]

Optional[ChatDocument]: valid result of the task.

Source code in langroid/agent/task.py
async def run_async(
    self,
    msg: Any = None,
    turns: int = -1,
    caller: None | Task = None,
    max_cost: float = 0,
    max_tokens: int = 0,
    session_id: str = "",
    allow_restart: bool = True,
    return_type: Optional[Type[T]] = None,
) -> Optional[ChatDocument | T]:
    """
    Loop over `step()` until task is considered done or `turns` is reached.
    Runs asynchronously.

    Args:
        msg (Any): initial *user-role* message to process; if None,
            the LLM will respond to its initial `self.task_messages`
            which set up and kick off the overall task.
            The agent tries to achieve this goal by looping
            over `self.step()` until the task is considered
            done; this can involve a series of messages produced by Agent,
            LLM or Human (User). Note that `msg`, if passed, is treated as
            message with role `user`; a "system" role message should not be
            passed here.
        turns (int): number of turns to run the task for;
            default is -1, which means run until task is done.
        caller (Task|None): the calling task, if any
        max_cost (float): max cost allowed for the task (default 0 -> no limit)
        max_tokens (int): max tokens allowed for the task (default 0 -> no limit)
        session_id (str): session id for the task
        allow_restart (bool): whether to allow restarting the task
        return_type (Optional[Type[T]]): desired final result type

    Returns:
        Optional[ChatDocument]: valid result of the task.
    """

    # Even if the initial "sender" is not literally the USER (since the task could
    # have come from another LLM), as far as this agent is concerned, the initial
    # message can be considered to be from the USER
    # (from the POV of this agent's LLM).

    if allow_restart and (
        (self.restart and caller is None)
        or (self.config_sub_task.restart_as_subtask and caller is not None)
    ):
        # We are either at top level, with restart = True, OR
        # we are a sub-task with restart_as_subtask = True,
        # so reset own agent and recursively for all sub-tasks
        self.reset_all_sub_tasks()

    self.n_stalled_steps = 0
    self._no_answer_step = -5  # last step where the best explicit response was N/A
    # how many N/A alternations have we had so far? (for Inf loop detection)
    self.n_no_answer_alternations = 0
    self.max_cost = max_cost
    self.max_tokens = max_tokens
    self.session_id = session_id
    self._set_alive()
    self._init_message_counter()
    self.history.clear()

    msg_input = self.agent.to_ChatDocument(msg, author_entity=Entity.USER)

    if (
        isinstance(msg_input, ChatDocument)
        and msg_input.metadata.recipient != ""
        and msg_input.metadata.recipient != self.name
    ):
        # this task is not the intended recipient so return None
        return None

    self._pre_run_loop(
        msg=msg_input,
        caller=caller,
        is_async=False,
    )
    # self.turns overrides if it is > 0 and turns not set (i.e. = -1)
    turns = self.turns if turns < 0 else turns
    i = 0
    while True:
        self._step_idx = i  # used in step() below
        await self.step_async()
        await asyncio.sleep(0.01)  # temp yield to avoid blocking
        done, status = self.done()
        if done:
            if self._level == 0 and not settings.quiet:
                print("[magenta]Bye, hope this was useful!")
            break
        i += 1
        max_turns = (
            min(turns, settings.max_turns)
            if turns > 0 and settings.max_turns > 0
            else max(turns, settings.max_turns)
        )
        if max_turns > 0 and i >= max_turns:
            # Important to distinguish between:
            # (a) intentional run for a
            #     fixed number of turns, where we expect the pending message
            #     at that stage to be the desired result, and
            # (b) hitting max_turns limit, which is not intentional, and is an
            #     exception, resulting in a None task result
            status = (
                StatusCode.MAX_TURNS
                if i == settings.max_turns
                else StatusCode.FIXED_TURNS
            )
            break
        if (
            self.config.inf_loop_cycle_len > 0
            and i % self.config.inf_loop_cycle_len == 0
            and self._maybe_infinite_loop()
            or self.n_no_answer_alternations > self.config.inf_loop_wait_factor
        ):
            raise InfiniteLoopException(
                """Possible infinite loop detected!
                You can adjust infinite loop detection (or turn it off)
                by changing the params in the TaskConfig passed to the Task 
                constructor; see here:
                https://langroid.github.io/langroid/reference/agent/task/#langroid.agent.task.TaskConfig
                """
            )

    final_result = self.result(status)
    self._post_run_loop()
    if final_result is None:
        return None

    if return_type is None:
        return_type = self.default_return_type

    # If possible, take a final strict decoding step
    # when the output does not match `return_type`
    if return_type is not None and return_type != ChatDocument:
        parsed_result = self.agent.from_ChatDocument(final_result, return_type)

        if (
            parsed_result is None
            and isinstance(self.agent, ChatAgent)
            and self.agent._json_schema_available()
        ):
            strict_agent = self.agent[return_type]
            output_args = strict_agent._function_args()[-1]
            if output_args is not None:
                schema = output_args.function.parameters
                strict_result = await strict_agent.llm_response_async(
                    f"""
                    A response adhering to the following JSON schema was expected:
                    {schema}

                    Please resubmit with the correct schema. 
                    """
                )

                if strict_result is not None:
                    return cast(
                        Optional[T],
                        strict_agent.from_ChatDocument(strict_result, return_type),
                    )

        return parsed_result

    return final_result

step(turns=-1)

Synchronous version of step_async(). See step_async() for details. TODO: Except for the self.response() calls, this fn should be identical to step_async(). Consider refactoring to avoid duplication.

Source code in langroid/agent/task.py
def step(self, turns: int = -1) -> ChatDocument | None:
    """
    Synchronous version of `step_async()`. See `step_async()` for details.
    TODO: Except for the self.response() calls, this fn should be identical to
    `step_async()`. Consider refactoring to avoid duplication.
    """
    self.is_done = False
    parent = self.pending_message
    recipient = (
        ""
        if self.pending_message is None
        else self.pending_message.metadata.recipient
    )
    if not self._valid_recipient(recipient):
        logger.warning(f"Invalid recipient: {recipient}")
        error_doc = ChatDocument(
            content=f"Invalid recipient: {recipient}",
            metadata=ChatDocMetaData(
                sender=Entity.AGENT,
                sender_name=Entity.AGENT,
            ),
        )
        self._process_valid_responder_result(Entity.AGENT, parent, error_doc)
        return error_doc

    responders: List[Responder] = self.non_human_responders.copy()

    if (
        Entity.USER in self.responders
        and not self.human_tried
        and not self.agent.has_tool_message_attempt(self.pending_message)
    ):
        # Give human first chance if they haven't been tried in last step,
        # and the msg is not a tool-call attempt;
        # (When `interactive=False`, human is only allowed to respond only if
        #  if explicitly addressed)
        # This ensures human gets a chance to respond,
        #   other than to a LLM tool-call.
        # When there's a tool msg attempt we want the
        #  Agent to be the next responder; this only makes a difference in an
        #  interactive setting: LLM generates tool, then we don't want user to
        #  have to respond, and instead let the agent_response handle the tool.

        responders.insert(0, Entity.USER)

    found_response = False
    # (responder, result) from a responder who explicitly said NO_ANSWER
    no_answer_response: None | Tuple[Responder, ChatDocument] = None
    n_non_responders = 0
    for r in responders:
        self.is_pass_thru = False
        if not self._can_respond(r):
            n_non_responders += 1
            # create dummy msg for logging
            log_doc = ChatDocument(
                content="[CANNOT RESPOND]",
                metadata=ChatDocMetaData(
                    sender=r if isinstance(r, Entity) else Entity.USER,
                    sender_name=str(r),
                    recipient=recipient,
                ),
            )
            # no need to register this dummy msg in ObjectRegistry
            ChatDocument.delete_id(log_doc.id())
            self.log_message(r, log_doc)
            if n_non_responders == len(responders):
                # don't stay in this "non-response" loop forever
                break
            continue
        self.human_tried = r == Entity.USER
        result = self.response(r, turns)
        if result and NO_ANSWER in result.content:
            no_answer_response = (r, result)
        self.is_done = self._is_done_response(result, r)
        self.is_pass_thru = PASS in result.content if result else False
        if self.valid(result, r):
            found_response = True
            assert result is not None
            self._process_valid_responder_result(r, parent, result)
            break
        else:
            self.log_message(r, result)
        if self.is_done:
            # skip trying other responders in this step
            break
    if not found_response:  # did not find a valid response
        if no_answer_response:
            # even though there was no valid response from anyone in this step,
            # if there was at least one who EXPLICITLY said NO_ANSWER, then
            # we process that as a valid response.
            r, result = no_answer_response
            self._process_valid_responder_result(r, parent, result)
        else:
            self._process_invalid_step_result(parent)
    self._show_pending_message_if_debug()
    return self.pending_message

step_async(turns=-1) async

A single "turn" in the task conversation: The "allowed" responders in this turn (which can be either the 3 "entities", or one of the sub-tasks) are tried in sequence, until a valid response is obtained; a valid response is one that contributes to the task, either by ending it, or producing a response to be further acted on. Update self.pending_message to the latest valid response (or NO_ANSWER if no valid response was obtained from any responder).

Parameters:

Name Type Description Default
turns int

number of turns to process. Typically used in testing where there is no human to "quit out" of current level, or in cases where we want to limit the number of turns of a delegated agent.

-1

Returns (ChatDocument|None): Updated self.pending_message. Currently the return value is not used by the task.run() method, but we return this as a convenience for other use-cases, e.g. where we want to run a task step by step in a different context.

Source code in langroid/agent/task.py
async def step_async(self, turns: int = -1) -> ChatDocument | None:
    """
    A single "turn" in the task conversation: The "allowed" responders in this
    turn (which can be either the 3 "entities", or one of the sub-tasks) are
    tried in sequence, until a _valid_ response is obtained; a _valid_
    response is one that contributes to the task, either by ending it,
    or producing a response to be further acted on.
    Update `self.pending_message` to the latest valid response (or NO_ANSWER
    if no valid response was obtained from any responder).

    Args:
        turns (int): number of turns to process. Typically used in testing
            where there is no human to "quit out" of current level, or in cases
            where we want to limit the number of turns of a delegated agent.

    Returns (ChatDocument|None):
        Updated `self.pending_message`. Currently the return value is not used
            by the `task.run()` method, but we return this as a convenience for
            other use-cases, e.g. where we want to run a task step by step in a
            different context.
    """
    self.is_done = False
    parent = self.pending_message
    recipient = (
        ""
        if self.pending_message is None
        else self.pending_message.metadata.recipient
    )
    if not self._valid_recipient(recipient):
        logger.warning(f"Invalid recipient: {recipient}")
        error_doc = ChatDocument(
            content=f"Invalid recipient: {recipient}",
            metadata=ChatDocMetaData(
                sender=Entity.AGENT,
                sender_name=Entity.AGENT,
            ),
        )
        self._process_valid_responder_result(Entity.AGENT, parent, error_doc)
        return error_doc

    responders: List[Responder] = self.non_human_responders_async.copy()

    if (
        Entity.USER in self.responders
        and not self.human_tried
        and not self.agent.has_tool_message_attempt(self.pending_message)
    ):
        # Give human first chance if they haven't been tried in last step,
        # and the msg is not a tool-call attempt;
        # This ensures human gets a chance to respond,
        #   other than to a LLM tool-call.
        # When there's a tool msg attempt we want the
        #  Agent to be the next responder; this only makes a difference in an
        #  interactive setting: LLM generates tool, then we don't want user to
        #  have to respond, and instead let the agent_response handle the tool.
        responders.insert(0, Entity.USER)

    found_response = False
    # (responder, result) from a responder who explicitly said NO_ANSWER
    no_answer_response: None | Tuple[Responder, ChatDocument] = None
    for r in responders:
        self.is_pass_thru = False
        if not self._can_respond(r):
            # create dummy msg for logging
            log_doc = ChatDocument(
                content="[CANNOT RESPOND]",
                metadata=ChatDocMetaData(
                    sender=r if isinstance(r, Entity) else Entity.USER,
                    sender_name=str(r),
                    recipient=recipient,
                ),
            )
            # no need to register this dummy msg in ObjectRegistry
            ChatDocument.delete_id(log_doc.id())
            self.log_message(r, log_doc)
            continue
        self.human_tried = r == Entity.USER
        result = await self.response_async(r, turns)
        if result and NO_ANSWER in result.content:
            no_answer_response = (r, result)
        self.is_done = self._is_done_response(result, r)
        self.is_pass_thru = PASS in result.content if result else False
        if self.valid(result, r):
            found_response = True
            assert result is not None
            self._process_valid_responder_result(r, parent, result)
            break
        else:
            self.log_message(r, result)
        if self.is_done:
            # skip trying other responders in this step
            break
    if not found_response:
        if no_answer_response:
            # even though there was no valid response from anyone in this step,
            # if there was at least one who EXPLICITLY said NO_ANSWER, then
            # we process that as a valid response.
            r, result = no_answer_response
            self._process_valid_responder_result(r, parent, result)
        else:
            self._process_invalid_step_result(parent)
    self._show_pending_message_if_debug()
    return self.pending_message

response(e, turns=-1)

Sync version of response_async(). See response_async() for details.

Source code in langroid/agent/task.py
def response(
    self,
    e: Responder,
    turns: int = -1,
) -> Optional[ChatDocument]:
    """
    Sync version of `response_async()`. See `response_async()` for details.
    """
    if isinstance(e, Task):
        actual_turns = e.turns if e.turns > 0 else turns
        e.agent.callbacks.set_parent_agent(self.agent)
        # e.callbacks.set_parent_agent(self.agent)
        pending_tools = self.agent.try_get_tool_messages(self.pending_message)
        # TODO disable this
        if (
            len(pending_tools) > 1
            and len(self.agent.oai_tool_calls) > 1
            and not self.config.allow_subtask_multi_oai_tools
        ):
            result = self._forbid_multi_oai_tools(e)
        else:
            result = e.run(
                self.pending_message,
                turns=actual_turns,
                caller=self,
                max_cost=self.max_cost,
                max_tokens=self.max_tokens,
            )
            # update result.tool_messages if any
            if isinstance(result, ChatDocument):
                self.agent.try_get_tool_messages(result)
            if result is not None:
                content, id2result, oai_tool_id = self.agent.process_tool_results(
                    result.content,
                    result.oai_tool_id2result,
                    (
                        self.pending_message.oai_tool_calls
                        if isinstance(self.pending_message, ChatDocument)
                        else None
                    ),
                )
                result.content = content
                result.oai_tool_id2result = id2result
                result.metadata.oai_tool_id = oai_tool_id

        result_str = (  # only used by callback to display content and possible tool
            "NONE"
            if result is None
            else "\n\n".join(str(m) for m in ChatDocument.to_LLMMessage(result))
        )
        maybe_tool = len(extract_top_level_json(result_str)) > 0
        self.callbacks.show_subtask_response(
            task=e,
            content=result_str,
            is_tool=maybe_tool,
        )
    else:
        response_fn = self._entity_responder_map[cast(Entity, e)]
        result = response_fn(self.pending_message)
        # update result.tool_messages if any
        if isinstance(result, ChatDocument):
            self.agent.try_get_tool_messages(result)

    result_chat_doc = self.agent.to_ChatDocument(
        result,
        chat_doc=self.pending_message,
        author_entity=e if isinstance(e, Entity) else Entity.USER,
    )
    return self._process_result_routing(result_chat_doc, e)

response_async(e, turns=-1) async

Get response to self.pending_message from a responder. If response is valid (i.e. it ends the current turn of seeking responses): -then return the response as a ChatDocument object, -otherwise return None. Args: e (Responder): responder to get response from. turns (int): number of turns to run the task for. Default is -1, which means run until task is done.

Returns:

Type Description
Optional[ChatDocument]

Optional[ChatDocument]: response to self.pending_message from entity if

Optional[ChatDocument]

valid, None otherwise

Source code in langroid/agent/task.py
async def response_async(
    self,
    e: Responder,
    turns: int = -1,
) -> Optional[ChatDocument]:
    """
    Get response to `self.pending_message` from a responder.
    If response is __valid__ (i.e. it ends the current turn of seeking
    responses):
        -then return the response as a ChatDocument object,
        -otherwise return None.
    Args:
        e (Responder): responder to get response from.
        turns (int): number of turns to run the task for.
            Default is -1, which means run until task is done.

    Returns:
        Optional[ChatDocument]: response to `self.pending_message` from entity if
        valid, None otherwise
    """
    if isinstance(e, Task):
        actual_turns = e.turns if e.turns > 0 else turns
        e.agent.callbacks.set_parent_agent(self.agent)
        pending_tools = self.agent.try_get_tool_messages(self.pending_message)
        # TODO disable this
        if (
            len(pending_tools) > 1
            and len(self.agent.oai_tool_calls) > 1
            and not self.config.allow_subtask_multi_oai_tools
        ):
            result = self._forbid_multi_oai_tools(e)
        else:
            # e.callbacks.set_parent_agent(self.agent)
            result = await e.run_async(
                self.pending_message,
                turns=actual_turns,
                caller=self,
                max_cost=self.max_cost,
                max_tokens=self.max_tokens,
            )
            # update result.tool_messages if any
            if isinstance(result, ChatDocument):
                self.agent.try_get_tool_messages(result)
            if result is not None:
                content, id2result, oai_tool_id = self.agent.process_tool_results(
                    result.content,
                    result.oai_tool_id2result,
                    (
                        self.pending_message.oai_tool_calls
                        if isinstance(self.pending_message, ChatDocument)
                        else None
                    ),
                )
                result.content = content
                result.oai_tool_id2result = id2result
                result.metadata.oai_tool_id = oai_tool_id

        result_str = (  # only used by callback to display content and possible tool
            "NONE"
            if result is None
            else "\n\n".join(str(m) for m in ChatDocument.to_LLMMessage(result))
        )
        maybe_tool = len(extract_top_level_json(result_str)) > 0
        self.callbacks.show_subtask_response(
            task=e,
            content=result_str,
            is_tool=maybe_tool,
        )
    else:
        response_fn = self._entity_responder_async_map[cast(Entity, e)]
        result = await response_fn(self.pending_message)
        # update result.tool_messages if any
        if isinstance(result, ChatDocument):
            self.agent.try_get_tool_messages(result)

    result_chat_doc = self.agent.to_ChatDocument(
        result,
        chat_doc=self.pending_message,
        author_entity=e if isinstance(e, Entity) else Entity.USER,
    )
    return self._process_result_routing(result_chat_doc, e)

result(status=None)

Get result of task. This is the default behavior. Derived classes can override this.

Note the result of a task is returned as if it is from the User entity.

Parameters:

Name Type Description Default
status StatusCode

status of the task when it ended

None

Returns: ChatDocument: result of task

Source code in langroid/agent/task.py
def result(self, status: StatusCode | None = None) -> ChatDocument | None:
    """
    Get result of task. This is the default behavior.
    Derived classes can override this.

    Note the result of a task is returned as if it is from the User entity.

    Args:
        status (StatusCode): status of the task when it ended
    Returns:
        ChatDocument: result of task
    """
    if status in [StatusCode.STALLED, StatusCode.MAX_TURNS, StatusCode.INF_LOOP]:
        # In these case we don't know (and don't want to try to guess)
        # what the task result should be, so we return None
        return None

    result_msg = self.pending_message

    content = result_msg.content if result_msg else ""
    content_any = result_msg.content_any if result_msg else None
    if DONE in content and self.config.recognize_string_signals:
        # assuming it is of the form "DONE: <content>"
        content = content.replace(DONE, "").strip()
    oai_tool_calls = result_msg.oai_tool_calls if result_msg else None
    oai_tool_id2result = result_msg.oai_tool_id2result if result_msg else None
    fun_call = result_msg.function_call if result_msg else None
    tool_messages = result_msg.tool_messages if result_msg else []
    # if there is a DoneTool or AgentDoneTool among these,
    # we extract content and tools from here, and ignore all others
    for t in tool_messages:
        if isinstance(t, FinalResultTool):
            content = ""
            content_any = None
            tool_messages = [t]  # pass it on to parent so it also quits
            break
        elif isinstance(t, (AgentDoneTool, DoneTool)):
            # there shouldn't be multiple tools like this; just take the first
            content = to_string(t.content)
            content_any = t.content
            fun_call = None
            oai_tool_calls = None
            if isinstance(t, AgentDoneTool):
                # AgentDoneTool may have tools, unlike DoneTool
                tool_messages = t.tools
            break
    # drop the "Done" tools since they should not be part of the task result,
    # or else they would cause the parent task to get unintentionally done!
    tool_messages = [
        t for t in tool_messages if not isinstance(t, (DoneTool, AgentDoneTool))
    ]
    block = result_msg.metadata.block if result_msg else None
    recipient = result_msg.metadata.recipient if result_msg else ""
    tool_ids = result_msg.metadata.tool_ids if result_msg else []

    # regardless of which entity actually produced the result,
    # when we return the result, we set entity to USER
    # since to the "parent" task, this result is equivalent to a response from USER
    result_doc = ChatDocument(
        content=content,
        content_any=content_any,
        oai_tool_calls=oai_tool_calls,
        oai_tool_id2result=oai_tool_id2result,
        function_call=fun_call,
        tool_messages=tool_messages,
        metadata=ChatDocMetaData(
            source=Entity.USER,
            sender=Entity.USER,
            block=block,
            status=status or (result_msg.metadata.status if result_msg else None),
            sender_name=self.name,
            recipient=recipient,
            tool_ids=tool_ids,
            parent_id=result_msg.id() if result_msg else "",
            agent_id=str(self.agent.id),
        ),
    )
    if self.pending_message is not None:
        self.pending_message.metadata.child_id = result_doc.id()

    return result_doc

done(result=None, r=None)

Check if task is done. This is the default behavior. Derived classes can override this. Args: result (ChatDocument|None): result from a responder r (Responder|None): responder that produced the result Not used here, but could be used by derived classes. Returns: bool: True if task is done, False otherwise StatusCode: status code indicating why task is done

Source code in langroid/agent/task.py
def done(
    self, result: ChatDocument | None = None, r: Responder | None = None
) -> Tuple[bool, StatusCode]:
    """
    Check if task is done. This is the default behavior.
    Derived classes can override this.
    Args:
        result (ChatDocument|None): result from a responder
        r (Responder|None): responder that produced the result
            Not used here, but could be used by derived classes.
    Returns:
        bool: True if task is done, False otherwise
        StatusCode: status code indicating why task is done
    """
    if self._is_kill():
        return (True, StatusCode.KILL)
    result = result or self.pending_message
    allow_done_string = self.config.recognize_string_signals
    # An entity decided task is done, either via DoneTool,
    # or by explicitly saying DONE
    done_result = result is not None and (
        (
            DONE in (result.content if isinstance(result, str) else result.content)
            and allow_done_string
        )
        or any(
            isinstance(t, (DoneTool, AgentDoneTool, FinalResultTool))
            for t in result.tool_messages
        )
    )

    user_quit = (
        result is not None
        and (result.content in USER_QUIT_STRINGS or done_result)
        and result.metadata.sender == Entity.USER
    )
    if self._level == 0 and self._user_can_respond() and self.only_user_quits_root:
        # for top-level task, only user can quit out
        return (user_quit, StatusCode.USER_QUIT if user_quit else StatusCode.OK)

    if self.is_done:
        return (True, StatusCode.DONE)

    if self.n_stalled_steps >= self.max_stalled_steps:
        # we are stuck, so bail to avoid infinite loop
        logger.warning(
            f"Task {self.name} stuck for {self.max_stalled_steps} steps; exiting."
        )
        return (True, StatusCode.STALLED)

    if self.max_cost > 0 and self.agent.llm is not None:
        try:
            if self.agent.llm.tot_tokens_cost()[1] > self.max_cost:
                logger.warning(
                    f"Task {self.name} cost exceeded {self.max_cost}; exiting."
                )
                return (True, StatusCode.MAX_COST)
        except Exception:
            pass

    if self.max_tokens > 0 and self.agent.llm is not None:
        try:
            if self.agent.llm.tot_tokens_cost()[0] > self.max_tokens:
                logger.warning(
                    f"Task {self.name} uses > {self.max_tokens} tokens; exiting."
                )
                return (True, StatusCode.MAX_TOKENS)
        except Exception:
            pass
    final = (
        # no valid response from any entity/agent in current turn
        result is None
        or done_result
        or (  # current task is addressing message to caller task
            self.caller is not None
            and self.caller.name != ""
            and result.metadata.recipient == self.caller.name
        )
        or user_quit
    )
    return (final, StatusCode.OK)

valid(result, r)

Is the result from a Responder (i.e. an entity or sub-task) such that we can stop searching for responses in this step?

Source code in langroid/agent/task.py
def valid(
    self,
    result: Optional[ChatDocument],
    r: Responder,
) -> bool:
    """
    Is the result from a Responder (i.e. an entity or sub-task)
    such that we can stop searching for responses in this step?
    """
    # TODO caution we should ensure that no handler method (tool) returns simply
    # an empty string (e.g when showing contents of an empty file), since that
    # would be considered an invalid response, and other responders will wrongly
    # be given a chance to respond.

    # if task would be considered done given responder r's `result`,
    # then consider the result valid.
    if result is not None and self.done(result, r)[0]:
        return True
    return (
        result is not None
        and not self._is_empty_message(result)
        # some weaker LLMs, including even GPT-4o, may say "DO-NOT-KNOW."
        # (with a punctuation at the end), so need to strip out punctuation
        and re.sub(r"[,.!?:]", "", result.content.strip()) != NO_ANSWER
    )

log_message(resp, msg=None, mark=False)

Log current pending message, and related state, for lineage/debugging purposes.

Parameters:

Name Type Description Default
resp Responder

Responder that generated the msg

required
msg ChatDocument

Message to log. Defaults to None.

None
mark bool

Whether to mark the message as the final result of a task.step() call. Defaults to False.

False
Source code in langroid/agent/task.py
def log_message(
    self,
    resp: Responder,
    msg: ChatDocument | None = None,
    mark: bool = False,
) -> None:
    """
    Log current pending message, and related state, for lineage/debugging purposes.

    Args:
        resp (Responder): Responder that generated the `msg`
        msg (ChatDocument, optional): Message to log. Defaults to None.
        mark (bool, optional): Whether to mark the message as the final result of
            a `task.step()` call. Defaults to False.
    """
    default_values = ChatDocLoggerFields().dict().values()
    msg_str_tsv = "\t".join(str(v) for v in default_values)
    if msg is not None:
        msg_str_tsv = msg.tsv_str()

    mark_str = "*" if mark else " "
    task_name = self.name if self.name != "" else "root"
    resp_color = "white" if mark else "red"
    resp_str = f"[{resp_color}] {resp} [/{resp_color}]"

    if msg is None:
        msg_str = f"{mark_str}({task_name}) {resp_str}"
    else:
        color = {
            Entity.LLM: "green",
            Entity.USER: "blue",
            Entity.AGENT: "red",
            Entity.SYSTEM: "magenta",
        }[msg.metadata.sender]
        f = msg.log_fields()
        tool_type = f.tool_type.rjust(6)
        tool_name = f.tool.rjust(10)
        tool_str = f"{tool_type}({tool_name})" if tool_name != "" else ""
        sender = f"[{color}]" + str(f.sender_entity).rjust(10) + f"[/{color}]"
        sender_name = f.sender_name.rjust(10)
        recipient = "=>" + str(f.recipient).rjust(10)
        block = "X " + str(f.block or "").rjust(10)
        content = f"[{color}]{f.content}[/{color}]"
        msg_str = (
            f"{mark_str}({task_name}) "
            f"{resp_str} {sender}({sender_name}) "
            f"({recipient}) ({block}) {tool_str} {content}"
        )

    if self.logger is not None:
        self.logger.log(msg_str)
    if self.tsv_logger is not None:
        resp_str = str(resp)
        self.tsv_logger.info(f"{mark_str}\t{task_name}\t{resp_str}\t{msg_str_tsv}")

set_color_log(enable=True)

Flag to enable/disable color logging using rich.console. In some contexts, such as Colab notebooks, we may want to disable color logging using rich.console, since those logs show up in the cell output rather than in the log file. Turning off this feature will still create logs, but without the color formatting from rich.console Args: enable (bool): value of self.color_log to set to, which will enable/diable rich logging

Source code in langroid/agent/task.py
def set_color_log(self, enable: bool = True) -> None:
    """
    Flag to enable/disable color logging using rich.console.
    In some contexts, such as Colab notebooks, we may want to disable color logging
    using rich.console, since those logs show up in the cell output rather than
    in the log file. Turning off this feature will still create logs, but without
    the color formatting from rich.console
    Args:
        enable (bool): value of `self.color_log` to set to,
            which will enable/diable rich logging

    """
    self.color_log = enable