Skip to content

chainlit

langroid/agent/callbacks/chainlit.py

Callbacks for Chainlit integration.

ChainlitAgentCallbacks(agent, msg=None, config=ChainlitCallbackConfig())

Inject Chainlit callbacks into a Langroid Agent

Source code in langroid/agent/callbacks/chainlit.py
def __init__(
    self,
    agent: lr.Agent,
    msg: cl.Message = None,
    config: ChainlitCallbackConfig = ChainlitCallbackConfig(),
):
    """Add callbacks to the agent, and save the initial message,
    so we can alter the display of the first user message.
    """
    agent.callbacks.start_llm_stream = self.start_llm_stream
    agent.callbacks.start_llm_stream_async = self.start_llm_stream_async
    agent.callbacks.cancel_llm_stream = self.cancel_llm_stream
    agent.callbacks.finish_llm_stream = self.finish_llm_stream
    agent.callbacks.show_llm_response = self.show_llm_response
    agent.callbacks.show_agent_response = self.show_agent_response
    agent.callbacks.get_user_response = self.get_user_response
    agent.callbacks.get_last_step = self.get_last_step
    agent.callbacks.set_parent_agent = self.set_parent_agent
    agent.callbacks.show_error_message = self.show_error_message
    agent.callbacks.show_start_response = self.show_start_response
    self.config = config
    self.agent: lr.Agent = agent
    if self.agent.llm is not None:
        # We don't want to suppress LLM output in async + streaming,
        # since we often use chainlit async callbacks to display LLM output
        self.agent.llm.config.async_stream_quiet = False
    if msg is not None:
        self.show_first_user_message(msg)

start_llm_stream()

Returns a streaming fn that can be passed to the LLM class

Source code in langroid/agent/callbacks/chainlit.py
def start_llm_stream(self) -> Callable[[str], None]:
    """Returns a streaming fn that can be passed to the LLM class"""
    self.stream = cl.Step(
        id=self.curr_step.id if self.curr_step is not None else None,
        name=self._entity_name("llm"),
        type="llm",
        parent_id=self._get_parent_id(),
    )
    self.last_step = self.stream
    self.curr_step = None
    logger.info(
        f"""
        Starting LLM stream for {self.agent.config.name}
        id = {self.stream.id} 
        under parent {self._get_parent_id()}
    """
    )
    run_sync(self.stream.send())  # type: ignore

    def stream_token(t: str) -> None:
        if self.stream is None:
            raise ValueError("Stream not initialized")
        run_sync(self.stream.stream_token(t))

    return stream_token

start_llm_stream_async() async

Returns a streaming fn that can be passed to the LLM class

Source code in langroid/agent/callbacks/chainlit.py
async def start_llm_stream_async(self) -> Callable[[str], None]:
    """Returns a streaming fn that can be passed to the LLM class"""
    self.stream = cl.Step(
        id=self.curr_step.id if self.curr_step is not None else None,
        name=self._entity_name("llm"),
        type="llm",
        parent_id=self._get_parent_id(),
    )
    self.last_step = self.stream
    self.curr_step = None
    logger.info(
        f"""
        Starting LLM stream for {self.agent.config.name}
        id = {self.stream.id} 
        under parent {self._get_parent_id()}
    """
    )
    await self.stream.send()  # type: ignore

    async def stream_token(t: str) -> None:
        if self.stream is None:
            raise ValueError("Stream not initialized")
        await self.stream.stream_token(t)

    return stream_token

cancel_llm_stream()

Called when cached response found.

Source code in langroid/agent/callbacks/chainlit.py
def cancel_llm_stream(self) -> None:
    """Called when cached response found."""
    self.last_step = None
    if self.stream is not None:
        run_sync(self.stream.remove())  # type: ignore

finish_llm_stream(content, is_tool=False)

Update the stream, and display entire response in the right language.

