importsocketimportstructfromtypingimportTypefrom.packetimport*from.registriesimport*def_send_msg(sock:socket.socket,msg:bytearray):""" Send a message through the socket, prefixed with its length. :param sock: The socket to send the message through. :type sock: socket.socket :param msg: The message to send. :type msg: bytearray """msg=struct.pack('>I',len(msg))+msgsock.sendall(msg)def_recv_msg(sock:socket.socket):""" Receive a message from the socket. :param sock: The socket to receive the message from. :type sock: socket.socket :return: The received message data. :rtype: bytearray or None """raw_msglen=_recvall(sock,4)ifnotraw_msglen:returnNonemsglen=struct.unpack('>I',raw_msglen)[0]return_recvall(sock,msglen)def_recvall(sock:socket.socket,n):""" Helper function to receive n bytes or return None if EOF is hit. :param sock: The socket to receive data from. :type sock: socket.socket :param n: The number of bytes to receive. :type n: int :return: The received data. :rtype: bytearray or None """try:data=bytearray()whilelen(data)<n:packet=sock.recv(n-len(data))ifnotpacket:returnNonedata.extend(packet)returndataexceptConnectionAbortedError:returnNone
[docs]defreadStructuredPacket(sock:socket.socket,registry:registry.Registry[Type[StructuredPacket]])->StructuredPacket|None:""" Read a structured packet from the socket. :param sock: The socket to read from. :type sock: socket.socket :param registry: The registry of structured packet types. :type registry: registry.Registry[Type[StructuredPacket]] :return: The structured packet read from the socket. :rtype: StructuredPacket or None """msg_type_data=_recv_msg(sock)ifmsg_type_dataisNone:returnNonebuff=Buffer(msg_type_data)msg_type=Identifier.from_string(buff.read_string())msg_class=registry.get(msg_type)ifmsg_classisnotNone:returnmsg_class.unpack(buff)else:returnNone
[docs]defsendStructuredPacket(sock:socket.socket,packet:StructuredPacket,registry:registry.Registry[Type[StructuredPacket]]):""" Send a structured packet through the socket. :param sock: The socket to send through. :type sock: socket.socket :param packet: The structured packet to send. :type packet: StructuredPacket :param registry: The registry of structured packet types. :type registry: registry.Registry[Type[StructuredPacket]] :raises ValueError: If the packet type is not registered in the registry. """packet_id=registry.get_id(type(packet))ifpacket_idisNone:raiseValueError(f"Packet({packet}) must be registered in the registry!")buff=Buffer(bytearray())buff.write_string(str(packet_id))packet.pack(buff)_send_msg(sock,buff.buffer)