Skip to content

chainlit

langroid/agent/callbacks/chainlit.py

Callbacks for Chainlit integration.

ChainlitAgentCallbacks(agent, config=ChainlitCallbackConfig())

Inject Chainlit callbacks into a Langroid Agent

Source code in langroid/agent/callbacks/chainlit.py
def __init__(
    self,
    agent: lr.Agent,
    config: ChainlitCallbackConfig = ChainlitCallbackConfig(),
):
    """Add callbacks to the agent, and save the initial message,
    so we can alter the display of the first user message.
    """
    agent.callbacks.start_llm_stream = self.start_llm_stream
    agent.callbacks.start_llm_stream_async = self.start_llm_stream_async
    agent.callbacks.cancel_llm_stream = self.cancel_llm_stream
    agent.callbacks.finish_llm_stream = self.finish_llm_stream
    agent.callbacks.show_llm_response = self.show_llm_response
    agent.callbacks.show_agent_response = self.show_agent_response
    agent.callbacks.get_user_response = self.get_user_response
    agent.callbacks.get_user_response_async = self.get_user_response_async
    agent.callbacks.get_last_step = self.get_last_step
    agent.callbacks.set_parent_agent = self.set_parent_agent
    agent.callbacks.show_error_message = self.show_error_message
    agent.callbacks.show_start_response = self.show_start_response
    self.config = config
    self.agent: lr.Agent = agent
    if self.agent.llm is not None:
        # We don't want to suppress LLM output in async + streaming,
        # since we often use chainlit async callbacks to display LLM output
        self.agent.llm.config.async_stream_quiet = False

start_llm_stream()

Returns a streaming fn that can be passed to the LLM class

Source code in langroid/agent/callbacks/chainlit.py
def start_llm_stream(self) -> Callable[[str, StreamEventType], None]:
    """Returns a streaming fn that can be passed to the LLM class"""
    self.stream = cl.Message(
        content="",
        id=self.curr_step.id if self.curr_step is not None else None,
        author=self._entity_name("llm"),
        type="assistant_message",
        parent_id=self._get_parent_id(),
    )
    self.last_step = self.stream
    self.curr_step = None
    logger.info(
        f"""
        Starting LLM stream for {self.agent.config.name}
        id = {self.stream.id} 
        under parent {self._get_parent_id()}
    """
    )

    def stream_token(t: str, e: StreamEventType) -> None:
        if self.stream is None:
            raise ValueError("Stream not initialized")
        run_sync(self.stream.stream_token(t))

    return stream_token

start_llm_stream_async() async

Returns a streaming fn that can be passed to the LLM class

Source code in langroid/agent/callbacks/chainlit.py
async def start_llm_stream_async(self) -> Callable[[str, StreamEventType], None]:
    """Returns a streaming fn that can be passed to the LLM class"""
    self.stream = cl.Message(
        content="",
        id=self.curr_step.id if self.curr_step is not None else None,
        author=self._entity_name("llm"),
        type="assistant_message",
        parent_id=self._get_parent_id(),
    )
    self.last_step = self.stream
    self.curr_step = None
    logger.info(
        f"""
        Starting LLM stream for {self.agent.config.name}
        id = {self.stream.id} 
        under parent {self._get_parent_id()}
        """
    )

    async def stream_token(t: str, e: StreamEventType) -> None:
        if self.stream is None:
            raise ValueError("Stream not initialized")
        await self.stream.stream_token(t)

    return stream_token

cancel_llm_stream()

Called when cached response found.

Source code in langroid/agent/callbacks/chainlit.py
def cancel_llm_stream(self) -> None:
    """Called when cached response found."""
    self.last_step = None
    if self.stream is not None:
        run_sync(self.stream.remove())  # type: ignore

finish_llm_stream(content, is_tool=False)

Update the stream, and display entire response in the right language.

Source code in langroid/agent/callbacks/chainlit.py
def finish_llm_stream(self, content: str, is_tool: bool = False) -> None:
    """Update the stream, and display entire response in the right language."""
    if self.agent.llm is None or self.stream is None:
        raise ValueError("LLM or stream not initialized")
    if content == "":
        run_sync(self.stream.remove())  # type: ignore
    else:
        run_sync(self.stream.update())  # type: ignore
    stream_id = self.stream.id if content else None
    step = cl.Message(
        content=textwrap.dedent(content) or NO_ANSWER,
        id=stream_id,
        author=self._entity_name("llm", tool=is_tool),
        type="assistant_message",
        parent_id=self._get_parent_id(),
        language="json" if is_tool else None,
    )
    logger.info(
        f"""
        Finish STREAM LLM response for {self.agent.config.name}
        id = {step.id} 
        under parent {self._get_parent_id()}
        """
    )
    run_sync(step.update())  # type: ignore