Source code in langroid/agent/callbacks/chainlit.py
def finish_llm_stream(self, content: str, is_tool: bool = False) -> None:
    """Update the stream, and display entire response in the right language."""
    if self.agent.llm is None or self.stream is None:
        raise ValueError("LLM or stream not initialized")
    if content == "":
        run_sync(self.stream.remove())  # type: ignore
    else:
        run_sync(self.stream.update())  # type: ignore
    stream_id = self.stream.id if content else None
    step = cl.Step(
        id=stream_id,
        name=self._entity_name("llm", tool=is_tool),
        type="llm",
        parent_id=self._get_parent_id(),
        language="json" if is_tool else None,
    )
    step.output = textwrap.dedent(content) or NO_ANSWER
    logger.info(
        f"""
        Finish STREAM LLM response for {self.agent.config.name}
        id = {step.id} 
        under parent {self._get_parent_id()}
        """
    )
    run_sync(step.update())  # type: ignore

show_llm_response(content, is_tool=False, cached=False, language=None)

Show non-streaming LLM response.

Source code in langroid/agent/callbacks/chainlit.py
def show_llm_response(
    self,
    content: str,
    is_tool: bool = False,
    cached: bool = False,
    language: str | None = None,
) -> None:
    """Show non-streaming LLM response."""
    step = cl.Step(
        id=self.curr_step.id if self.curr_step is not None else None,
        name=self._entity_name("llm", tool=is_tool, cached=cached),
        type="llm",
        parent_id=self._get_parent_id(),
        language=language or ("json" if is_tool else None),
    )
    self.last_step = step
    self.curr_step = None
    step.output = textwrap.dedent(content) or NO_ANSWER
    logger.info(
        f"""
        Showing NON-STREAM LLM response for {self.agent.config.name}
        id = {step.id} 
        under parent {self._get_parent_id()}
        """
    )
    run_sync(step.send())  # type: ignore

show_error_message(error)

Show error message as a step.

Source code in langroid/agent/callbacks/chainlit.py
def show_error_message(self, error: str) -> None:
    """Show error message as a step."""
    step = cl.Step(
        name=self.agent.config.name + f"({ERROR})",
        type="run",
        parent_id=self._get_parent_id(),
        language="text",
    )
    self.last_step = step
    step.output = error
    run_sync(step.send())

show_agent_response(content, language='text')

Show message from agent (typically tool handler). Agent response can be considered as a "step" between LLM response and user response

Source code in langroid/agent/callbacks/chainlit.py
def show_agent_response(self, content: str, language="text") -> None:
    """Show message from agent (typically tool handler).
    Agent response can be considered as a "step"
    between LLM response and user response
    """
    step = cl.Step(
        id=self.curr_step.id if self.curr_step is not None else None,
        name=self._entity_name("agent"),
        type="tool",
        parent_id=self._get_parent_id(),
        language=language,
    )
    if language == "text":
        content = wrap_text_preserving_structure(content, width=90)
    self.last_step = step
    self.curr_step = None
    step.output = content
    logger.info(
        f"""
        Showing AGENT response for {self.agent.config.name}
        id = {step.id} 
        under parent {self._get_parent_id()}
        """
    )
    run_sync(step.send())  # type: ignore

show_start_response(entity)

When there's a potentially long-running process, start a step, so that the UI displays a spinner while the process is running.

Source code in langroid/agent/callbacks/chainlit.py
def show_start_response(self, entity: str) -> None:
    """When there's a potentially long-running process, start a step,
    so that the UI displays a spinner while the process is running."""
    if self.curr_step is not None:
        run_sync(self.curr_step.remove())  # type: ignore
    step = cl.Step(
        name=self._entity_name(entity),
        type="run",
        parent_id=self._get_parent_id(),
        language="text",
    )
    step.output = ""
    self.last_step = step
    self.curr_step = step
    logger.info(
        f"""
        Showing START response for {self.agent.config.name} ({entity})
        id = {step.id} 
        under parent {self._get_parent_id()}
        """
    )
    run_sync(step.send())  # type: ignore

get_user_response(prompt)

Ask for user response, wait for it, and return it, as a cl.Step rather than as a cl.Message so we can nest it under the parent step.

Source code in langroid/agent/callbacks/chainlit.py
def get_user_response(self, prompt: str) -> str:
    """Ask for user response, wait for it, and return it,
    as a cl.Step rather than as a cl.Message so we can nest it
    under the parent step.
    """
    return run_sync(self.ask_user_step(prompt=prompt, suppress_values=["c"]))

show_user_response(message)

Show user response as a step.

