2 Commits

2 changed files with 74 additions and 2 deletions

View File

@@ -5,7 +5,12 @@ import json
import uuid import uuid
from typing import Any, override from typing import Any, override
from judas_protocol.types import ActionType, Category, ControlAction from judas_protocol.types import (
ActionType,
Category,
ControlAction,
TelemetryAction,
)
class Message: class Message:
@@ -108,6 +113,66 @@ class Message:
ack_required=True, ack_required=True,
) )
class Telemetry:
"""TELEMETRY category message factory methods."""
@staticmethod
def initial(payload: dict[str, Any]) -> Message:
"""Create an INITIAL telemetry message.
Args:
payload (dict[str, Any]): The payload of the message.
Returns:
Message: The created INITIAL telemetry message.
"""
return Message(
id_=None,
category=Category.TELEMETRY,
action=TelemetryAction.INITIAL,
payload=payload,
acknowledged=False,
ack_required=True,
)
@staticmethod
def update(payload: dict[str, Any]) -> Message:
"""Create an UPDATE telemetry message.
Args:
payload (dict[str, Any]): The payload of the message.
Returns:
Message: The created UPDATE telemetry message.
"""
return Message(
id_=None,
category=Category.TELEMETRY,
action=TelemetryAction.UPDATE,
payload=payload,
acknowledged=False,
ack_required=False,
)
@staticmethod
def on_demand(payload: dict[str, Any]) -> Message:
"""Create an ON_DEMAND telemetry message.
Args:
payload (dict[str, Any]): The payload of the message.
Returns:
Message: The created ON_DEMAND telemetry message.
"""
return Message(
id_=None,
category=Category.TELEMETRY,
action=TelemetryAction.ON_DEMAND,
payload=payload,
acknowledged=False,
ack_required=True,
)
def to_dict(self) -> dict[str, Any]: def to_dict(self) -> dict[str, Any]:
"""Convert the message to a dictionary. """Convert the message to a dictionary.

View File

@@ -5,14 +5,21 @@ from __future__ import annotations
from enum import Enum from enum import Enum
type ActionType = ControlAction type ActionType = ControlAction | TelemetryAction
class Category(str, Enum): class Category(str, Enum):
CONTROL = "control" CONTROL = "control"
TELEMETRY = "telemetry"
class ControlAction(str, Enum): class ControlAction(str, Enum):
HELLO = "hello" HELLO = "hello"
ACK = "ack" ACK = "ack"
CLOSE = "close" CLOSE = "close"
class TelemetryAction(str, Enum):
INITIAL = "initial"
UPDATE = "update"
ON_DEMAND = "on_demand"