17 Commits

Author SHA1 Message Date
e308a07dab fix(backend_server.py): call _initialize_handlers() on init 2026-03-03 20:38:55 +01:00
dafe418916 feat(backend_server.py): add warning if received an unknown message (no handler) 2026-03-03 20:18:24 +01:00
c64a258243 build(uv.lock): update judas_protocol to 0.8.0 2026-03-03 19:34:32 +01:00
ead2224066 fix(backend_server.py): do not disconnect a client if Exception raised on msg handling 2026-03-03 18:56:34 +01:00
ee381414a9 chore(backend_server.py): remove redundant HELLO msg handling 2026-03-03 18:55:45 +01:00
0ed478a88e feat(backend_server.py): implement message handling 2026-03-03 18:55:24 +01:00
6446fe883c fix(backend_server.py): check if client to disconnect has an open socket 2026-03-03 18:54:45 +01:00
ec58a5257a chore(handler/__init__.py): add module init 2026-03-03 18:53:05 +01:00
c952413d91 feat(hello_handler.py): add HELLO message handler 2026-03-03 18:52:39 +01:00
882c8780e1 feat(base_handler.py): add BaseHandler class for message handling 2026-03-03 18:52:13 +01:00
d3f68d3baf chore(backend/__init__.py): add Client and ClientStatus to __all__ 2026-03-03 17:46:51 +01:00
62acc4b181 style(client.py): correct property typing 2026-03-03 17:46:06 +01:00
faecc38261 feat(client_status.py): move ClientStatus enum to own module 2026-03-03 17:45:01 +01:00
3eb681e233 refactor(backend_server.py): move loading known clients to its own method 2026-03-03 17:43:19 +01:00
bda10a6248 chore(cache/): add cache/ directory 2026-03-03 17:41:48 +01:00
fa2da207a9 refactor(backend_server.py): refactor calls to Message class constructors after protocol changes 2026-03-01 20:17:05 +01:00
f41a7774ec build(uv.lock): update judas_protocol to 0.7.0 2026-03-01 20:14:03 +01:00
9 changed files with 246 additions and 89 deletions

0
cache/.gitkeep vendored Normal file
View File

View File

@@ -1,3 +1,5 @@
from .backend_server import BackendServer from .backend_server import BackendServer
from .client import Client
from .client_status import ClientStatus
__all__ = ["BackendServer"] __all__ = ["BackendServer", "Client", "ClientStatus"]

View File

