Skip to content

arangodb_agent

langroid/agent/special/arangodb/arangodb_agent.py

ArangoChatAgent(config)

Bases: ChatAgent

Source code in langroid/agent/special/arangodb/arangodb_agent.py
def __init__(self, config: ArangoChatAgentConfig):
    super().__init__(config)
    self.config: ArangoChatAgentConfig = config
    self.init_state()
    self._validate_config()
    self._import_arango()
    self._initialize_db()
    self._init_tools_sys_message()

with_retry(func, max_retries=3, delay=1.0)

Execute a function with retries on connection error

Source code in langroid/agent/special/arangodb/arangodb_agent.py
def with_retry(
    self, func: Callable[[], T], max_retries: int = 3, delay: float = 1.0
) -> T:
    """Execute a function with retries on connection error"""
    for attempt in range(max_retries):
        try:
            return func()
        except ArangoError:
            if attempt == max_retries - 1:
                raise
            logger.warning(
                f"Connection failed (attempt {attempt + 1}/{max_retries}). "
                f"Retrying in {delay} seconds..."
            )
            time.sleep(delay)
            # Reconnect if needed
            self._initialize_db()
    return func()  # Final attempt after loop if not raised

read_query(query, bind_vars=None)

Execute a read query with connection retry.

Source code in langroid/agent/special/arangodb/arangodb_agent.py
def read_query(
    self, query: str, bind_vars: Optional[Dict[Any, Any]] = None
) -> QueryResult:
    """Execute a read query with connection retry."""
    if not self.db:
        return QueryResult(
            success=False, data="No database connection is established."
        )

    def execute_read() -> QueryResult:
        try:
            cursor = self.db.aql.execute(query, bind_vars=bind_vars)
            records = [doc for doc in cursor]  # type: ignore
            records = records[: self.config.max_num_results]
            logger.warning(f"Records retrieved: {records}")
            return QueryResult(success=True, data=records if records else [])
        except Exception as e:
            if isinstance(e, ServerConnectionError):
                raise
            logger.error(f"Failed to execute query: {query}\n{e}")
            error_message = self.retry_query(e, query)
            return QueryResult(success=False, data=error_message)

    try:
        return self.with_retry(execute_read)  # type: ignore
    except Exception as e:
        return QueryResult(
            success=False, data=f"Failed after max retries: {str(e)}"
        )

write_query(query, bind_vars=None)

Execute a write query with connection retry.

Source code in langroid/agent/special/arangodb/arangodb_agent.py
def write_query(
    self, query: str, bind_vars: Optional[Dict[Any, Any]] = None
) -> QueryResult:
    """Execute a write query with connection retry."""
    if not self.db:
        return QueryResult(
            success=False, data="No database connection is established."
        )

    def execute_write() -> QueryResult:
        try:
            self.db.aql.execute(query, bind_vars=bind_vars)
            return QueryResult(success=True)
        except Exception as e:
            if isinstance(e, ServerConnectionError):
                raise
            logger.error(f"Failed to execute query: {query}\n{e}")
            error_message = self.retry_query(e, query)
            return QueryResult(success=False, data=error_message)

    try:
        return self.with_retry(execute_write)  # type: ignore
    except Exception as e:
        return QueryResult(
            success=False, data=f"Failed after max retries: {str(e)}"
        )

aql_retrieval_tool(msg)

Handle AQL query for data retrieval

Source code in langroid/agent/special/arangodb/arangodb_agent.py
def aql_retrieval_tool(self, msg: AQLRetrievalTool) -> str:
    """Handle AQL query for data retrieval"""
    if not self.tried_schema:
        return f"""
        You need to use `{arango_schema_tool_name}` first to get the 
        database schema before using `{aql_retrieval_tool_name}`. This ensures
        you know the correct collection names and edge definitions.
        """
    elif not self.config.database_created:
        return """
        You need to create the database first using `{aql_creation_tool_name}`.
        """
    self.num_tries += 1
    query = msg.aql_query
    if query == self.current_retrieval_aql_query:
        return """
        You have already tried this query, so you will get the same results again!
        If you need to retry, please MODIFY the query to get different results.
        """
    self.current_retrieval_aql_query = query
    logger.info(f"Executing AQL query: {query}")
    response = self.read_query(query)

    if isinstance(response.data, list) and len(response.data) == 0:
        return """
        No results found. Check if your collection names are correct - 
        they are case-sensitive. Use exact names from the schema.
        Try modifying your query based on the RETRY-SUGGESTIONS 
        in your instructions.
        """
    return str(response.data)

