Bases: IOMessageVisitor
Initialize the MesopGUIMessageVisitor object.
PARAMETER | DESCRIPTION |
level | The level of the message. TYPE: int |
conversation_id | The ID of the conversation. TYPE: str |
conversation_message | Conversation message that wraps the visited io_message TYPE: ConversationMessage |
styles | TYPE: MesopHomePageStyles |
read_only | Input messages are disabled in read only mode TYPE: bool DEFAULT: False |
Source code in fastagency/ui/mesop/message.py
| def __init__(
self,
level: int,
conversation_id: str,
conversation_message: ConversationMessage,
styles: MesopHomePageStyles,
read_only: bool = False,
) -> None:
"""Initialize the MesopGUIMessageVisitor object.
Args:
level (int): The level of the message.
conversation_id (str): The ID of the conversation.
conversation_message (ConversationMessage): Conversation message that wraps the visited io_message
styles (MesopHomePageStyles): Styles for the message
read_only (bool): Input messages are disabled in read only mode
"""
self._level = level
self._conversation_id = conversation_id
self._readonly = read_only
self._conversation_message = conversation_message
self._styles = styles
|
process_message
Source code in fastagency/ui/mesop/message.py
| def process_message(self, message: IOMessage) -> Optional[str]:
try:
return self.visit(message)
except Exception as e:
logger.warning(f"Failed to render message: {e}")
self.render_error_message(e, message)
return None
|
render_error_message
Source code in fastagency/ui/mesop/message.py
| def render_error_message(
self,
e: Exception,
message: IOMessage,
*,
content: Optional[str] = None,
style: Optional[MesopMessageStyles] = None,
) -> None:
style = self._styles.message.error or self._styles.message.default
title = "[Error] " + message.type.replace("_", " ").capitalize()
with me.box(style=style.box or self._styles.message.default.box):
self._header(
message,
title=title,
box_style=style.header_box or self._styles.message.default.header_box,
md_style=style.header_md or self._styles.message.default.header_md,
)
content = (
"Failed to render message:"
+ json.dumps(message.model_dump(), indent=2)
+ f"<br>Error: {e}"
)
logger.warning(f"render_error_message: {content=}")
logger.warning(e, exc_info=True)
# me.markdown(content, style=style.md or self._styles.message.default.md)
self._render_content(content, style.md or self._styles.message.default.md)
|
visit
Source code in fastagency/base.py
| def visit(self, message: IOMessage) -> Optional[str]:
method_name = f"visit_{message.type}"
method = getattr(self, method_name, self.visit_default)
return method(message)
|
visit_default
Source code in fastagency/ui/mesop/message.py
| def visit_default(
self,
message: IOMessage,
*,
content: Optional[str] = None,
style: Optional[MesopMessageStyles] = None,
error: Optional[bool] = False,
inner_callback: Optional[Callable[..., None]] = None,
scrollable: Optional[bool] = False,
) -> None:
# logger.info(f"visit_default: {message=}")
style = style or self._styles.message.default
title = message.type.replace("_", " ").capitalize()
title = "[Error] " + title if error else title
with me.box(style=style.box or self._styles.message.default.box):
self._header(
message,
title=title,
box_style=style.header_box,
md_style=style.header_md,
)
content = content or json.dumps(message.model_dump()["content"])
self._render_content(
content,
msg_md_style=style.scrollable_md
or self._styles.message.default.scrollable_md
if scrollable
else style.md or self._styles.message.default.md,
)
if inner_callback:
inner_callback()
|
visit_error
visit_error(message: Error) -> None
Source code in fastagency/ui/mesop/message.py
| def visit_error(self, message: Error) -> None:
self.visit_default(
message,
content=f"### {message.short}\n{message.long}",
style=self._styles.message.error,
scrollable=True,
)
|
visit_function_call_execution
Source code in fastagency/ui/mesop/message.py
| def visit_function_call_execution(self, message: FunctionCallExecution) -> None:
content = f"""**function_name**: `{message.function_name}`<br>
**call_id**: `{message.call_id}`<br>
**retval**: {message.retval}"""
return self.visit_default(
message,
content=content,
style=self._styles.message.function_call_execution,
scrollable=True,
)
|
visit_multiple_choice
Source code in fastagency/ui/mesop/message.py
| def visit_multiple_choice(self, message: MultipleChoice) -> str:
if message.single:
return self._visit_single_choice(message)
else:
return self._visit_many_choices(message)
|
visit_suggested_function_call
Source code in fastagency/ui/mesop/message.py
| def visit_suggested_function_call(self, message: SuggestedFunctionCall) -> None:
content = f"""**function_name**: `{message.function_name}`<br>
**call_id**: `{message.call_id}`<br>
**arguments**: {json.dumps(message.arguments)}"""
self.visit_default(
message,
content=content,
style=self._styles.message.suggested_function_call,
scrollable=True,
)
|
visit_system_message
Source code in fastagency/ui/mesop/message.py
| def visit_system_message(self, message: SystemMessage) -> None:
content = (
f"""#### **{message.message['heading']}**
{message.message['body']}
"""
if "heading" in message.message and "body" in message.message
else json.dumps(message.message)
)
self.visit_default(
message,
content=content,
style=self._styles.message.system,
scrollable=True,
)
|
visit_text_input
Source code in fastagency/ui/mesop/message.py
| def visit_text_input(self, message: TextInput) -> str:
def on_input(feedback: str) -> Iterator[None]:
self._conversation_message.feedback = [feedback]
self._conversation_message.feedback_completed = True
yield from self._provide_feedback(feedback)
def value_if_completed() -> Optional[str]:
message = self._conversation_message
return message.feedback[0] if message.feedback_completed else None
# base_color = "#dff"
prompt = message.prompt if message.prompt else "Please enter a value"
if message.suggestions:
suggestions = ",".join(suggestion for suggestion in message.suggestions)
prompt += "\n Suggestions: " + suggestions
self.visit_default(
message,
content=prompt,
style=self._styles.message.text_input,
inner_callback=lambda: input_text(
on_input,
key="prompt",
disabled=self._readonly or self._has_feedback(),
value=value_if_completed(),
style=self._styles.message.text_input_inner,
),
)
return ""
|
visit_text_message
Source code in fastagency/ui/mesop/message.py
| def visit_text_message(self, message: TextMessage) -> None:
content = message.body if message.body else ""
content = content if content.strip() != "" else "*(empty message)*"
self.visit_default(
message,
content=content,
style=self._styles.message.text,
)
|
visit_workflow_completed
Source code in fastagency/base.py
| def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]:
return self.visit_default(message)
|