Skip to content

vector_store

langroid/vector_store/init.py

VectorStore(config)

Bases: ABC

Abstract base class for a vector store.

Source code in langroid/vector_store/base.py
def __init__(self, config: VectorStoreConfig):
    self.config = config
    self.embedding_model = EmbeddingModel.create(config.embedding)

clear_empty_collections() abstractmethod

Clear all empty collections in the vector store. Returns the number of collections deleted.

Source code in langroid/vector_store/base.py
@abstractmethod
def clear_empty_collections(self) -> int:
    """Clear all empty collections in the vector store.
    Returns the number of collections deleted.
    """
    pass

clear_all_collections(really=False, prefix='') abstractmethod

Clear all collections in the vector store.

Parameters:

Name Type Description Default
really bool

Whether to really clear all collections. Defaults to False.

False
prefix str

Prefix of collections to clear.

''

Returns: int: Number of collections deleted.

Source code in langroid/vector_store/base.py
@abstractmethod
def clear_all_collections(self, really: bool = False, prefix: str = "") -> int:
    """
    Clear all collections in the vector store.

    Args:
        really (bool, optional): Whether to really clear all collections.
            Defaults to False.
        prefix (str, optional): Prefix of collections to clear.
    Returns:
        int: Number of collections deleted.
    """
    pass

list_collections(empty=False) abstractmethod

List all collections in the vector store (only non empty collections if empty=False).

Source code in langroid/vector_store/base.py
@abstractmethod
def list_collections(self, empty: bool = False) -> List[str]:
    """List all collections in the vector store
    (only non empty collections if empty=False).
    """
    pass

set_collection(collection_name, replace=False)

Set the current collection to the given collection name. Args: collection_name (str): Name of the collection. replace (bool, optional): Whether to replace the collection if it already exists. Defaults to False.

Source code in langroid/vector_store/base.py
def set_collection(self, collection_name: str, replace: bool = False) -> None:
    """
    Set the current collection to the given collection name.
    Args:
        collection_name (str): Name of the collection.
        replace (bool, optional): Whether to replace the collection if it
            already exists. Defaults to False.
    """

    self.config.collection_name = collection_name
    self.config.replace_collection = replace

create_collection(collection_name, replace=False) abstractmethod

Create a collection with the given name. Args: collection_name (str): Name of the collection. replace (bool, optional): Whether to replace the collection if it already exists. Defaults to False.

Source code in langroid/vector_store/base.py
@abstractmethod
def create_collection(self, collection_name: str, replace: bool = False) -> None:
    """Create a collection with the given name.
    Args:
        collection_name (str): Name of the collection.
        replace (bool, optional): Whether to replace the
            collection if it already exists. Defaults to False.
    """
    pass

compute_from_docs(docs, calc)

Compute a result on a set of documents, using a dataframe calc string like df.groupby('state')['income'].mean().

Source code in langroid/vector_store/base.py
def compute_from_docs(self, docs: List[Document], calc: str) -> str:
    """Compute a result on a set of documents,
    using a dataframe calc string like `df.groupby('state')['income'].mean()`.
    """
    # convert each doc to a dict, using dotted paths for nested fields
    dicts = [flatten_dict(doc.dict(by_alias=True)) for doc in docs]
    df = pd.DataFrame(dicts)

    try:
        result = pd.eval(  # safer than eval but limited to single expression
            calc,
            engine="python",
            parser="pandas",
            local_dict={"df": df},
        )
    except Exception as e:
        # return error message so LLM can fix the calc string if needed
        err = f"""
        Error encountered in pandas eval: {str(e)}
        """
        if isinstance(e, KeyError) and "not in index" in str(e):
            # Pd.eval sometimes fails on a perfectly valid exprn like
            # df.loc[..., 'column'] with a KeyError.
            err += """
            Maybe try a different way, e.g. 
            instead of df.loc[..., 'column'], try df.loc[...]['column']
            """
        return err
    return stringify(result)

maybe_add_ids(documents)

Add ids to metadata if absent, since some vecdbs don't like having blank ids.

Source code in langroid/vector_store/base.py
def maybe_add_ids(self, documents: Sequence[Document]) -> None:
    """Add ids to metadata if absent, since some
    vecdbs don't like having blank ids."""
    for d in documents:
        if d.metadata.id in [None, ""]:
            d.metadata.id = ObjectRegistry.new_id()

similar_texts_with_scores(text, k=1, where=None) abstractmethod

Find k most similar texts to the given text, in terms of vector distance metric (e.g., cosine similarity).

Parameters:

Name Type Description Default
text str

The text to find similar texts for.

required
k int

Number of similar texts to retrieve. Defaults to 1.

1
where Optional[str]

Where clause to filter the search.

None

Returns:

