Skip to content

parsing

langroid/parsing/init.py

Parser(config)

Source code in langroid/parsing/parser.py
def __init__(self, config: ParsingConfig):
    self.config = config
    try:
        self.tokenizer = tiktoken.encoding_for_model(config.token_encoding_model)
    except Exception:
        self.tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002")

add_window_ids(chunks)

Chunks may belong to multiple docs, but for each doc, they appear consecutively. Add window_ids in metadata

Source code in langroid/parsing/parser.py
def add_window_ids(self, chunks: List[Document]) -> None:
    """Chunks may belong to multiple docs, but for each doc,
    they appear consecutively. Add window_ids in metadata"""

    # discard empty chunks
    chunks = [c for c in chunks if c.content.strip() != ""]
    if len(chunks) == 0:
        return
    # The original metadata.id (if any) is ignored since it will be same for all
    # chunks and is useless. We want a distinct id for each chunk.
    # ASSUMPTION: all chunks c of a doc have same c.metadata.id !
    orig_ids = [c.metadata.id for c in chunks]
    ids = [ObjectRegistry.new_id() for c in chunks]
    id2chunk = {id: c for id, c in zip(ids, chunks)}

    # group the ids by orig_id
    # (each distinct orig_id refers to a different document)
    orig_id_to_ids: Dict[str, List[str]] = {}
    for orig_id, id in zip(orig_ids, ids):
        if orig_id not in orig_id_to_ids:
            orig_id_to_ids[orig_id] = []
        orig_id_to_ids[orig_id].append(id)

    # now each orig_id maps to a sequence of ids within a single doc

    k = self.config.n_neighbor_ids
    for orig, ids in orig_id_to_ids.items():
        # ids are consecutive chunks in a single doc
        n = len(ids)
        window_ids = [ids[max(0, i - k) : min(n, i + k + 1)] for i in range(n)]
        for i, _ in enumerate(ids):
            c = id2chunk[ids[i]]
            c.metadata.window_ids = window_ids[i]
            c.metadata.id = ids[i]
            c.metadata.is_chunk = True

chunk_tokens(text)

Split a text into chunks of ~CHUNK_SIZE tokens, based on punctuation and newline boundaries. Adapted from https://github.com/openai/chatgpt-retrieval-plugin/blob/main/services/chunks.py

Parameters:

Name Type Description Default
text str

The text to split into chunks.

required

Returns:

Type Description
List[str]

A list of text chunks, each of which is a string of tokens

List[str]

roughly self.config.chunk_size tokens long.

Source code in langroid/parsing/parser.py
def chunk_tokens(
    self,
    text: str,
) -> List[str]:
    """
    Split a text into chunks of ~CHUNK_SIZE tokens,
    based on punctuation and newline boundaries.
    Adapted from
    https://github.com/openai/chatgpt-retrieval-plugin/blob/main/services/chunks.py

    Args:
        text: The text to split into chunks.

    Returns:
        A list of text chunks, each of which is a string of tokens
        roughly self.config.chunk_size tokens long.
    """
    # Return an empty list if the text is empty or whitespace
    if not text or text.isspace():
        return []

    # Tokenize the text
    tokens = self.tokenizer.encode(text, disallowed_special=())

    # Initialize an empty list of chunks
    chunks = []

    # Initialize a counter for the number of chunks
    num_chunks = 0

    # Loop until all tokens are consumed
    while tokens and num_chunks < self.config.max_chunks:
        # Take the first chunk_size tokens as a chunk
        chunk = tokens[: self.config.chunk_size]

        # Decode the chunk into text
        chunk_text = self.tokenizer.decode(chunk)

        # Skip the chunk if it is empty or whitespace
        if not chunk_text or chunk_text.isspace():
            # Remove the tokens corresponding to the chunk text
            # from remaining tokens
            tokens = tokens[len(chunk) :]
            # Continue to the next iteration of the loop
            continue

        # Find the last period or punctuation mark in the chunk
        last_punctuation = max(
            chunk_text.rfind("."),
            chunk_text.rfind("?"),
            chunk_text.rfind("!"),
            chunk_text.rfind("\n"),
        )

        # If there is a punctuation mark, and the last punctuation index is
        # after MIN_CHUNK_SIZE_CHARS
        if (
            last_punctuation != -1
            and last_punctuation > self.config.min_chunk_chars
        ):
            # Truncate the chunk text at the punctuation mark
            chunk_text = chunk_text[: last_punctuation + 1]

        # Remove any newline characters and strip any leading or
        # trailing whitespace
        chunk_text_to_append = chunk_text.replace("\n", " ").strip()

        if len(chunk_text_to_append) > self.config.discard_chunk_chars:
            # Append the chunk text to the list of chunks
            chunks.append(chunk_text_to_append)

        # Remove the tokens corresponding to the chunk text
        # from the remaining tokens
        tokens = tokens[
            len(self.tokenizer.encode(chunk_text, disallowed_special=())) :
        ]

        # Increment the number of chunks
        num_chunks += 1

    # There may be remaining tokens, but we discard them
    # since we have already reached the maximum number of chunks

    return chunks