From b8333b64088916dbef6c965566b0c3e38741e634 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Tue, 23 Sep 2025 13:50:48 +0200 Subject: [PATCH 01/12] refactor(connector.py): avoid recursion in `connect()` --- src/judas_client/connector.py | 50 ++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index 2488e37..c86e8cd 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -68,29 +68,35 @@ class Connector: return False - def connect(self, retry_interval: int = 1) -> None: - self.logger.debug( - f"Connecting to {self.host}:{self.port} with timeout {self.connect_timeout}s..." - ) - try: - self.socket.settimeout(self.connect_timeout) - self.socket.connect((self.host, self.port)) - self.socket.settimeout(self.socket_timeout) - self.logger.info(f"[+] Connected to {self.host}:{self.port}") - self.send_hello() - except ( - socket.timeout, - ConnectionRefusedError, - ConnectionAbortedError, - ) as e: - self.logger.error( - f"[!] Connection to {self.host}:{self.port} failed: {e}" + def connect(self) -> None: + retry_interval: int = 1 + connected: bool = False + while not connected: + self.logger.debug( + f"[.] Connecting to {self.host}:{self.port} with timeout {self.connect_timeout}s..." ) - self.logger.info( - f"[.] Retrying connection in {retry_interval} s..." - ) - time.sleep(retry_interval) - self.connect(retry_interval=min(30, retry_interval * 2)) + try: + self.socket.settimeout(self.connect_timeout) + self.socket.connect((self.host, self.port)) + self.socket.settimeout(self.socket_timeout) + self.logger.info(f"[+] Connected to {self.host}:{self.port}") + self.send_hello() + connected = True + except ( + socket.timeout, + ConnectionRefusedError, + ConnectionAbortedError, + ) as e: + self.logger.error( + f"[!] Connection to {self.host}:{self.port} failed: {e}" + ) + self.logger.info( + f"[.] Retrying connection in {retry_interval} s..." + ) + time.sleep(retry_interval) + retry_interval = min( + retry_interval * 2, 30 + ) # exponential backoff def send(self, data: bytes, no_check_ack: bool = False) -> None: self.logger.debug(f"[>] Sending data: {data}") From 8646388851c0276616610550326e81ea9974f579 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Tue, 23 Sep 2025 23:32:10 +0200 Subject: [PATCH 02/12] build(uv.lock): update judas_protocol to 0.3.0 --- uv.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uv.lock b/uv.lock index ca96414..a0e37a0 100644 --- a/uv.lock +++ b/uv.lock @@ -292,8 +292,8 @@ test = [ [[package]] name = "judas-protocol" -version = "0.2.0" -source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#bc1bf46388eb904738893a2f86b5050b4ce2489e" } +version = "0.3.0" +source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#c25ee1ebdfff8ff51bf00131732720091562e101" } [[package]] name = "markdown-it-py" From 53912ed3394b0b1909f15976bd149b3a023c210c Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Tue, 23 Sep 2025 23:35:09 +0200 Subject: [PATCH 03/12] refactor(connector.py): rewrite `Connector` to use selectors --- src/judas_client/connector.py | 246 +++++++++++++--------------------- 1 file changed, 96 insertions(+), 150 deletions(-) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index c86e8cd..67957cd 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging as lg +import selectors import socket import time from typing import Callable @@ -16,8 +17,6 @@ class Connector: host: str, port: int, *, - connect_timeout: float = 5.0, - ack_timeout: float | None = None, on_message: Callable[[Message], None], ) -> None: self.logger: lg.Logger = lg.getLogger( @@ -27,178 +26,125 @@ class Connector: self.host: str = host self.port: int = port - self.socket_timeout: None = None - self.connect_timeout: float = connect_timeout - self.ack_timeout: float | None = ack_timeout + self.selector = selectors.DefaultSelector() self.socket: socket.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) + self.socket.setblocking(False) + + self.selector.register( + self.socket, + selectors.EVENT_READ | selectors.EVENT_WRITE, + data=None, + ) self.mac_address: str = mac_address + self.inbound_buffer: bytes = b"" + self.outbound_buffer: bytes = b"" + self.on_message: Callable[[Message], None] = on_message - def _send_ack(self) -> None: - self.logger.debug("[>] Sending ACK...") - try: - self.socket.sendall(Message.ack().to_bytes()) - self.logger.debug("[<] ACK sent") - except socket.error as e: - self.logger.error(f"[!] Failed to send ACK: {e}") - - def _check_ack(self) -> bool: - self.logger.debug("[.] Waiting for ACK...") - try: - self.socket.settimeout(self.ack_timeout) - ack: bytes = self.socket.recv(1024) - self.socket.settimeout(self.socket_timeout) - - if ack == Message.ack().to_bytes(): - self.logger.debug("[<] ACK received") - return True - else: - self.logger.error(f"[!] Invalid ACK received: {ack}") - - except TimeoutError as e: - self.logger.error(f"[!] ACK timeout: {e}") - - except socket.error as e: - self.logger.error(f"[!] Failed to receive ACK: {e}") - - return False - - def connect(self) -> None: - retry_interval: int = 1 - connected: bool = False - while not connected: - self.logger.debug( - f"[.] Connecting to {self.host}:{self.port} with timeout {self.connect_timeout}s..." - ) + def _send_outbound(self) -> None: + while self.outbound_buffer: try: - self.socket.settimeout(self.connect_timeout) - self.socket.connect((self.host, self.port)) - self.socket.settimeout(self.socket_timeout) - self.logger.info(f"[+] Connected to {self.host}:{self.port}") - self.send_hello() - connected = True - except ( - socket.timeout, - ConnectionRefusedError, - ConnectionAbortedError, - ) as e: - self.logger.error( - f"[!] Connection to {self.host}:{self.port} failed: {e}" + sent = self.socket.send(self.outbound_buffer) + self.logger.debug( + f"[>] Sent {sent} bytes: {self.outbound_buffer[:sent]!r}" ) - self.logger.info( - f"[.] Retrying connection in {retry_interval} s..." - ) - time.sleep(retry_interval) - retry_interval = min( - retry_interval * 2, 30 - ) # exponential backoff + self.outbound_buffer = self.outbound_buffer[sent:] + except BlockingIOError: + # OS buffer full, wait for next EVENT_WRITE + break + except socket.error as e: + self.logger.error(f"[!] Socket error: {e}") + self.reconnect() + break - def send(self, data: bytes, no_check_ack: bool = False) -> None: - self.logger.debug(f"[>] Sending data: {data}") - while True: - try: - self.socket.sendall(data) - - if no_check_ack: - self.logger.debug("[>] Data sent without ACK check") - break - else: - self.logger.info("[>] Data sent") - - acknowledged: bool = self._check_ack() - if acknowledged: - self.logger.debug("[.] Data acknowledged") - break - else: - self.logger.warning( - "[!] Data not acknowledged, retrying..." - ) - - except BrokenPipeError as e: - self.logger.error(f"[!] Broken pipe: {e}") - self.logger.info("[.] Reconnecting...") - self.connect() - except (socket.error, ValueError) as e: - self.logger.error(f"[!] Failed to send data: {e}") - time.sleep(1) - - def receive(self) -> bytes: - self.logger.debug("[.] Waiting to receive data...") + def _receive_inbound(self) -> None: try: data: bytes = self.socket.recv(4096) - if not data: - self.logger.warning("[!] Received empty message") - return b"" - self.logger.debug(f"[<] Received data: {data}") - return data + if data: + self.logger.debug(f"[<] Received {len(data)} bytes: {data!r}") + self.inbound_buffer += data + else: + self.logger.debug("[!] Connection closed by the server.") + self.reconnect() except socket.error as e: - self.logger.error(f"[!] Failed to receive data: {e}") - return b"" - - def close(self) -> None: - self.logger.debug("Closing connection...") - self.socket.close() - self.logger.info("Connection closed.") - - def reconnect(self) -> None: - self.logger.debug("Reconnecting...") - self.close() - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect() + self.logger.error(f"[!] Socket error: {e}") + self.reconnect() def send_hello(self) -> None: - self.logger.debug("[.] Sending hello message...") - hello_message: Message = Message.hello(self.mac_address) - acknowledged: bool = False - while not acknowledged: - self.send(hello_message.to_bytes(), no_check_ack=True) - self.logger.debug("[.] Hello message sent, waiting for ACK...") - acknowledged = self._check_ack() - if not acknowledged: - self.logger.warning( - "[!] Hello message not acknowledged, retrying..." - ) - time.sleep(1) + self.logger.debug("[*] Sending HELLO message...") + hello_message: bytes = Message.hello(self.mac_address).to_bytes() + self.outbound_buffer += hello_message + self._send_outbound() - def _loop(self) -> None: - self.logger.debug("Starting connector loop...") - while True: - time.sleep(0.1) - data: bytes = self.receive() - if not data: - self.reconnect() - continue - for line in data.split(b"\n"): - line: bytes = line.strip() + def close(self) -> None: + self.logger.debug("[*] Closing connection...") + self.selector.unregister(self.socket) + self.socket.close() + self.logger.debug("[.] Connection closed.") - if not line: - continue + def reconnect(self) -> None: + self.logger.debug("[*] Reconnecting...") + self.close() - self.logger.debug(f"[.] Raw message data: {line}") - try: - message: Message = Message.from_bytes(line) - except ValueError as e: - self.logger.error(f"[!] Failed to parse message: {e}") - continue - self.logger.info(f"[*] Message received: {message}") - self.on_message(message) + # reinit socket + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setblocking(False) + self.selector.register( + self.socket, + selectors.EVENT_READ | selectors.EVENT_WRITE, + data=None, + ) + self.connect() - # if self._check_ack(): - # self.logger.debug("[.] ACK verified") - # else: - # self.logger.error("[!] ACK verification failed") + def connect(self) -> None: + self.logger.debug(f"Connecting to {self.host}:{self.port}...") + connected: bool = False + delay: float = 1.0 + while not connected: + try: + self.socket.connect((self.host, self.port)) + connected = True + except BlockingIOError: + # Connection in progress + time.sleep(0.1) + except socket.error as e: + self.logger.error(f"[!] Connection error: {e}") + self.logger.debug(f"[.] Retrying in {delay} seconds...") + time.sleep(delay) + delay = min(delay * 2, 30) # exponential backoff + + self.logger.debug("[*] Connected, sending HELLO...") + self.send_hello() def run(self) -> None: - self.logger.debug("Running Connector...") + self.connect() try: - self.connect() - self._loop() + while True: + events = self.selector.select(timeout=1) + for key, mask in events: + if mask & selectors.EVENT_READ: + self._receive_inbound() + if mask & selectors.EVENT_WRITE: + self._send_outbound() + + # Process inbound buffer for complete messages + while b"\n" in self.inbound_buffer: + message_bytes, self.inbound_buffer = ( + self.inbound_buffer.split(b"\n", 1) + ) + try: + message = Message.from_bytes(message_bytes) + self.on_message(message) + except Exception as e: + self.logger.error(f"[!] Failed to parse message: {e}") + + time.sleep(0.1) except KeyboardInterrupt: - self.logger.info("Interrupted by user.") + self.logger.debug("[*] Interrupted by user.") finally: self.close() From 3a46ed197a1e700cf4cd66ec3accd96431873772 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Tue, 23 Sep 2025 23:36:21 +0200 Subject: [PATCH 04/12] docs(connector.py): add docstrings --- src/judas_client/connector.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index 67957cd..b9b7738 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -11,6 +11,8 @@ from judas_protocol import Message class Connector: + """Connector class for managing TCP connection and message exchange.""" + def __init__( self, mac_address: str, @@ -19,6 +21,14 @@ class Connector: *, on_message: Callable[[Message], None], ) -> None: + """Initialize the Connector. + + Args: + mac_address (str): The MAC address of the client. + host (str): The server host address. + port (int): The server port number. + on_message (Callable[[Message], None]): Callback for handling incoming messages. + """ self.logger: lg.Logger = lg.getLogger( f"{__name__}.{self.__class__.__name__}" ) @@ -47,6 +57,7 @@ class Connector: self.on_message: Callable[[Message], None] = on_message def _send_outbound(self) -> None: + """Send data from the outbound buffer.""" while self.outbound_buffer: try: sent = self.socket.send(self.outbound_buffer) @@ -63,6 +74,7 @@ class Connector: break def _receive_inbound(self) -> None: + """Receive data into the inbound buffer.""" try: data: bytes = self.socket.recv(4096) if data: @@ -76,18 +88,21 @@ class Connector: self.reconnect() def send_hello(self) -> None: + """Send a HELLO message to the server.""" self.logger.debug("[*] Sending HELLO message...") hello_message: bytes = Message.hello(self.mac_address).to_bytes() self.outbound_buffer += hello_message self._send_outbound() def close(self) -> None: + """Close the connection and clean up resources.""" self.logger.debug("[*] Closing connection...") self.selector.unregister(self.socket) self.socket.close() self.logger.debug("[.] Connection closed.") def reconnect(self) -> None: + """Reconnect to the server.""" self.logger.debug("[*] Reconnecting...") self.close() @@ -102,6 +117,7 @@ class Connector: self.connect() def connect(self) -> None: + """Establish a connection to the server.""" self.logger.debug(f"Connecting to {self.host}:{self.port}...") connected: bool = False delay: float = 1.0 @@ -122,6 +138,7 @@ class Connector: self.send_hello() def run(self) -> None: + """Run the main event loop.""" self.connect() try: while True: From 00d1d869d59e4bf5014a0c2baa5c0d9e3cc0fef5 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Sun, 5 Oct 2025 13:13:27 +0200 Subject: [PATCH 05/12] build(uv.lock): update judas_protocol to 0.4.3 --- uv.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uv.lock b/uv.lock index a0e37a0..e8ad24e 100644 --- a/uv.lock +++ b/uv.lock @@ -292,8 +292,8 @@ test = [ [[package]] name = "judas-protocol" -version = "0.3.0" -source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#c25ee1ebdfff8ff51bf00131732720091562e101" } +version = "0.4.3" +source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#5ef300ff93bb43d4db28ae019fec30f48f88152b" } [[package]] name = "markdown-it-py" From fcaa7dae70abe1139ac9e7f5d1d9e7947c0814e3 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Wed, 19 Nov 2025 21:46:22 +0100 Subject: [PATCH 06/12] build(uv.lock): update judas_protocol to 0.5.0 --- uv.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uv.lock b/uv.lock index e8ad24e..318cb09 100644 --- a/uv.lock +++ b/uv.lock @@ -292,8 +292,8 @@ test = [ [[package]] name = "judas-protocol" -version = "0.4.3" -source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#5ef300ff93bb43d4db28ae019fec30f48f88152b" } +version = "0.5.0" +source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#c48b69ecee16f5824ffd8bce8921341d5fa326b7" } [[package]] name = "markdown-it-py" From 99e94e2f5a3e89396e0a1140665edc1ba48cb0f2 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Wed, 19 Nov 2025 21:52:19 +0100 Subject: [PATCH 07/12] feat(connector.py): add `Connector.running` attribute --- src/judas_client/connector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index b9b7738..0eb9acf 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -54,6 +54,8 @@ class Connector: self.inbound_buffer: bytes = b"" self.outbound_buffer: bytes = b"" + self.running: bool = True + self.on_message: Callable[[Message], None] = on_message def _send_outbound(self) -> None: @@ -141,7 +143,7 @@ class Connector: """Run the main event loop.""" self.connect() try: - while True: + while self.running: events = self.selector.select(timeout=1) for key, mask in events: if mask & selectors.EVENT_READ: From 54582811864da1df0526763ef74b79544c695d01 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Wed, 19 Nov 2025 21:53:08 +0100 Subject: [PATCH 08/12] feat(connector.py): add `Connector.send()` method --- src/judas_client/connector.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index 0eb9acf..de64b95 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -89,6 +89,17 @@ class Connector: self.logger.error(f"[!] Socket error: {e}") self.reconnect() + def send(self, message: Message) -> None: + """Send a message to the server. + + Args: + message (Message): The message to send. + """ + self.logger.debug(f"[>] Queueing message to send: {message}") + if message.ack_required: + self.pending_acks[message.id] = (message, time.time()) + self.outbound_buffer += message.to_bytes() + def send_hello(self) -> None: """Send a HELLO message to the server.""" self.logger.debug("[*] Sending HELLO message...") From 54d2ba0083194ba2d6ea2df07372050f03d8182f Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Wed, 19 Nov 2025 21:54:32 +0100 Subject: [PATCH 09/12] feat(connector.py): add `pending_acks` and handle inbound/outbound ACKs --- src/judas_client/connector.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index de64b95..5c3b797 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -7,7 +7,7 @@ import socket import time from typing import Callable -from judas_protocol import Message +from judas_protocol import Category, ControlAction, Message class Connector: @@ -54,6 +54,8 @@ class Connector: self.inbound_buffer: bytes = b"" self.outbound_buffer: bytes = b"" + self.pending_acks: dict[str, tuple[Message, float]] = {} + self.running: bool = True self.on_message: Callable[[Message], None] = on_message @@ -169,7 +171,27 @@ class Connector: ) try: message = Message.from_bytes(message_bytes) - self.on_message(message) + # handle incoming ACKs + if ( + message.category == Category.CONTROL + and message.action == ControlAction.ACK + ): + if ( + message.payload.get("target_id") + in self.pending_acks + ): + target_id = message.payload["target_id"] + self.logger.debug( + f"[.] Received ACK for message ID {target_id}" + ) + del self.pending_acks[target_id] + else: + self.on_message(message) + + if message.ack_required: + ack_message = Message.ack(message.id) + self.send(ack_message) + self._send_outbound() except Exception as e: self.logger.error(f"[!] Failed to parse message: {e}") From a28c724145534982d56b2d4b47b429af9f7eb275 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Wed, 19 Nov 2025 21:55:24 +0100 Subject: [PATCH 10/12] style(connector.py): add blank line --- src/judas_client/connector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index 5c3b797..bab4346 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -68,6 +68,7 @@ class Connector: self.logger.debug( f"[>] Sent {sent} bytes: {self.outbound_buffer[:sent]!r}" ) + self.outbound_buffer = self.outbound_buffer[sent:] except BlockingIOError: # OS buffer full, wait for next EVENT_WRITE From bc62dbed66b08990a9aca790a831b07d8b5afcdb Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Wed, 19 Nov 2025 21:56:38 +0100 Subject: [PATCH 11/12] feat(connector.py): handle `self.selector.unregister` exceptions in `Connector.close()` --- src/judas_client/connector.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index bab4346..8f47e31 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -106,14 +106,16 @@ class Connector: def send_hello(self) -> None: """Send a HELLO message to the server.""" self.logger.debug("[*] Sending HELLO message...") - hello_message: bytes = Message.hello(self.mac_address).to_bytes() - self.outbound_buffer += hello_message - self._send_outbound() + hello_message: Message = Message.hello(self.mac_address) + self.send(hello_message) def close(self) -> None: """Close the connection and clean up resources.""" self.logger.debug("[*] Closing connection...") - self.selector.unregister(self.socket) + try: + self.selector.unregister(self.socket) + except Exception as e: + self.logger.error(f"[!] Error unregistering socket: {e}") self.socket.close() self.logger.debug("[.] Connection closed.") From 38d663a50178523304df0c0eff0714aef5066c74 Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Sun, 30 Nov 2025 18:34:38 +0100 Subject: [PATCH 12/12] style(connector.py): add typing to variables --- src/judas_client/connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index 8f47e31..a68b64a 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -173,7 +173,7 @@ class Connector: self.inbound_buffer.split(b"\n", 1) ) try: - message = Message.from_bytes(message_bytes) + message: Message = Message.from_bytes(message_bytes) # handle incoming ACKs if ( message.category == Category.CONTROL @@ -192,7 +192,7 @@ class Connector: self.on_message(message) if message.ack_required: - ack_message = Message.ack(message.id) + ack_message: Message = Message.ack(message.id) self.send(ack_message) self._send_outbound() except Exception as e: