Skip to content

IOMessage

fastagency.IOMessage dataclass #

IOMessage(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: str = lambda: str(hex)(),
)

Bases: ABC

auto_reply class-attribute instance-attribute #

auto_reply: bool = False

recipient class-attribute instance-attribute #

recipient: Optional[str] = None

sender class-attribute instance-attribute #

sender: Optional[str] = None

type property #

type: MessageType

uuid class-attribute instance-attribute #

uuid: str = field(default_factory=lambda: str(hex))

workflow_uuid instance-attribute #

workflow_uuid: str

create staticmethod #

create(
    type: Optional[MessageType] = None, **kwargs: Any
) -> IOMessage
Source code in fastagency/messages.py
@staticmethod
def create(type: Optional[MessageType] = None, **kwargs: Any) -> "IOMessage":
    cls = IOMessage._get_message_class(type)

    content = kwargs.pop("content", {})
    kwargs.update(content)

    return cls(**kwargs)

model_dump #

model_dump() -> dict[str, Any]
Source code in fastagency/messages.py
def model_dump(self) -> dict[str, Any]:
    params_names = IOMessage._get_parameters_names()
    d = asdict(self)
    content = {k: v for k, v in d.items() if k not in params_names}
    retval = {k: v for k, v in d.items() if k in params_names}
    retval["content"] = content
    retval["type"] = self.type
    return retval