Bases: IOMessageVisitor
 Initialize the MesopGUIMessageVisitor object.
    | PARAMETER |  DESCRIPTION |  
    level |    The level of the message.      TYPE: int     |  
  conversation_id |    The ID of the conversation.      TYPE: str     |  
  conversation_message |    Conversation message that wraps the visited io_message      TYPE: ConversationMessage     |  
  styles |       TYPE: MesopHomePageStyles     |  
  read_only |    Input messages are disabled in read only mode      TYPE: bool   DEFAULT: False     |  
  
  Source code in fastagency/ui/mesop/message.py
  | def __init__(
    self,
    level: int,
    conversation_id: str,
    conversation_message: ConversationMessage,
    styles: MesopHomePageStyles,
    read_only: bool = False,
) -> None:
    """Initialize the MesopGUIMessageVisitor object.
    Args:
        level (int): The level of the message.
        conversation_id (str): The ID of the conversation.
        conversation_message (ConversationMessage): Conversation message that wraps the visited io_message
        styles (MesopHomePageStyles): Styles for the message
        read_only (bool): Input messages are disabled in read only mode
    """
    self._level = level
    self._conversation_id = conversation_id
    self._readonly = read_only
    self._conversation_message = conversation_message
    self._styles = styles
  | 
        process_message 
    Source code in fastagency/ui/mesop/message.py
  | def process_message(self, message: IOMessage) -> Optional[str]:
    try:
        return self.visit(message)
    except Exception as e:
        logger.warning(f"Failed to render message: {e}")
        self.render_error_message(e, message)
        return None
  | 
           render_error_message 
    Source code in fastagency/ui/mesop/message.py
  | def render_error_message(
    self,
    e: Exception,
    message: IOMessage,
    *,
    content: Optional[str] = None,
    style: Optional[MesopMessageStyles] = None,
) -> None:
    style = self._styles.message.error or self._styles.message.default
    title = "[Error] " + message.type.replace("_", " ").capitalize()
    with me.box(style=style.box or self._styles.message.default.box):
        self._header(
            message,
            title=title,
            box_style=style.header_box or self._styles.message.default.header_box,
            md_style=style.header_md or self._styles.message.default.header_md,
        )
        content = (
            "Failed to render message:"
            + json.dumps(message.model_dump(), indent=2)
            + f"<br>Error: {e}"
        )
        logger.warning(f"render_error_message: {content=}")
        logger.warning(e, exc_info=True)
        # me.markdown(content, style=style.md or self._styles.message.default.md)
        self._render_content(content, style.md or self._styles.message.default.md)
  | 
           visit 
    Source code in fastagency/base.py
  | def visit(self, message: IOMessage) -> Optional[str]:
    method_name = f"visit_{message.type}"
    method = getattr(self, method_name, self.visit_default)
    return method(message)
  | 
           visit_default 
    Source code in fastagency/ui/mesop/message.py
  | def visit_default(
    self,
    message: IOMessage,
    *,
    content: Optional[str] = None,
    style: Optional[MesopMessageStyles] = None,
    error: Optional[bool] = False,
    inner_callback: Optional[Callable[..., None]] = None,
    scrollable: Optional[bool] = False,
) -> None:
    # logger.info(f"visit_default: {message=}")
    style = style or self._styles.message.default
    title = message.type.replace("_", " ").capitalize()
    title = "[Error] " + title if error else title
    with me.box(style=style.box or self._styles.message.default.box):
        self._header(
            message,
            title=title,
            box_style=style.header_box,
            md_style=style.header_md,
        )
        content = content or json.dumps(message.model_dump()["content"])
        self._render_content(
            content,
            msg_md_style=style.scrollable_md
            or self._styles.message.default.scrollable_md
            if scrollable
            else style.md or self._styles.message.default.md,
        )
        if inner_callback:
            inner_callback()
  | 
           visit_error 
 visit_error(message: Error) -> None
    Source code in fastagency/ui/mesop/message.py
  | def visit_error(self, message: Error) -> None:
    self.visit_default(
        message,
        content=f"### {message.short}\n{message.long}",
        style=self._styles.message.error,
        scrollable=True,
    )
  | 
           visit_function_call_execution 
    Source code in fastagency/ui/mesop/message.py
  |     def visit_function_call_execution(self, message: FunctionCallExecution) -> None:
        content = f"""**function_name**: `{message.function_name}`<br>
**call_id**: `{message.call_id}`<br>
**retval**: {message.retval}"""
        return self.visit_default(
            message,
            content=content,
            style=self._styles.message.function_call_execution,
            scrollable=True,
        )
  | 
           visit_multiple_choice 
    Source code in fastagency/ui/mesop/message.py
  | def visit_multiple_choice(self, message: MultipleChoice) -> str:
    if message.single:
        return self._visit_single_choice(message)
    else:
        return self._visit_many_choices(message)
  | 
           visit_suggested_function_call 
    Source code in fastagency/ui/mesop/message.py
  |     def visit_suggested_function_call(self, message: SuggestedFunctionCall) -> None:
        content = f"""**function_name**: `{message.function_name}`<br>
**call_id**: `{message.call_id}`<br>
**arguments**: {json.dumps(message.arguments)}"""
        self.visit_default(
            message,
            content=content,
            style=self._styles.message.suggested_function_call,
            scrollable=True,
        )
  | 
           visit_system_message 
    Source code in fastagency/ui/mesop/message.py
  |     def visit_system_message(self, message: SystemMessage) -> None:
        content = (
            f"""#### **{message.message['heading']}**
{message.message['body']}
"""
            if "heading" in message.message and "body" in message.message
            else json.dumps(message.message)
        )
        self.visit_default(
            message,
            content=content,
            style=self._styles.message.system,
            scrollable=True,
        )
  | 
           visit_text_input 
    Source code in fastagency/ui/mesop/message.py
  | def visit_text_input(self, message: TextInput) -> str:
    def on_input(feedback: str) -> Iterator[None]:
        self._conversation_message.feedback = [feedback]
        self._conversation_message.feedback_completed = True
        yield from self._provide_feedback(feedback)
    def value_if_completed() -> Optional[str]:
        message = self._conversation_message
        return message.feedback[0] if message.feedback_completed else None
    # base_color = "#dff"
    prompt = message.prompt if message.prompt else "Please enter a value"
    if message.suggestions:
        suggestions = ",".join(suggestion for suggestion in message.suggestions)
        prompt += "\n Suggestions: " + suggestions
    self.visit_default(
        message,
        content=prompt,
        style=self._styles.message.text_input,
        inner_callback=lambda: input_text(
            on_input,
            key="prompt",
            disabled=self._readonly or self._has_feedback(),
            value=value_if_completed(),
            style=self._styles.message.text_input_inner,
        ),
    )
    return ""
  | 
           visit_text_message 
    Source code in fastagency/ui/mesop/message.py
  | def visit_text_message(self, message: TextMessage) -> None:
    content = message.body if message.body else ""
    content = content if content.strip() != "" else "*(empty message)*"
    self.visit_default(
        message,
        content=content,
        style=self._styles.message.text,
    )
  | 
           visit_workflow_completed 
    Source code in fastagency/base.py
  | def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]:
    return self.visit_default(message)
  |