aql_creation_tool(msg)

Handle AQL query for creating data

Source code in langroid/agent/special/arangodb/arangodb_agent.py
def aql_creation_tool(self, msg: AQLCreationTool) -> str:
    """Handle AQL query for creating data"""
    self.num_tries += 1
    query = msg.aql_query
    logger.info(f"Executing AQL query: {query}")
    response = self.write_query(query)

    if response.success:
        self.config.database_created = True
        return "AQL query executed successfully"
    return str(response.data)

arango_schema_tool(msg)

Get database schema. If collections=None, include all collections. If properties=False, show only connection info, else show all properties and example-docs.

Source code in langroid/agent/special/arangodb/arangodb_agent.py
def arango_schema_tool(
    self,
    msg: ArangoSchemaTool | None,
) -> Dict[str, List[Dict[str, Any]]] | str:
    """Get database schema. If collections=None, include all collections.
    If properties=False, show only connection info,
    else show all properties and example-docs.
    """

    if (
        msg is not None
        and msg.collections == self.current_schema_params.collections
        and msg.properties == self.current_schema_params.properties
    ):
        return """
        You have already tried this schema TOOL, so you will get the same results 
        again! Please MODIFY the tool params `collections` or `properties` to get
        different results.
        """

    if msg is not None:
        collections = msg.collections
        properties = msg.properties
    else:
        collections = None
        properties = True
    self.tried_schema = True
    if (
        self.config.kg_schema is not None
        and len(self.config.kg_schema) > 0
        and msg is None
    ):
        # we are trying to pre-populate full schema before the agent runs,
        # so get it if it's already available
        # (Note of course that this "full schema" may actually be incomplete)
        return self.config.kg_schema

    # increment tries only if the LLM is asking for the schema,
    # in which case msg will not be None
    self.num_tries += msg is not None

    try:
        # Get graph schemas (keeping full graph info)
        graph_schema = [
            {"graph_name": g["name"], "edge_definitions": g["edge_definitions"]}
            for g in self.db.graphs()  # type: ignore
        ]

        # Get collection schemas
        collection_schema = []
        for collection in self.db.collections():  # type: ignore
            if collection["name"].startswith("_"):
                continue

            col_name = collection["name"]
            if collections and col_name not in collections:
                continue

            col_type = collection["type"]
            col_size = self.db.collection(col_name).count()

            if col_size == 0:
                continue

            if properties:
                # Full property collection with sampling
                lim = self.config.schema_sample_pct * col_size  # type: ignore
                limit_amount = ceil(lim / 100.0) or 1
                sample_query = f"""
                    FOR doc in {col_name}
                    LIMIT {limit_amount}
                    RETURN doc
                """

                properties_list = []
                example_doc = None

                def simplify_doc(doc: Any) -> Any:
                    if isinstance(doc, list) and len(doc) > 0:
                        return [simplify_doc(doc[0])]
                    if isinstance(doc, dict):
                        return {k: simplify_doc(v) for k, v in doc.items()}
                    return doc

                for doc in self.db.aql.execute(sample_query):  # type: ignore
                    if example_doc is None:
                        example_doc = simplify_doc(doc)
                    for key, value in doc.items():
                        prop = {"name": key, "type": type(value).__name__}
                        if prop not in properties_list:
                            properties_list.append(prop)

                collection_schema.append(
                    {
                        "collection_name": col_name,
                        "collection_type": col_type,
                        f"{col_type}_properties": properties_list,
                        f"example_{col_type}": example_doc,
                    }
                )
            else:
                # Basic info + from/to for edges only
                collection_info = {
                    "collection_name": col_name,
                    "collection_type": col_type,
                }
                if col_type == "edge":
                    # Get a sample edge to extract from/to fields
                    sample_edge = next(
                        self.db.aql.execute(  # type: ignore
                            f"FOR e IN {col_name} LIMIT 1 RETURN e"
                        ),
                        None,
                    )
                    if sample_edge:
                        collection_info["from_collection"] = sample_edge[
                            "_from"
                        ].split("/")[0]
                        collection_info["to_collection"] = sample_edge["_to"].split(
                            "/"
                        )[0]

                collection_schema.append(collection_info)

        schema = {
            "Graph Schema": graph_schema,
            "Collection Schema": collection_schema,
        }
        schema_str = json.dumps(schema, indent=2)
        logger.warning(f"Schema retrieved:\n{schema_str}")
        with open("logs/arango-schema.json", "w") as f:
            f.write(schema_str)
        if (n_fields := count_fields(schema)) > self.config.max_schema_fields:
            logger.warning(
                f"""
                Schema has {n_fields} fields, which exceeds the maximum of
                {self.config.max_schema_fields}. Showing a trimmed version
                that only includes edge info and no other properties.
                """
            )
            schema = trim_schema(schema)
            n_fields = count_fields(schema)
            logger.warning(f"Schema trimmed down to {n_fields} fields.")
            schema_str = (
                json.dumps(schema)
                + "\n"
                + f"""

                CAUTION: The requested schema was too large, so 
                the schema has been trimmed down to show only all collection names,
                their types, 
                and edge relationships (from/to collections) without any properties.
                To find out more about the schema, you can EITHER:
                - Use the `{arango_schema_tool_name}` tool again with the 
                  `properties` arg set to True, and `collections` arg set to
                    specific collections you want to know more about, OR
                - Use the `{aql_retrieval_tool_name}` tool to learn more about
                  the schema by querying the database.

                """
            )
            if msg is None:
                self.config.kg_schema = schema_str
            return schema_str
        self.config.kg_schema = schema
        return schema

    except Exception as e:
        logger.error(f"Schema retrieval failed: {str(e)}")
        return f"Failed to retrieve schema: {str(e)}"

