Skip to content

special

langroid/agent/special/init.py

RelevanceExtractorAgent(config)

Bases: ChatAgent

Agent for extracting segments from text, that are relevant to a given query.

Source code in langroid/agent/special/relevance_extractor_agent.py
def __init__(self, config: RelevanceExtractorAgentConfig):
    super().__init__(config)
    self.config: RelevanceExtractorAgentConfig = config
    self.enable_message(SegmentExtractTool)
    self.numbered_passage: Optional[str] = None

llm_response(message=None)

Compose a prompt asking to extract relevant segments from a passage. Steps: - number the segments in the passage - compose prompt - send to LLM

Source code in langroid/agent/special/relevance_extractor_agent.py
@no_type_check
def llm_response(
    self, message: Optional[str | ChatDocument] = None
) -> Optional[ChatDocument]:
    """Compose a prompt asking to extract relevant segments from a passage.
    Steps:
    - number the segments in the passage
    - compose prompt
    - send to LLM
    """
    assert self.config.query is not None, "No query specified"
    assert message is not None, "No message specified"
    message_str = message.content if isinstance(message, ChatDocument) else message
    # number the segments in the passage
    self.numbered_passage = number_segments(message_str, self.config.segment_length)
    # compose prompt
    prompt = f"""
    PASSAGE:
    {self.numbered_passage}

    QUERY: {self.config.query}
    """
    # send to LLM
    return super().llm_response(prompt)

llm_response_async(message=None) async

Compose a prompt asking to extract relevant segments from a passage. Steps: - number the segments in the passage - compose prompt - send to LLM The LLM is expected to generate a structured msg according to the SegmentExtractTool schema, i.e. it should contain a segment_list field whose value is a list of segment numbers or ranges, like "10,12,14-17".

Source code in langroid/agent/special/relevance_extractor_agent.py
@no_type_check
async def llm_response_async(
    self, message: Optional[str | ChatDocument] = None
) -> Optional[ChatDocument]:
    """
    Compose a prompt asking to extract relevant segments from a passage.
    Steps:
    - number the segments in the passage
    - compose prompt
    - send to LLM
    The LLM is expected to generate a structured msg according to the
    SegmentExtractTool schema, i.e. it should contain a `segment_list` field
    whose value is a list of segment numbers or ranges, like "10,12,14-17".
    """

    assert self.config.query is not None, "No query specified"
    assert message is not None, "No message specified"
    message_str = message.content if isinstance(message, ChatDocument) else message
    # number the segments in the passage
    self.numbered_passage = number_segments(message_str, self.config.segment_length)
    # compose prompt
    prompt = f"""
    PASSAGE:
    {self.numbered_passage}

    QUERY: {self.config.query}
    """
    # send to LLM
    return await super().llm_response_async(prompt)

extract_segments(msg)

Method to handle a segmentExtractTool message from LLM

Source code in langroid/agent/special/relevance_extractor_agent.py
def extract_segments(self, msg: SegmentExtractTool) -> str:
    """Method to handle a segmentExtractTool message from LLM"""
    spec = msg.segment_list
    if len(self.message_history) == 0:
        return DONE + " " + NO_ANSWER
    if spec is None or spec.strip() in ["", NO_ANSWER]:
        return DONE + " " + NO_ANSWER
    assert self.numbered_passage is not None, "No numbered passage"
    # assume this has numbered segments
    try:
        extracts = extract_numbered_segments(self.numbered_passage, spec)
    except Exception:
        return DONE + " " + NO_ANSWER
    # this response ends the task by saying DONE
    return DONE + " " + extracts

handle_message_fallback(msg)

Handle case where LLM forgets to use SegmentExtractTool

Source code in langroid/agent/special/relevance_extractor_agent.py
def handle_message_fallback(
    self, msg: str | ChatDocument
) -> str | ChatDocument | None:
    """Handle case where LLM forgets to use SegmentExtractTool"""
    if isinstance(msg, ChatDocument) and msg.metadata.sender == Entity.LLM:
        return DONE + " " + NO_ANSWER
    else:
        return None

DocChatAgent(config)

Bases: ChatAgent

Agent for chatting with a collection of documents.

Source code in langroid/agent/special/doc_chat_agent.py
def __init__(
    self,
    config: DocChatAgentConfig,
):
    super().__init__(config)
    self.config: DocChatAgentConfig = config
    self.original_docs: List[Document] = []
    self.original_docs_length = 0
    self.from_dataframe = False
    self.df_description = ""
    self.chunked_docs: List[Document] = []
    self.chunked_docs_clean: List[Document] = []
    self.response: None | Document = None
    if len(config.doc_paths) > 0:
        self.ingest()

clear()

Clear the document collection and the specific collection in vecdb

Source code in langroid/agent/special/doc_chat_agent.py
def clear(self) -> None:
    """Clear the document collection and the specific collection in vecdb"""
    self.original_docs = []
    self.original_docs_length = 0
    self.chunked_docs = []
    self.chunked_docs_clean = []
    if self.vecdb is None:
        logger.warning("Attempting to clear VecDB, but VecDB not set.")
        return
    collection_name = self.vecdb.config.collection_name
    if collection_name is None:
        return
    try:
        # Note we may have used a vecdb with a config.collection_name
        # different from the agent's config.vecdb.collection_name!!
        self.vecdb.delete_collection(collection_name)
        self.vecdb = VectorStore.create(self.vecdb.config)
    except Exception as e:
        logger.warning(
            f"""
            Error while deleting collection {collection_name}:
            {e}
            """
        )

ingest()

Chunk + embed + store docs specified by self.config.doc_paths

Source code in langroid/agent/special/doc_chat_agent.py
def ingest(self) -> None:
    """
    Chunk + embed + store docs specified by self.config.doc_paths
    """
    if len(self.config.doc_paths) == 0:
        # we must be using a previously defined collection
        # But let's get all the chunked docs so we can
        # do keyword and other non-vector searches
        if self.vecdb is None:
            raise ValueError("VecDB not set")
        self.setup_documents(filter=self.config.filter)
        return
    self.ingest_doc_paths(self.config.doc_paths)  # type: ignore

ingest_doc_paths(paths, metadata=[], doc_type=None)

Split, ingest docs from specified paths, do not add these to config.doc_paths.

Parameters:

Name Type Description Default
paths str | bytes | List[str | bytes]

document paths, urls or byte-content of docs. The bytes option is intended to support cases where a document has already been read in as bytes (e.g. from an API or a database), and we want to avoid having to write it to a temporary file just to read it back in.

required
metadata List[Dict[str, Any]] | Dict[str, Any] | DocMetaData | List[DocMetaData]

List of metadata dicts, one for each path. If a single dict is passed in, it is used for all paths.

[]
doc_type str | DocumentType | None

DocumentType to use for parsing, if known. MUST apply to all docs if specified. This is especially useful when the paths are of bytes type, to help with document type detection.

None

Returns: List of Document objects