show_llm_response(content, is_tool=False, cached=False, language=None)

Show non-streaming LLM response.

Source code in langroid/agent/callbacks/chainlit.py
def show_llm_response(
    self,
    content: str,
    is_tool: bool = False,
    cached: bool = False,
    language: str | None = None,
) -> None:
    """Show non-streaming LLM response."""
    step = cl.Message(
        content=textwrap.dedent(content) or NO_ANSWER,
        id=self.curr_step.id if self.curr_step is not None else None,
        author=self._entity_name("llm", tool=is_tool, cached=cached),
        type="assistant_message",
        language=language or ("json" if is_tool else None),
        parent_id=self._get_parent_id(),
    )
    self.last_step = step
    self.curr_step = None
    logger.info(
        f"""
        Showing NON-STREAM LLM response for {self.agent.config.name}
        id = {step.id} 
        under parent {self._get_parent_id()}
        """
    )
    run_sync(step.send())  # type: ignore

show_error_message(error)

Show error message.

Source code in langroid/agent/callbacks/chainlit.py
def show_error_message(self, error: str) -> None:
    """Show error message."""
    step = cl.Message(
        content=error,
        author=self.agent.config.name + f"({ERROR})",
        type="run",
        language="text",
        parent_id=self._get_parent_id(),
    )
    self.last_step = step
    run_sync(step.send())

show_agent_response(content, language='text')

Show message from agent (typically tool handler).

Source code in langroid/agent/callbacks/chainlit.py
def show_agent_response(self, content: str, language="text") -> None:
    """Show message from agent (typically tool handler)."""
    if language == "text":
        content = wrap_text_preserving_structure(content, width=90)
    step = cl.Message(
        content=content,
        id=self.curr_step.id if self.curr_step is not None else None,
        author=self._entity_name("agent"),
        type="tool",
        language=language,
        parent_id=self._get_parent_id(),
    )
    self.last_step = step
    self.curr_step = None
    logger.info(
        f"""
        Showing AGENT response for {self.agent.config.name}
        id = {step.id} 
        under parent {self._get_parent_id()}
        """
    )
    run_sync(step.send())  # type: ignore

show_start_response(entity)

When there's a potentially long-running process, start a step, so that the UI displays a spinner while the process is running.

Source code in langroid/agent/callbacks/chainlit.py
def show_start_response(self, entity: str) -> None:
    """When there's a potentially long-running process, start a step,
    so that the UI displays a spinner while the process is running."""
    if self.curr_step is not None:
        run_sync(self.curr_step.remove())  # type: ignore
    step = cl.Message(
        content="",
        author=self._entity_name(entity),
        type="run",
        parent_id=self._get_parent_id(),
        language="text",
    )
    self.last_step = step
    self.curr_step = step
    logger.info(
        f"""
        Showing START response for {self.agent.config.name} ({entity})
        id = {step.id} 
        under parent {self._get_parent_id()}
        """
    )
    run_sync(step.send())  # type: ignore

get_user_response(prompt)

Ask for user response, wait for it, and return it

Source code in langroid/agent/callbacks/chainlit.py
def get_user_response(self, prompt: str) -> str:
    """Ask for user response, wait for it, and return it"""

    return run_sync(self.ask_user(prompt=prompt, suppress_values=["c"]))

get_user_response_async(prompt) async

Ask for user response, wait for it, and return it

Source code in langroid/agent/callbacks/chainlit.py
async def get_user_response_async(self, prompt: str) -> str:
    """Ask for user response, wait for it, and return it"""

    return await self.ask_user(prompt=prompt, suppress_values=["c"])

ask_user(prompt, timeout=USER_TIMEOUT, suppress_values=['c']) async

Ask user for input.

Parameters:

Name Type Description Default
prompt str

Prompt to display to user

required
timeout int

Timeout in seconds

USER_TIMEOUT
suppress_values List[str]

List of values to suppress from display (e.g. "c" for continue)

['c']

Returns:

Name Type Description
str str

User response

Source code in langroid/agent/callbacks/chainlit.py
async def ask_user(
    self,
    prompt: str,
    timeout: int = USER_TIMEOUT,
    suppress_values: List[str] = ["c"],
) -> str:
    """
    Ask user for input.

    Args:
        prompt (str): Prompt to display to user
        timeout (int): Timeout in seconds
        suppress_values (List[str]): List of values to suppress from display
            (e.g. "c" for continue)

    Returns:
        str: User response
    """
    ask_msg = cl.AskUserMessage(
        content=prompt,
        author=f"{self.agent.config.name}(Awaiting user input...)",
        type="assistant_message",
        timeout=timeout,
    )
    res = await ask_msg.send()
    if prompt == "":
        # if there was no actual prompt, clear the row from the UI for clarity.
        await ask_msg.remove()

    if res is None:
        run_sync(
            cl.Message(
                content=f"Timed out after {USER_TIMEOUT} seconds. Exiting."
            ).send()
        )
        return "x"

    # Finally, reproduce the user response at right nesting level
    if res["output"] in suppress_values:
        return ""

    return res["output"]

