Skip to content

neo4j_chat_agent

langroid/agent/special/neo4j/neo4j_chat_agent.py

Neo4jChatAgent(config)

Bases: ChatAgent

Raises:

Type Description
ValueError

If database information is not provided in the config.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def __init__(self, config: Neo4jChatAgentConfig):
    """Initialize the Neo4jChatAgent.

    Raises:
        ValueError: If database information is not provided in the config.
    """
    self.config: Neo4jChatAgentConfig = config
    self._validate_config()
    self._import_neo4j()
    self._initialize_connection()
    self._init_tool_messages()

handle_message_fallback(msg)

When LLM sends a no-tool msg, assume user is the intended recipient.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def handle_message_fallback(
    self, msg: str | ChatDocument
) -> str | ChatDocument | None:
    """When LLM sends a no-tool msg, assume user is the intended recipient."""
    if isinstance(msg, ChatDocument) and msg.metadata.sender == Entity.LLM:
        msg.metadata.recipient = Entity.USER
        return msg
    return None

close()

close the connection

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def close(self) -> None:
    """close the connection"""
    if self.driver:
        self.driver.close()

retry_query(e, query)

Generate an error message for a failed Cypher query and return it.

Parameters:

Name Type Description Default
e Exception

The exception raised during the Cypher query execution.

required
query str

The Cypher query that failed.

required

Returns:

Name Type Description
str str

The error message.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def retry_query(self, e: Exception, query: str) -> str:
    """
    Generate an error message for a failed Cypher query and return it.

    Args:
        e (Exception): The exception raised during the Cypher query execution.
        query (str): The Cypher query that failed.

    Returns:
        str: The error message.
    """
    logger.error(f"Cypher Query failed: {query}\nException: {e}")

    # Construct the error message
    error_message_template = f"""\
    {NEO4J_ERROR_MSG}: '{query}'
    {str(e)}
    Run a new query, correcting the errors.
    """

    return error_message_template

read_query(query, parameters=None)

Executes a given Cypher query with parameters on the Neo4j database.

Parameters:

Name Type Description Default
query str

The Cypher query string to be executed.

required
parameters Optional[Dict[Any, Any]]

A dictionary of parameters for the query.

None

Returns:

Name Type Description
QueryResult QueryResult

An object representing the outcome of the query execution.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def read_query(
    self, query: str, parameters: Optional[Dict[Any, Any]] = None
) -> QueryResult:
    """
    Executes a given Cypher query with parameters on the Neo4j database.

    Args:
        query (str): The Cypher query string to be executed.
        parameters (Optional[Dict[Any, Any]]): A dictionary of parameters for
                                                the query.

    Returns:
        QueryResult: An object representing the outcome of the query execution.
    """
    if not self.driver:
        return QueryResult(
            success=False, data="No database connection is established."
        )

    try:
        assert isinstance(self.config, Neo4jChatAgentConfig)
        with self.driver.session(
            database=self.config.neo4j_settings.database
        ) as session:
            result = session.run(query, parameters)
            if result.peek():
                records = [record.data() for record in result]
                return QueryResult(success=True, data=records)
            else:
                return QueryResult(success=True, data=[])
    except Exception as e:
        logger.error(f"Failed to execute query: {query}\n{e}")
        error_message = self.retry_query(e, query)
        return QueryResult(success=False, data=error_message)
    finally:
        self.close()

write_query(query, parameters=None)

Executes a write transaction using a given Cypher query on the Neo4j database. This method should be used for queries that modify the database.

Parameters:

Name Type Description Default
query str

The Cypher query string to be executed.

required
parameters dict

A dict of parameters for the Cypher query.

None

Returns:

Name Type Description
QueryResult QueryResult

