From 54d2ba0083194ba2d6ea2df07372050f03d8182f Mon Sep 17 00:00:00 2001 From: Artur Borecki Date: Wed, 19 Nov 2025 21:54:32 +0100 Subject: [PATCH] feat(connector.py): add `pending_acks` and handle inbound/outbound ACKs --- src/judas_client/connector.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/judas_client/connector.py b/src/judas_client/connector.py index de64b95..5c3b797 100644 --- a/src/judas_client/connector.py +++ b/src/judas_client/connector.py @@ -7,7 +7,7 @@ import socket import time from typing import Callable -from judas_protocol import Message +from judas_protocol import Category, ControlAction, Message class Connector: @@ -54,6 +54,8 @@ class Connector: self.inbound_buffer: bytes = b"" self.outbound_buffer: bytes = b"" + self.pending_acks: dict[str, tuple[Message, float]] = {} + self.running: bool = True self.on_message: Callable[[Message], None] = on_message @@ -169,7 +171,27 @@ class Connector: ) try: message = Message.from_bytes(message_bytes) - self.on_message(message) + # handle incoming ACKs + if ( + message.category == Category.CONTROL + and message.action == ControlAction.ACK + ): + if ( + message.payload.get("target_id") + in self.pending_acks + ): + target_id = message.payload["target_id"] + self.logger.debug( + f"[.] Received ACK for message ID {target_id}" + ) + del self.pending_acks[target_id] + else: + self.on_message(message) + + if message.ack_required: + ack_message = Message.ack(message.id) + self.send(ack_message) + self._send_outbound() except Exception as e: self.logger.error(f"[!] Failed to parse message: {e}")