ChainlitTaskCallbacks(task, config=ChainlitCallbackConfig())

Bases: ChainlitAgentCallbacks

Recursively inject ChainlitAgentCallbacks into a Langroid Task's agent and agents of sub-tasks.

Source code in langroid/agent/callbacks/chainlit.py
def __init__(
    self,
    task: lr.Task,
    config: ChainlitCallbackConfig = ChainlitCallbackConfig(),
):
    """Inject callbacks recursively, ensuring msg is passed to the
    top-level agent"""

    super().__init__(task.agent, config)
    self._inject_callbacks(task)
    self.task = task
    if config.show_subtask_response:
        self.task.callbacks.show_subtask_response = self.show_subtask_response

show_subtask_response(task, content, is_tool=False)

Show sub-task response as a step, nested at the right level.

Source code in langroid/agent/callbacks/chainlit.py
def show_subtask_response(
    self, task: lr.Task, content: str, is_tool: bool = False
) -> None:
    """Show sub-task response as a step, nested at the right level."""

    # The step should nest under the calling agent's last step
    step = cl.Message(
        content=content or NO_ANSWER,
        author=(
            self.task.agent.config.name + f"( ⏎ From {task.agent.config.name})"
        ),
        type="run",
        parent_id=self._get_parent_id(),
        language="json" if is_tool else None,
    )
    self.last_step = step
    run_sync(step.send())

setup_llm() async

From the session llm_settings, create new LLMConfig and LLM objects, save them in session state.

Source code in langroid/agent/callbacks/chainlit.py
@no_type_check
async def setup_llm() -> None:
    """From the session `llm_settings`, create new LLMConfig and LLM objects,
    save them in session state."""
    llm_settings = cl.user_session.get("llm_settings", {})
    model = llm_settings.get("chat_model")
    context_length = llm_settings.get("context_length", 16_000)
    temperature = llm_settings.get("temperature", 0.2)
    timeout = llm_settings.get("timeout", 90)
    logger.info(f"Using model: {model}")
    llm_config = lm.OpenAIGPTConfig(
        chat_model=model or lm.OpenAIChatModel.GPT4o,
        # or, other possibilities for example:
        # "litellm/ollama_chat/mistral"
        # "litellm/ollama_chat/mistral:7b-instruct-v0.2-q8_0"
        # "litellm/ollama/llama2"
        # "local/localhost:8000/v1"
        # "local/localhost:8000"
        chat_context_length=context_length,  # adjust based on model
        temperature=temperature,
        timeout=timeout,
    )
    llm = lm.OpenAIGPT(llm_config)
    cl.user_session.set("llm_config", llm_config)
    cl.user_session.set("llm", llm)

update_llm(new_settings) async

Update LLMConfig and LLM from settings, and save in session state.

Source code in langroid/agent/callbacks/chainlit.py
@no_type_check
async def update_llm(new_settings: Dict[str, Any]) -> None:
    """Update LLMConfig and LLM from settings, and save in session state."""
    cl.user_session.set("llm_settings", new_settings)
    await inform_llm_settings()
    await setup_llm()

get_text_files(message, extensions=['.txt', '.pdf', '.doc', '.docx']) async

Get dict (file_name -> file_path) from files uploaded in chat msg

Source code in langroid/agent/callbacks/chainlit.py
async def get_text_files(
    message: cl.Message,
    extensions: List[str] = [".txt", ".pdf", ".doc", ".docx"],
) -> Dict[str, str]:
    """Get dict (file_name -> file_path) from files uploaded in chat msg"""

    files = [file for file in message.elements if file.path.endswith(tuple(extensions))]
    return {file.name: file.path for file in files}

wrap_text_preserving_structure(text, width=90)

Wrap text preserving paragraph breaks. Typically used to format an agent_response output, which may have long lines with no newlines or paragraph breaks.

Source code in langroid/agent/callbacks/chainlit.py
def wrap_text_preserving_structure(text: str, width: int = 90) -> str:
    """Wrap text preserving paragraph breaks. Typically used to
    format an agent_response output, which may have long lines
    with no newlines or paragraph breaks."""

    paragraphs = text.split("\n\n")  # Split the text into paragraphs
    wrapped_text = []

    for para in paragraphs:
        if para.strip():  # If the paragraph is not just whitespace
            # Wrap this paragraph and add it to the result
            wrapped_paragraph = textwrap.fill(para, width=width)
            wrapped_text.append(wrapped_paragraph)
        else:
            # Preserve paragraph breaks
            wrapped_text.append("")

    return "\n\n".join(wrapped_text)