Skip to content

Workflow

fastagency.runtimes.ag2.Workflow #

Workflow()

Bases: WorkflowsProtocol

Initialize the workflows.

Source code in fastagency/runtimes/ag2/ag2.py
def __init__(self) -> None:
    """Initialize the workflows."""
    self._workflows: dict[str, tuple[Callable[[UI, dict[str, Any]], str], str]] = {}

names property #

names: list[str]

get_description #

get_description(name: str) -> str
Source code in fastagency/runtimes/ag2/ag2.py
def get_description(self, name: str) -> str:
    _, description = self._workflows.get(name, (None, "Description not available!"))
    return description

register #

register(
    name: str,
    description: str,
    *,
    fail_on_redefintion: bool = False
) -> Callable[[WorkflowTypeVar], WorkflowTypeVar]
Source code in fastagency/runtimes/ag2/ag2.py
def register(
    self, name: str, description: str, *, fail_on_redefintion: bool = False
) -> Callable[[WorkflowTypeVar], WorkflowTypeVar]:
    def decorator(func: WorkflowTypeVar) -> WorkflowTypeVar:
        check_register_decorator(func)
        if name in self._workflows:
            if fail_on_redefintion:
                raise ValueError(f"A workflow with name '{name}' already exists.")
            else:
                logger.warning(f"Overwriting workflow with name '{name}'")

        self._workflows[name] = func, description
        return func

    return decorator

register_api #

register_api(
    api: OpenAPI,
    callers: Union[
        ConversableAgent, Iterable[ConversableAgent]
    ],
    executors: Union[
        ConversableAgent, Iterable[ConversableAgent]
    ],
    functions: Optional[
        Union[
            str,
            Iterable[
                Union[str, Mapping[str, Mapping[str, str]]]
            ],
        ]
    ] = None,
) -> None
Source code in fastagency/runtimes/ag2/ag2.py
def register_api(
    self,
    api: "OpenAPI",
    callers: Union[ConversableAgent, Iterable[ConversableAgent]],
    executors: Union[ConversableAgent, Iterable[ConversableAgent]],
    functions: Optional[
        Union[str, Iterable[Union[str, Mapping[str, Mapping[str, str]]]]]
    ] = None,
) -> None:
    if not isinstance(callers, Iterable):
        callers = [callers]
    if not isinstance(executors, Iterable):
        executors = [executors]
    if isinstance(functions, str):
        functions = [functions]

    for caller in callers:
        api._register_for_llm(caller, functions=functions)

    for executor in executors:
        api._register_for_execution(executor, functions=functions)

run #

run(
    name: str,
    ui: UI,
    user_id: Optional[str] = None,
    **kwargs: Any
) -> str
Source code in fastagency/runtimes/ag2/ag2.py
def run(
    self,
    name: str,
    ui: UI,
    user_id: Optional[str] = None,
    **kwargs: Any,
) -> str:
    workflow, _ = self._workflows[name]

    # todo: inject user_id into call (and other stuff)
    try:
        ui.workflow_started(
            sender="Workflow",
            recipient="User",
            name=name,
            description=self.get_description(name),
            params=kwargs,
        )
        retval = (
            asyncio.run(workflow(ui, kwargs))
            if asyncio.iscoroutinefunction(workflow)
            else workflow(ui, kwargs)
        )

    except Exception as e:
        logger.error(
            f"Unhandled exception occurred when executing the workflow: {e}",
            exc_info=True,
        )
        ui.error(
            sender="Workflow",
            recipient="User",
            short="Unhandled exception occurred when executing the workflow.",
            long=str(e),
        )
        retval = f"Unhandled exception occurred when executing the workflow: {e}"

    ui.workflow_completed(
        sender="Workflow",
        recipient="User",
        result=retval,
    )
    logger.info(f"Workflow '{name}' completed with result: {retval}")

    return retval