From 639e1f73a07286bce1a1ef227d216a722e3fa214 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Thu, 28 Aug 2025 20:04:32 +0200 Subject: [PATCH] feat(server.py): add socket-based communication --- src/judas_server/backend/server.py | 183 ++++++++++++++++++++++++----- 1 file changed, 155 insertions(+), 28 deletions(-) diff --git a/src/judas_server/backend/server.py b/src/judas_server/backend/server.py index 6324478..70ee238 100644 --- a/src/judas_server/backend/server.py +++ b/src/judas_server/backend/server.py @@ -1,54 +1,181 @@ # -*- coding: utf-8 -*- from __future__ import annotations -from typing import Any - import logging as lg -import random as rn import threading import time +import selectors +import socket +import types +from typing import Any + +from judas_protocol import Message + +from judas_server.backend.client import Client class BackendServer: - def __init__(self) -> None: + def __init__(self, host: str = "0.0.0.0", port: int = 3692) -> None: + """Initialize the backend server. + + Args: + host (str): The host IP address to bind the server to. + port (int): The port number to bind the server to. + """ self.logger: lg.Logger = lg.getLogger( f"{__name__}.{self.__class__.__name__}" ) self.logger.debug("Initializing Server...") - # TODO: add socket logic here + self.selector = selectors.DefaultSelector() + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._bind_socket(host, port) + self.server_socket.listen() + self.server_socket.setblocking(False) + self.selector.register( + self.server_socket, selectors.EVENT_READ, data=None + ) - self.clients: dict[str, dict[str, dict[str, Any]]] = { - "C_01": { - "one_time": { - "hostname": "mock-host", - "platform": "windows 11", - "cpu_info": "i7", - }, - "polled": {"cpu_usage": 0, "ram_usage": 0}, - "ondemand": {}, - } - } + self.clients: dict[str, Client] = {} self.running: bool = False + def _bind_socket(self, host: str, port: int) -> None: + """Bind the server socket to the specified host and port. + + Args: + host (str): The host IP address to bind the server to. + port (int): The port number to bind the server to. + """ + self.logger.debug(f"Binding socket to {host}:{port}") + while True: + try: + self.server_socket.bind((host, port)) + self.logger.debug(f"Socket bound to {host}:{port}") + break + except OSError as e: + self.logger.error( + f"Failed to bind socket to {host}:{port}, retrying...: {e}" + ) + time.sleep(1) + + def _accept_connection(self, sock: socket.socket) -> None: + """Accept a new client connection. + + Args: + sock (socket.socket): The selected socket. + """ + conn, addr = sock.accept() + self.logger.info(f"[+] Accepted connection from {addr}") + conn.setblocking(False) + + # wait for hello message to get mac_id + + conn.settimeout(5) + message = conn.recv(1024) + conn.settimeout(None) + + message = Message.from_bytes(message) + + mac_id = message.payload.get("mac", None) + if mac_id is None: + self.logger.error( + f"[-] No mac_id provided by {addr}, closing connection" + ) + conn.close() + return + + client = Client(id_=mac_id, addr=addr, socket=conn) + self.clients[mac_id] = client + + events = selectors.EVENT_READ | selectors.EVENT_WRITE + self.selector.register(conn, events, data=client) + + self.logger.info(f"[+] Registered client {client}") + + def _disconnect(self, client: Client) -> None: + """Disconnect a client and clean up resources. + + Args: + sock (socket.socket): The client socket to disconnect. + """ + self.logger.info(f"[-] Disconnecting {client}") + self.selector.unregister(client.socket) + client.disconnect() + + def _handle_connection( + self, key: selectors.SelectorKey, mask: int + ) -> None: + """Handle a client connection. + + Args: + key (selectors.SelectorKey): The selector key for the client. + mask (int): The event mask. + """ + sock: socket.socket = key.fileobj + client = key.data + + try: + if mask & selectors.EVENT_READ: + recv_data = sock.recv(1024) + if recv_data: + self.logger.debug( + f"[<] Received data from {client}: {recv_data!r}" + ) + client.inbound += recv_data + while b"\n" in client.inbound: + line, client.inbound = client.inbound.split(b"\n", 1) + self.logger.info( + f"[<] Complete message from {client}: {line!r}" + ) + + # send ACK + ack = Message.ack().to_bytes() + self.logger.debug(f"[>] Sending ACK to {client}") + client.outbound += ack + + else: + self._disconnect(client) + + if mask & selectors.EVENT_WRITE: + if client.outbound: + self.logger.debug( + f"[>] Sending data to {client}: {client.outbound!r}" + ) + sent = sock.send(client.outbound) + + client.outbound = client.outbound[sent:] + except ConnectionResetError as e: + self.logger.error(f"Connection reset by {client}, disconnect: {e}") + self._disconnect(client) + def run(self) -> None: + """Start the backend server.""" self.running = True threading.Thread( name="BackendServer thread", target=self._loop, daemon=True ).start() def _loop(self) -> None: - self.logger.info("Starting server loop...") - while self.running: - for client in self.clients.values(): - client["polled"]["cpu_usage"] = round(rn.uniform(0, 100), 1) - client["polled"]["ram_usage"] = round(rn.uniform(0, 100), 1) - time.sleep(1) + """Main server loop to handle incoming connections and data.""" + self.logger.info("Server is running...") + try: + while self.running: + events = self.selector.select(timeout=1) + for key, mask in events: + if key.data is None: + self._accept_connection(key.fileobj) + else: + self._handle_connection(key, mask) + except Exception as e: + self.logger.error(f"Server error: {e}") + raise e + finally: + self.selector.close() + self.server_socket.close() + self.logger.info("Server has stopped.") - self.logger.info("Server loop stopped.") - - def get_client_data( - self, client_id: str - ) -> dict[str, dict[str, Any]] | None: - return self.clients.get(client_id, None) + # def get_client_data( + # self, client_id: str + # ) -> dict[str, dict[str, Any]] | None: + # return self.clients.get(client_id, None)