From 8c30c4328dffe2064019f468a6138d66f31c2d89 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Thu, 28 Aug 2025 19:45:20 +0200 Subject: [PATCH] feat(connector.py): add `Connector` class --- src/judas_client/connector.py | 155 ++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 src/judas_client/connector.py diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py new file mode 100644 index 0000000..56deb23 --- /dev/null +++ b/src/judas_client/connector.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +from __future__ import annotations + +import logging as lg + +import socket +import uuid +import time + +from judas_protocol import Message + + +class Connector: + def __init__( + self, + host: str, + port: int, + connect_timeout: float = 5.0, + ack_timeout: float = None, + ) -> None: + self.logger: lg.Logger = lg.getLogger( + f"{__name__}.{self.__class__.__name__}" + ) + self.logger.debug("Initializing Connector...") + + self.host: str = host + self.port: int = port + self.socket_timeout: None = None + self.connect_timeout: float = connect_timeout + self.ack_timeout: float = ack_timeout + + self.socket: socket.socket = socket.socket( + socket.AF_INET, socket.SOCK_STREAM + ) + + self.mac_address: str = self._get_mac_address() + + def _get_mac_address(self) -> str: + mac_address: str = ":".join( + [ + "{:02x}".format((uuid.getnode() >> ele) & 0xFF) + for ele in range(0, 8 * 6, 8) + ][::-1] + ) + self.logger.debug(f"MAC address: {mac_address}") + return mac_address + + def _send_ack(self) -> None: + self.logger.debug("[>] Sending ACK...") + try: + self.socket.sendall(Message.ack().to_bytes()) + self.logger.debug("[<] ACK sent") + except socket.error as e: + self.logger.error(f"[!] Failed to send ACK: {e}") + + def _check_ack(self) -> bool: + self.logger.debug("[.] Waiting for ACK...") + try: + self.socket.settimeout(self.ack_timeout) + ack: bytes = self.socket.recv(1024) + self.socket.settimeout(self.socket_timeout) + + if ack == Message.ack().to_bytes(): + self.logger.debug("[<] ACK received") + return True + else: + self.logger.error(f"[!] Invalid ACK received: {ack}") + + except TimeoutError as e: + self.logger.error(f"[!] ACK timeout: {e}") + + except socket.error as e: + self.logger.error(f"[!] Failed to receive ACK: {e}") + + return False + + def connect(self, retry_interval: int = 1) -> None: + self.logger.debug( + f"Connecting to {self.host}:{self.port} with timeout {self.connect_timeout}s..." + ) + try: + self.socket.settimeout(self.connect_timeout) + self.socket.connect((self.host, self.port)) + self.socket.settimeout(self.socket_timeout) + self.logger.info(f"[+] Connected to {self.host}:{self.port}") + except ( + socket.timeout, + ConnectionRefusedError, + ConnectionAbortedError, + ) as e: + self.logger.error( + f"[!] Connection to {self.host}:{self.port} failed: {e}" + ) + self.logger.info( + f"[.] Retrying connection in {retry_interval} s..." + ) + time.sleep(retry_interval) + self.connect(retry_interval=min(60, retry_interval * 2)) + + def send(self, data: bytes) -> None: + self.logger.debug(f"[>] Sending data: {data}") + try: + self.socket.sendall(data) + self.logger.info("[>] Data sent") + self._send_ack() + except BrokenPipeError as e: + self.logger.error(f"[!] Broken pipe: {e}") + self.logger.info("[.] Reconnecting...") + self.connect() + self.send(data) + except (socket.error, ValueError) as e: + self.logger.error(f"[!] Failed to send data: {e}") + + def receive(self) -> bytes: + self.logger.debug("[.] Waiting to receive data...") + try: + data: bytes = self.socket.recv(4096) + if not data: + self.logger.warning("[!] Received empty message") + self.logger.debug(f"[<] Received data: {data}") + return data + except socket.error as e: + self.logger.error(f"[!] Failed to receive data: {e}") + return b"" + + def close(self) -> None: + self.logger.debug("Closing connection...") + self.socket.close() + self.logger.info("Connection closed.") + + def _loop(self) -> None: + self.logger.debug("Starting main loop...") + while True: + data: bytes = self.receive() + if not data: + continue + message = Message.from_bytes(data.strip()) + self.logger.info(f"[<] Message received: {message}") + # if self._check_ack(): + # self.logger.debug("[.] ACK verified") + # else: + # self.logger.error("[!] ACK verification failed") + time.sleep(1) + + def run(self) -> None: + self.logger.debug("Running Connector...") + try: + self.connect() + # send hello message + self.send(Message.hello(self.mac_address).to_bytes()) + self._loop() + except KeyboardInterrupt: + self.logger.info("Interrupted by user.") + finally: + self.close()