Skip to content

qdrantdb

langroid/vector_store/qdrantdb.py

QdrantDB(config=QdrantDBConfig())

Bases: VectorStore

Source code in langroid/vector_store/qdrantdb.py
def __init__(self, config: QdrantDBConfig = QdrantDBConfig()):
    super().__init__(config)
    self.config: QdrantDBConfig = config
    from qdrant_client import QdrantClient

    if self.config.use_sparse_embeddings:
        try:
            from transformers import AutoModelForMaskedLM, AutoTokenizer
        except ImportError:
            raise ImportError(
                """
                To use sparse embeddings,
                you must install langroid with the [transformers] extra, e.g.:
                pip install "langroid[transformers]"
                """
            )

        self.sparse_tokenizer = AutoTokenizer.from_pretrained(
            self.config.sparse_embedding_model
        )
        self.sparse_model = AutoModelForMaskedLM.from_pretrained(
            self.config.sparse_embedding_model
        )
    self.host = config.host
    self.port = config.port
    load_dotenv()
    key = os.getenv("QDRANT_API_KEY")
    url = os.getenv("QDRANT_API_URL")
    if config.docker:
        if url is None:
            logger.warning(
                f"""The QDRANT_API_URL env variable must be set to use
                QdrantDB in local docker mode. Please set this
                value in your .env file.
                Switching to local storage at {config.storage_path}
                """
            )
            config.cloud = False
        else:
            config.cloud = True
    elif config.cloud and None in [key, url]:
        logger.warning(
            f"""QDRANT_API_KEY, QDRANT_API_URL env variable must be set to use
            QdrantDB in cloud mode. Please set these values
            in your .env file.
            Switching to local storage at {config.storage_path}
            """
        )
        config.cloud = False

    if config.cloud:
        self.client = QdrantClient(
            url=url,
            api_key=key,
            timeout=config.timeout,
        )
    else:
        try:
            self.client = QdrantClient(
                path=config.storage_path,
            )
        except Exception as e:
            new_storage_path = config.storage_path + ".new"
            logger.warning(
                f"""
                Error connecting to local QdrantDB at {config.storage_path}:
                {e}
                Switching to {new_storage_path}
                """
            )
            self.client = QdrantClient(
                path=new_storage_path,
            )

    # Note: Only create collection if a non-null collection name is provided.
    # This is useful to delay creation of vecdb until we have a suitable
    # collection name (e.g. we could get it from the url or folder path).
    if config.collection_name is not None:
        self.create_collection(
            config.collection_name, replace=config.replace_collection
        )

clone()

Create an independent Qdrant client when running against Qdrant Cloud.

Source code in langroid/vector_store/qdrantdb.py
def clone(self) -> "QdrantDB":
    """Create an independent Qdrant client when running against Qdrant Cloud."""
    if not self.config.cloud:
        return self
    cloned = super().clone()
    assert isinstance(cloned, QdrantDB)
    return cloned

close()

Close the QdrantDB client and release any resources (e.g., file locks). This is especially important for local storage to release the .lock file.

Source code in langroid/vector_store/qdrantdb.py
def close(self) -> None:
    """
    Close the QdrantDB client and release any resources (e.g., file locks).
    This is especially important for local storage to release the .lock file.
    """
    if hasattr(self.client, "close"):
        # QdrantLocal has a close method that releases the lock
        self.client.close()
        logger.info(f"Closed QdrantDB connection for {self.config.storage_path}")

clear_all_collections(really=False, prefix='')

Clear all collections with the given prefix.

Source code in langroid/vector_store/qdrantdb.py
def clear_all_collections(self, really: bool = False, prefix: str = "") -> int:
    """Clear all collections with the given prefix."""

    if not really:
        logger.warning("Not deleting all collections, set really=True to confirm")
        return 0
    coll_names = [
        c for c in self.list_collections(empty=True) if c.startswith(prefix)
    ]
    if len(coll_names) == 0:
        logger.warning(f"No collections found with prefix {prefix}")
        return 0
    n_empty_deletes = 0
    n_non_empty_deletes = 0
    for name in coll_names:
        info = self.client.get_collection(collection_name=name)
        points_count = from_optional(info.points_count, 0)

        n_empty_deletes += points_count == 0
        n_non_empty_deletes += points_count > 0
        self.client.delete_collection(collection_name=name)
    logger.warning(
        f"""
        Deleted {n_empty_deletes} empty collections and
        {n_non_empty_deletes} non-empty collections.
        """
    )
    return n_empty_deletes + n_non_empty_deletes