Type Description
List[Tuple[Document, float]]

List[Tuple[Document,float]]: List of (Document, score) tuples.

Source code in langroid/vector_store/base.py
@abstractmethod
def similar_texts_with_scores(
    self,
    text: str,
    k: int = 1,
    where: Optional[str] = None,
) -> List[Tuple[Document, float]]:
    """
    Find k most similar texts to the given text, in terms of vector distance metric
    (e.g., cosine similarity).

    Args:
        text (str): The text to find similar texts for.
        k (int, optional): Number of similar texts to retrieve. Defaults to 1.
        where (Optional[str], optional): Where clause to filter the search.

    Returns:
        List[Tuple[Document,float]]: List of (Document, score) tuples.

    """
    pass

add_context_window(docs_scores, neighbors=0)

In each doc's metadata, there may be a window_ids field indicating the ids of the chunks around the current chunk. These window_ids may overlap, so we - coalesce each overlapping groups into a single window (maintaining ordering), - create a new document for each part, preserving metadata,

We may have stored a longer set of window_ids than we need during chunking. Now, we just want neighbors on each side of the center of the window_ids list.

Parameters:

Name Type Description Default
docs_scores List[Tuple[Document, float]]

List of pairs of documents to add context windows to together with their match scores.

required
neighbors int

Number of neighbors on "each side" of match to retrieve. Defaults to 0. "Each side" here means before and after the match, in the original text.

0

Returns:

Type Description
List[Tuple[Document, float]]

List[Tuple[Document, float]]: List of (Document, score) tuples.

Source code in langroid/vector_store/base.py
def add_context_window(
    self, docs_scores: List[Tuple[Document, float]], neighbors: int = 0
) -> List[Tuple[Document, float]]:
    """
    In each doc's metadata, there may be a window_ids field indicating
    the ids of the chunks around the current chunk.
    These window_ids may overlap, so we
    - coalesce each overlapping groups into a single window (maintaining ordering),
    - create a new document for each part, preserving metadata,

    We may have stored a longer set of window_ids than we need during chunking.
    Now, we just want `neighbors` on each side of the center of the window_ids list.

    Args:
        docs_scores (List[Tuple[Document, float]]): List of pairs of documents
            to add context windows to together with their match scores.
        neighbors (int, optional): Number of neighbors on "each side" of match to
            retrieve. Defaults to 0.
            "Each side" here means before and after the match,
            in the original text.

    Returns:
        List[Tuple[Document, float]]: List of (Document, score) tuples.
    """
    # We return a larger context around each match, i.e.
    # a window of `neighbors` on each side of the match.
    docs = [d for d, s in docs_scores]
    scores = [s for d, s in docs_scores]
    if neighbors == 0:
        return docs_scores
    doc_chunks = [d for d in docs if d.metadata.is_chunk]
    if len(doc_chunks) == 0:
        return docs_scores
    window_ids_list = []
    id2metadata = {}
    # id -> highest score of a doc it appears in
    id2max_score: Dict[int | str, float] = {}
    for i, d in enumerate(docs):
        window_ids = d.metadata.window_ids
        if len(window_ids) == 0:
            window_ids = [d.id()]
        id2metadata.update({id: d.metadata for id in window_ids})

        id2max_score.update(
            {id: max(id2max_score.get(id, 0), scores[i]) for id in window_ids}
        )
        n = len(window_ids)
        chunk_idx = window_ids.index(d.id())
        neighbor_ids = window_ids[
            max(0, chunk_idx - neighbors) : min(n, chunk_idx + neighbors + 1)
        ]
        window_ids_list += [neighbor_ids]

    # window_ids could be from different docs,
    # and they may overlap, so we coalesce overlapping groups into
    # separate windows.
    window_ids_list = self.remove_overlaps(window_ids_list)
    final_docs = []
    final_scores = []
    for w in window_ids_list:
        metadata = copy.deepcopy(id2metadata[w[0]])
        metadata.window_ids = w
        document = Document(
            content=" ".join([d.content for d in self.get_documents_by_ids(w)]),
            metadata=metadata,
        )
        # make a fresh id since content is in general different
        document.metadata.id = ObjectRegistry.new_id()
        final_docs += [document]
        final_scores += [max(id2max_score[id] for id in w)]
    return list(zip(final_docs, final_scores))

remove_overlaps(windows) staticmethod

Given a collection of windows, where each window is a sequence of ids, identify groups of overlapping windows, and for each overlapping group, order the chunk-ids using topological sort so they appear in the original order in the text.

Parameters:

Name Type Description Default
windows List[int | str]

List of windows, where each window is a sequence of ids.

required

Returns:

Type Description
List[List[str]]

List[int|str]: List of windows, where each window is a sequence of ids, and no two windows overlap.

