Skip to content

MesopUI

fastagency.ui.mesop.base.MesopUI #

MesopUI(super_conversation: Optional[MesopUI] = None)

Bases: IOMessageVisitor

Initialize the console UI object.

PARAMETER DESCRIPTION
super_conversation

The super conversation. Defaults to None.

TYPE: Optional[MesopUI] DEFAULT: None

Source code in fastagency/ui/mesop/base.py
def __init__(self, super_conversation: "Optional[MesopUI]" = None) -> None:
    """Initialize the console UI object.

    Args:
        super_conversation (Optional[MesopUI], optional): The super conversation. Defaults to None.
    """
    self.id: str = uuid4().hex
    self.super_conversation: Optional[MesopUI] = super_conversation
    self.sub_conversations: list[MesopUI] = []
    self._in_queue: Optional[Queue[str]] = None
    self._out_queue: Optional[Queue[MesopMessage]] = None

    if super_conversation is None:
        self._in_queue = Queue()
        self._out_queue = Queue()
    MesopUI.register(self)

app property #

app: Runnable

id instance-attribute #

id: str = hex

in_queue property #

in_queue: Queue[str]

is_root_conversation property #

is_root_conversation: bool

level property #

level: int

out_queue property #

out_queue: Queue[MesopMessage]

root_conversation property #

root_conversation: MesopUI

sub_conversations instance-attribute #

sub_conversations: list[MesopUI] = []

super_conversation instance-attribute #

super_conversation: Optional[MesopUI] = super_conversation

create #

create(app: Runnable, import_string: str) -> Iterator[None]
Source code in fastagency/ui/mesop/base.py
@contextmanager
def create(self, app: Runnable, import_string: str) -> Iterator[None]:
    logger.info(f"Creating MesopUI with import string: {import_string}")
    self._app = app
    self._import_string = import_string

    start_script = """import fastagency.ui.mesop.main"""

    with TemporaryDirectory() as temp_dir:
        main_path = Path(temp_dir) / "main.py"
        with main_path.open("w") as f:
            f.write(start_script)

        MESOP_FLAGS.mark_as_parsed()
        MesopUI._main_path = str(main_path)
        MesopUI._created_instance = self

        yield

create_subconversation #

create_subconversation() -> MesopUI
Source code in fastagency/ui/mesop/base.py
def create_subconversation(self) -> "MesopUI":
    sub_conversation = MesopUI(self)
    self.sub_conversations.append(sub_conversation)

    return sub_conversation

get_conversation classmethod #

get_conversation(id: str) -> MesopUI
Source code in fastagency/ui/mesop/base.py
@classmethod
def get_conversation(cls, id: str) -> "MesopUI":
    return cls._registry[id]

get_created_instance classmethod #

get_created_instance() -> MesopUI
Source code in fastagency/ui/mesop/base.py
@classmethod
def get_created_instance(cls) -> "MesopUI":
    created_instance = cls._created_instance
    if created_instance is None:
        raise RuntimeError("MesopUI has not been created yet.")

    return created_instance

get_message_stream #

get_message_stream() -> Generator[MesopMessage, None, None]
Source code in fastagency/ui/mesop/base.py
def get_message_stream(self) -> Generator[MesopMessage, None, None]:
    while True:
        message = self.out_queue.get()
        if self._is_stream_braker(message.io_message):
            yield message
            break
        yield message

process_message #

process_message(message: IOMessage) -> Optional[str]
Source code in fastagency/ui/mesop/base.py
def process_message(self, message: IOMessage) -> Optional[str]:
    return self.visit(message)

register classmethod #

register(conversation: MesopUI) -> None
Source code in fastagency/ui/mesop/base.py
@classmethod
def register(cls, conversation: "MesopUI") -> None:
    cls._registry[conversation.id] = conversation

respond #

respond(message: str) -> None
Source code in fastagency/ui/mesop/base.py
def respond(self, message: str) -> None:
    self.in_queue.put(message)

respond_to classmethod #

respond_to(
    conversation_id: str, message: str
) -> Generator[MesopMessage, None, None]
Source code in fastagency/ui/mesop/base.py
@classmethod
def respond_to(
    cls, conversation_id: str, message: str
) -> Generator[MesopMessage, None, None]:
    conversation = cls.get_conversation(conversation_id)
    conversation.respond(message)
    return conversation.get_message_stream()

start #

start(
    app: Runnable,
    import_string: str,
    name: Optional[str] = None,
    initial_message: Optional[str] = None,
) -> None
Source code in fastagency/ui/mesop/base.py
def start(
    self,
    app: Runnable,
    import_string: str,
    name: Optional[str] = None,
    initial_message: Optional[str] = None,
) -> None:
    logger.info(
        f"Starting MesopUI: import_string={self._import_string}, main_path={self._main_path}"
    )

    MesopUI._app = app

    mesop_main(["mesop", self._main_path])

unregister classmethod #

unregister(conversation: MesopUI) -> None
Source code in fastagency/ui/mesop/base.py
@classmethod
def unregister(cls, conversation: "MesopUI") -> None:
    del cls._registry[conversation.id]

visit #

visit(message: IOMessage) -> Optional[str]
Source code in fastagency/base.py
def visit(self, message: IOMessage) -> Optional[str]:
    method_name = f"visit_{message.type}"
    method = getattr(self, method_name, self.visit_default)
    return method(message)

visit_default #

visit_default(message: IOMessage) -> None
Source code in fastagency/ui/mesop/base.py
def visit_default(self, message: IOMessage) -> None:
    mesop_msg = self._mesop_message(message)
    self._publish(mesop_msg)

visit_function_call_execution #

visit_function_call_execution(
    message: FunctionCallExecution,
) -> Optional[str]
Source code in fastagency/base.py
def visit_function_call_execution(
    self, message: FunctionCallExecution
) -> Optional[str]:
    return self.visit_default(message)

visit_multiple_choice #

visit_multiple_choice(message: MultipleChoice) -> str
Source code in fastagency/ui/mesop/base.py
def visit_multiple_choice(self, message: MultipleChoice) -> str:
    mesop_msg = self._mesop_message(message)
    self._publish(mesop_msg)
    return self.in_queue.get()

visit_suggested_function_call #

visit_suggested_function_call(
    message: SuggestedFunctionCall,
) -> Optional[str]
Source code in fastagency/base.py
def visit_suggested_function_call(
    self, message: SuggestedFunctionCall
) -> Optional[str]:
    return self.visit_default(message)

visit_system_message #

visit_system_message(
    message: SystemMessage,
) -> Optional[str]
Source code in fastagency/base.py
def visit_system_message(self, message: SystemMessage) -> Optional[str]:
    return self.visit_default(message)

visit_text_input #

visit_text_input(message: TextInput) -> str
Source code in fastagency/ui/mesop/base.py
def visit_text_input(self, message: TextInput) -> str:
    mesop_msg = self._mesop_message(message)
    self._publish(mesop_msg)
    return self.in_queue.get()

visit_text_message #

visit_text_message(message: TextMessage) -> None
Source code in fastagency/ui/mesop/base.py
def visit_text_message(self, message: TextMessage) -> None:
    mesop_msg = self._mesop_message(message)
    self._publish(mesop_msg)

visit_workflow_completed #

visit_workflow_completed(
    message: WorkflowCompleted,
) -> Optional[str]
Source code in fastagency/base.py
def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]:
    return self.visit_default(message)