Source code in langroid/agent/special/doc_chat_agent.py
def ingest_doc_paths(
    self,
    paths: str | bytes | List[str | bytes],
    metadata: (
        List[Dict[str, Any]] | Dict[str, Any] | DocMetaData | List[DocMetaData]
    ) = [],
    doc_type: str | DocumentType | None = None,
) -> List[Document]:
    """Split, ingest docs from specified paths,
    do not add these to config.doc_paths.

    Args:
        paths: document paths, urls or byte-content of docs.
            The bytes option is intended to support cases where a document
            has already been read in as bytes (e.g. from an API or a database),
            and we want to avoid having to write it to a temporary file
            just to read it back in.
        metadata: List of metadata dicts, one for each path.
            If a single dict is passed in, it is used for all paths.
        doc_type: DocumentType to use for parsing, if known.
            MUST apply to all docs if specified.
            This is especially useful when the `paths` are of bytes type,
            to help with document type detection.
    Returns:
        List of Document objects
    """
    if isinstance(paths, str) or isinstance(paths, bytes):
        paths = [paths]
    all_paths = paths
    paths_meta: Dict[int, Any] = {}
    urls_meta: Dict[int, Any] = {}
    idxs = range(len(all_paths))
    url_idxs, path_idxs, bytes_idxs = get_urls_paths_bytes_indices(all_paths)
    urls = [all_paths[i] for i in url_idxs]
    paths = [all_paths[i] for i in path_idxs]
    bytes_list = [all_paths[i] for i in bytes_idxs]
    path_idxs.extend(bytes_idxs)
    paths.extend(bytes_list)
    if (isinstance(metadata, list) and len(metadata) > 0) or not isinstance(
        metadata, list
    ):
        if isinstance(metadata, list):
            idx2meta = {
                p: (
                    m
                    if isinstance(m, dict)
                    else (isinstance(m, DocMetaData) and m.dict())
                )  # appease mypy
                for p, m in zip(idxs, metadata)
            }
        elif isinstance(metadata, dict):
            idx2meta = {p: metadata for p in idxs}
        else:
            idx2meta = {p: metadata.dict() for p in idxs}
        urls_meta = {u: idx2meta[u] for u in url_idxs}
        paths_meta = {p: idx2meta[p] for p in path_idxs}
    docs: List[Document] = []
    parser = Parser(self.config.parsing)
    if len(urls) > 0:
        for ui in url_idxs:
            meta = urls_meta.get(ui, {})
            loader = URLLoader(urls=[all_paths[ui]], parser=parser)  # type: ignore
            url_docs = loader.load()
            # update metadata of each doc with meta
            for d in url_docs:
                d.metadata = d.metadata.copy(update=meta)
            docs.extend(url_docs)
    if len(paths) > 0:  # paths OR bytes are handled similarly
        for pi in path_idxs:
            meta = paths_meta.get(pi, {})
            p = all_paths[pi]
            path_docs = RepoLoader.get_documents(
                p,
                parser=parser,
                doc_type=doc_type,
            )
            # update metadata of each doc with meta
            for d in path_docs:
                d.metadata = d.metadata.copy(update=meta)
            docs.extend(path_docs)
    n_docs = len(docs)
    n_splits = self.ingest_docs(docs, split=self.config.split)
    if n_docs == 0:
        return []
    n_urls = len(urls)
    n_paths = len(paths)
    print(
        f"""
    [green]I have processed the following {n_urls} URLs
    and {n_paths} docs into {n_splits} parts:
    """.strip()
    )
    path_reps = [p if isinstance(p, str) else "bytes" for p in paths]
    print("\n".join([u for u in urls if isinstance(u, str)]))  # appease mypy
    print("\n".join(path_reps))
    return docs

ingest_docs(docs, split=True, metadata=[])

Chunk docs into pieces, map each chunk to vec-embedding, store in vec-db

Parameters:

Name Type Description Default
docs List[Document]

List of Document objects

required
split bool

Whether to split docs into chunks. Default is True. If False, docs are treated as "chunks" and are not split.

True
metadata List[Dict[str, Any]] | Dict[str, Any] | DocMetaData | List[DocMetaData]

List of metadata dicts, one for each doc, to augment whatever metadata is already in the doc. [ASSUME no conflicting keys between the two metadata dicts.] If a single dict is passed in, it is used for all docs.

[]
Source code in langroid/agent/special/doc_chat_agent.py
def ingest_docs(
    self,
    docs: List[Document],
    split: bool = True,
    metadata: (
        List[Dict[str, Any]] | Dict[str, Any] | DocMetaData | List[DocMetaData]
    ) = [],
) -> int:
    """
    Chunk docs into pieces, map each chunk to vec-embedding, store in vec-db

    Args:
        docs: List of Document objects
        split: Whether to split docs into chunks. Default is True.
            If False, docs are treated as "chunks" and are not split.
        metadata: List of metadata dicts, one for each doc, to augment
            whatever metadata is already in the doc.
            [ASSUME no conflicting keys between the two metadata dicts.]
            If a single dict is passed in, it is used for all docs.
    """
    if isinstance(metadata, list) and len(metadata) > 0:
        for d, m in zip(docs, metadata):
            d.metadata = d.metadata.copy(
                update=m if isinstance(m, dict) else m.dict()  # type: ignore
            )
    elif isinstance(metadata, dict):
        for d in docs:
            d.metadata = d.metadata.copy(update=metadata)
    elif isinstance(metadata, DocMetaData):
        for d in docs:
            d.metadata = d.metadata.copy(update=metadata.dict())

    self.original_docs.extend(docs)
    if self.parser is None:
        raise ValueError("Parser not set")
    for d in docs:
        if d.metadata.id in [None, ""]:
            d.metadata.id = ObjectRegistry.new_id()
    if split:
        docs = self.parser.split(docs)
    else:
        if self.config.n_neighbor_chunks > 0:
            self.parser.add_window_ids(docs)
        # we're not splitting, so we mark each doc as a chunk
        for d in docs:
            d.metadata.is_chunk = True
    if self.vecdb is None:
        raise ValueError("VecDB not set")

    # If any additional fields need to be added to content,
    # add them as key=value pairs for all docs, before batching.
    # This helps retrieval for table-like data.
    # Note we need to do this at stage so that the embeddings
    # are computed on the full content with these additional fields.
    if len(self.config.add_fields_to_content) > 0:
        fields = [
            f for f in extract_fields(docs[0], self.config.add_fields_to_content)
        ]
        if len(fields) > 0:
            for d in docs:
                key_vals = extract_fields(d, fields)
                d.content = (
                    ",".join(f"{k}={v}" for k, v in key_vals.items())
                    + ",content="
                    + d.content
                )
    docs = docs[: self.config.parsing.max_chunks]
    # vecdb should take care of adding docs in batches;
    # batching can be controlled via vecdb.config.batch_size
    self.vecdb.add_documents(docs)
    self.original_docs_length = self.doc_length(docs)
    self.setup_documents(docs, filter=self.config.filter)
    return len(docs)

retrieval_tool(msg)

Handle the RetrievalTool message

Source code in langroid/agent/special/doc_chat_agent.py
def retrieval_tool(self, msg: RetrievalTool) -> str:
    """Handle the RetrievalTool message"""
    self.config.retrieve_only = True
    self.config.parsing.n_similar_docs = msg.num_results
    content_doc = self.answer_from_docs(msg.query)
    return content_doc.content

document_compatible_dataframe(df, content='content', metadata=[]) staticmethod

Convert dataframe so it is compatible with Document class: - has "content" column - has an "id" column to be used as Document.metadata.id

Parameters:

Name Type Description Default
df DataFrame

dataframe to convert

required
content str

name of content column

'content'
metadata List[str]

list of metadata column names

[]

Returns:

Type Description
Tuple[DataFrame, List[str]]

Tuple[pd.DataFrame, List[str]]: dataframe, metadata - dataframe: dataframe with "content" column and "id" column - metadata: list of metadata column names, including "id"