Source code in langroid/vector_store/base.py
@staticmethod
def remove_overlaps(windows: List[List[str]]) -> List[List[str]]:
    """
    Given a collection of windows, where each window is a sequence of ids,
    identify groups of overlapping windows, and for each overlapping group,
    order the chunk-ids using topological sort so they appear in the original
    order in the text.

    Args:
        windows (List[int|str]): List of windows, where each window is a
            sequence of ids.

    Returns:
        List[int|str]: List of windows, where each window is a sequence of ids,
            and no two windows overlap.
    """
    ids = set(id for w in windows for id in w)
    # id -> {win -> # pos}
    id2win2pos: Dict[str, Dict[int, int]] = {id: {} for id in ids}

    for i, w in enumerate(windows):
        for j, id in enumerate(w):
            id2win2pos[id][i] = j

    n = len(windows)
    # relation between windows:
    order = np.zeros((n, n), dtype=np.int8)
    for i, w in enumerate(windows):
        for j, x in enumerate(windows):
            if i == j:
                continue
            if len(set(w).intersection(x)) == 0:
                continue
            id = list(set(w).intersection(x))[0]  # any common id
            if id2win2pos[id][i] > id2win2pos[id][j]:
                order[i, j] = -1  # win i is before win j
            else:
                order[i, j] = 1  # win i is after win j

    # find groups of windows that overlap, like connected components in a graph
    groups = components(np.abs(order))

    # order the chunk-ids in each group using topological sort
    new_windows = []
    for g in groups:
        # find total ordering among windows in group based on order matrix
        # (this is a topological sort)
        _g = np.array(g)
        order_matrix = order[_g][:, _g]
        ordered_window_indices = topological_sort(order_matrix)
        ordered_window_ids = [windows[i] for i in _g[ordered_window_indices]]
        flattened = [id for w in ordered_window_ids for id in w]
        flattened_deduped = list(dict.fromkeys(flattened))
        # Note we are not going to split these, and instead we'll return
        # larger windows from concatenating the connected groups.
        # This ensures context is retained for LLM q/a
        new_windows += [flattened_deduped]

    return new_windows

get_all_documents(where='') abstractmethod

Get all documents in the current collection, possibly filtered by where.

Source code in langroid/vector_store/base.py
@abstractmethod
def get_all_documents(self, where: str = "") -> List[Document]:
    """
    Get all documents in the current collection, possibly filtered by `where`.
    """
    pass

get_documents_by_ids(ids) abstractmethod

Get documents by their ids. Args: ids (List[str]): List of document ids.

Returns:

Type Description
List[Document]

List[Document]: List of documents

Source code in langroid/vector_store/base.py
@abstractmethod
def get_documents_by_ids(self, ids: List[str]) -> List[Document]:
    """
    Get documents by their ids.
    Args:
        ids (List[str]): List of document ids.

    Returns:
        List[Document]: List of documents
    """
    pass

QdrantDB(config=QdrantDBConfig())

Bases: VectorStore

Source code in langroid/vector_store/qdrantdb.py
def __init__(self, config: QdrantDBConfig = QdrantDBConfig()):
    super().__init__(config)
    self.config: QdrantDBConfig = config
    emb_model = EmbeddingModel.create(config.embedding)
    self.embedding_fn: EmbeddingFunction = emb_model.embedding_fn()
    self.embedding_dim = emb_model.embedding_dims
    if self.config.use_sparse_embeddings:
        try:
            from transformers import AutoModelForMaskedLM, AutoTokenizer
        except ImportError:
            raise ImportError(
                """
                To use sparse embeddings, 
                you must install langroid with the [transformers] extra, e.g.:
                pip install "langroid[transformers]"
                """
            )

        self.sparse_tokenizer = AutoTokenizer.from_pretrained(
            self.config.sparse_embedding_model
        )
        self.sparse_model = AutoModelForMaskedLM.from_pretrained(
            self.config.sparse_embedding_model
        )
    self.host = config.host
    self.port = config.port
    load_dotenv()
    key = os.getenv("QDRANT_API_KEY")
    url = os.getenv("QDRANT_API_URL")
    if config.docker:
        if url is None:
            logger.warning(
                f"""The QDRANT_API_URL env variable must be set to use
                QdrantDB in local docker mode. Please set this
                value in your .env file.
                Switching to local storage at {config.storage_path}
                """
            )
            config.cloud = False
        else:
            config.cloud = True
    elif config.cloud and None in [key, url]:
        logger.warning(
            f"""QDRANT_API_KEY, QDRANT_API_URL env variable must be set to use 
            QdrantDB in cloud mode. Please set these values 
            in your .env file. 
            Switching to local storage at {config.storage_path} 
            """
        )
        config.cloud = False

    if config.cloud:
        self.client = QdrantClient(
            url=url,
            api_key=key,
            timeout=config.timeout,
        )
    else:
        try:
            self.client = QdrantClient(
                path=config.storage_path,
            )
        except Exception as e:
            new_storage_path = config.storage_path + ".new"
            logger.warning(
                f"""
                Error connecting to local QdrantDB at {config.storage_path}:
                {e}
                Switching to {new_storage_path}
                """
            )
            self.client = QdrantClient(
                path=new_storage_path,
            )

    # Note: Only create collection if a non-null collection name is provided.
    # This is useful to delay creation of vecdb until we have a suitable
    # collection name (e.g. we could get it from the url or folder path).
    if config.collection_name is not None:
        self.create_collection(
            config.collection_name, replace=config.replace_collection
        )

