Skip to content

MesopUI

fastagency.ui.mesop.MesopUI #

MesopUI(
    super_conversation: Optional[MesopUI] = None,
    *,
    security_policy: Optional[SecurityPolicy] = None,
    styles: Optional[MesopHomePageStyles] = None,
    keep_alive: Optional[bool] = False,
    auth: Optional[AuthProtocol] = None
)

Bases: MessageProcessorMixin, CreateWorkflowUIMixin

Initialize the console UI object.

PARAMETER DESCRIPTION
super_conversation

The super conversation. Defaults to None.

TYPE: Optional[MesopUI] DEFAULT: None

security_policy

The security policy. Defaults to None.

TYPE: Optional[SecurityPolicy] DEFAULT: None

styles

The styles. Defaults to None.

TYPE: Optional[MesopHomePageStyles] DEFAULT: None

keep_alive

If keep alive messages should be inserted, defaults to False`

TYPE: Optional[bool] DEFAULT: False

auth

The auth settings to use. Defaults to None.

TYPE: Optional[AuthProtocol] DEFAULT: None

Source code in fastagency/ui/mesop/mesop.py
def __init__(
    self,
    super_conversation: "Optional[MesopUI]" = None,
    *,
    security_policy: Optional[me.SecurityPolicy] = None,
    styles: Optional[MesopHomePageStyles] = None,
    keep_alive: Optional[bool] = False,
    auth: Optional[AuthProtocol] = None,
) -> None:
    """Initialize the console UI object.

    Args:
        super_conversation (Optional[MesopUI], optional): The super conversation. Defaults to None.
        security_policy (Optional[me.SecurityPolicy], optional): The security policy. Defaults to None.
        styles (Optional[MesopHomePageStyles], optional): The styles. Defaults to None.
        keep_alive (Optional[bool]): If keep alive messages should be inserted, defaults to False`
        auth (Optional[AuthProtocol]): The auth settings to use. Defaults to None.
    """
    logger.info(f"Initializing MesopUI: {self}")
    try:
        self.id: str = uuid4().hex
        self.super_conversation: Optional[MesopUI] = super_conversation
        self.sub_conversations: list[MesopUI] = []
        self._in_queue: Optional[Queue[str]] = None
        self._out_queue: Optional[Queue[MesopMessage]] = None

        self._keep_me_alive = keep_alive
        self._keep_alive_thread: Optional[threading.Thread] = None
        if super_conversation is None:
            self._in_queue = Queue()
            self._out_queue = Queue()
            self.keep_me_alive()

        MesopUI.register(self)

        if MesopUI._me is None:
            from .main import create_home_page, me

            create_home_page(
                self, security_policy=security_policy, styles=styles, auth=auth
            )
            MesopUI._me = me

    except Exception as e:
        logger.error(e, exc_info=True)
        raise
    logger.info(f"Initialized MesopUI: {self}")

app property #

app: Runnable

id instance-attribute #

id: str = hex

in_queue property #

in_queue: Queue[str]

is_root_conversation property #

is_root_conversation: bool

level property #

level: int

out_queue property #

out_queue: Queue[MesopMessage]

root_conversation property #

root_conversation: MesopUI

sub_conversations instance-attribute #

sub_conversations: list[MesopUI] = []

super_conversation instance-attribute #

super_conversation: Optional[MesopUI] = super_conversation

create #

create(app: Runnable, import_string: str) -> Iterator[None]
Source code in fastagency/ui/mesop/mesop.py
@contextmanager
def create(self, app: Runnable, import_string: str) -> Iterator[None]:
    logger.info(f"Creating MesopUI with import string: {import_string}")
    MesopUI._app = app
    MesopUI._import_string = import_string

    start_script = """import fastagency.ui.mesop.main"""

    with TemporaryDirectory() as temp_dir:
        main_path = Path(temp_dir) / "main.py"
        with main_path.open("w") as f:
            f.write(start_script)

        MESOP_FLAGS.mark_as_parsed()
        MesopUI._main_path = str(main_path)
        MesopUI._created_instance = self

        yield

create_subconversation #

create_subconversation() -> MesopUI
Source code in fastagency/ui/mesop/mesop.py
def create_subconversation(self) -> "MesopUI":
    sub_conversation = MesopUI(self)
    self.sub_conversations.append(sub_conversation)

    return sub_conversation

create_workflow_ui #

create_workflow_ui(workflow_uuid: str) -> UI
Source code in fastagency/base.py
def create_workflow_ui(self: UIBase, workflow_uuid: str) -> "UI":
    return UI(uibase=self, workflow_uuid=workflow_uuid)

do_not_keep_me_alive #

do_not_keep_me_alive() -> None
Source code in fastagency/ui/mesop/mesop.py
def do_not_keep_me_alive(self) -> None:
    self._keep_me_alive = False

error #

error(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def error(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        Error(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            short=short,
            long=long,
        )
    )

function_call_execution #

function_call_execution(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]
Source code in fastagency/messages.py
def function_call_execution(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        FunctionCallExecution(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            function_name=function_name,
            call_id=call_id,
            retval=retval,
        )
    )

get_conversation classmethod #

get_conversation(id: str) -> MesopUI
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def get_conversation(cls, id: str) -> "MesopUI":
    return cls._registry[id]

get_created_instance classmethod #

get_created_instance() -> MesopUI
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def get_created_instance(cls) -> "MesopUI":
    created_instance = cls._created_instance
    if created_instance is None:
        raise RuntimeError("MesopUI has not been created yet.")

    return created_instance

get_message_stream #

get_message_stream() -> Generator[MesopMessage, None, None]
Source code in fastagency/ui/mesop/mesop.py
def get_message_stream(self) -> Generator[MesopMessage, None, None]:
    while True:
        message = self.out_queue.get()
        if self._is_stream_braker(message.io_message):
            yield message
            break
        yield message

handle_wsgi #

handle_wsgi(
    app: Runnable,
    environ: dict[str, Any],
    start_response: Callable[..., Any],
) -> list[bytes]
Source code in fastagency/ui/mesop/mesop.py
def handle_wsgi(
    self,
    app: "Runnable",
    environ: dict[str, Any],
    start_response: Callable[..., Any],
) -> list[bytes]:
    logger.debug(f"Starting MesopUI using WSGI interface with app: {app}")
    MesopUI._created_instance = self
    MesopUI._app = app

    if configure_static_file_serving is None:  # pragme: no cover
        logger.error("configure_static_file_serving is None")

    if MesopUI._me is None:
        logger.error("MesopUI._me is None")
        raise RuntimeError("MesopUI._me is None")

    return MesopUI._me(environ, start_response)  # type: ignore[no-any-return]

keep_alive #

keep_alive(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def keep_alive(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        KeepAlive(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
        )
    )

keep_me_alive #

keep_me_alive() -> None
Source code in fastagency/ui/mesop/mesop.py
def keep_me_alive(self) -> None:
    def keep_alive_worker() -> None:
        while self._keep_me_alive:
            time.sleep(3)
            if self._out_queue:
                # todo: do something more elegant
                msg = KeepAlive(workflow_uuid="")
                mesop_msg = self._mesop_message(msg)
                logger.debug(f"putting keepalive {msg.uuid}")
                self._out_queue.put(mesop_msg)

    if self._keep_me_alive and self._keep_alive_thread is None:
        self._keep_alive_thread = threading.Thread(target=keep_alive_worker)
        self._keep_alive_thread.start()

multiple_choice #

multiple_choice(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]
Source code in fastagency/messages.py
def multiple_choice(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    choices = choices or []
    return self.process_message(
        MultipleChoice(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            prompt=prompt,
            choices=choices,
            default=default,
            single=single,
        )
    )

process_message #

process_message(message: IOMessage) -> Optional[str]
Source code in fastagency/ui/mesop/mesop.py
def process_message(self, message: IOMessage) -> Optional[str]:
    return self.visit(message)

register classmethod #

register(conversation: MesopUI) -> None
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def register(cls, conversation: "MesopUI") -> None:
    cls._registry[conversation.id] = conversation

respond #

respond(message: str) -> None
Source code in fastagency/ui/mesop/mesop.py
def respond(self, message: str) -> None:
    self.in_queue.put(message)

respond_to classmethod #

respond_to(
    conversation_id: str, message: str
) -> Generator[MesopMessage, None, None]
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def respond_to(
    cls, conversation_id: str, message: str
) -> Generator[MesopMessage, None, None]:
    conversation = cls.get_conversation(conversation_id)
    conversation.respond(message)
    return conversation.get_message_stream()

start #

start(
    *,
    app: Runnable,
    import_string: str,
    name: Optional[str] = None,
    params: dict[str, Any],
    single_run: bool = False
) -> None
Source code in fastagency/ui/mesop/mesop.py
def start(
    self,
    *,
    app: Runnable,
    import_string: str,
    name: Optional[str] = None,
    params: dict[str, Any],
    single_run: bool = False,
) -> None:
    logger.info(
        f"Starting MesopUI: import_string={self._import_string}, main_path={self._main_path}"
    )
    if single_run:
        logger.warning("single_run parameter is currently not supported in MesopUI")

    MesopUI._app = app

    mesop_main(["mesop", self._main_path])

suggested_function_call #

suggested_function_call(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def suggested_function_call(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    arguments = arguments or {}
    return self.process_message(
        SuggestedFunctionCall(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            function_name=function_name,
            call_id=call_id,
            arguments=arguments,
        )
    )

system_message #

system_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def system_message(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    message = message or {}
    return self.process_message(
        SystemMessage(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            message=message,
        )
    )

text_input #

text_input(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]
Source code in fastagency/messages.py
def text_input(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    suggestions = suggestions or []
    return self.process_message(
        TextInput(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            prompt=prompt,
            suggestions=suggestions,
            password=password,
        )
    )

text_message #

text_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def text_message(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        TextMessage(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            body=body,
        )
    )

unregister classmethod #

unregister(conversation: MesopUI) -> None
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def unregister(cls, conversation: "MesopUI") -> None:
    del cls._registry[conversation.id]

visit #

visit(message: IOMessage) -> Optional[str]
Source code in fastagency/messages.py
def visit(self, message: IOMessage) -> Optional[str]:
    method_name = f"visit_{message.type}"
    method = getattr(self, method_name, self.visit_default)
    return method(message)

visit_default #

visit_default(message: IOMessage) -> None
Source code in fastagency/ui/mesop/mesop.py
def visit_default(self, message: IOMessage) -> None:
    mesop_msg = self._mesop_message(message)
    self._publish(mesop_msg)

visit_error #

visit_error(message: Error) -> Optional[str]
Source code in fastagency/messages.py
def visit_error(self, message: Error) -> Optional[str]:
    return self.visit_default(message)

visit_function_call_execution #

visit_function_call_execution(
    message: FunctionCallExecution,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_function_call_execution(
    self, message: FunctionCallExecution
) -> Optional[str]:
    return self.visit_default(message)

visit_keep_alive #

visit_keep_alive(message: KeepAlive) -> Optional[str]
Source code in fastagency/messages.py
def visit_keep_alive(self, message: KeepAlive) -> Optional[str]:
    return self.visit_default(message)

visit_multiple_choice #

visit_multiple_choice(message: MultipleChoice) -> str
Source code in fastagency/ui/mesop/mesop.py
def visit_multiple_choice(self, message: MultipleChoice) -> str:
    mesop_msg = self._mesop_message(message)
    self._publish(mesop_msg)
    return self.in_queue.get()

visit_suggested_function_call #

visit_suggested_function_call(
    message: SuggestedFunctionCall,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_suggested_function_call(
    self, message: SuggestedFunctionCall
) -> Optional[str]:
    return self.visit_default(message)

visit_system_message #

visit_system_message(
    message: SystemMessage,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_system_message(self, message: SystemMessage) -> Optional[str]:
    return self.visit_default(message)

visit_text_input #

visit_text_input(message: TextInput) -> str
Source code in fastagency/ui/mesop/mesop.py
def visit_text_input(self, message: TextInput) -> str:
    mesop_msg = self._mesop_message(message)
    self._publish(mesop_msg)
    return self.in_queue.get()

visit_text_message #

visit_text_message(message: TextMessage) -> None
Source code in fastagency/ui/mesop/mesop.py
def visit_text_message(self, message: TextMessage) -> None:
    mesop_msg = self._mesop_message(message)
    self._publish(mesop_msg)

visit_workflow_completed #

visit_workflow_completed(
    message: WorkflowCompleted,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]:
    return self.visit_default(message)

visit_workflow_started #

visit_workflow_started(
    message: WorkflowStarted,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_workflow_started(self, message: WorkflowStarted) -> Optional[str]:
    return self.visit_default(message)

workflow_completed #

workflow_completed(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_completed(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        WorkflowCompleted(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            result=result,
        )
    )

workflow_started #

workflow_started(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_started(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    params = params or {}
    return self.process_message(
        WorkflowStarted(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            name=name,
            description=description,
            params=params,
        )
    )