Skip to content

batch

langroid/agent/batch.py

ExceptionHandling

Bases: str, Enum

Enum for exception handling options.

run_batched_tasks(inputs, do_task, batch_size, stop_on_first_result, sequential, handle_exceptions, output_map, message_template, message=None)

Common batch processing logic for both agent methods and tasks.

Parameters:

Name Type Description Default
inputs List[str | ChatDocument]

List of inputs to process

required
do_task Callable[[str | ChatDocument, int], Coroutine[Any, Any, Any]]

Task execution function

required
batch_size Optional[int]

Size of batches, if None process all at once

required
stop_on_first_result bool

Whether to stop after first valid result

required
sequential bool

Whether to process sequentially

required
handle_exceptions Union[bool, ExceptionHandling]

How to handle exceptions: - RAISE or False: Let exceptions propagate - RETURN_NONE or True: Convert exceptions to None in results - RETURN_EXCEPTION: Include exception objects in results Boolean values are deprecated and will be removed in a future version.

required
output_map Callable[[Any], Any]

Function to map results

required
message_template str

Template for status message

required
message Optional[str]

Optional override for status message

None
Source code in langroid/agent/batch.py
def run_batched_tasks(
    inputs: List[str | ChatDocument],
    do_task: Callable[[str | ChatDocument, int], Coroutine[Any, Any, Any]],
    batch_size: Optional[int],
    stop_on_first_result: bool,
    sequential: bool,
    handle_exceptions: Union[bool, ExceptionHandling],
    output_map: Callable[[Any], Any],
    message_template: str,
    message: Optional[str] = None,
) -> List[Any]:
    """
    Common batch processing logic for both agent methods and tasks.

    Args:
        inputs: List of inputs to process
        do_task: Task execution function
        batch_size: Size of batches, if None process all at once
        stop_on_first_result: Whether to stop after first valid result
        sequential: Whether to process sequentially
        handle_exceptions: How to handle exceptions:
            - RAISE or False: Let exceptions propagate
            - RETURN_NONE or True: Convert exceptions to None in results
            - RETURN_EXCEPTION: Include exception objects in results
            Boolean values are deprecated and will be removed in a future version.
        output_map: Function to map results
        message_template: Template for status message
        message: Optional override for status message
    """

    async def run_all_batched_tasks(
        inputs: List[str | ChatDocument],
        batch_size: int | None,
    ) -> List[Any]:
        """Extra wrap to run asyncio.run one single time and not once per loop

        Args:
            inputs (List[str  |  ChatDocument]): inputs to process
            batch_size (int | None): batch size

        Returns:
            List[Any]: results
        """
        results: List[Any] = []
        if batch_size is None:
            msg = message or message_template.format(total=len(inputs))
            with status(msg), SuppressLoggerWarnings():
                results = await _process_batch_async(
                    inputs,
                    do_task,
                    stop_on_first_result=stop_on_first_result,
                    sequential=sequential,
                    handle_exceptions=handle_exceptions,
                    output_map=output_map,
                )
        else:
            batches = batched(inputs, batch_size)
            for batch in batches:
                start_idx = len(results)
                complete_str = f", {start_idx} complete" if start_idx > 0 else ""
                msg = (
                    message or message_template.format(total=len(inputs)) + complete_str
                )

                if stop_on_first_result and any(r is not None for r in results):
                    results.extend([None] * len(batch))
                else:
                    with status(msg), SuppressLoggerWarnings():
                        results.extend(
                            await _process_batch_async(
                                batch,
                                do_task,
                                start_idx=start_idx,
                                stop_on_first_result=stop_on_first_result,
                                sequential=sequential,
                                handle_exceptions=handle_exceptions,
                                output_map=output_map,
                            )
                        )
        return results

    return asyncio.run(run_all_batched_tasks(inputs, batch_size))

run_batch_task_gen(gen_task, items, input_map=lambda x: str(x), output_map=lambda x: x, stop_on_first_result=False, sequential=True, batch_size=None, turns=-1, message=None, handle_exceptions=ExceptionHandling.RAISE, max_cost=0.0, max_tokens=0)