clear_all_collections(really=False, prefix='')

Clear all collections with the given prefix.

Source code in langroid/vector_store/qdrantdb.py
def clear_all_collections(self, really: bool = False, prefix: str = "") -> int:
    """Clear all collections with the given prefix."""
    if not really:
        logger.warning("Not deleting all collections, set really=True to confirm")
        return 0
    coll_names = [
        c for c in self.list_collections(empty=True) if c.startswith(prefix)
    ]
    if len(coll_names) == 0:
        logger.warning(f"No collections found with prefix {prefix}")
        return 0
    n_empty_deletes = 0
    n_non_empty_deletes = 0
    for name in coll_names:
        info = self.client.get_collection(collection_name=name)
        points_count = from_optional(info.points_count, 0)

        n_empty_deletes += points_count == 0
        n_non_empty_deletes += points_count > 0
        self.client.delete_collection(collection_name=name)
    logger.warning(
        f"""
        Deleted {n_empty_deletes} empty collections and 
        {n_non_empty_deletes} non-empty collections.
        """
    )
    return n_empty_deletes + n_non_empty_deletes

list_collections(empty=False)

Returns:

Type Description
List[str]

List of collection names that have at least one vector.

Parameters:

Name Type Description Default
empty bool

Whether to include empty collections.

False
Source code in langroid/vector_store/qdrantdb.py
def list_collections(self, empty: bool = False) -> List[str]:
    """
    Returns:
        List of collection names that have at least one vector.

    Args:
        empty (bool, optional): Whether to include empty collections.
    """
    colls = list(self.client.get_collections())[0][1]
    if empty:
        return [coll.name for coll in colls]
    counts = []
    for coll in colls:
        try:
            counts.append(
                from_optional(
                    self.client.get_collection(
                        collection_name=coll.name
                    ).points_count,
                    0,
                )
            )
        except Exception:
            logger.warning(f"Error getting collection {coll.name}")
            counts.append(0)
    return [coll.name for coll, count in zip(colls, counts) if (count or 0) > 0]

create_collection(collection_name, replace=False)

Create a collection with the given name, optionally replacing an existing collection if replace is True. Args: collection_name (str): Name of the collection to create. replace (bool): Whether to replace an existing collection with the same name. Defaults to False.

Source code in langroid/vector_store/qdrantdb.py
def create_collection(self, collection_name: str, replace: bool = False) -> None:
    """
    Create a collection with the given name, optionally replacing an existing
        collection if `replace` is True.
    Args:
        collection_name (str): Name of the collection to create.
        replace (bool): Whether to replace an existing collection
            with the same name. Defaults to False.
    """
    self.config.collection_name = collection_name
    if self.client.collection_exists(collection_name=collection_name):
        coll = self.client.get_collection(collection_name=collection_name)
        if (
            coll.status == CollectionStatus.GREEN
            and from_optional(coll.points_count, 0) > 0
        ):
            logger.warning(f"Non-empty Collection {collection_name} already exists")
            if not replace:
                logger.warning("Not replacing collection")
                return
            else:
                logger.warning("Recreating fresh collection")
        self.client.delete_collection(collection_name=collection_name)

    vectors_config = {
        "": VectorParams(
            size=self.embedding_dim,
            distance=Distance.COSINE,
        )
    }
    sparse_vectors_config = None
    if self.config.use_sparse_embeddings:
        sparse_vectors_config = {
            "text-sparse": SparseVectorParams(index=SparseIndexParams())
        }
    self.client.create_collection(
        collection_name=collection_name,
        vectors_config=vectors_config,
        sparse_vectors_config=sparse_vectors_config,
    )
    collection_info = self.client.get_collection(collection_name=collection_name)
    assert collection_info.status == CollectionStatus.GREEN
    assert collection_info.vectors_count in [0, None]
    if settings.debug:
        level = logger.getEffectiveLevel()
        logger.setLevel(logging.INFO)
        logger.info(collection_info)
        logger.setLevel(level)