list_collections(empty=False)

Returns:

Type Description
List[str]

List of collection names that have at least one vector.

Parameters:

Name Type Description Default
empty bool

Whether to include empty collections.

False
Source code in langroid/vector_store/qdrantdb.py
def list_collections(self, empty: bool = False) -> List[str]:
    """
    Returns:
        List of collection names that have at least one vector.

    Args:
        empty (bool, optional): Whether to include empty collections.
    """

    colls = list(self.client.get_collections())[0][1]
    if empty:
        return [coll.name for coll in colls]
    counts = []
    for coll in colls:
        try:
            counts.append(
                from_optional(
                    self.client.get_collection(
                        collection_name=coll.name
                    ).points_count,
                    0,
                )
            )
        except Exception:
            logger.warning(f"Error getting collection {coll.name}")
            counts.append(0)
    return [coll.name for coll, count in zip(colls, counts) if (count or 0) > 0]

create_collection(collection_name, replace=False)

Create a collection with the given name, optionally replacing an existing collection if replace is True. Args: collection_name (str): Name of the collection to create. replace (bool): Whether to replace an existing collection with the same name. Defaults to False.

Source code in langroid/vector_store/qdrantdb.py
def create_collection(self, collection_name: str, replace: bool = False) -> None:
    """
    Create a collection with the given name, optionally replacing an existing
        collection if `replace` is True.
    Args:
        collection_name (str): Name of the collection to create.
        replace (bool): Whether to replace an existing collection
            with the same name. Defaults to False.
    """
    from qdrant_client.http.models import (
        CollectionStatus,
        Distance,
        SparseIndexParams,
        SparseVectorParams,
        VectorParams,
    )

    self.config.collection_name = collection_name
    if self.client.collection_exists(collection_name=collection_name):
        coll = self.client.get_collection(collection_name=collection_name)
        if (
            coll.status == CollectionStatus.GREEN
            and from_optional(coll.points_count, 0) > 0
        ):
            logger.warning(f"Non-empty Collection {collection_name} already exists")
            if not replace:
                logger.warning("Not replacing collection")
                return
            else:
                logger.warning("Recreating fresh collection")
        self.client.delete_collection(collection_name=collection_name)

    vectors_config = {
        "": VectorParams(
            size=self.embedding_dim,
            distance=Distance.COSINE,
        )
    }
    sparse_vectors_config = None
    if self.config.use_sparse_embeddings:
        sparse_vectors_config = {
            "text-sparse": SparseVectorParams(index=SparseIndexParams())
        }
    self.client.create_collection(
        collection_name=collection_name,
        vectors_config=vectors_config,
        sparse_vectors_config=sparse_vectors_config,
    )
    collection_info = self.client.get_collection(collection_name=collection_name)
    assert collection_info.status == CollectionStatus.GREEN
    assert collection_info.vectors_count in [0, None]
    if settings.debug:
        level = logger.getEffectiveLevel()
        logger.setLevel(logging.INFO)
        logger.info(collection_info)
        logger.setLevel(level)

is_valid_uuid(uuid_to_test)

Check if a given string is a valid UUID.

Source code in langroid/vector_store/qdrantdb.py
def is_valid_uuid(uuid_to_test: str) -> bool:
    """
    Check if a given string is a valid UUID.
    """
    try:
        uuid_obj = uuid.UUID(uuid_to_test)
        return str(uuid_obj) == uuid_to_test
    except Exception:
        pass
    # Check for valid unsigned 64-bit integer
    try:
        int_value = int(uuid_to_test)
        return 0 <= int_value <= 18446744073709551615
    except ValueError:
        return False