Skip to content

neo4j_chat_agent

langroid/agent/special/neo4j/neo4j_chat_agent.py

Neo4jChatAgent(config)

Bases: ChatAgent

Raises:

Type Description
ValueError

If database information is not provided in the config.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def __init__(self, config: Neo4jChatAgentConfig):
    """Initialize the Neo4jChatAgent.

    Raises:
        ValueError: If database information is not provided in the config.
    """
    self.config: Neo4jChatAgentConfig = config
    self._validate_config()
    self._import_neo4j()
    self._initialize_db()
    self._init_tools_sys_message()
    self.init_state()

handle_message_fallback(msg)

When LLM sends a no-tool msg, assume user is the intended recipient, and if in interactive mode, forward the msg to the user.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def handle_message_fallback(
    self, msg: str | ChatDocument
) -> str | ForwardTool | None:
    """
    When LLM sends a no-tool msg, assume user is the intended recipient,
    and if in interactive mode, forward the msg to the user.
    """

    done_tool_name = DoneTool.default_value("request")
    forward_tool_name = ForwardTool.default_value("request")
    if isinstance(msg, ChatDocument) and msg.metadata.sender == Entity.LLM:
        if self.interactive:
            return ForwardTool(agent="User")
        else:
            if self.config.chat_mode:
                return f"""
                Since you did not explicitly address the User, it is not clear
                whether:
                - you intend this to be the final response to the 
                  user's query/request, in which case you must use the 
                  `{forward_tool_name}` to indicate this.
                - OR, you FORGOT to use an Appropriate TOOL,
                  in which case you should use the available tools to
                  make progress on the user's query/request.
                """
            return f"""
            The intent of your response is not clear:
            - if you intended this to be the final answer to the user's query,
                then use the `{done_tool_name}` to indicate so,
                with the `content` set to the answer or result.
            - otherwise, use one of the available tools to make progress 
                to arrive at the final answer.
            """
    return None

close()

close the connection

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def close(self) -> None:
    """close the connection"""
    if self.driver:
        self.driver.close()

retry_query(e, query)

Generate an error message for a failed Cypher query and return it.

Parameters:

Name Type Description Default
e Exception

The exception raised during the Cypher query execution.

required
query str

The Cypher query that failed.

required

Returns:

Name Type Description
str str

The error message.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def retry_query(self, e: Exception, query: str) -> str:
    """
    Generate an error message for a failed Cypher query and return it.

    Args:
        e (Exception): The exception raised during the Cypher query execution.
        query (str): The Cypher query that failed.

    Returns:
        str: The error message.
    """
    logger.error(f"Cypher Query failed: {query}\nException: {e}")

    # Construct the error message
    error_message_template = f"""\
    {NEO4J_ERROR_MSG}: '{query}'
    {str(e)}
    Run a new query, correcting the errors.
    """

    return error_message_template

read_query(query, parameters=None)

Executes a given Cypher query with parameters on the Neo4j database.

Parameters:

Name Type Description Default
query str

The Cypher query string to be executed.

required
parameters Optional[Dict[Any, Any]]

A dictionary of parameters for the query.

None

Returns:

Name Type Description
QueryResult QueryResult

An object representing the outcome of the query execution.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def read_query(
    self, query: str, parameters: Optional[Dict[Any, Any]] = None
) -> QueryResult:
    """
    Executes a given Cypher query with parameters on the Neo4j database.

    Args:
        query (str): The Cypher query string to be executed.
        parameters (Optional[Dict[Any, Any]]): A dictionary of parameters for
                                                the query.

    Returns:
        QueryResult: An object representing the outcome of the query execution.
    """
    if not self.driver:
        return QueryResult(
            success=False, data="No database connection is established."
        )

    try:
        assert isinstance(self.config, Neo4jChatAgentConfig)
        with self.driver.session(
            database=self.config.neo4j_settings.database
        ) as session:
            result = session.run(query, parameters)
            if result.peek():
                records = [record.data() for record in result]
                return QueryResult(success=True, data=records)
            else:
                return QueryResult(success=True, data=[])
    except Exception as e:
        logger.error(f"Failed to execute query: {query}\n{e}")
        error_message = self.retry_query(e, query)
        return QueryResult(success=False, data=error_message)
    finally:
        self.close()

write_query(query, parameters=None)

Executes a write transaction using a given Cypher query on the Neo4j database. This method should be used for queries that modify the database.

Parameters:

Name Type Description Default
query str

The Cypher query string to be executed.

required
parameters dict

A dict of parameters for the Cypher query.

None

Returns:

Name Type Description
QueryResult QueryResult