An object representing the outcome of the query execution. It contains a success flag and an optional error message.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def write_query(
    self, query: str, parameters: Optional[Dict[Any, Any]] = None
) -> QueryResult:
    """
    Executes a write transaction using a given Cypher query on the Neo4j database.
    This method should be used for queries that modify the database.

    Args:
        query (str): The Cypher query string to be executed.
        parameters (dict, optional): A dict of parameters for the Cypher query.

    Returns:
        QueryResult: An object representing the outcome of the query execution.
                     It contains a success flag and an optional error message.
    """
    if not self.driver:
        return QueryResult(
            success=False, data="No database connection is established."
        )

    try:
        assert isinstance(self.config, Neo4jChatAgentConfig)
        with self.driver.session(
            database=self.config.neo4j_settings.database
        ) as session:
            session.write_transaction(lambda tx: tx.run(query, parameters))
            return QueryResult(success=True)
    except Exception as e:
        logging.warning(f"An error occurred: {e}")
        error_message = self.retry_query(e, query)
        return QueryResult(success=False, data=error_message)
    finally:
        self.close()

remove_database()

Deletes all nodes and relationships from the current Neo4j database.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def remove_database(self) -> None:
    """Deletes all nodes and relationships from the current Neo4j database."""
    delete_query = """
            MATCH (n)
            DETACH DELETE n
        """
    response = self.write_query(delete_query)

    if response.success:
        print("[green]Database is deleted!")
    else:
        print("[red]Database is not deleted!")

retrieval_query(msg)

" Handle a CypherRetrievalTool message by executing a Cypher query and returning the result. Args: msg (CypherRetrievalTool): The tool-message to handle.

Returns:

Name Type Description
str str

The result of executing the cypher_query.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def retrieval_query(self, msg: CypherRetrievalTool) -> str:
    """ "
    Handle a CypherRetrievalTool message by executing a Cypher query and
    returning the result.
    Args:
        msg (CypherRetrievalTool): The tool-message to handle.

    Returns:
        str: The result of executing the cypher_query.
    """
    if not self.tried_schema:
        return """
        You did not yet use the `get_schema` tool to get the schema 
        of the neo4j knowledge-graph db. Use that tool first before using 
        the `retrieval_query` tool, to ensure you know all the correct
        node labels, relationship types, and property keys available in
        the database.
        """
    query = msg.cypher_query

    logger.info(f"Executing Cypher query: {query}")
    response = self.read_query(query)
    if isinstance(response.data, list) and len(response.data) == 0:
        return """
        No results found; check if your query used the right label names -- 
        remember these are case sensitive, so you have to use the exact label
        names you found in the schema. 
        Or retry using one of the  RETRY-SUGGESTIONS in your instructions. 
        """
    return str(response.data)

create_query(msg)

" Handle a CypherCreationTool message by executing a Cypher query and returning the result. Args: msg (CypherCreationTool): The tool-message to handle.

Returns:

Name Type Description
str str

The result of executing the cypher_query.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def create_query(self, msg: CypherCreationTool) -> str:
    """ "
    Handle a CypherCreationTool message by executing a Cypher query and
    returning the result.
    Args:
        msg (CypherCreationTool): The tool-message to handle.

    Returns:
        str: The result of executing the cypher_query.
    """
    query = msg.cypher_query

    logger.info(f"Executing Cypher query: {query}")
    response = self.write_query(query)
    if response.success:
        return "Cypher query executed successfully"
    else:
        return str(response.data)

get_schema(msg)

Retrieves the schema of a Neo4j graph database.

Parameters:

Name Type Description Default
msg GraphSchemaTool

An instance of GraphDatabaseSchema, typically

required

Returns:

Name Type Description
str str

The visual representation of the database schema as a string, or a

str

message stating that the database schema is empty or not valid.

Source code in langroid/agent/special/neo4j/neo4j_chat_agent.py
def get_schema(self, msg: GraphSchemaTool | None) -> str:
    """
    Retrieves the schema of a Neo4j graph database.

    Args:
        msg (GraphSchemaTool): An instance of GraphDatabaseSchema, typically
        containing information or parameters needed for the database query.

    Returns:
        str: The visual representation of the database schema as a string, or a
        message stating that the database schema is empty or not valid.

    Raises:
        This function does not explicitly raise exceptions but depends on the
        behavior of 'self.read_query' method, which might raise exceptions related
         to database connectivity or query execution.
    """
    self.tried_schema = True
    schema_result = self.read_query("CALL db.schema.visualization()")
    if schema_result.success:
        # ther is a possibility that the schema is empty, which is a valid response
        # the schema.data will be: [{"nodes": [], "relationships": []}]
        return json.dumps(schema_result.data)
    else:
        return f"Failed to retrieve schema: {schema_result.data}"