Source code in langroid/agent/special/doc_chat_agent.py
@staticmethod
def document_compatible_dataframe(
    df: pd.DataFrame,
    content: str = "content",
    metadata: List[str] = [],
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Convert dataframe so it is compatible with Document class:
    - has "content" column
    - has an "id" column to be used as Document.metadata.id

    Args:
        df: dataframe to convert
        content: name of content column
        metadata: list of metadata column names

    Returns:
        Tuple[pd.DataFrame, List[str]]: dataframe, metadata
            - dataframe: dataframe with "content" column and "id" column
            - metadata: list of metadata column names, including "id"
    """
    if content not in df.columns:
        raise ValueError(
            f"""
            Content column {content} not in dataframe,
            so we cannot ingest into the DocChatAgent.
            Please specify the `content` parameter as a suitable
            text-based column in the dataframe.
            """
        )
    if content != "content":
        # rename content column to "content", leave existing column intact
        df = df.rename(columns={content: "content"}, inplace=False)

    actual_metadata = metadata.copy()
    if "id" not in df.columns:
        docs = dataframe_to_documents(df, content="content", metadata=metadata)
        ids = [str(d.id()) for d in docs]
        df["id"] = ids

    if "id" not in actual_metadata:
        actual_metadata += ["id"]

    return df, actual_metadata

ingest_dataframe(df, content='content', metadata=[])

Ingest a dataframe into vecdb.

Source code in langroid/agent/special/doc_chat_agent.py
def ingest_dataframe(
    self,
    df: pd.DataFrame,
    content: str = "content",
    metadata: List[str] = [],
) -> int:
    """
    Ingest a dataframe into vecdb.
    """
    self.from_dataframe = True
    self.df_description = describe_dataframe(
        df, filter_fields=self.config.filter_fields, n_vals=5
    )
    df, metadata = DocChatAgent.document_compatible_dataframe(df, content, metadata)
    docs = dataframe_to_documents(df, content="content", metadata=metadata)
    # When ingesting a dataframe we will no longer do any chunking,
    # so we mark each doc as a chunk.
    # TODO - revisit this since we may still want to chunk large text columns
    for d in docs:
        d.metadata.is_chunk = True
    return self.ingest_docs(docs)

setup_documents(docs=[], filter=None)

Setup self.chunked_docs and self.chunked_docs_clean based on possible filter. These will be used in various non-vector-based search functions, e.g. self.get_similar_chunks_bm25(), self.get_fuzzy_matches(), etc.

Parameters:

Name Type Description Default
docs List[Document]

List of Document objects. This is empty when we are calling this method after initial doc ingestion.

[]
filter str | None

Filter condition for various lexical/semantic search fns.

None
Source code in langroid/agent/special/doc_chat_agent.py
def setup_documents(
    self,
    docs: List[Document] = [],
    filter: str | None = None,
) -> None:
    """
    Setup `self.chunked_docs` and `self.chunked_docs_clean`
    based on possible filter.
    These will be used in various non-vector-based search functions,
    e.g. self.get_similar_chunks_bm25(), self.get_fuzzy_matches(), etc.

    Args:
        docs: List of Document objects. This is empty when we are calling this
            method after initial doc ingestion.
        filter: Filter condition for various lexical/semantic search fns.
    """
    if filter is None and len(docs) > 0:
        # no filter, so just use the docs passed in
        self.chunked_docs.extend(docs)
    else:
        if self.vecdb is None:
            raise ValueError("VecDB not set")
        self.chunked_docs = self.vecdb.get_all_documents(where=filter or "")

    self.chunked_docs_clean = [
        Document(content=preprocess_text(d.content), metadata=d.metadata)
        for d in self.chunked_docs
    ]

get_field_values(fields)

Get string-listing of possible values of each field, e.g. { "genre": "crime, drama, mystery, ... (10 more)", "certificate": "R, PG-13, PG, R", } The field names may have "metadata." prefix, e.g. "metadata.genre".

Source code in langroid/agent/special/doc_chat_agent.py
def get_field_values(self, fields: list[str]) -> Dict[str, str]:
    """Get string-listing of possible values of each field,
    e.g.
    {
        "genre": "crime, drama, mystery, ... (10 more)",
        "certificate": "R, PG-13, PG, R",
    }
    The field names may have "metadata." prefix, e.g. "metadata.genre".
    """
    field_values: Dict[str, Set[str]] = {}
    # make empty set for each field
    for f in fields:
        field_values[f] = set()
    if self.vecdb is None:
        raise ValueError("VecDB not set")
    # get all documents and accumulate possible values of each field until 10
    docs = self.vecdb.get_all_documents()  # only works for vecdbs that support this
    for d in docs:
        # extract fields from d
        doc_field_vals = extract_fields(d, fields)
        # the `field` returned by extract_fields may contain only the last
        # part of the field name, e.g. "genre" instead of "metadata.genre",
        # so we use the orig_field name to fill in the values
        for (field, val), orig_field in zip(doc_field_vals.items(), fields):
            field_values[orig_field].add(val)
    # For each field make a string showing list of possible values,
    # truncate to 20 values, and if there are more, indicate how many
    # more there are, e.g. Genre: crime, drama, mystery, ... (20 more)
    field_values_list = {}
    for f in fields:
        vals = list(field_values[f])
        n = len(vals)
        remaining = n - 20
        vals = vals[:20]
        if n > 20:
            vals.append(f"(...{remaining} more)")
        # make a string of the values, ensure they are strings
        field_values_list[f] = ", ".join(str(v) for v in vals)
    return field_values_list

doc_length(docs)

Calc token-length of a list of docs Args: docs: list of Document objects Returns: int: number of tokens

Source code in langroid/agent/special/doc_chat_agent.py
def doc_length(self, docs: List[Document]) -> int:
    """
    Calc token-length of a list of docs
    Args:
        docs: list of Document objects
    Returns:
        int: number of tokens
    """
    if self.parser is None:
        raise ValueError("Parser not set")
    return self.parser.num_tokens(self.doc_string(docs))

user_docs_ingest_dialog()

Ask user to select doc-collection, enter filenames/urls, and ingest into vecdb.

Source code in langroid/agent/special/doc_chat_agent.py
def user_docs_ingest_dialog(self) -> None:
    """
    Ask user to select doc-collection, enter filenames/urls, and ingest into vecdb.
    """
    if self.vecdb is None:
        raise ValueError("VecDB not set")
    n_deletes = self.vecdb.clear_empty_collections()
    collections = self.vecdb.list_collections()
    collection_name = "NEW"
    is_new_collection = False
    replace_collection = False
    if len(collections) > 0:
        n = len(collections)
        delete_str = (
            f"(deleted {n_deletes} empty collections)" if n_deletes > 0 else ""
        )
        print(f"Found {n} collections: {delete_str}")
        for i, option in enumerate(collections, start=1):
            print(f"{i}. {option}")
        while True:
            choice = Prompt.ask(
                f"Enter 1-{n} to select a collection, "
                "or hit ENTER to create a NEW collection, "
                "or -1 to DELETE ALL COLLECTIONS",
                default="0",
            )
            try:
                if -1 <= int(choice) <= n:
                    break
            except Exception:
                pass

        if choice == "-1":
            confirm = Prompt.ask(
                "Are you sure you want to delete all collections?",
                choices=["y", "n"],
                default="n",
            )
            if confirm == "y":
                self.vecdb.clear_all_collections(really=True)
                collection_name = "NEW"

        if int(choice) > 0:
            collection_name = collections[int(choice) - 1]
            print(f"Using collection {collection_name}")
            choice = Prompt.ask(
                "Would you like to replace this collection?",
                choices=["y", "n"],
                default="n",
            )
            replace_collection = choice == "y"

    if collection_name == "NEW":
        is_new_collection = True
        collection_name = Prompt.ask(
            "What would you like to name the NEW collection?",
            default="doc-chat",
        )

    self.vecdb.set_collection(collection_name, replace=replace_collection)

    default_urls_str = (
        " (or leave empty for default URLs)" if is_new_collection else ""
    )
    print(f"[blue]Enter some URLs or file/dir paths below {default_urls_str}")
    inputs = get_list_from_user()
    if len(inputs) == 0:
        if is_new_collection:
            inputs = self.config.default_paths
    self.config.doc_paths = inputs  # type: ignore
    self.ingest()

doc_string(docs) staticmethod

Generate a string representation of a list of docs. Args: docs: list of Document objects Returns: str: string representation

Source code in langroid/agent/special/doc_chat_agent.py
@staticmethod
def doc_string(docs: List[Document]) -> str:
    """
    Generate a string representation of a list of docs.
    Args:
        docs: list of Document objects
    Returns:
        str: string representation
    """
    contents = [f"Extract: {d.content}" for d in docs]
    sources = [d.metadata.source for d in docs]
    sources = [f"Source: {s}" if s is not None else "" for s in sources]
    return "\n".join(
        [
            f"""
            [{i+1}]
            {content}
            {source}
            """
            for i, (content, source) in enumerate(zip(contents, sources))
        ]
    )

get_summary_answer(question, passages)

Given a question and a list of (possibly) doc snippets, generate an answer if possible Args: question: question to answer passages: list of Document objects each containing a possibly relevant snippet, and metadata Returns: a Document object containing the answer, and metadata containing source citations

Source code in langroid/agent/special/doc_chat_agent.py
def get_summary_answer(
    self, question: str, passages: List[Document]
) -> ChatDocument:
    """
    Given a question and a list of (possibly) doc snippets,
    generate an answer if possible
    Args:
        question: question to answer
        passages: list of `Document` objects each containing a possibly relevant
            snippet, and metadata
    Returns:
        a `Document` object containing the answer,
        and metadata containing source citations

    """

    passages_str = self.doc_string(passages)
    # Substitute Q and P into the templatized prompt

    final_prompt = self.config.summarize_prompt.format(
        question=question, extracts=passages_str
    )
    show_if_debug(final_prompt, "SUMMARIZE_PROMPT= ")

    # Generate the final verbatim extract based on the final prompt.
    # Note this will send entire message history, plus this final_prompt
    # to the LLM, and self.message_history will be updated to include
    # 2 new LLMMessage objects:
    # one for `final_prompt`, and one for the LLM response

    if self.config.conversation_mode:
        # respond with temporary context
        answer_doc = super()._llm_response_temp_context(question, final_prompt)
    else:
        answer_doc = super().llm_response_forget(final_prompt)

    final_answer = answer_doc.content.strip()
    show_if_debug(final_answer, "SUMMARIZE_RESPONSE= ")

    citations = extract_markdown_references(final_answer)

    citations_str = ""
    if len(citations) > 0:
        # append [i] source, content for each citation
        citations_str = "\n".join(
            [
                f"[^{c}] {passages[c-1].metadata.source}"
                f"\n{format_footnote_text(passages[c-1].content)}"
                for c in citations
            ]
        )

    return ChatDocument(
        content=final_answer,  # does not contain citations
        metadata=ChatDocMetaData(
            source=citations_str,  # only the citations
            sender=Entity.LLM,
            has_citation=len(citations) > 0,
            cached=getattr(answer_doc.metadata, "cached", False),
        ),
    )

rerank_with_diversity(passages)

Rerank a list of items in such a way that each successive item is least similar (on average) to the earlier items.

Args: query (str): The query for which the passages are relevant. passages (List[Document]): A list of Documents to be reranked.

Returns: List[Documents]: A reranked list of Documents.

Source code in langroid/agent/special/doc_chat_agent.py
def rerank_with_diversity(self, passages: List[Document]) -> List[Document]:
    """
    Rerank a list of items in such a way that each successive item is least similar
    (on average) to the earlier items.

    Args:
    query (str): The query for which the passages are relevant.
    passages (List[Document]): A list of Documents to be reranked.

    Returns:
    List[Documents]: A reranked list of Documents.
    """

    if self.vecdb is None:
        logger.warning("No vecdb; cannot use rerank_with_diversity")
        return passages
    emb_model = self.vecdb.embedding_model
    emb_fn = emb_model.embedding_fn()
    embs = emb_fn([p.content for p in passages])
    embs_arr = [np.array(e) for e in embs]
    indices = list(range(len(passages)))

    # Helper function to compute average similarity to
    # items in the current result list.
    def avg_similarity_to_result(i: int, result: List[int]) -> float:
        return sum(  # type: ignore
            (embs_arr[i] @ embs_arr[j])
            / (np.linalg.norm(embs_arr[i]) * np.linalg.norm(embs_arr[j]))
            for j in result
        ) / len(result)

    # copy passages to items
    result = [indices.pop(0)]  # Start with the first item.

    while indices:
        # Find the item that has the least average similarity
        # to items in the result list.
        least_similar_item = min(
            indices, key=lambda i: avg_similarity_to_result(i, result)
        )
        result.append(least_similar_item)
        indices.remove(least_similar_item)

    # return passages in order of result list
    return [passages[i] for i in result]

rerank_to_periphery(passages)

Rerank to avoid Lost In the Middle (LIM) problem, where LLMs pay more attention to items at the ends of a list, rather than the middle. So we re-rank to make the best passages appear at the periphery of the list. https://arxiv.org/abs/2307.03172

Example reranking: 1 2 3 4 5 6 7 8 9 ==> 1 3 5 7 9 8 6 4 2

Parameters:

Name Type Description Default
passages List[Document]

A list of Documents to be reranked.

required

Returns:

Type Description
List[Document]

List[Documents]: A reranked list of Documents.

Source code in langroid/agent/special/doc_chat_agent.py
def rerank_to_periphery(self, passages: List[Document]) -> List[Document]:
    """
    Rerank to avoid Lost In the Middle (LIM) problem,
    where LLMs pay more attention to items at the ends of a list,
    rather than the middle. So we re-rank to make the best passages
    appear at the periphery of the list.
    https://arxiv.org/abs/2307.03172

    Example reranking:
    1 2 3 4 5 6 7 8 9 ==> 1 3 5 7 9 8 6 4 2

    Args:
        passages (List[Document]): A list of Documents to be reranked.

    Returns:
        List[Documents]: A reranked list of Documents.

    """
    # Splitting items into odds and evens based on index, not value
    odds = passages[::2]
    evens = passages[1::2][::-1]

    # Merging them back together
    return odds + evens

add_context_window(docs_scores)

In each doc's metadata, there may be a window_ids field indicating the ids of the chunks around the current chunk. We use these stored window_ids to retrieve the desired number (self.config.n_neighbor_chunks) of neighbors on either side of the current chunk.

Parameters:

Name Type Description Default
docs_scores List[Tuple[Document, float]]

List of pairs of documents to add context windows to together with their match scores.

required

Returns:

Type Description
List[Tuple[Document, float]]

List[Tuple[Document, float]]: List of (Document, score) tuples.

Source code in langroid/agent/special/doc_chat_agent.py
def add_context_window(
    self,
    docs_scores: List[Tuple[Document, float]],
) -> List[Tuple[Document, float]]:
    """
    In each doc's metadata, there may be a window_ids field indicating
    the ids of the chunks around the current chunk. We use these stored
    window_ids to retrieve the desired number
    (self.config.n_neighbor_chunks) of neighbors
    on either side of the current chunk.

    Args:
        docs_scores (List[Tuple[Document, float]]): List of pairs of documents
            to add context windows to together with their match scores.

    Returns:
        List[Tuple[Document, float]]: List of (Document, score) tuples.
    """
    if self.vecdb is None or self.config.n_neighbor_chunks == 0:
        return docs_scores
    if len(docs_scores) == 0:
        return []
    if set(docs_scores[0][0].__fields__) != {"content", "metadata"}:
        # Do not add context window when there are other fields besides just
        # content and metadata, since we do not know how to set those other fields
        # for newly created docs with combined content.
        return docs_scores
    return self.vecdb.add_context_window(docs_scores, self.config.n_neighbor_chunks)

get_semantic_search_results(query, k=10)

Get semantic search results from vecdb. Args: query (str): query to search for k (int): number of results to return Returns: List[Tuple[Document, float]]: List of (Document, score) tuples.

Source code in langroid/agent/special/doc_chat_agent.py
def get_semantic_search_results(
    self,
    query: str,
    k: int = 10,
) -> List[Tuple[Document, float]]:
    """
    Get semantic search results from vecdb.
    Args:
        query (str): query to search for
        k (int): number of results to return
    Returns:
        List[Tuple[Document, float]]: List of (Document, score) tuples.
    """
    if self.vecdb is None:
        raise ValueError("VecDB not set")
    # Note: for dynamic filtering based on a query, users can
    # use the `temp_update` context-manager to pass in a `filter` to self.config,
    # e.g.:
    # with temp_update(self.config, {"filter": "metadata.source=='source1'"}):
    #     docs_scores = self.get_semantic_search_results(query, k=k)
    # This avoids having pass the `filter` argument to every function call
    # upstream of this one.
    # The `temp_update` context manager is defined in
    # `langroid/utils/pydantic_utils.py`
    return self.vecdb.similar_texts_with_scores(
        query,
        k=k,
        where=self.config.filter,
    )

get_relevant_chunks(query, query_proxies=[])

The retrieval stage in RAG: get doc-chunks that are most "relevant" to the query (and possibly any proxy queries), from the document-store, which currently is the vector store, but in theory could be any document store, or even web-search. This stage does NOT involve an LLM, and the retrieved chunks could either be pre-chunked text (from the initial pre-processing stage where chunks were stored in the vector store), or they could be dynamically retrieved based on a window around a lexical match.

These are the steps (some optional based on config): - semantic search based on vector-embedding distance, from vecdb - lexical search using bm25-ranking (keyword similarity) - fuzzy matching (keyword similarity) - re-ranking of doc-chunks by relevance to query, using cross-encoder, and pick top k

Parameters:

Name Type Description Default
query str

original query (assumed to be in stand-alone form)

required
query_proxies List[str]

possible rephrases, or hypothetical answer to query (e.g. for HyDE-type retrieval)

[]

Returns:

Source code in langroid/agent/special/doc_chat_agent.py
def get_relevant_chunks(
    self, query: str, query_proxies: List[str] = []
) -> List[Document]:
    """
    The retrieval stage in RAG: get doc-chunks that are most "relevant"
    to the query (and possibly any proxy queries), from the document-store,
    which currently is the vector store,
    but in theory could be any document store, or even web-search.
    This stage does NOT involve an LLM, and the retrieved chunks
    could either be pre-chunked text (from the initial pre-processing stage
    where chunks were stored in the vector store), or they could be
    dynamically retrieved based on a window around a lexical match.

    These are the steps (some optional based on config):
    - semantic search based on vector-embedding distance, from vecdb
    - lexical search using bm25-ranking (keyword similarity)
    - fuzzy matching (keyword similarity)
    - re-ranking of doc-chunks by relevance to query, using cross-encoder,
       and pick top k

    Args:
        query: original query (assumed to be in stand-alone form)
        query_proxies: possible rephrases, or hypothetical answer to query
                (e.g. for HyDE-type retrieval)

    Returns:

    """

    if (
        self.vecdb is None
        or self.vecdb.config.collection_name
        not in self.vecdb.list_collections(empty=False)
    ):
        return []

    # if we are using cross-encoder reranking or reciprocal rank fusion (RRF),
    # we can retrieve more docs during retrieval, and leave it to the cross-encoder
    # or RRF reranking to whittle down to self.config.parsing.n_similar_docs
    retrieval_multiple = (
        1
        if (
            self.config.cross_encoder_reranking_model == ""
            and not self.config.use_reciprocal_rank_fusion
        )
        else 3
    )

    if self.vecdb is None:
        raise ValueError("VecDB not set")

    with status("[cyan]Searching VecDB for relevant doc passages..."):
        docs_and_scores: List[Tuple[Document, float]] = []
        for q in [query] + query_proxies:
            docs_and_scores += self.get_semantic_search_results(
                q,
                k=self.config.parsing.n_similar_docs * retrieval_multiple,
            )
            # sort by score descending
            docs_and_scores = sorted(
                docs_and_scores, key=lambda x: x[1], reverse=True
            )

    # keep only docs with unique d.id()
    id2_rank_semantic = {d.id(): i for i, (d, _) in enumerate(docs_and_scores)}
    id2doc = {d.id(): d for d, _ in docs_and_scores}
    # make sure we get unique docs
    passages = [id2doc[id] for id, _ in id2_rank_semantic.items()]

    id2_rank_bm25 = {}
    if self.config.use_bm25_search:
        # TODO: Add score threshold in config
        docs_scores = self.get_similar_chunks_bm25(query, retrieval_multiple)
        if self.config.cross_encoder_reranking_model == "":
            # only if we're not re-ranking with a cross-encoder,
            # we collect these ranks for Reciprocal Rank Fusion down below.
            docs_scores = sorted(docs_scores, key=lambda x: x[1], reverse=True)
            id2_rank_bm25 = {d.id(): i for i, (d, _) in enumerate(docs_scores)}
            id2doc.update({d.id(): d for d, _ in docs_scores})
        else:
            passages += [d for (d, _) in docs_scores]

    id2_rank_fuzzy = {}
    if self.config.use_fuzzy_match:
        # TODO: Add score threshold in config
        fuzzy_match_doc_scores = self.get_fuzzy_matches(query, retrieval_multiple)
        if self.config.cross_encoder_reranking_model == "":
            # only if we're not re-ranking with a cross-encoder,
            # we collect these ranks for Reciprocal Rank Fusion down below.
            fuzzy_match_doc_scores = sorted(
                fuzzy_match_doc_scores, key=lambda x: x[1], reverse=True
            )
            id2_rank_fuzzy = {
                d.id(): i for i, (d, _) in enumerate(fuzzy_match_doc_scores)
            }
            id2doc.update({d.id(): d for d, _ in fuzzy_match_doc_scores})
        else:
            passages += [d for (d, _) in fuzzy_match_doc_scores]

    if (
        self.config.cross_encoder_reranking_model == ""
        and self.config.use_reciprocal_rank_fusion
        and (self.config.use_bm25_search or self.config.use_fuzzy_match)
    ):
        # Since we're not using cross-enocder re-ranking,
        # we need to re-order the retrieved chunks from potentially three
        # different retrieval methods (semantic, bm25, fuzzy), where the
        # similarity scores are on different scales.
        # We order the retrieved chunks using Reciprocal Rank Fusion (RRF) score.
        # Combine the ranks from each id2doc_rank_* dict into a single dict,
        # where the reciprocal rank score is the sum of
        # 1/(rank + self.config.reciprocal_rank_fusion_constant).
        # See https://learn.microsoft.com/en-us/azure/search/hybrid-search-ranking
        #
        # Note: diversity/periphery-reranking below may modify the final ranking.
        id2_reciprocal_score = {}
        for id_ in (
            set(id2_rank_semantic.keys())
            | set(id2_rank_bm25.keys())
            | set(id2_rank_fuzzy.keys())
        ):
            rank_semantic = id2_rank_semantic.get(id_, float("inf"))
            rank_bm25 = id2_rank_bm25.get(id_, float("inf"))
            rank_fuzzy = id2_rank_fuzzy.get(id_, float("inf"))
            c = self.config.reciprocal_rank_fusion_constant
            reciprocal_fusion_score = (
                1 / (rank_semantic + c) + 1 / (rank_bm25 + c) + 1 / (rank_fuzzy + c)
            )
            id2_reciprocal_score[id_] = reciprocal_fusion_score

        # sort the docs by the reciprocal score, in descending order
        id2_reciprocal_score = OrderedDict(
            sorted(
                id2_reciprocal_score.items(),
                key=lambda x: x[1],
                reverse=True,
            )
        )
        # each method retrieved up to retrieval_multiple * n_similar_docs,
        # so we need to take the top n_similar_docs from the combined list
        passages = [
            id2doc[id]
            for i, (id, _) in enumerate(id2_reciprocal_score.items())
            if i < self.config.parsing.n_similar_docs
        ]
        # passages must have distinct ids
        assert len(passages) == len(set([d.id() for d in passages])), (
            f"Duplicate passages in retrieved docs: {len(passages)} != "
            f"{len(set([d.id() for d in passages]))}"
        )

    if len(passages) == 0:
        return []

    if self.config.rerank_after_adding_context:
        passages_scores = [(p, 0.0) for p in passages]
        passages_scores = self.add_context_window(passages_scores)
        passages = [p for p, _ in passages_scores]
    # now passages can potentially have a lot of doc chunks,
    # so we re-rank them using a cross-encoder scoring model,
    # and pick top k where k = config.parsing.n_similar_docs
    # https://www.sbert.net/examples/applications/retrieve_rerank
    if self.config.cross_encoder_reranking_model != "":
        passages = self.rerank_with_cross_encoder(query, passages)

    if self.config.rerank_diversity:
        # reorder to increase diversity among top docs
        passages = self.rerank_with_diversity(passages)

    if self.config.rerank_periphery:
        # reorder so most important docs are at periphery
        # (see Lost In the Middle issue).
        passages = self.rerank_to_periphery(passages)

    if not self.config.rerank_after_adding_context:
        passages_scores = [(p, 0.0) for p in passages]
        passages_scores = self.add_context_window(passages_scores)
        passages = [p for p, _ in passages_scores]

    return passages[: self.config.parsing.n_similar_docs]

get_relevant_extracts(query)

Get list of (verbatim) extracts from doc-chunks relevant to answering a query.

These are the stages (some optional based on config): - use LLM to convert query to stand-alone query - optionally use LLM to rephrase query to use below - optionally use LLM to generate hypothetical answer (HyDE) to use below. - get_relevant_chunks(): get doc-chunks relevant to query and proxies - use LLM to get relevant extracts from doc-chunks

Parameters:

Name Type Description Default
query str

query to search for

required

Returns:

Name Type Description
query str

stand-alone version of input query

List[Document]

List[Document]: list of relevant extracts

Source code in langroid/agent/special/doc_chat_agent.py
@no_type_check
def get_relevant_extracts(self, query: str) -> Tuple[str, List[Document]]:
    """
    Get list of (verbatim) extracts from doc-chunks relevant to answering a query.

    These are the stages (some optional based on config):
    - use LLM to convert query to stand-alone query
    - optionally use LLM to rephrase query to use below
    - optionally use LLM to generate hypothetical answer (HyDE) to use below.
    - get_relevant_chunks(): get doc-chunks relevant to query and proxies
    - use LLM to get relevant extracts from doc-chunks

    Args:
        query (str): query to search for

    Returns:
        query (str): stand-alone version of input query
        List[Document]: list of relevant extracts

    """
    if (
        self.vecdb is None
        or self.vecdb.config.collection_name
        not in self.vecdb.list_collections(empty=False)
    ):
        return query, []

    if len(self.dialog) > 0 and not self.config.assistant_mode:
        # Regardless of whether we are in conversation mode or not,
        # for relevant doc/chunk extraction, we must convert the query
        # to a standalone query to get more relevant results.
        with status("[cyan]Converting to stand-alone query...[/cyan]"):
            with StreamingIfAllowed(self.llm, False):
                query = self.llm.followup_to_standalone(self.dialog, query)
        print(f"[orange2]New query: {query}")

    proxies = []
    if self.config.hypothetical_answer:
        answer = self.llm_hypothetical_answer(query)
        proxies = [answer]

    if self.config.n_query_rephrases > 0:
        rephrases = self.llm_rephrase_query(query)
        proxies += rephrases

    passages = self.get_relevant_chunks(query, proxies)  # no LLM involved

    if len(passages) == 0:
        return query, []

    with status("[cyan]LLM Extracting verbatim passages..."):
        with StreamingIfAllowed(self.llm, False):
            # these are async calls, one per passage; turn off streaming
            extracts = self.get_verbatim_extracts(query, passages)
            extracts = [e for e in extracts if e.content != NO_ANSWER]

    return query, extracts

get_verbatim_extracts(query, passages)

Run RelevanceExtractorAgent in async/concurrent mode on passages, to extract portions relevant to answering query, from each passage. Args: query (str): query to answer passages (List[Documents]): list of passages to extract from

Returns:

Type Description
List[Document]

List[Document]: list of Documents containing extracts and metadata.

Source code in langroid/agent/special/doc_chat_agent.py
def get_verbatim_extracts(
    self,
    query: str,
    passages: List[Document],
) -> List[Document]:
    """
    Run RelevanceExtractorAgent in async/concurrent mode on passages,
    to extract portions relevant to answering query, from each passage.
    Args:
        query (str): query to answer
        passages (List[Documents]): list of passages to extract from

    Returns:
        List[Document]: list of Documents containing extracts and metadata.
    """
    agent_cfg = self.config.relevance_extractor_config
    if agent_cfg is None:
        # no relevance extraction: simply return passages
        return passages
    if agent_cfg.llm is None:
        # Use main DocChatAgent's LLM if not provided explicitly:
        # this reduces setup burden on the user
        agent_cfg.llm = self.config.llm
    agent_cfg.query = query
    agent_cfg.segment_length = self.config.extraction_granularity
    agent_cfg.llm.stream = False  # disable streaming for concurrent calls

    agent = RelevanceExtractorAgent(agent_cfg)
    task = Task(
        agent,
        name="Relevance-Extractor",
        interactive=False,
    )

    extracts: list[str] = run_batch_tasks(
        task,
        passages,
        input_map=lambda msg: msg.content,
        output_map=lambda ans: ans.content if ans is not None else NO_ANSWER,
    )  # type: ignore

    # Caution: Retain ALL other fields in the Documents (which could be
    # other than just `content` and `metadata`), while simply replacing
    # `content` with the extracted portions
    passage_extracts = []
    for p, e in zip(passages, extracts):
        if e == NO_ANSWER or len(e) == 0:
            continue
        p_copy = p.copy()
        p_copy.content = e
        passage_extracts.append(p_copy)

    return passage_extracts

answer_from_docs(query)

Answer query based on relevant docs from the VecDB

Parameters:

Name Type Description Default
query str

query to answer

required

Returns:

Name Type Description
Document ChatDocument

answer

Source code in langroid/agent/special/doc_chat_agent.py
def answer_from_docs(self, query: str) -> ChatDocument:
    """
    Answer query based on relevant docs from the VecDB

    Args:
        query (str): query to answer

    Returns:
        Document: answer
    """
    response = ChatDocument(
        content=NO_ANSWER,
        metadata=ChatDocMetaData(
            source="None",
            sender=Entity.LLM,
        ),
    )
    # query may be updated to a stand-alone version
    query, extracts = self.get_relevant_extracts(query)
    if len(extracts) == 0:
        return response
    if self.llm is None:
        raise ValueError("LLM not set")
    if self.config.retrieve_only:
        # only return extracts, skip LLM-based summary answer
        meta = dict(
            sender=Entity.LLM,
        )
        # copy metadata from first doc, unclear what to do here.
        meta.update(extracts[0].metadata)
        return ChatDocument(
            content="\n\n".join([e.content for e in extracts]),
            metadata=ChatDocMetaData(**meta),  # type: ignore
        )
    response = self.get_summary_answer(query, extracts)

    self.update_dialog(query, response.content)
    self.response = response  # save last response
    return response

summarize_docs(instruction='Give a concise summary of the following text:')

Summarize all docs

Source code in langroid/agent/special/doc_chat_agent.py
def summarize_docs(
    self,
    instruction: str = "Give a concise summary of the following text:",
) -> None | ChatDocument:
    """Summarize all docs"""
    if self.llm is None:
        raise ValueError("LLM not set")
    if len(self.original_docs) == 0:
        logger.warning(
            """
            No docs to summarize! Perhaps you are re-using a previously
            defined collection?
            In that case, we don't have access to the original docs.
            To create a summary, use a new collection, and specify a list of docs.
            """
        )
        return None
    full_text = "\n\n".join([d.content for d in self.original_docs])
    if self.parser is None:
        raise ValueError("No parser defined")
    tot_tokens = self.parser.num_tokens(full_text)
    MAX_INPUT_TOKENS = (
        self.llm.completion_context_length()
        - self.config.llm.max_output_tokens
        - 100
    )
    if tot_tokens > MAX_INPUT_TOKENS:
        # truncate
        full_text = self.parser.tokenizer.decode(
            self.parser.tokenizer.encode(full_text)[:MAX_INPUT_TOKENS]
        )
        logger.warning(
            f"Summarizing after truncating text to {MAX_INPUT_TOKENS} tokens"
        )
    prompt = f"""
    {instruction}

    FULL TEXT:
    {full_text}
    """.strip()
    with StreamingIfAllowed(self.llm):
        summary = ChatAgent.llm_response(self, prompt)
        return summary

justify_response()

Show evidence for last response

Source code in langroid/agent/special/doc_chat_agent.py
def justify_response(self) -> ChatDocument | None:
    """Show evidence for last response"""
    if self.response is None:
        print("[magenta]No response yet")
        return None
    source = self.response.metadata.source
    if len(source) > 0:
        print("[magenta]" + source)
    else:
        print("[magenta]No source found")
    return None

RetrieverAgent(config)

Bases: DocChatAgent

Agent for just retrieving chunks/docs/extracts matching a query

Source code in langroid/agent/special/retriever_agent.py
def __init__(self, config: DocChatAgentConfig):
    super().__init__(config)
    self.config: DocChatAgentConfig = config
    logger.warning(
        """
    `RetrieverAgent` is deprecated. Use `DocChatAgent` instead, with
    `DocChatAgentConfig.retrieve_only=True`, and if you want to retrieve
    FULL relevant doc-contents rather than just extracts, then set
    `DocChatAgentConfig.extraction_granularity=-1`
    """
    )

LanceDocChatAgent(cfg)

Bases: DocChatAgent

Source code in langroid/agent/special/lance_doc_chat_agent.py
def __init__(self, cfg: DocChatAgentConfig):
    super().__init__(cfg)
    self.config: DocChatAgentConfig = cfg
    self.enable_message(QueryPlanTool, use=False, handle=True)

query_plan(msg)

Handle the LLM's use of the FilterTool. Temporarily set the config filter and either return the final answer in case there's a dataframe_calc, or return the rephrased query so the LLM can handle it.

Source code in langroid/agent/special/lance_doc_chat_agent.py
def query_plan(self, msg: QueryPlanTool) -> AgentDoneTool | str:
    """
    Handle the LLM's use of the FilterTool.
    Temporarily set the config filter and either return the final answer
    in case there's a dataframe_calc, or return the rephrased query
    so the LLM can handle it.
    """
    # create document-subset based on this filter
    plan = msg.plan
    try:
        self.setup_documents(filter=plan.filter or None)
    except Exception as e:
        logger.error(f"Error setting up documents: {e}")
        # say DONE with err msg so it goes back to LanceFilterAgent
        return AgentDoneTool(
            content=f"""
            Possible Filter Error:\n {e}

            Note that only the following fields are allowed in the filter
            of a query plan: 
            {", ".join(self.config.filter_fields)}
            """
        )

    # update the filter so it is used in the DocChatAgent
    self.config.filter = plan.filter or None
    if plan.dataframe_calc:
        # we just get relevant docs then do the calculation
        # TODO if calc causes err, it is captured in result,
        # and LLM can correct the calc based on the err,
        # and this will cause retrieval all over again,
        # which may be wasteful if only the calc part is wrong.
        # The calc step can later be done with a separate Agent/Tool.
        if plan.query is None or plan.query.strip() == "":
            if plan.filter is None or plan.filter.strip() == "":
                return AgentDoneTool(
                    content="""
                    Cannot execute Query Plan since filter as well as 
                    rephrased query are empty.                    
                    """
                )
            else:
                # no query to match, so just get all docs matching filter
                docs = self.vecdb.get_all_documents(plan.filter)
        else:
            _, docs = self.get_relevant_extracts(plan.query)
        if len(docs) == 0:
            return AgentDoneTool(content=NO_ANSWER)
        answer = self.vecdb.compute_from_docs(docs, plan.dataframe_calc)
    else:
        # pass on the query so LLM can handle it
        response = self.llm_response(plan.query)
        answer = NO_ANSWER if response is None else response.content
    return AgentDoneTool(tools=[AnswerTool(answer=answer)])

ingest_dataframe(df, content='content', metadata=[])

Ingest from a dataframe. Assume we are doing this once, not incrementally

Source code in langroid/agent/special/lance_doc_chat_agent.py
def ingest_dataframe(
    self,
    df: pd.DataFrame,
    content: str = "content",
    metadata: List[str] = [],
) -> int:
    """Ingest from a dataframe. Assume we are doing this once, not incrementally"""

    self.from_dataframe = True
    if df.shape[0] == 0:
        raise ValueError(
            """
            LanceDocChatAgent.ingest_dataframe() received an empty dataframe.
            """
        )
    n = df.shape[0]

    # If any additional fields need to be added to content,
    # add them as key=value pairs, into the `content` field for all rows.
    # This helps retrieval for table-like data.
    # Note we need to do this at stage so that the embeddings
    # are computed on the full content with these additional fields.
    fields = [f for f in self.config.add_fields_to_content if f in df.columns]
    if len(fields) > 0:
        df[content] = df.apply(
            lambda row: (",".join(f"{f}={row[f]}" for f in fields))
            + ", content="
            + row[content],
            axis=1,
        )

    df, metadata = DocChatAgent.document_compatible_dataframe(df, content, metadata)
    self.df_description = describe_dataframe(
        df,
        filter_fields=self.config.filter_fields,
        n_vals=10,
    )
    self.vecdb.add_dataframe(df, content="content", metadata=metadata)

    tbl = self.vecdb.client.open_table(self.vecdb.config.collection_name)
    # We assume "content" is available as top-level field
    if "content" in tbl.schema.names:
        tbl.create_fts_index("content", replace=True)
    # We still need to do the below so that
    # other types of searches in DocChatAgent
    # can work, as they require Document objects
    docs = dataframe_to_documents(df, content="content", metadata=metadata)
    self.setup_documents(docs)
    # mark each doc as already-chunked so we don't try to split them further
    # TODO later we may want to split large text-columns
    for d in docs:
        d.metadata.is_chunk = True
    return n  # type: ignore

get_similar_chunks_bm25(query, multiple)

Override the DocChatAgent.get_similar_chunks_bm25() to use LanceDB FTS (Full Text Search).

Source code in langroid/agent/special/lance_doc_chat_agent.py
def get_similar_chunks_bm25(
    self, query: str, multiple: int
) -> List[Tuple[Document, float]]:
    """
    Override the DocChatAgent.get_similar_chunks_bm25()
    to use LanceDB FTS (Full Text Search).
    """
    # Clean up query: replace all newlines with spaces in query,
    # force special search keywords to lower case, remove quotes,
    # so it's not interpreted as search syntax
    query_clean = (
        query.replace("\n", " ")
        .replace("AND", "and")
        .replace("OR", "or")
        .replace("NOT", "not")
        .replace("'", "")
        .replace('"', "")
        .replace(":", "--")
    )

    tbl = self.vecdb.client.open_table(self.vecdb.config.collection_name)
    result = (
        tbl.search(query_clean)
        .where(self.config.filter or None)
        .limit(self.config.parsing.n_similar_docs * multiple)
    )
    docs = self.vecdb._lance_result_to_docs(result)
    scores = [r["score"] for r in result.to_list()]
    return list(zip(docs, scores))

TableChatAgent(config)

Bases: ChatAgent

Agent for chatting with a collection of documents.

Source code in langroid/agent/special/table_chat_agent.py
def __init__(self, config: TableChatAgentConfig):
    if isinstance(config.data, pd.DataFrame):
        df = config.data
    else:
        df = read_tabular_data(config.data, config.separator)

    df.columns = df.columns.str.strip().str.replace(" +", "_", regex=True)

    self.df = df
    summary = dataframe_summary(df)
    config.system_message = config.system_message.format(summary=summary)

    super().__init__(config)
    self.config: TableChatAgentConfig = config

    logger.info(
        f"""TableChatAgent initialized with dataframe of shape {self.df.shape}
        and columns: 
        {self.df.columns}
        """
    )
    # enable the agent to use and handle the PandasEvalTool
    self.enable_message(PandasEvalTool)

pandas_eval(msg)

Handle a PandasEvalTool message by evaluating the expression field and returning the result. Args: msg (PandasEvalTool): The tool-message to handle.

Returns:

Name Type Description
str str

The result of running the code along with any print output.

Source code in langroid/agent/special/table_chat_agent.py
def pandas_eval(self, msg: PandasEvalTool) -> str:
    """
    Handle a PandasEvalTool message by evaluating the `expression` field
        and returning the result.
    Args:
        msg (PandasEvalTool): The tool-message to handle.

    Returns:
        str: The result of running the code along with any print output.
    """
    self.sent_expression = True
    exprn = msg.expression
    local_vars = {"df": self.df}
    # Create a string-based I/O stream
    code_out = io.StringIO()

    # Temporarily redirect standard output to our string-based I/O stream
    sys.stdout = code_out

    # Evaluate the last line and get the result
    try:
        eval_result = pd.eval(exprn, local_dict=local_vars)
    except Exception as e:
        eval_result = f"ERROR: {type(e)}: {e}"

    if eval_result is None:
        eval_result = ""

    # Always restore the original standard output
    sys.stdout = sys.__stdout__

    # If df has been modified in-place, save the changes back to self.df
    self.df = local_vars["df"]

    # Get the resulting string from the I/O stream
    print_result = code_out.getvalue() or ""
    sep = "\n" if print_result else ""
    # Combine the print and eval results
    result = f"{print_result}{sep}{eval_result}"
    if result == "":
        result = "No result"
    # Return the result
    return result

handle_message_fallback(msg)

Handle various LLM deviations

Source code in langroid/agent/special/table_chat_agent.py
def handle_message_fallback(
    self, msg: str | ChatDocument
) -> str | ChatDocument | None:
    """Handle various LLM deviations"""
    if isinstance(msg, ChatDocument) and msg.metadata.sender == lr.Entity.LLM:
        if msg.content.strip() == DONE and self.sent_expression:
            # LLM sent an expression (i.e. used the `pandas_eval` tool)
            # but upon receiving the results, simply said DONE without
            # narrating the result as instructed.
            return """
                You forgot to PRESENT the answer to the user's query
                based on the results from `pandas_eval` tool.
            """
        if self.sent_expression:
            # LLM forgot to say DONE
            self.sent_expression = False
            return DONE + " " + PASS
        else:
            # LLM forgot to use the `pandas_eval` tool
            return """
                You forgot to use the `pandas_eval` tool/function 
                to find the answer.
                Try again using the `pandas_eval` tool/function.
                """
    return None

PandasEvalTool

Bases: ToolMessage

Tool/function to evaluate a pandas expression involving a dataframe df

dataframe_summary(df)

Generate a structured summary for a pandas DataFrame containing numerical and categorical values.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to summarize.

required

Returns:

Name Type Description
str str

A nicely structured and formatted summary string.

Source code in langroid/agent/special/table_chat_agent.py
@no_type_check
def dataframe_summary(df: pd.DataFrame) -> str:
    """
    Generate a structured summary for a pandas DataFrame containing numerical
    and categorical values.

    Args:
        df (pd.DataFrame): The input DataFrame to summarize.

    Returns:
        str: A nicely structured and formatted summary string.
    """

    # Column names display
    col_names_str = (
        "COLUMN NAMES:\n" + " ".join([f"'{col}'" for col in df.columns]) + "\n\n"
    )

    # Numerical data summary
    num_summary = df.describe().map(lambda x: "{:.2f}".format(x))
    num_str = "Numerical Column Summary:\n" + num_summary.to_string() + "\n\n"

    # Categorical data summary
    cat_columns = df.select_dtypes(include=[np.object_]).columns
    cat_summary_list = []

    for col in cat_columns:
        unique_values = df[col].unique()
        if len(unique_values) < 10:
            cat_summary_list.append(f"'{col}': {', '.join(map(str, unique_values))}")
        else:
            cat_summary_list.append(f"'{col}': {df[col].nunique()} unique values")

    cat_str = "Categorical Column Summary:\n" + "\n".join(cat_summary_list) + "\n\n"

    # Missing values summary
    nan_summary = df.isnull().sum().rename("missing_values").to_frame()
    nan_str = "Missing Values Column Summary:\n" + nan_summary.to_string() + "\n"

    # Combine the summaries into one structured string
    summary_str = col_names_str + num_str + cat_str + nan_str

    return summary_str