An object representing the outcome of the query execution. It contains a success flag and an optional error message.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def write_query(
    self, query: str, parameters: Optional[Dict[Any, Any]] = None
) -> QueryResult:
    """
    Executes a write transaction using a given Cypher query on the Neo4j database.
    This method should be used for queries that modify the database.

    Args:
        query (str): The Cypher query string to be executed.
        parameters (dict, optional): A dict of parameters for the Cypher query.

    Returns:
        QueryResult: An object representing the outcome of the query execution.
                     It contains a success flag and an optional error message.
    """
    # Check if query contains database/collection creation patterns
    query_upper = query.upper()
    is_creation_query = any(
        [
            "CREATE" in query_upper,
            "MERGE" in query_upper,
            "CREATE CONSTRAINT" in query_upper,
            "CREATE INDEX" in query_upper,
        ]
    )

    if is_creation_query:
        self.config.database_created = True
        logger.info("Detected database/collection creation query")

    if not self.driver:
        return QueryResult(
            success=False, data="No database connection is established."
        )

    try:
        assert isinstance(self.config, Neo4jChatAgentConfig)
        with self.driver.session(
            database=self.config.neo4j_settings.database
        ) as session:
            session.write_transaction(lambda tx: tx.run(query, parameters))
            return QueryResult(success=True)
    except Exception as e:
        logging.warning(f"An error occurred: {e}")
        error_message = self.retry_query(e, query)
        return QueryResult(success=False, data=error_message)
    finally:
        self.close()

remove_database()

Deletes all nodes and relationships from the current Neo4j database.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def remove_database(self) -> None:
    """Deletes all nodes and relationships from the current Neo4j database."""
    delete_query = """
            MATCH (n)
            DETACH DELETE n
        """
    response = self.write_query(delete_query)

    if response.success:
        print("[green]Database is deleted!")
    else:
        print("[red]Database is not deleted!")

cypher_retrieval_tool(msg)

" Handle a CypherRetrievalTool message by executing a Cypher query and returning the result. Args: msg (CypherRetrievalTool): The tool-message to handle.

Returns:

Name Type Description
str str

The result of executing the cypher_query.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def cypher_retrieval_tool(self, msg: CypherRetrievalTool) -> str:
    """ "
    Handle a CypherRetrievalTool message by executing a Cypher query and
    returning the result.
    Args:
        msg (CypherRetrievalTool): The tool-message to handle.

    Returns:
        str: The result of executing the cypher_query.
    """
    if not self.tried_schema:
        return f"""
        You did not yet use the `{graph_schema_tool_name}` tool to get the schema 
        of the neo4j knowledge-graph db. Use that tool first before using 
        the `{cypher_retrieval_tool_name}` tool, to ensure you know all the correct
        node labels, relationship types, and property keys available in
        the database.
        """
    elif not self.config.database_created:
        return f"""
        You have not yet created the Neo4j database. 
        Use the `{cypher_creation_tool_name}`
        tool to create the database first before using the 
        `{cypher_retrieval_tool_name}` tool.
        """
    query = msg.cypher_query
    self.current_retrieval_cypher_query = query
    logger.info(f"Executing Cypher query: {query}")
    response = self.read_query(query)
    if isinstance(response.data, list) and len(response.data) == 0:
        return """
        No results found; check if your query used the right label names -- 
        remember these are case sensitive, so you have to use the exact label
        names you found in the schema. 
        Or retry using one of the  RETRY-SUGGESTIONS in your instructions. 
        """
    return str(response.data)

cypher_creation_tool(msg)

" Handle a CypherCreationTool message by executing a Cypher query and returning the result. Args: msg (CypherCreationTool): The tool-message to handle.

Returns:

Name Type Description
str str

The result of executing the cypher_query.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def cypher_creation_tool(self, msg: CypherCreationTool) -> str:
    """ "
    Handle a CypherCreationTool message by executing a Cypher query and
    returning the result.
    Args:
        msg (CypherCreationTool): The tool-message to handle.

    Returns:
        str: The result of executing the cypher_query.
    """
    query = msg.cypher_query

    logger.info(f"Executing Cypher query: {query}")
    response = self.write_query(query)
    if response.success:
        self.config.database_created = True
        return "Cypher query executed successfully"
    else:
        return str(response.data)

graph_schema_tool(msg)

Retrieves the schema of a Neo4j graph database.

Parameters:

Name Type Description Default
msg GraphSchemaTool

An instance of GraphDatabaseSchema, typically

required

Returns:

Name Type Description
str str | Optional[Union[str, List[Dict[Any, Any]]]]

The visual representation of the database schema as a string, or a

str | Optional[Union[str, List[Dict[Any, Any]]]]

message stating that the database schema is empty or not valid.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def graph_schema_tool(
    self, msg: GraphSchemaTool | None
) -> str | Optional[Union[str, List[Dict[Any, Any]]]]:
    """
    Retrieves the schema of a Neo4j graph database.

    Args:
        msg (GraphSchemaTool): An instance of GraphDatabaseSchema, typically
        containing information or parameters needed for the database query.

    Returns:
        str: The visual representation of the database schema as a string, or a
        message stating that the database schema is empty or not valid.

    Raises:
        This function does not explicitly raise exceptions but depends on the
        behavior of 'self.read_query' method, which might raise exceptions related
         to database connectivity or query execution.
    """
    self.tried_schema = True
    if self.config.kg_schema is not None and len(self.config.kg_schema) > 0:
        return self.config.kg_schema
    schema_result = self.read_query("CALL db.schema.visualization()")
    if schema_result.success:
        # there is a possibility that the schema is empty, which is a valid response
        # the schema.data will be: [{"nodes": [], "relationships": []}]
        self.config.kg_schema = schema_result.data  # type: ignore
        return schema_result.data
    else:
        return f"Failed to retrieve schema: {schema_result.data}"