Generate and run copies of a task async/concurrently one per item in items list. For each item, apply input_map to get the initial message to process. For each result, apply output_map to get the final result. Args: gen_task (Callable[[int], Task]): generates the tasks to run items (list[T]): list of items to process input_map (Callable[[T], str|ChatDocument]): function to map item to initial message to process output_map (Callable[[ChatDocument|str], U]): function to map result to final result. If stop_on_first_result is enabled, then map any invalid output to None. We continue until some non-None result is obtained. stop_on_first_result (bool): whether to stop after the first valid (not-None) result. In this case all other tasks are cancelled, and their corresponding result is None in the returned list. sequential (bool): whether to run sequentially (e.g. some APIs such as ooba don't support concurrent requests) batch_size (Optional[int]): The number of tasks to run at a time, if None, unbatched turns (int): number of turns to run, -1 for infinite message (Optional[str]): optionally overrides the console status messages handle_exceptions: How to handle exceptions: - RAISE or False: Let exceptions propagate - RETURN_NONE or True: Convert exceptions to None in results - RETURN_EXCEPTION: Include exception objects in results Boolean values are deprecated and will be removed in a future version. max_cost: float: maximum cost to run the task (default 0.0 for unlimited) max_tokens: int: maximum token usage (in and out) (default 0 for unlimited)

Returns:

Type Description
list[Optional[U]]

list[Optional[U]]: list of final results. Always list[U] if

list[Optional[U]]

stop_on_first_result is disabled

Source code in langroid/agent/batch.py
def run_batch_task_gen(
    gen_task: Callable[[int], Task],
    items: list[T],
    input_map: Callable[[T], str | ChatDocument] = lambda x: str(x),
    output_map: Callable[[ChatDocument | None], U] = lambda x: x,  # type: ignore
    stop_on_first_result: bool = False,
    sequential: bool = True,
    batch_size: Optional[int] = None,
    turns: int = -1,
    message: Optional[str] = None,
    handle_exceptions: Union[bool, ExceptionHandling] = ExceptionHandling.RAISE,
    max_cost: float = 0.0,
    max_tokens: int = 0,
) -> list[Optional[U]]:
    """
    Generate and run copies of a task async/concurrently one per item in `items` list.
    For each item, apply `input_map` to get the initial message to process.
    For each result, apply `output_map` to get the final result.
    Args:
        gen_task (Callable[[int], Task]): generates the tasks to run
        items (list[T]): list of items to process
        input_map (Callable[[T], str|ChatDocument]): function to map item to
            initial message to process
        output_map (Callable[[ChatDocument|str], U]): function to map result
            to final result. If stop_on_first_result is enabled, then
            map any invalid output to None. We continue until some non-None
            result is obtained.
        stop_on_first_result (bool): whether to stop after the first valid
            (not-None) result. In this case all other tasks are
            cancelled, and their corresponding result is None in the
            returned list.
        sequential (bool): whether to run sequentially
            (e.g. some APIs such as ooba don't support concurrent requests)
        batch_size (Optional[int]): The number of tasks to run at a time,
            if None, unbatched
        turns (int): number of turns to run, -1 for infinite
        message (Optional[str]): optionally overrides the console status messages
        handle_exceptions: How to handle exceptions:
            - RAISE or False: Let exceptions propagate
            - RETURN_NONE or True: Convert exceptions to None in results
            - RETURN_EXCEPTION: Include exception objects in results
            Boolean values are deprecated and will be removed in a future version.
        max_cost: float: maximum cost to run the task (default 0.0 for unlimited)
        max_tokens: int: maximum token usage (in and out) (default 0 for unlimited)


    Returns:
        list[Optional[U]]: list of final results. Always list[U] if
        `stop_on_first_result` is disabled
    """
    inputs = [input_map(item) for item in items]

    async def _do_task(
        input: str | ChatDocument,
        i: int,
    ) -> BaseException | Optional[ChatDocument] | tuple[int, Optional[ChatDocument]]:
        task_i = gen_task(i)
        if task_i.agent.llm is not None:
            task_i.agent.llm.set_stream(False)
        task_i.agent.config.show_stats = False

        try:
            result = await task_i.run_async(
                input, turns=turns, max_cost=max_cost, max_tokens=max_tokens
            )
        except asyncio.CancelledError as e:
            task_i.kill()
            # exception will be handled by the caller
            raise e
        return result

    return run_batched_tasks(
        inputs=inputs,
        do_task=_do_task,
        batch_size=batch_size,
        stop_on_first_result=stop_on_first_result,
        sequential=sequential,
        handle_exceptions=handle_exceptions,
        output_map=output_map,
        message_template="[bold green]Running {total} tasks:",
        message=message,
    )

run_batch_tasks(task, items, input_map=lambda x: str(x), output_map=lambda x: x, stop_on_first_result=False, sequential=True, batch_size=None, turns=-1, max_cost=0.0, max_tokens=0)

Run copies of task async/concurrently one per item in items list. For each item, apply input_map to get the initial message to process. For each result, apply output_map to get the final result. Args: task (Task): task to run items (list[T]): list of items to process input_map (Callable[[T], str|ChatDocument]): function to map item to initial message to process output_map (Callable[[ChatDocument|str], U]): function to map result to final result sequential (bool): whether to run sequentially (e.g. some APIs such as ooba don't support concurrent requests) batch_size (Optional[int]): The number of tasks to run at a time, if None, unbatched turns (int): number of turns to run, -1 for infinite max_cost: float: maximum cost to run the task (default 0.0 for unlimited) max_tokens: int: maximum token usage (in and out) (default 0 for unlimited)

Returns:

Type Description
List[Optional[U]]

list[Optional[U]]: list of final results. Always list[U] if

List[Optional[U]]

stop_on_first_result is disabled

Source code in langroid/agent/batch.py
def run_batch_tasks(
    task: Task,
    items: list[T],
    input_map: Callable[[T], str | ChatDocument] = lambda x: str(x),
    output_map: Callable[[ChatDocument | None], U] = lambda x: x,  # type: ignore
    stop_on_first_result: bool = False,
    sequential: bool = True,
    batch_size: Optional[int] = None,
    turns: int = -1,
    max_cost: float = 0.0,
    max_tokens: int = 0,
) -> List[Optional[U]]:
    """
    Run copies of `task` async/concurrently one per item in `items` list.
    For each item, apply `input_map` to get the initial message to process.
    For each result, apply `output_map` to get the final result.
    Args:
        task (Task): task to run
        items (list[T]): list of items to process
        input_map (Callable[[T], str|ChatDocument]): function to map item to
            initial message to process
        output_map (Callable[[ChatDocument|str], U]): function to map result
            to final result
        sequential (bool): whether to run sequentially
            (e.g. some APIs such as ooba don't support concurrent requests)
        batch_size (Optional[int]): The number of tasks to run at a time,
            if None, unbatched
        turns (int): number of turns to run, -1 for infinite
        max_cost: float: maximum cost to run the task (default 0.0 for unlimited)
        max_tokens: int: maximum token usage (in and out) (default 0 for unlimited)

    Returns:
        list[Optional[U]]: list of final results. Always list[U] if
        `stop_on_first_result` is disabled
    """
    message = f"[bold green]Running {len(items)} copies of {task.name}..."
    return run_batch_task_gen(
        lambda i: task.clone(i),
        items,
        input_map,
        output_map,
        stop_on_first_result,
        sequential,
        batch_size,
        turns,
        message,
        max_cost=max_cost,
        max_tokens=max_tokens,
    )

run_batch_agent_method(agent, method, items, input_map=lambda x: str(x), output_map=lambda x: x, sequential=True, stop_on_first_result=False, handle_exceptions=ExceptionHandling.RAISE, batch_size=None)

Run the method on copies of agent, async/concurrently one per item in items list. ASSUMPTION: The method is an async method and has signature: method(self, input: str|ChatDocument|None) -> ChatDocument|None So this would typically be used for the agent's "responder" methods, e.g. llm_response_async or agent_responder_async.

For each item, apply input_map to get the initial message to process. For each result, apply output_map to get the final result.

Parameters:

Name Type Description Default
agent Agent

agent whose method to run

required
method str

Async method to run on copies of agent. The method is assumed to have signature: method(self, input: str|ChatDocument|None) -> ChatDocument|None

required
input_map Callable[[Any], str | ChatDocument]

function to map item to initial message to process

lambda x: str(x)
output_map Callable[[ChatDocument | str], Any]

function to map result to final result

lambda x: x
sequential bool

whether to run sequentially (e.g. some APIs such as ooba don't support concurrent requests)

True
stop_on_first_result bool

whether to stop after the first valid

False
handle_exceptions Union[bool, ExceptionHandling]

How to handle exceptions: - RAISE or False: Let exceptions propagate - RETURN_NONE or True: Convert exceptions to None in results - RETURN_EXCEPTION: Include exception objects in results Boolean values are deprecated and will be removed in a future version.

RAISE
batch_size Optional[int]

The number of items to process in each batch. If None, process all items at once.

None

Returns: List[Any]: list of final results

Source code in langroid/agent/batch.py
def run_batch_agent_method(
    agent: Agent,
    method: Callable[
        [str | ChatDocument | None], Coroutine[Any, Any, ChatDocument | None]
    ],
    items: List[Any],
    input_map: Callable[[Any], str | ChatDocument] = lambda x: str(x),
    output_map: Callable[[ChatDocument | None], Any] = lambda x: x,
    sequential: bool = True,
    stop_on_first_result: bool = False,
    handle_exceptions: Union[bool, ExceptionHandling] = ExceptionHandling.RAISE,
    batch_size: Optional[int] = None,
) -> List[Any]:
    """
    Run the `method` on copies of `agent`, async/concurrently one per
    item in `items` list.
    ASSUMPTION: The `method` is an async method and has signature:
        method(self, input: str|ChatDocument|None) -> ChatDocument|None
    So this would typically be used for the agent's "responder" methods,
    e.g. `llm_response_async` or `agent_responder_async`.

    For each item, apply `input_map` to get the initial message to process.
    For each result, apply `output_map` to get the final result.

    Args:
        agent (Agent): agent whose method to run
        method (str): Async method to run on copies of `agent`.
            The method is assumed to have signature:
            `method(self, input: str|ChatDocument|None) -> ChatDocument|None`
        input_map (Callable[[Any], str|ChatDocument]): function to map item to
            initial message to process
        output_map (Callable[[ChatDocument|str], Any]): function to map result
            to final result
        sequential (bool): whether to run sequentially
            (e.g. some APIs such as ooba don't support concurrent requests)
        stop_on_first_result (bool): whether to stop after the first valid
        handle_exceptions: How to handle exceptions:
            - RAISE or False: Let exceptions propagate
            - RETURN_NONE or True: Convert exceptions to None in results
            - RETURN_EXCEPTION: Include exception objects in results
            Boolean values are deprecated and will be removed in a future version.
        batch_size (Optional[int]): The number of items to process in each batch.
            If None, process all items at once.
    Returns:
        List[Any]: list of final results
    """
    # Check if the method is async
    method_name = method.__name__
    if not inspect.iscoroutinefunction(method):
        raise ValueError(f"The method {method_name} is not async.")

    inputs = [input_map(item) for item in items]
    agent_cfg = copy.deepcopy(agent.config)
    assert agent_cfg.llm is not None, "agent must have llm config"
    agent_cfg.llm.stream = False
    agent_cfg.show_stats = False
    agent_cls = type(agent)
    agent_name = agent_cfg.name

    async def _do_task(input: str | ChatDocument, i: int) -> Any:
        agent_cfg.name = f"{agent_cfg.name}-{i}"
        agent_i = agent_cls(agent_cfg)
        method_i = getattr(agent_i, method_name, None)
        if method_i is None:
            raise ValueError(f"Agent {agent_name} has no method {method_name}")
        result = await method_i(input)
        return result

    return run_batched_tasks(
        inputs=inputs,
        do_task=_do_task,
        batch_size=batch_size,
        stop_on_first_result=stop_on_first_result,
        sequential=sequential,
        handle_exceptions=handle_exceptions,
        output_map=output_map,
        message_template=f"[bold green]Running {{total}} copies of {agent_name}...",
    )