Source code in langroid/agent/callbacks/chainlit.py
def show_user_response(self, message: str) -> None:
    """Show user response as a step."""
    step = cl.Step(
        id=cl.context.current_step.id,
        name=self._entity_name("user"),
        type="run",
        parent_id=self._get_parent_id(),
    )
    step.output = message
    logger.info(
        f"""
        Showing USER response for {self.agent.config.name}
        id = {step.id} 
        under parent {self._get_parent_id()}
        """
    )
    run_sync(step.send())

show_first_user_message(msg)

Show first user message as a step.

Source code in langroid/agent/callbacks/chainlit.py
def show_first_user_message(self, msg: cl.Message):
    """Show first user message as a step."""
    step = cl.Step(
        id=msg.id,
        name=self._entity_name("user"),
        type="run",
        parent_id=self._get_parent_id(),
    )
    self.last_step = step
    step.output = msg.content
    run_sync(step.update())

ask_user_step(prompt, timeout=USER_TIMEOUT, suppress_values=['c']) async

Ask user for input, as a step nested under parent_id. Rather than rely entirely on AskUserMessage (which doesn't let us nest the question + answer under a step), we instead create fake steps for the question and answer, and only rely on AskUserMessage with an empty prompt to await user response.

Parameters:

Name Type Description Default
prompt str

Prompt to display to user

required
timeout int

Timeout in seconds

USER_TIMEOUT
suppress_values List[str]

List of values to suppress from display (e.g. "c" for continue)

['c']

Returns:

Name Type Description
str str

User response

Source code in langroid/agent/callbacks/chainlit.py
async def ask_user_step(
    self,
    prompt: str,
    timeout: int = USER_TIMEOUT,
    suppress_values: List[str] = ["c"],
) -> str:
    """
    Ask user for input, as a step nested under parent_id.
    Rather than rely entirely on AskUserMessage (which doesn't let us
    nest the question + answer under a step), we instead create fake
    steps for the question and answer, and only rely on AskUserMessage
    with an empty prompt to await user response.

    Args:
        prompt (str): Prompt to display to user
        timeout (int): Timeout in seconds
        suppress_values (List[str]): List of values to suppress from display
            (e.g. "c" for continue)

    Returns:
        str: User response
    """

    # save hide_cot status to restore later
    # (We should probably use a ctx mgr for this)
    hide_cot = config.ui.hide_cot

    # force hide_cot to False so that the user question + response is visible
    config.ui.hide_cot = False

    if prompt != "":
        # Create a question step to ask user
        question_step = cl.Step(
            name=f"{self.agent.config.name} (AskUser ❓)",
            type="run",
            parent_id=self._get_parent_id(),
        )
        question_step.output = prompt
        await question_step.send()  # type: ignore

    # Use AskUserMessage to await user response,
    # but with an empty prompt so the question is not visible,
    # but still pauses for user input in the input box.
    res = await cl.AskUserMessage(
        content="",
        timeout=timeout,
    ).send()

    if res is None:
        run_sync(
            cl.Message(
                content=f"Timed out after {USER_TIMEOUT} seconds. Exiting."
            ).send()
        )
        return "x"

    # The above will try to display user response in res
    # but we create fake step with same id as res and
    # erase it using empty output so it's not displayed
    step = cl.Step(
        id=res["id"],
        name="TempUserResponse",
        type="run",
        parent_id=self._get_parent_id(),
    )
    step.output = ""
    await step.update()  # type: ignore

    # Finally, reproduce the user response at right nesting level
    if res["output"] in suppress_values:
        config.ui.hide_cot = hide_cot  # restore original value
        return ""

    step = cl.Step(
        name=self._entity_name(entity="user"),
        type="run",
        parent_id=self._get_parent_id(),
    )
    step.output = res["output"]
    await step.send()  # type: ignore
    config.ui.hide_cot = hide_cot  # restore original value
    return res["output"]

ChainlitTaskCallbacks(task, msg=None, config=ChainlitCallbackConfig())

Bases: ChainlitAgentCallbacks

Recursively inject ChainlitAgentCallbacks into a Langroid Task's agent and agents of sub-tasks.

Source code in langroid/agent/callbacks/chainlit.py
def __init__(
    self,
    task: lr.Task,
    msg: cl.Message = None,
    config: ChainlitCallbackConfig = ChainlitCallbackConfig(),
):
    """Inject callbacks recursively, ensuring msg is passed to the
    top-level agent"""

    super().__init__(task.agent, msg, config)
    self._inject_callbacks(task)
    self.task = task
    if config.show_subtask_response:
        self.task.callbacks.show_subtask_response = self.show_subtask_response

show_subtask_response(task, content, is_tool=False)

Show sub-task response as a step, nested at the right level.

Source code in langroid/agent/callbacks/chainlit.py
def show_subtask_response(
    self, task: lr.Task, content: str, is_tool: bool = False
) -> None:
    """Show sub-task response as a step, nested at the right level."""

    # The step should nest under the calling agent's last step
    step = cl.Step(
        name=self.task.agent.config.name + f"( ⏎ From {task.agent.config.name})",
        type="run",
        parent_id=self._get_parent_id(),
        language="json" if is_tool else None,
    )
    step.output = content or NO_ANSWER
    self.last_step = step
    run_sync(step.send())

setup_llm() async

From the session llm_settings, create new LLMConfig and LLM objects, save them in session state.

Source code in langroid/agent/callbacks/chainlit.py
@no_type_check
async def setup_llm() -> None:
    """From the session `llm_settings`, create new LLMConfig and LLM objects,
    save them in session state."""
    llm_settings = cl.user_session.get("llm_settings", {})
    model = llm_settings.get("chat_model")
    context_length = llm_settings.get("context_length", 16_000)
    temperature = llm_settings.get("temperature", 0.2)
    timeout = llm_settings.get("timeout", 90)
    logger.info(f"Using model: {model}")
    llm_config = lm.OpenAIGPTConfig(
        chat_model=model or lm.OpenAIChatModel.GPT4o,
        # or, other possibilities for example:
        # "litellm/ollama_chat/mistral"
        # "litellm/ollama_chat/mistral:7b-instruct-v0.2-q8_0"
        # "litellm/ollama/llama2"
        # "local/localhost:8000/v1"
        # "local/localhost:8000"
        chat_context_length=context_length,  # adjust based on model
        temperature=temperature,
        timeout=timeout,
    )
    llm = lm.OpenAIGPT(llm_config)
    cl.user_session.set("llm_config", llm_config)
    cl.user_session.set("llm", llm)

update_llm(new_settings) async

Update LLMConfig and LLM from settings, and save in session state.

Source code in langroid/agent/callbacks/chainlit.py
@no_type_check
async def update_llm(new_settings: Dict[str, Any]) -> None:
    """Update LLMConfig and LLM from settings, and save in session state."""
    cl.user_session.set("llm_settings", new_settings)
    await inform_llm_settings()
    await setup_llm()

get_text_files(message, extensions=['.txt', '.pdf', '.doc', '.docx']) async

Get dict (file_name -> file_path) from files uploaded in chat msg

Source code in langroid/agent/callbacks/chainlit.py
async def get_text_files(
    message: cl.Message,
    extensions: List[str] = [".txt", ".pdf", ".doc", ".docx"],
) -> Dict[str, str]:
    """Get dict (file_name -> file_path) from files uploaded in chat msg"""

    files = [file for file in message.elements if file.path.endswith(tuple(extensions))]
    return {file.name: file.path for file in files}

wrap_text_preserving_structure(text, width=90)

Wrap text preserving paragraph breaks. Typically used to format an agent_response output, which may have long lines with no newlines or paragraph breaks.

Source code in langroid/agent/callbacks/chainlit.py
def wrap_text_preserving_structure(text: str, width: int = 90) -> str:
    """Wrap text preserving paragraph breaks. Typically used to
    format an agent_response output, which may have long lines
    with no newlines or paragraph breaks."""

    paragraphs = text.split("\n\n")  # Split the text into paragraphs
    wrapped_text = []

    for para in paragraphs:
        if para.strip():  # If the paragraph is not just whitespace
            # Wrap this paragraph and add it to the result
            wrapped_paragraph = textwrap.fill(para, width=width)
            wrapped_text.append(wrapped_paragraph)
        else:
            # Preserve paragraph breaks
            wrapped_text.append("")

    return "\n\n".join(wrapped_text)