feat(server.py): add socket-based communication
This commit is contained in:
@@ -1,54 +1,181 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import logging as lg
|
||||
import random as rn
|
||||
import threading
|
||||
import time
|
||||
import selectors
|
||||
import socket
|
||||
import types
|
||||
from typing import Any
|
||||
|
||||
from judas_protocol import Message
|
||||
|
||||
from judas_server.backend.client import Client
|
||||
|
||||
|
||||
class BackendServer:
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, host: str = "0.0.0.0", port: int = 3692) -> None:
|
||||
"""Initialize the backend server.
|
||||
|
||||
Args:
|
||||
host (str): The host IP address to bind the server to.
|
||||
port (int): The port number to bind the server to.
|
||||
"""
|
||||
self.logger: lg.Logger = lg.getLogger(
|
||||
f"{__name__}.{self.__class__.__name__}"
|
||||
)
|
||||
self.logger.debug("Initializing Server...")
|
||||
|
||||
# TODO: add socket logic here
|
||||
self.selector = selectors.DefaultSelector()
|
||||
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._bind_socket(host, port)
|
||||
self.server_socket.listen()
|
||||
self.server_socket.setblocking(False)
|
||||
self.selector.register(
|
||||
self.server_socket, selectors.EVENT_READ, data=None
|
||||
)
|
||||
|
||||
self.clients: dict[str, dict[str, dict[str, Any]]] = {
|
||||
"C_01": {
|
||||
"one_time": {
|
||||
"hostname": "mock-host",
|
||||
"platform": "windows 11",
|
||||
"cpu_info": "i7",
|
||||
},
|
||||
"polled": {"cpu_usage": 0, "ram_usage": 0},
|
||||
"ondemand": {},
|
||||
}
|
||||
}
|
||||
self.clients: dict[str, Client] = {}
|
||||
|
||||
self.running: bool = False
|
||||
|
||||
def _bind_socket(self, host: str, port: int) -> None:
|
||||
"""Bind the server socket to the specified host and port.
|
||||
|
||||
Args:
|
||||
host (str): The host IP address to bind the server to.
|
||||
port (int): The port number to bind the server to.
|
||||
"""
|
||||
self.logger.debug(f"Binding socket to {host}:{port}")
|
||||
while True:
|
||||
try:
|
||||
self.server_socket.bind((host, port))
|
||||
self.logger.debug(f"Socket bound to {host}:{port}")
|
||||
break
|
||||
except OSError as e:
|
||||
self.logger.error(
|
||||
f"Failed to bind socket to {host}:{port}, retrying...: {e}"
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
def _accept_connection(self, sock: socket.socket) -> None:
|
||||
"""Accept a new client connection.
|
||||
|
||||
Args:
|
||||
sock (socket.socket): The selected socket.
|
||||
"""
|
||||
conn, addr = sock.accept()
|
||||
self.logger.info(f"[+] Accepted connection from {addr}")
|
||||
conn.setblocking(False)
|
||||
|
||||
# wait for hello message to get mac_id
|
||||
|
||||
conn.settimeout(5)
|
||||
message = conn.recv(1024)
|
||||
conn.settimeout(None)
|
||||
|
||||
message = Message.from_bytes(message)
|
||||
|
||||
mac_id = message.payload.get("mac", None)
|
||||
if mac_id is None:
|
||||
self.logger.error(
|
||||
f"[-] No mac_id provided by {addr}, closing connection"
|
||||
)
|
||||
conn.close()
|
||||
return
|
||||
|
||||
client = Client(id_=mac_id, addr=addr, socket=conn)
|
||||
self.clients[mac_id] = client
|
||||
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
self.selector.register(conn, events, data=client)
|
||||
|
||||
self.logger.info(f"[+] Registered client {client}")
|
||||
|
||||
def _disconnect(self, client: Client) -> None:
|
||||
"""Disconnect a client and clean up resources.
|
||||
|
||||
Args:
|
||||
sock (socket.socket): The client socket to disconnect.
|
||||
"""
|
||||
self.logger.info(f"[-] Disconnecting {client}")
|
||||
self.selector.unregister(client.socket)
|
||||
client.disconnect()
|
||||
|
||||
def _handle_connection(
|
||||
self, key: selectors.SelectorKey, mask: int
|
||||
) -> None:
|
||||
"""Handle a client connection.
|
||||
|
||||
Args:
|
||||
key (selectors.SelectorKey): The selector key for the client.
|
||||
mask (int): The event mask.
|
||||
"""
|
||||
sock: socket.socket = key.fileobj
|
||||
client = key.data
|
||||
|
||||
try:
|
||||
if mask & selectors.EVENT_READ:
|
||||
recv_data = sock.recv(1024)
|
||||
if recv_data:
|
||||
self.logger.debug(
|
||||
f"[<] Received data from {client}: {recv_data!r}"
|
||||
)
|
||||
client.inbound += recv_data
|
||||
while b"\n" in client.inbound:
|
||||
line, client.inbound = client.inbound.split(b"\n", 1)
|
||||
self.logger.info(
|
||||
f"[<] Complete message from {client}: {line!r}"
|
||||
)
|
||||
|
||||
# send ACK
|
||||
ack = Message.ack().to_bytes()
|
||||
self.logger.debug(f"[>] Sending ACK to {client}")
|
||||
client.outbound += ack
|
||||
|
||||
else:
|
||||
self._disconnect(client)
|
||||
|
||||
if mask & selectors.EVENT_WRITE:
|
||||
if client.outbound:
|
||||
self.logger.debug(
|
||||
f"[>] Sending data to {client}: {client.outbound!r}"
|
||||
)
|
||||
sent = sock.send(client.outbound)
|
||||
|
||||
client.outbound = client.outbound[sent:]
|
||||
except ConnectionResetError as e:
|
||||
self.logger.error(f"Connection reset by {client}, disconnect: {e}")
|
||||
self._disconnect(client)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Start the backend server."""
|
||||
self.running = True
|
||||
threading.Thread(
|
||||
name="BackendServer thread", target=self._loop, daemon=True
|
||||
).start()
|
||||
|
||||
def _loop(self) -> None:
|
||||
self.logger.info("Starting server loop...")
|
||||
while self.running:
|
||||
for client in self.clients.values():
|
||||
client["polled"]["cpu_usage"] = round(rn.uniform(0, 100), 1)
|
||||
client["polled"]["ram_usage"] = round(rn.uniform(0, 100), 1)
|
||||
time.sleep(1)
|
||||
"""Main server loop to handle incoming connections and data."""
|
||||
self.logger.info("Server is running...")
|
||||
try:
|
||||
while self.running:
|
||||
events = self.selector.select(timeout=1)
|
||||
for key, mask in events:
|
||||
if key.data is None:
|
||||
self._accept_connection(key.fileobj)
|
||||
else:
|
||||
self._handle_connection(key, mask)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Server error: {e}")
|
||||
raise e
|
||||
finally:
|
||||
self.selector.close()
|
||||
self.server_socket.close()
|
||||
self.logger.info("Server has stopped.")
|
||||
|
||||
self.logger.info("Server loop stopped.")
|
||||
|
||||
def get_client_data(
|
||||
self, client_id: str
|
||||
) -> dict[str, dict[str, Any]] | None:
|
||||
return self.clients.get(client_id, None)
|
||||
# def get_client_data(
|
||||
# self, client_id: str
|
||||
# ) -> dict[str, dict[str, Any]] | None:
|
||||
# return self.clients.get(client_id, None)
|
||||
|
||||
Reference in New Issue
Block a user