feat(backend_server.py): track message ACKs and resend if no ACK recv'd within 5 seconds
This commit is contained in:
@@ -55,6 +55,8 @@ class BackendServer:
|
|||||||
] = {}
|
] = {}
|
||||||
self._initialize_handlers()
|
self._initialize_handlers()
|
||||||
|
|
||||||
|
self.pending_acks: list[tuple[Client, Message, float]] = []
|
||||||
|
|
||||||
self.running: bool = False
|
self.running: bool = False
|
||||||
|
|
||||||
def _initialize_handlers(self) -> None:
|
def _initialize_handlers(self) -> None:
|
||||||
@@ -141,6 +143,22 @@ class BackendServer:
|
|||||||
)
|
)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
def send(self, client: Client, msg: Message) -> None:
|
||||||
|
"""Send a message to a client.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
client (Client): The client to send the message to.
|
||||||
|
msg (Message): The message to send.
|
||||||
|
"""
|
||||||
|
msg_bytes: bytes = msg.to_bytes()
|
||||||
|
self.logger.info(
|
||||||
|
f"[>] Sending message {msg.id} to {client}, category: {msg.category}, action: {msg.action}, ack_required: {msg.ack_required}"
|
||||||
|
)
|
||||||
|
self.logger.debug(f"[>] Message bytes: {msg_bytes!r}")
|
||||||
|
if msg.ack_required:
|
||||||
|
self.pending_acks.append((client, msg, time.time()))
|
||||||
|
client.outbound += msg_bytes
|
||||||
|
|
||||||
def send_ack(self, client: Client, target_id: str) -> None:
|
def send_ack(self, client: Client, target_id: str) -> None:
|
||||||
"""Send an ACK message to a client.
|
"""Send an ACK message to a client.
|
||||||
|
|
||||||
@@ -148,9 +166,9 @@ class BackendServer:
|
|||||||
client (Client): The client to send the ACK to.
|
client (Client): The client to send the ACK to.
|
||||||
target_id (str): The id of the ACK'd message.
|
target_id (str): The id of the ACK'd message.
|
||||||
"""
|
"""
|
||||||
ack: bytes = Message.Control.ack(target_id=target_id).to_bytes()
|
ack: Message = Message.Control.ack(target_id=target_id)
|
||||||
self.logger.info(f"[>] Sending ACK to {client}")
|
self.logger.info(f"[>] Sending ACK to {client}")
|
||||||
client.outbound += ack
|
self.send(client, ack)
|
||||||
|
|
||||||
def send_close(self, client: Client) -> None:
|
def send_close(self, client: Client) -> None:
|
||||||
"""Send a CLOSE message to a client.
|
"""Send a CLOSE message to a client.
|
||||||
@@ -158,9 +176,9 @@ class BackendServer:
|
|||||||
Args:
|
Args:
|
||||||
client (Client): The client to send the CLOSE message to.
|
client (Client): The client to send the CLOSE message to.
|
||||||
"""
|
"""
|
||||||
close_msg: bytes = Message.Control.close().to_bytes()
|
close_msg: Message = Message.Control.close()
|
||||||
self.logger.info(f"[>] Sending CLOSE to {client}")
|
self.logger.info(f"[>] Sending CLOSE to {client}")
|
||||||
client.outbound += close_msg
|
self.send(client, close_msg)
|
||||||
|
|
||||||
def _accept_connection(self, sock: socket.socket) -> None:
|
def _accept_connection(self, sock: socket.socket) -> None:
|
||||||
"""Accept a new client connection.
|
"""Accept a new client connection.
|
||||||
@@ -335,6 +353,15 @@ class BackendServer:
|
|||||||
and now - client.last_seen > 60 * 60 * 24 # 24 hours
|
and now - client.last_seen > 60 * 60 * 24 # 24 hours
|
||||||
):
|
):
|
||||||
self.clients[client.id].status = ClientStatus.STALE
|
self.clients[client.id].status = ClientStatus.STALE
|
||||||
|
|
||||||
|
# check pending ACKs
|
||||||
|
for client, msg, timestamp in self.pending_acks[:]:
|
||||||
|
if time.time() - timestamp > 5: # 5 second timeout
|
||||||
|
self.logger.warning(
|
||||||
|
f"ACK timeout for message {msg.id} to {client}, resending..."
|
||||||
|
)
|
||||||
|
self.send(client, msg)
|
||||||
|
self.pending_acks.remove((client, msg, timestamp))
|
||||||
time.sleep(0.001) # prevent 100% CPU usage
|
time.sleep(0.001) # prevent 100% CPU usage
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user