generated from pufereq/python-template
feat(message.py): add Message.Telemetry factory class
This commit is contained in:
@@ -5,7 +5,12 @@ import json
|
||||
import uuid
|
||||
from typing import Any, override
|
||||
|
||||
from judas_protocol.types import ActionType, Category, ControlAction
|
||||
from judas_protocol.types import (
|
||||
ActionType,
|
||||
Category,
|
||||
ControlAction,
|
||||
TelemetryAction,
|
||||
)
|
||||
|
||||
|
||||
class Message:
|
||||
@@ -108,6 +113,66 @@ class Message:
|
||||
ack_required=True,
|
||||
)
|
||||
|
||||
class Telemetry:
|
||||
"""TELEMETRY category message factory methods."""
|
||||
|
||||
@staticmethod
|
||||
def initial(payload: dict[str, Any]) -> Message:
|
||||
"""Create an INITIAL telemetry message.
|
||||
|
||||
Args:
|
||||
payload (dict[str, Any]): The payload of the message.
|
||||
|
||||
Returns:
|
||||
Message: The created INITIAL telemetry message.
|
||||
"""
|
||||
return Message(
|
||||
id_=None,
|
||||
category=Category.TELEMETRY,
|
||||
action=TelemetryAction.INITIAL,
|
||||
payload=payload,
|
||||
acknowledged=False,
|
||||
ack_required=True,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def update(payload: dict[str, Any]) -> Message:
|
||||
"""Create an UPDATE telemetry message.
|
||||
|
||||
Args:
|
||||
payload (dict[str, Any]): The payload of the message.
|
||||
|
||||
Returns:
|
||||
Message: The created UPDATE telemetry message.
|
||||
"""
|
||||
return Message(
|
||||
id_=None,
|
||||
category=Category.TELEMETRY,
|
||||
action=TelemetryAction.UPDATE,
|
||||
payload=payload,
|
||||
acknowledged=False,
|
||||
ack_required=False,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def on_demand(payload: dict[str, Any]) -> Message:
|
||||
"""Create an ON_DEMAND telemetry message.
|
||||
|
||||
Args:
|
||||
payload (dict[str, Any]): The payload of the message.
|
||||
|
||||
Returns:
|
||||
Message: The created ON_DEMAND telemetry message.
|
||||
"""
|
||||
return Message(
|
||||
id_=None,
|
||||
category=Category.TELEMETRY,
|
||||
action=TelemetryAction.ON_DEMAND,
|
||||
payload=payload,
|
||||
acknowledged=False,
|
||||
ack_required=True,
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert the message to a dictionary.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user