chore(release): 0.4.0 #12

Merged
pufereq merged 14 commits from release/0.4.0 into main 2025-11-30 17:37:18 +00:00
3 changed files with 123 additions and 64 deletions

View File

@@ -7,7 +7,9 @@ import socket
import threading import threading
import time import time
from judas_protocol import Message from typing import Any
from judas_protocol import Category, ControlAction, Message
from judas_server.backend.client import Client from judas_server.backend.client import Client
@@ -60,14 +62,15 @@ class BackendServer:
) )
time.sleep(1) time.sleep(1)
def _send_ack(self, client: Client) -> None: def _send_ack(self, client: Client, target_id: str) -> None:
"""Send an ACK message to a client. """Send an ACK message to a client.
Args: Args:
client (Client): The client to send the ACK to. client (Client): The client to send the ACK to.
target_id (str): The id of the ACK'd message.
""" """
ack: bytes = Message.ack().to_bytes() ack: bytes = Message.ack(target_id=target_id).to_bytes()
self.logger.debug(f"[>] Sending ACK to {client}") self.logger.info(f"[>] Sending ACK to {client}")
client.outbound += ack client.outbound += ack
def _accept_connection(self, sock: socket.socket) -> None: def _accept_connection(self, sock: socket.socket) -> None:
@@ -78,43 +81,13 @@ class BackendServer:
""" """
conn, addr = sock.accept() conn, addr = sock.accept()
self.logger.info(f"[+] Accepted connection from {addr}") self.logger.info(f"[+] Accepted connection from {addr}")
conn.setblocking(False)
# wait for hello message to get mac_id client = Client(mac_id=None, addr=addr, socket=conn)
conn.settimeout(5)
try:
message = conn.recv(1024)
if not message:
self.logger.error(f"[-] No data received from {addr}")
conn.close()
return
except socket.timeout:
self.logger.error(f"[-] Timeout waiting for hello from {addr}")
conn.close()
return
conn.settimeout(None)
message = message.split(b"\n")[0] # get first line only
message = Message.from_bytes(message)
mac_id = message.payload.get("mac", None)
if mac_id is None:
self.logger.error(
f"[-] No mac_id provided by {addr}, closing connection"
)
conn.close()
return
client = Client(id_=mac_id, addr=addr, socket=conn)
self.clients[mac_id] = client
self._send_ack(client)
events = selectors.EVENT_READ | selectors.EVENT_WRITE events = selectors.EVENT_READ | selectors.EVENT_WRITE
self.selector.register(conn, events, data=client) self.selector.register(conn, events, data=client)
self.logger.info(f"[+] Registered client {client}") self.logger.info(f"[+] Registered client {client}, HELLO pending...")
def _disconnect(self, client: Client) -> None: def _disconnect(self, client: Client) -> None:
"""Disconnect a client and clean up resources. """Disconnect a client and clean up resources.
@@ -122,10 +95,51 @@ class BackendServer:
Args: Args:
sock (socket.socket): The client socket to disconnect. sock (socket.socket): The client socket to disconnect.
""" """
self.logger.info(f"[-] Disconnecting {client}") self.logger.info(f"[-] Disconnecting {client}...")
self.selector.unregister(client.socket) self.logger.debug("[*] Sending DNR message...")
try:
self.selector.unregister(client.socket)
except Exception as e:
self.logger.error(f"Error unregistering client {client}: {e}")
client.disconnect() client.disconnect()
def _send_outbound(self, sock: socket.socket, client: Client) -> None:
"""Queue data to be sent to a client.
Args:
client (Client): The client to send data to.
"""
self.logger.debug(f"[>] Sending data to {client}: {client.outbound!r}")
sent = sock.send(client.outbound)
client.outbound = client.outbound[sent:]
def _receive_inbound(
self, sock: socket.socket, client: Client, packet_size: int = 4096
) -> None:
"""Receive data from a client socket.
Args:
sock (socket.socket): The client socket to receive data from.
client (Client): The client object.
packet_size (int): The maximum amount of data to be received at once.
Returns:
bytes: The received data.
"""
recv_data = sock.recv(packet_size)
if recv_data:
self.logger.debug(
f"[<] Received data from {client}: {recv_data!r}"
)
client.inbound += recv_data
# set last seen
client.last_seen = time.time()
else:
self._disconnect(client)
def _handle_connection( def _handle_connection(
self, key: selectors.SelectorKey, mask: int self, key: selectors.SelectorKey, mask: int
) -> None: ) -> None:
@@ -135,42 +149,84 @@ class BackendServer:
key (selectors.SelectorKey): The selector key for the client. key (selectors.SelectorKey): The selector key for the client.
mask (int): The event mask. mask (int): The event mask.
""" """
sock: socket.socket = key.fileobj sock: socket.socket = key.fileobj # type: ignore
client = key.data client = key.data
try: try:
if mask & selectors.EVENT_READ: if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) self._receive_inbound(sock, client)
if recv_data: if client.inbound:
self.logger.debug( if client.mac_id is None:
f"[<] Received data from {client}: {recv_data!r}" # expect HELLO message
) try:
client.inbound += recv_data msg = Message.from_bytes(client.inbound)
if (
msg.category == Category.CONTROL
and msg.action == ControlAction.HELLO
and msg.payload.get("mac") is not None
):
client.mac_id = msg.payload["mac"]
if (
client.mac_id in self.clients
and self.clients[client.mac_id].status
== "connected"
):
old_client: Client = self.clients[
client.mac_id
]
self.logger.warning(
f"Client {client.mac_id} is already connected from {old_client.addr}, disconnecting old client..."
)
self._disconnect(old_client)
# TODO: tell client not to reconnect
self.clients[client.mac_id] = client
self.logger.info(
f"[+] Registered new client {client}"
)
else:
self.logger.error(
f"Expected HELLO message from {client}, got {msg}"
)
self._disconnect(client)
return
except Exception as e:
self.logger.error(
f"Failed to parse HELLO message from {client}: {e}"
)
self._disconnect(client)
return
while b"\n" in client.inbound: while b"\n" in client.inbound:
line, client.inbound = client.inbound.split(b"\n", 1) line, client.inbound = client.inbound.split(b"\n", 1)
self.logger.info( self.logger.debug(
f"[<] Complete message from {client}: {line!r}" f"[<] Complete message from {client}: {line!r}"
) )
try:
msg = Message.from_bytes(line)
self.logger.info(f"[.] Parsed message {msg.id}")
if msg.ack_required:
self._send_ack(client, target_id=msg.id)
self._send_ack(client) except Exception as e:
self.logger.error(
f"Failed to parse message from {client}: {e}"
)
self._disconnect(client)
return
# set last seen
client.last_seen = time.time()
else: else:
self._disconnect(client) self._disconnect(client)
if mask & selectors.EVENT_WRITE: if mask & selectors.EVENT_WRITE:
if client.outbound: if client.outbound:
self.logger.debug( self._send_outbound(sock, client)
f"[>] Sending data to {client}: {client.outbound!r}"
)
sent = sock.send(client.outbound)
client.outbound = client.outbound[sent:]
# TODO: wait for ACK from client
except ConnectionResetError as e: except ConnectionResetError as e:
self.logger.error(f"Connection reset by {client}, disconnect: {e}") self.logger.error(f"Connection reset by {client}, disconnect: {e}")
self._disconnect(client) self._disconnect(client)
except Exception as e:
self.logger.error(f"Connection error for {client}: {e}")
self._disconnect(client)
def run(self) -> None: def run(self) -> None:
"""Start the backend server.""" """Start the backend server."""
@@ -187,9 +243,11 @@ class BackendServer:
events = self.selector.select(timeout=1) events = self.selector.select(timeout=1)
for key, mask in events: for key, mask in events:
if key.data is None: if key.data is None:
self._accept_connection(key.fileobj) self._accept_connection(key.fileobj) # type: ignore
else: else:
self._handle_connection(key, mask) self._handle_connection(key, mask)
time.sleep(0.001) # prevent 100% CPU usage
except Exception as e: except Exception as e:
self.logger.error(f"Server error: {e}") self.logger.error(f"Server error: {e}")
raise e raise e
@@ -204,7 +262,7 @@ class BackendServer:
self.logger.warning(f"Client {client_id} not found") self.logger.warning(f"Client {client_id} not found")
return None return None
return { return {
"id": client.id, "id": client.mac_id,
"addr": client.addr, "addr": client.addr,
"last_seen": client.last_seen, "last_seen": client.last_seen,
"status": client.status, "status": client.status,

View File

@@ -17,12 +17,13 @@ class Client:
"""Represents a client.""" """Represents a client."""
def __init__( def __init__(
self, id_: str, addr: tuple[str, int], socket: socket.socket self, mac_id: str | None, addr: tuple[str, int], socket: socket.socket
) -> None: ) -> None:
"""Initialize the client. """Initialize the client.
Args: Args:
id_ (str): The unique identifier for the client. mac_id (str | None): The unique identifier for the client.
Can be None if not yet assigned.
addr (tuple[str, int]): The (IP, port) address of the client. addr (tuple[str, int]): The (IP, port) address of the client.
socket (socket.socket): The socket object for communication. socket (socket.socket): The socket object for communication.
""" """
@@ -31,7 +32,7 @@ class Client:
) )
self.logger.debug(f"Initializing Client {addr}...") self.logger.debug(f"Initializing Client {addr}...")
self.id: str = id_ self.mac_id: str | None = mac_id
self.last_seen: float = 0.0 # unix timestanp of last inbound message self.last_seen: float = 0.0 # unix timestanp of last inbound message
self.status: ClientStatus = ClientStatus.CONNECTED self.status: ClientStatus = ClientStatus.CONNECTED
@@ -41,10 +42,10 @@ class Client:
self.outbound: bytes = b"" self.outbound: bytes = b""
def __str__(self) -> str: def __str__(self) -> str:
return f"Client({self.id} ({self.addr[0]}:{self.addr[1]}))" return f"Client({self.mac_id} ({self.addr[0]}:{self.addr[1]}))"
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Client({self.id}, {self.addr})" return f"Client({self.mac_id}, {self.addr})"
def disconnect(self) -> None: def disconnect(self) -> None:
"""Disconnect the client and close the socket.""" """Disconnect the client and close the socket."""

4
uv.lock generated
View File

@@ -319,8 +319,8 @@ wheels = [
[[package]] [[package]]
name = "judas-protocol" name = "judas-protocol"
version = "0.1.0" version = "0.5.0"
source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#fd070b176347a0f7b81f937b189d8f50736f3514" } source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#c48b69ecee16f5824ffd8bce8921341d5fa326b7" }
[[package]] [[package]]
name = "judas-server" name = "judas-server"