Compare commits

15 Commits

Author SHA1 Message Date
github-actions[bot]
9c1aee6bba chore(release): 0.3.0 2025-11-30 17:40:21 +00:00
7a4e062c04 Merge pull request 'chore(release): 0.3.0' (#5) from release/0.3.0 into main
Reviewed-on: #5
2025-11-30 17:39:25 +00:00
ddb55dc462 Merge pull request 'refactor: make connector use queues' (#4) from refactor/make-connector-use-queues into develop
Reviewed-on: #4
2025-11-30 17:35:33 +00:00
38d663a501 style(connector.py): add typing to variables 2025-11-30 18:34:38 +01:00
bc62dbed66 feat(connector.py): handle self.selector.unregister exceptions in Connector.close() 2025-11-19 21:56:42 +01:00
a28c724145 style(connector.py): add blank line 2025-11-19 21:55:24 +01:00
54d2ba0083 feat(connector.py): add pending_acks and handle inbound/outbound ACKs 2025-11-19 21:54:32 +01:00
5458281186 feat(connector.py): add Connector.send() method 2025-11-19 21:53:08 +01:00
99e94e2f5a feat(connector.py): add Connector.running attribute 2025-11-19 21:52:19 +01:00
fcaa7dae70 build(uv.lock): update judas_protocol to 0.5.0 2025-11-19 21:46:22 +01:00
00d1d869d5 build(uv.lock): update judas_protocol to 0.4.3 2025-10-05 13:43:57 +02:00
3a46ed197a docs(connector.py): add docstrings 2025-09-23 23:36:21 +02:00
53912ed339 refactor(connector.py): rewrite Connector to use selectors 2025-09-23 23:35:09 +02:00
8646388851 build(uv.lock): update judas_protocol to 0.3.0 2025-09-23 23:32:10 +02:00
b8333b6408 refactor(connector.py): avoid recursion in connect() 2025-09-23 13:50:48 +02:00
4 changed files with 207 additions and 146 deletions

View File

@@ -2,6 +2,60 @@
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [0.3.0] - 2025-11-30
### Bug Fixes
- [`431d8bb`](https://gitea.pufereq.pl/judas/judas_client/commit/431d8bb3bb5171a4d12ea306bfc767776bf6fb2a) **__main__.py**: fix wrong import
- [`a00668e`](https://gitea.pufereq.pl/judas/judas_client/commit/a00668ecf3e99cf0453a7c333de4f6ab57cbcac3) **client.py**: fix wrong import
- [`b756270`](https://gitea.pufereq.pl/judas/judas_client/commit/b7562703ddc82456859cb72da59e74b4bddf2dc9) **connector.py**: handle multiple messages in one packet
### Documentation
- [`3a46ed1`](https://gitea.pufereq.pl/judas/judas_client/commit/3a46ed197a1e700cf4cd66ec3accd96431873772) **connector.py**: add docstrings
### Features
- [`bc62dbe`](https://gitea.pufereq.pl/judas/judas_client/commit/bc62dbed66b08990a9aca790a831b07d8b5afcdb) **connector.py**: handle `self.selector.unregister` exceptions in `Connector.close()`
- [`54d2ba0`](https://gitea.pufereq.pl/judas/judas_client/commit/54d2ba0083194ba2d6ea2df07372050f03d8182f) **connector.py**: add `pending_acks` and handle inbound/outbound ACKs
- [`5458281`](https://gitea.pufereq.pl/judas/judas_client/commit/54582811864da1df0526763ef74b79544c695d01) **connector.py**: add `Connector.send()` method
- [`99e94e2`](https://gitea.pufereq.pl/judas/judas_client/commit/99e94e2f5a3e89396e0a1140665edc1ba48cb0f2) **connector.py**: add `Connector.running` attribute
- [`b887525`](https://gitea.pufereq.pl/judas/judas_client/commit/b887525cf1f6295b41c92dc8415d73c70b51a667) **__init__.py**: import `Client` and `Connector`
- [`5261ad8`](https://gitea.pufereq.pl/judas/judas_client/commit/5261ad8e9fe6a07033395e6936713b48c29ae30c) **connector.py**: call `on_message()` on receive
- [`3c86a5f`](https://gitea.pufereq.pl/judas/judas_client/commit/3c86a5fefd04ae03cc1503aa552cb3b5e846017b) **connector.py**: add `on_message` handler function argument to init
- [`cc1145a`](https://gitea.pufereq.pl/judas/judas_client/commit/cc1145a4acb9b96e75e327d0060706f5f7f871ac) **connector.py**: get mac address from init argument
- [`21bc86f`](https://gitea.pufereq.pl/judas/judas_client/commit/21bc86fa8aab7e2fae1b966ee8b831187e7e24e2) **client.py**: add `Client` class
### Miscellaneous Tasks
- [`3512608`](https://gitea.pufereq.pl/judas/judas_client/commit/3512608dd19480499e7e03592db6af88c1866e65) **connector.py**: reduce loop interval to 0.1 s in `_loop()`
- [`cf78a4b`](https://gitea.pufereq.pl/judas/judas_client/commit/cf78a4b05b356fda9d31d31002af69143f5121d4) **connector.py**: clarify debug message for connector loop
### Refactor
- [`53912ed`](https://gitea.pufereq.pl/judas/judas_client/commit/53912ed3394b0b1909f15976bd149b3a023c210c) **connector.py**: rewrite `Connector` to use selectors
- [`b8333b6`](https://gitea.pufereq.pl/judas/judas_client/commit/b8333b64088916dbef6c965566b0c3e38741e634) **connector.py**: avoid recursion in `connect()`
- [`7ce3285`](https://gitea.pufereq.pl/judas/judas_client/commit/7ce32855c49435f31b0c82c7690754ae668f8709) **__main__.py**: use `Client`
- [`014c429`](https://gitea.pufereq.pl/judas/judas_client/commit/014c4296e0cd91f0920d83e2f0993ba87166a118) **connector.py**: remove unused `Connector._get_mac_address()` method
- [`1182e08`](https://gitea.pufereq.pl/judas/judas_client/commit/1182e089bcd74c1811faac2d6514832ba037ab42) **connector.py**: make `connect_timeout` and `ack_timeout` keyword-only
- [`35b3af3`](https://gitea.pufereq.pl/judas/judas_client/commit/35b3af31bef60fda7549d229fa818a2a5b607e7d) **connector.py**: fix typing of `Connector.ack_timeout`
### Revert
- [`9b077c8`](https://gitea.pufereq.pl/judas/judas_client/commit/9b077c8c4868e7ef020cb2b25203b04b3fb42554) "feat(__init__.py): import `Client` and `Connector`"
### Styling
- [`38d663a`](https://gitea.pufereq.pl/judas/judas_client/commit/38d663a50178523304df0c0eff0714aef5066c74) **connector.py**: add typing to variables
- [`a28c724`](https://gitea.pufereq.pl/judas/judas_client/commit/a28c724145534982d56b2d4b47b429af9f7eb275) **connector.py**: add blank line
### Build
- [`fcaa7da`](https://gitea.pufereq.pl/judas/judas_client/commit/fcaa7dae70abe1139ac9e7f5d1d9e7947c0814e3) **uv.lock**: update judas_protocol to 0.5.0
- [`00d1d86`](https://gitea.pufereq.pl/judas/judas_client/commit/00d1d869d59e4bf5014a0c2baa5c0d9e3cc0fef5) **uv.lock**: update judas_protocol to 0.4.3
- [`8646388`](https://gitea.pufereq.pl/judas/judas_client/commit/8646388851c0276616610550326e81ea9974f579) **uv.lock**: update judas_protocol to 0.3.0
- [`b9d0fd9`](https://gitea.pufereq.pl/judas/judas_client/commit/b9d0fd92a65bd88c5069fd5e5961b1b968ab7039) **uv.lock**: update judas_protocol to 0.2.0
## [0.2.0] - 2025-09-19 ## [0.2.0] - 2025-09-19
### Bug Fixes ### Bug Fixes

View File

@@ -4,7 +4,7 @@ build-backend = "uv_build"
[project] [project]
name = "judas_client" name = "judas_client"
version = "0.2.0" version = "0.3.0"
description = "A client for judas, a remote PC fleet management system." description = "A client for judas, a remote PC fleet management system."
readme = "README.md" readme = "README.md"
authors = [] authors = []

View File

@@ -2,24 +2,33 @@
from __future__ import annotations from __future__ import annotations
import logging as lg import logging as lg
import selectors
import socket import socket
import time import time
from typing import Callable from typing import Callable
from judas_protocol import Message from judas_protocol import Category, ControlAction, Message
class Connector: class Connector:
"""Connector class for managing TCP connection and message exchange."""
def __init__( def __init__(
self, self,
mac_address: str, mac_address: str,
host: str, host: str,
port: int, port: int,
*, *,
connect_timeout: float = 5.0,
ack_timeout: float | None = None,
on_message: Callable[[Message], None], on_message: Callable[[Message], None],
) -> None: ) -> None:
"""Initialize the Connector.
Args:
mac_address (str): The MAC address of the client.
host (str): The server host address.
port (int): The server port number.
on_message (Callable[[Message], None]): Callback for handling incoming messages.
"""
self.logger: lg.Logger = lg.getLogger( self.logger: lg.Logger = lg.getLogger(
f"{__name__}.{self.__class__.__name__}" f"{__name__}.{self.__class__.__name__}"
) )
@@ -27,172 +36,170 @@ class Connector:
self.host: str = host self.host: str = host
self.port: int = port self.port: int = port
self.socket_timeout: None = None
self.connect_timeout: float = connect_timeout
self.ack_timeout: float | None = ack_timeout
self.selector = selectors.DefaultSelector()
self.socket: socket.socket = socket.socket( self.socket: socket.socket = socket.socket(
socket.AF_INET, socket.SOCK_STREAM socket.AF_INET, socket.SOCK_STREAM
) )
self.socket.setblocking(False)
self.selector.register(
self.socket,
selectors.EVENT_READ | selectors.EVENT_WRITE,
data=None,
)
self.mac_address: str = mac_address self.mac_address: str = mac_address
self.inbound_buffer: bytes = b""
self.outbound_buffer: bytes = b""
self.pending_acks: dict[str, tuple[Message, float]] = {}
self.running: bool = True
self.on_message: Callable[[Message], None] = on_message self.on_message: Callable[[Message], None] = on_message
def _send_ack(self) -> None: def _send_outbound(self) -> None:
self.logger.debug("[>] Sending ACK...") """Send data from the outbound buffer."""
while self.outbound_buffer:
try: try:
self.socket.sendall(Message.ack().to_bytes()) sent = self.socket.send(self.outbound_buffer)
self.logger.debug("[<] ACK sent")
except socket.error as e:
self.logger.error(f"[!] Failed to send ACK: {e}")
def _check_ack(self) -> bool:
self.logger.debug("[.] Waiting for ACK...")
try:
self.socket.settimeout(self.ack_timeout)
ack: bytes = self.socket.recv(1024)
self.socket.settimeout(self.socket_timeout)
if ack == Message.ack().to_bytes():
self.logger.debug("[<] ACK received")
return True
else:
self.logger.error(f"[!] Invalid ACK received: {ack}")
except TimeoutError as e:
self.logger.error(f"[!] ACK timeout: {e}")
except socket.error as e:
self.logger.error(f"[!] Failed to receive ACK: {e}")
return False
def connect(self, retry_interval: int = 1) -> None:
self.logger.debug( self.logger.debug(
f"Connecting to {self.host}:{self.port} with timeout {self.connect_timeout}s..." f"[>] Sent {sent} bytes: {self.outbound_buffer[:sent]!r}"
) )
try:
self.socket.settimeout(self.connect_timeout)
self.socket.connect((self.host, self.port))
self.socket.settimeout(self.socket_timeout)
self.logger.info(f"[+] Connected to {self.host}:{self.port}")
self.send_hello()
except (
socket.timeout,
ConnectionRefusedError,
ConnectionAbortedError,
) as e:
self.logger.error(
f"[!] Connection to {self.host}:{self.port} failed: {e}"
)
self.logger.info(
f"[.] Retrying connection in {retry_interval} s..."
)
time.sleep(retry_interval)
self.connect(retry_interval=min(30, retry_interval * 2))
def send(self, data: bytes, no_check_ack: bool = False) -> None: self.outbound_buffer = self.outbound_buffer[sent:]
self.logger.debug(f"[>] Sending data: {data}") except BlockingIOError:
while True: # OS buffer full, wait for next EVENT_WRITE
try:
self.socket.sendall(data)
if no_check_ack:
self.logger.debug("[>] Data sent without ACK check")
break break
else: except socket.error as e:
self.logger.info("[>] Data sent") self.logger.error(f"[!] Socket error: {e}")
self.reconnect()
acknowledged: bool = self._check_ack()
if acknowledged:
self.logger.debug("[.] Data acknowledged")
break break
else:
self.logger.warning(
"[!] Data not acknowledged, retrying..."
)
except BrokenPipeError as e: def _receive_inbound(self) -> None:
self.logger.error(f"[!] Broken pipe: {e}") """Receive data into the inbound buffer."""
self.logger.info("[.] Reconnecting...")
self.connect()
except (socket.error, ValueError) as e:
self.logger.error(f"[!] Failed to send data: {e}")
time.sleep(1)
def receive(self) -> bytes:
self.logger.debug("[.] Waiting to receive data...")
try: try:
data: bytes = self.socket.recv(4096) data: bytes = self.socket.recv(4096)
if not data: if data:
self.logger.warning("[!] Received empty message") self.logger.debug(f"[<] Received {len(data)} bytes: {data!r}")
return b"" self.inbound_buffer += data
self.logger.debug(f"[<] Received data: {data}") else:
return data self.logger.debug("[!] Connection closed by the server.")
self.reconnect()
except socket.error as e: except socket.error as e:
self.logger.error(f"[!] Failed to receive data: {e}") self.logger.error(f"[!] Socket error: {e}")
return b"" self.reconnect()
def close(self) -> None: def send(self, message: Message) -> None:
self.logger.debug("Closing connection...") """Send a message to the server.
self.socket.close()
self.logger.info("Connection closed.")
def reconnect(self) -> None: Args:
self.logger.debug("Reconnecting...") message (Message): The message to send.
self.close() """
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.logger.debug(f"[>] Queueing message to send: {message}")
self.connect() if message.ack_required:
self.pending_acks[message.id] = (message, time.time())
self.outbound_buffer += message.to_bytes()
def send_hello(self) -> None: def send_hello(self) -> None:
self.logger.debug("[.] Sending hello message...") """Send a HELLO message to the server."""
self.logger.debug("[*] Sending HELLO message...")
hello_message: Message = Message.hello(self.mac_address) hello_message: Message = Message.hello(self.mac_address)
acknowledged: bool = False self.send(hello_message)
while not acknowledged:
self.send(hello_message.to_bytes(), no_check_ack=True)
self.logger.debug("[.] Hello message sent, waiting for ACK...")
acknowledged = self._check_ack()
if not acknowledged:
self.logger.warning(
"[!] Hello message not acknowledged, retrying..."
)
time.sleep(1)
def _loop(self) -> None: def close(self) -> None:
self.logger.debug("Starting connector loop...") """Close the connection and clean up resources."""
while True: self.logger.debug("[*] Closing connection...")
time.sleep(0.1)
data: bytes = self.receive()
if not data:
self.reconnect()
continue
for line in data.split(b"\n"):
line: bytes = line.strip()
if not line:
continue
self.logger.debug(f"[.] Raw message data: {line}")
try: try:
message: Message = Message.from_bytes(line) self.selector.unregister(self.socket)
except ValueError as e: except Exception as e:
self.logger.error(f"[!] Failed to parse message: {e}") self.logger.error(f"[!] Error unregistering socket: {e}")
continue self.socket.close()
self.logger.info(f"[*] Message received: {message}") self.logger.debug("[.] Connection closed.")
self.on_message(message)
# if self._check_ack(): def reconnect(self) -> None:
# self.logger.debug("[.] ACK verified") """Reconnect to the server."""
# else: self.logger.debug("[*] Reconnecting...")
# self.logger.error("[!] ACK verification failed") self.close()
# reinit socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setblocking(False)
self.selector.register(
self.socket,
selectors.EVENT_READ | selectors.EVENT_WRITE,
data=None,
)
self.connect()
def connect(self) -> None:
"""Establish a connection to the server."""
self.logger.debug(f"Connecting to {self.host}:{self.port}...")
connected: bool = False
delay: float = 1.0
while not connected:
try:
self.socket.connect((self.host, self.port))
connected = True
except BlockingIOError:
# Connection in progress
time.sleep(0.1)
except socket.error as e:
self.logger.error(f"[!] Connection error: {e}")
self.logger.debug(f"[.] Retrying in {delay} seconds...")
time.sleep(delay)
delay = min(delay * 2, 30) # exponential backoff
self.logger.debug("[*] Connected, sending HELLO...")
self.send_hello()
def run(self) -> None: def run(self) -> None:
self.logger.debug("Running Connector...") """Run the main event loop."""
try:
self.connect() self.connect()
self._loop() try:
while self.running:
events = self.selector.select(timeout=1)
for key, mask in events:
if mask & selectors.EVENT_READ:
self._receive_inbound()
if mask & selectors.EVENT_WRITE:
self._send_outbound()
# Process inbound buffer for complete messages
while b"\n" in self.inbound_buffer:
message_bytes, self.inbound_buffer = (
self.inbound_buffer.split(b"\n", 1)
)
try:
message: Message = Message.from_bytes(message_bytes)
# handle incoming ACKs
if (
message.category == Category.CONTROL
and message.action == ControlAction.ACK
):
if (
message.payload.get("target_id")
in self.pending_acks
):
target_id = message.payload["target_id"]
self.logger.debug(
f"[.] Received ACK for message ID {target_id}"
)
del self.pending_acks[target_id]
else:
self.on_message(message)
if message.ack_required:
ack_message: Message = Message.ack(message.id)
self.send(ack_message)
self._send_outbound()
except Exception as e:
self.logger.error(f"[!] Failed to parse message: {e}")
time.sleep(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
self.logger.info("Interrupted by user.") self.logger.debug("[*] Interrupted by user.")
finally: finally:
self.close() self.close()

6
uv.lock generated
View File

@@ -249,7 +249,7 @@ wheels = [
[[package]] [[package]]
name = "judas-client" name = "judas-client"
version = "0.2.0" version = "0.3.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "judas-protocol" }, { name = "judas-protocol" },
@@ -292,8 +292,8 @@ test = [
[[package]] [[package]]
name = "judas-protocol" name = "judas-protocol"
version = "0.2.0" version = "0.5.0"
source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#bc1bf46388eb904738893a2f86b5050b4ce2489e" } source = { git = "https://gitea.pufereq.pl/judas/judas_protocol.git#c48b69ecee16f5824ffd8bce8921341d5fa326b7" }
[[package]] [[package]]
name = "markdown-it-py" name = "markdown-it-py"