@@ -6,13 +6,18 @@ import selectors
import socket import socket
import threading import threading
import time import time
from typing import TYPE_CHECKING, Any
import yaml import yaml
from typing import Any
from judas_protocol import Category, ControlAction, Message from judas_protocol import Category, ControlAction, Message
from judas_server.backend.client import Client, ClientStatus from judas_server.backend.client import Client, ClientStatus
from judas_server.backend.handler.hello_handler import HelloHandler
if TYPE_CHECKING:
from typing import Callable
from judas_protocol import ActionType
class BackendServer: class BackendServer:
@@ -28,27 +33,6 @@ class BackendServer:
) )
self.logger.debug("Initializing Server...") self.logger.debug("Initializing Server...")
self.known_clients: dict[str, dict[str, str | float]] = {}
try:
with open("cache/known_clients.yaml", "r") as f:
self.known_clients = (
yaml.safe_load(f).get("known_clients", {}) or {}
)
self.logger.debug(
f"Loaded known clients: {self.known_clients}"
)
self.logger.info(
f"Loaded {len(self.known_clients)} known clients"
)
except FileNotFoundError:
self.logger.warning(
"known_clients.yaml not found, creating empty known clients list"
)
with open("cache/known_clients.yaml", "w") as f:
yaml.safe_dump({"known_clients": {}}, f)
except Exception as e:
self.logger.error(f"Error loading known clients: {e}")
self.selector = selectors.DefaultSelector() self.selector = selectors.DefaultSelector()
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt( self.server_socket.setsockopt(
@@ -62,17 +46,75 @@ class BackendServer:
) )
self.clients: dict[str, Client] = {} self.clients: dict[str, Client] = {}
self.known_clients: dict[str, dict[str, str | float]] = (
self._load_known_clients()
)
if self.known_clients: self.message_handlers: dict[
for client_id in self.known_clients: tuple[Category, ActionType], Callable[[Client, Message], None]
] = {}
self._initialize_handlers()
self.running: bool = False
def _initialize_handlers(self) -> None:
"""Initialize message handlers."""
hello_handler = HelloHandler(self)
self.message_handlers[(Category.CONTROL, ControlAction.HELLO)] = (
hello_handler.handle
)
def _load_known_clients(self) -> dict[str, dict[str, str | float]]:
"""Load the list of known clients from a YAML file and validate."""
known_clients: dict[str, dict[str, str | float]] = {}
try:
with open("cache/known_clients.yaml", "r") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict):
raise ValueError("YAML root must be a dict")
known_clients = data.get("known_clients", {}) or {}
if not isinstance(known_clients, dict):
raise ValueError("'known_clients' must be a dict")
for client_id, client_data in known_clients.items():
if not isinstance(client_data, dict):
raise ValueError(
f"Client {client_id} data must be a dict"
)
last_seen = client_data.get("last_seen", 0.0)
if not isinstance(last_seen, (float, int)):
raise ValueError(
f"Client {client_id} 'last_seen' must be a float or int"
)
self.logger.debug(f"Loaded known clients: {known_clients}")
self.logger.info(f"Loaded {len(known_clients)} known clients")
for client_id in known_clients:
client = Client(id=client_id, addr=None, socket=None) client = Client(id=client_id, addr=None, socket=None)
client.status = ClientStatus.OFFLINE client.status = ClientStatus.OFFLINE
client.last_seen = float( client.last_seen = float(
self.known_clients[client_id].get("last_seen", 0.0) known_clients[client_id].get("last_seen", 0.0)
) )
self.clients[client_id] = client self.clients[client_id] = client
self.running: bool = False except FileNotFoundError:
self.logger.warning(
"known_clients.yaml not found, creating empty known clients list"
)
with open("cache/known_clients.yaml", "w") as f:
yaml.safe_dump({"known_clients": {}}, f)
except Exception as e:
self.logger.error(f"Error loading known clients: {e}")
raise
return known_clients
def _save_known_clients(self) -> None: def _save_known_clients(self) -> None:
"""Save the list of known clients to a YAML file.""" """Save the list of known clients to a YAML file."""
@@ -106,7 +148,7 @@ class BackendServer:
client (Client): The client to send the ACK to. client (Client): The client to send the ACK to.
target_id (str): The id of the ACK'd message. target_id (str): The id of the ACK'd message.
""" """
ack: bytes = Message.ack(target_id=target_id).to_bytes() ack: bytes = Message.Control.ack(target_id=target_id).to_bytes()
self.logger.info(f"[>] Sending ACK to {client}") self.logger.info(f"[>] Sending ACK to {client}")
client.outbound += ack client.outbound += ack
@@ -116,7 +158,7 @@ class BackendServer:
Args: Args:
client (Client): The client to send the CLOSE message to. client (Client): The client to send the CLOSE message to.
""" """
close_msg: bytes = Message.close().to_bytes() close_msg: bytes = Message.Control.close().to_bytes()
self.logger.info(f"[>] Sending CLOSE to {client}") self.logger.info(f"[>] Sending CLOSE to {client}")
client.outbound += close_msg client.outbound += close_msg
@@ -144,6 +186,12 @@ class BackendServer:
""" """
self.logger.info(f"[-] Disconnecting {client}...") self.logger.info(f"[-] Disconnecting {client}...")
if client.socket is None:
self.logger.warning(
f"Client {client} has no socket, nothing to disconnect."
)
return
try: try:
self.selector.unregister(client.socket) self.selector.unregister(client.socket)
except Exception as e: except Exception as e:
@@ -206,48 +254,6 @@ class BackendServer:
self._disconnect(client) self._disconnect(client)
return return
if client.id is None:
# expect HELLO message
try:
msg = Message.from_bytes(client.inbound)
except Exception as e:
self.logger.error(
f"Failed to parse HELLO message from {client}: {e}"
)
self._disconnect(client)
return
if (
msg.category == Category.CONTROL
and msg.action == ControlAction.HELLO
and msg.payload.get("id") is not None
):
client.id = msg.payload["id"]
if (
client.id in self.clients
and self.clients[client.id].status == "connected"
):
old_client: Client = self.clients[client.id]
self.logger.warning(
f"Client {client.id} is already connected from {old_client.addr}, disconnecting old client..."
)
self.send_close(old_client)
self.clients[client.id] = client
self.known_clients[client.id] = {
"last_seen": client.last_seen
}
self._save_known_clients()
client.status = ClientStatus.ONLINE
self.logger.info(f"[+] Registered new client {client}")
else:
self.logger.error(
f"Expected HELLO message from {client}, got {msg}"
)
self._disconnect(client)
return
while b"\n" in client.inbound: while b"\n" in client.inbound:
line, client.inbound = client.inbound.split(b"\n", 1) line, client.inbound = client.inbound.split(b"\n", 1)
self.logger.debug( self.logger.debug(
@@ -256,13 +262,40 @@ class BackendServer:
try: try:
msg = Message.from_bytes(line) msg = Message.from_bytes(line)
self.logger.info(f"[.] Parsed message {msg.id}") self.logger.info(f"[.] Parsed message {msg.id}")
if client.id is None:
self.logger.debug(
f"Client {client} has no ID, expecting HELLO message..."
)
if (
msg.category != Category.CONTROL
or msg.action != ControlAction.HELLO
):
self.logger.warning(
f"First message from {client} must be HELLO, disconnecting..."
)
self._disconnect(client)
continue
handler: Callable[[Client, Message], None] | None = (
self.message_handlers.get(
(msg.category, msg.action), None
)
)
if handler is not None:
handler(client, msg)
else:
self.logger.warning(
f"No handler for message {msg.id} with category {msg.category} and action {msg.action}"
)
continue
if msg.ack_required: if msg.ack_required:
self.send_ack(client, target_id=msg.id) self.send_ack(client, target_id=msg.id)
except Exception as e: except Exception as e:
self.logger.error( self.logger.error(
f"Failed to parse message from {client}: {e}" f"Failed to parse message from {client}: {e}"
) )
self._disconnect(client)
return return
if mask & selectors.EVENT_WRITE and client.outbound: if mask & selectors.EVENT_WRITE and client.outbound:

View File

@@ -5,24 +5,19 @@ from __future__ import annotations
import logging as lg import logging as lg
import socket import socket
from enum import Enum
import time import time
from judas_server.backend.client_status import ClientStatus
class ClientStatus(str, Enum):
"""Enumeration of client connection statuses."""
ONLINE = "online"
PENDING = "pending"
OFFLINE = "offline"
STALE = "stale"
class Client: class Client:
"""Represents a client.""" """Represents a client."""
def __init__( def __init__(
self, id: str | None, addr: tuple[str, int], socket: socket.socket self,
id: str | None,
addr: tuple[str, int] | None,
socket: socket.socket | None,
) -> None: ) -> None:
"""Initialize the client. """Initialize the client.
@@ -41,13 +36,15 @@ class Client:
self.last_seen: float = 0.0 # unix timestanp of last inbound message self.last_seen: float = 0.0 # unix timestanp of last inbound message
self.status: ClientStatus = ClientStatus.PENDING self.status: ClientStatus = ClientStatus.PENDING
self.socket: socket.socket = socket self.socket: socket.socket | None = socket
self.addr: tuple[str, int] = addr self.addr: tuple[str, int] | None = addr
self.inbound: bytes = b"" self.inbound: bytes = b""
self.outbound: bytes = b"" self.outbound: bytes = b""
def __str__(self) -> str: def __str__(self) -> str:
if self.addr:
return f"Client({self.id} ({self.addr[0]}:{self.addr[1]}))" return f"Client({self.id} ({self.addr[0]}:{self.addr[1]}))"
return f"Client({self.id} (not connected))"
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Client({self.id}, {self.addr})" return f"Client({self.id}, {self.addr})"
@@ -55,6 +52,11 @@ class Client:
def disconnect(self) -> None: def disconnect(self) -> None:
"""Disconnect the client and close the socket.""" """Disconnect the client and close the socket."""
self.logger.debug(f"Disconnecting Client {self}...") self.logger.debug(f"Disconnecting Client {self}...")
if self.socket is None:
self.logger.warning(
f"Client {self} not connected, nothing to disconnect."
)
return
try: try:
self.socket.close() self.socket.close()
except Exception as e: except Exception as e:

View File

@@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
from enum import Enum
class ClientStatus(str, Enum):
"""Enumeration of client connection statuses."""
ONLINE = "online"
PENDING = "pending"
OFFLINE = "offline"
STALE = "stale"

View File

@@ -0,0 +1,4 @@
from .base_handler import BaseHandler
from .hello_handler import HelloHandler
__all__ = ["BaseHandler", "HelloHandler"]

View File

@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import logging as lg
from typing import TYPE_CHECKING
from judas_server.backend.client import Client
if TYPE_CHECKING:
from judas_protocol import Message
from judas_server.backend import BackendServer
class BaseHandler:
"""BaseHandler is the base class for all message handlers in the backend server.
It defines the interface for handling messages and provides common functionality for all handlers.
"""
def __init__(self, backend_server: BackendServer) -> None:
"""Initialize the BaseHandler with a reference to the backend server.
Args:
backend_server (BackendServer): The backend server instance that this handler belongs to.
"""
self.logger: lg.Logger = lg.getLogger(
f"{__name__}.{self.__class__.__name__}"
)
self.backend_server: BackendServer = backend_server
def handle(self, client: Client, message: Message) -> None:
"""Handle a message from a client.
This method must be implemented by subclasses to define the specific handling logic for different message types.
Args:
client (Client): The client that sent the message.
message (Message): The message to be handled.
"""
raise NotImplementedError("handle() must be implemented by subclasses")

View File

@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, override
from judas_protocol import Category, ControlAction, Message
from judas_server.backend.client import ClientStatus
from judas_server.backend.handler import BaseHandler
if TYPE_CHECKING:
from judas_server.backend.backend_server import BackendServer
from judas_server.backend.client import Client
class HelloHandler(BaseHandler):
def __init__(self, backend_server: BackendServer) -> None:
super().__init__(backend_server)
@override
def handle(self, client: Client, message: Message) -> None:
if client.id is not None:
return
if (
message.category != Category.CONTROL
or message.action != ControlAction.HELLO
):
self.logger.error(
f"Expected HELLO message from {client}, got {message}, disconnecting client..."
)
self.backend_server._disconnect(client)
return
if message.payload.get("id") is None:
self.logger.error(
f"HELLO message from {client} missing 'id' field, disconnecting client..."
)
self.backend_server._disconnect(client)
return
client.id = message.payload["id"]
# check if client already connected, if so disconnect old client and register new one
if (
client.id in self.backend_server.clients
and self.backend_server.clients[client.id].status == "connected"
):
old_client: Client = self.backend_server.clients[client.id]
self.backend_server.logger.warning(
f"Client {client.id} is already connected from {old_client.addr}, disconnecting old client..."
)
self.backend_server.send_close(old_client)
return
self.backend_server.clients[client.id] = client # type: ignore
self.backend_server.known_clients[client.id] = { # type: ignore
"last_seen": client.last_seen
}
self.backend_server._save_known_clients()
client.status = ClientStatus.ONLINE
self.logger.info(f"[+] Registered new client {client}")

4
uv.lock generated
View File

@@ -358,8 +358,8 @@ wheels = [
[[package]] [[package]]
name = "judas-protocol" name = "judas-protocol"
version = "0.6.0" version = "0.8.0"
source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#d16c1914ba343aed300f1c5fae0201370c3274de" } source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#a805ccf38edffadc1b8c8b276e60758c86516cd3" }
[[package]] [[package]]
name = "judas-server" name = "judas-server"