handle_message_fallback(msg)

When LLM sends a no-tool msg, assume user is the intended recipient, and if in interactive mode, forward the msg to the user.

Source code in langroid/agent/special/arangodb/arangodb_agent.py
def handle_message_fallback(
    self, msg: str | ChatDocument
) -> str | ForwardTool | None:
    """When LLM sends a no-tool msg, assume user is the intended recipient,
    and if in interactive mode, forward the msg to the user.
    """
    done_tool_name = DoneTool.default_value("request")
    forward_tool_name = ForwardTool.default_value("request")
    aql_retrieval_tool_instructions = AQLRetrievalTool.instructions()
    # TODO the aql_retrieval_tool_instructions may be empty/minimal
    # when using self.config.use_functions_api = True.
    tools_instruction = f"""
      For example you may want to use the TOOL
      `{aql_retrieval_tool_name}`  according to these instructions:
       {aql_retrieval_tool_instructions}
    """
    if isinstance(msg, ChatDocument) and msg.metadata.sender == Entity.LLM:
        if self.interactive:
            return ForwardTool(agent="User")
        else:
            if self.config.chat_mode:
                return f"""
                Since you did not explicitly address the User, it is not clear
                whether:
                - you intend this to be the final response to the 
                  user's query/request, in which case you must use the 
                  `{forward_tool_name}` to indicate this.
                - OR, you FORGOT to use an Appropriate TOOL,
                  in which case you should use the available tools to
                  make progress on the user's query/request.
                  {tools_instruction}
                """
            return f"""
            The intent of your response is not clear:
            - if you intended this to be the FINAL answer to the user's query,
                then use the `{done_tool_name}` to indicate so,
                with the `content` set to the answer or result.
            - otherwise, use one of the available tools to make progress 
                to arrive at the final answer.
                {tools_instruction}
            """
    return None

retry_query(e, query)

Generate error message for failed AQL query

Source code in langroid/agent/special/arangodb/arangodb_agent.py
def retry_query(self, e: Exception, query: str) -> str:
    """Generate error message for failed AQL query"""
    logger.error(f"AQL Query failed: {query}\nException: {e}")

    error_message = f"""\
    {ARANGO_ERROR_MSG}: '{query}'
    {str(e)}
    Please try again with a corrected query.
    """

    return error_message