diff --git a/src/judas_server/backend/backend_server.py b/src/judas_server/backend/backend_server.py index 98f49d4..7bf4f20 100644 --- a/src/judas_server/backend/backend_server.py +++ b/src/judas_server/backend/backend_server.py @@ -55,6 +55,8 @@ class BackendServer: ] = {} self._initialize_handlers() + self.pending_acks: list[tuple[Client, Message, float]] = [] + self.running: bool = False def _initialize_handlers(self) -> None: @@ -141,6 +143,22 @@ class BackendServer: ) time.sleep(1) + def send(self, client: Client, msg: Message) -> None: + """Send a message to a client. + + Args: + client (Client): The client to send the message to. + msg (Message): The message to send. + """ + msg_bytes: bytes = msg.to_bytes() + self.logger.info( + f"[>] Sending message {msg.id} to {client}, category: {msg.category}, action: {msg.action}, ack_required: {msg.ack_required}" + ) + self.logger.debug(f"[>] Message bytes: {msg_bytes!r}") + if msg.ack_required: + self.pending_acks.append((client, msg, time.time())) + client.outbound += msg_bytes + def send_ack(self, client: Client, target_id: str) -> None: """Send an ACK message to a client. @@ -148,9 +166,9 @@ class BackendServer: client (Client): The client to send the ACK to. target_id (str): The id of the ACK'd message. """ - ack: bytes = Message.Control.ack(target_id=target_id).to_bytes() + ack: Message = Message.Control.ack(target_id=target_id) self.logger.info(f"[>] Sending ACK to {client}") - client.outbound += ack + self.send(client, ack) def send_close(self, client: Client) -> None: """Send a CLOSE message to a client. @@ -158,9 +176,9 @@ class BackendServer: Args: client (Client): The client to send the CLOSE message to. """ - close_msg: bytes = Message.Control.close().to_bytes() + close_msg: Message = Message.Control.close() self.logger.info(f"[>] Sending CLOSE to {client}") - client.outbound += close_msg + self.send(client, close_msg) def _accept_connection(self, sock: socket.socket) -> None: """Accept a new client connection. @@ -335,6 +353,15 @@ class BackendServer: and now - client.last_seen > 60 * 60 * 24 # 24 hours ): self.clients[client.id].status = ClientStatus.STALE + + # check pending ACKs + for client, msg, timestamp in self.pending_acks[:]: + if time.time() - timestamp > 5: # 5 second timeout + self.logger.warning( + f"ACK timeout for message {msg.id} to {client}, resending..." + ) + self.send(client, msg) + self.pending_acks.remove((client, msg, timestamp)) time.sleep(0.001) # prevent 100% CPU usage except Exception as e: