Skip to content

IOStreamAdapter

fastagency.runtime.autogen.IOStreamAdapter #

IOStreamAdapter(ui: UI)

Initialize the adapter with a ChatableIO object.

PARAMETER DESCRIPTION
ui

The ChatableIO object to adapt

TYPE: ChatableIO

Source code in fastagency/runtime/autogen/base.py
def __init__(self, ui: UI) -> None:
    """Initialize the adapter with a ChatableIO object.

    Args:
        ui (ChatableIO): The ChatableIO object to adapt

    """
    self.ui = ui
    self.current_message = CurrentMessage()

    self.messages: list[IOMessage] = []
    if not isinstance(self.ui, UI):
        raise ValueError("The ui object must be an instance of UI.")

current_message instance-attribute #

current_message = CurrentMessage()

messages instance-attribute #

messages: list[IOMessage] = []

ui instance-attribute #

ui = ui

input #

input(prompt: str = '', *, password: bool = False) -> str
Source code in fastagency/runtime/autogen/base.py
def input(self, prompt: str = "", *, password: bool = False) -> str:
    # logger.info(f"input(): {prompt=}, {password=}")
    message: AskingMessage = self.current_message.process_input(
        prompt, password, self.messages
    )

    retval: str = self.ui.process_message(message)  # type: ignore[assignment]

    # in case of approving a suggested function call, we need to return an empty string to AutoGen
    if (
        message.type == "multiple_choice"
        and self.messages[-1].type == "suggested_function_call"
        and retval == "Approve"
    ):
        retval = ""
    if retval == "Exit":
        retval = "exit"

    # logger.info(f"input(): {retval=}")
    return retval

print #

print(
    *objects: Any,
    sep: str = " ",
    end: str = "\n",
    flush: bool = False
) -> None
Source code in fastagency/runtime/autogen/base.py
def print(
    self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False
) -> None:
    # logger.info(f"print(): {objects=}, {sep=}, {end=}, {flush=}")
    body = sep.join(map(str, objects)) + end
    ready_to_send = self._process_message_chunk(body)
    if ready_to_send:
        message = self.messages[-1]
        self.ui.process_message(message)