import socket
import time
from typing import Optional
import dill
import logging
from netqasm.sdk.classical_communication.message import StructuredMessage
from netqasm.sdk.classical_communication.socket import Socket as _Socket
from simulaqron.general.host_config import SocketsConfig, Host
from simulaqron.settings import network_config
[docs]class Socket(_Socket):
RETRY_TIME = 0.2
MAX_RETRIES = 20
def __init__(
self,
app_name: str,
remote_app_name: str,
socket_id: int = 0,
timeout: Optional[int] = None,
use_callbacks: bool = False,
network_name: str = "default",
log_config=None,
as_server: Optional[bool] = None,
):
assert socket_id == 0, (
"SimulaQron socket does not support setting socket ID, this is instead done in the config file"
)
self._node_name = app_name
self._remote_node_name = remote_app_name
self._use_callbacks = use_callbacks
self._network_name = network_name
self._logger = logging.getLogger(f"{self.__class__.__name__}(L: {app_name} <-> R: {remote_app_name})")
self._timeout = timeout
# We define _app_socket as None as a default value, so the __del__ method
# does not fail when the socket could not be connected correctly.
self._app_socket = None
self._app_socket: socket.socket = self._connect(as_server)
def __del__(self):
self.close()
[docs] def close(self):
"""
Closes this socket. Mo more messages can be sent or received after
invoking this method.
"""
if self._app_socket:
self._app_socket.close()
[docs] def send(self, msg: str):
"""
Sends a message to the remote node.
:param msg: The message to send.
:type msg: str
"""
self._logger.debug("Sending msg '%s'", msg)
raw_msg = self._serialize_msg(msg=msg)
self._app_socket.send(raw_msg)
[docs] def send_structured(self, msg: StructuredMessage):
"""
Sends a message to the remote node as a ``StructuredMessage`` object.
:param msg: The message to send.
:type msg: StructuredMessage
"""
self._logger.debug("Sending structured msg '%s'", msg)
raw_msg = self._serialize_structured_msg(msg=msg)
self._app_socket.send(raw_msg)
[docs] def send_silent(self, msg: str):
"""
Sends a message to the remote node without logging it.
:param msg: The message to send.
:type msg: str
"""
self.send(msg)
def _base_recv(self, block: bool, timeout: float, maxsize: int) -> bytes:
if block:
self._app_socket.setblocking(block)
old_timeout = self._app_socket.gettimeout()
self._app_socket.settimeout(timeout)
raw_msg = self._app_socket.recv(maxsize)
self._app_socket.settimeout(old_timeout)
if not block and not raw_msg:
raise RuntimeError("No message to receive (not blocking)")
return raw_msg
[docs] def recv(
self,
block: bool = True,
timeout: Optional[float] = None,
maxsize: Optional[int] = 1024
) -> str:
"""
Receive a message from the remote node.
:param block: Whether the underlying read operation should be blocking or not.
:type block: bool
:param timeout: Max time (in seconds) to wait for a message from the remote.
:type timeout: float | None
:param maxsize: Maximum size of bytes to read from the remote.
:type maxsize: int | None
:return: The received message as a string.
:rtype: str
"""
self._logger.debug("Receiving msg")
raw_msg = self._base_recv(block, timeout, maxsize)
msg = self._deserialize_msg(raw_msg=raw_msg)
self._logger.debug("Msg '%s' received", msg)
return msg
[docs] def recv_structured(
self,
block: bool = True,
timeout: Optional[float] = None,
maxsize: Optional[int] = 1024,
) -> StructuredMessage:
"""
Receives a message from the remote node and parses it as a ``StructuredMessage``.
:param block: Whether the underlying read operation should be blocking or not.
:type block: bool
:param timeout: Max time (in seconds) to wait for a message from the remote.
:type timeout: float | None
:param maxsize: Maximum size of bytes to read from the remote.
:type maxsize: int | None
:return: The parsed message.
:rtype: StructuredMessage
"""
self._logger.debug("Receiving structured msg")
raw_msg = self._base_recv(block, timeout, maxsize)
msg = self._deserialize_structured_msg(raw_msg=raw_msg)
self._logger.debug("Msg '%s' received", msg)
return msg
[docs] def recv_silent(
self,
block: bool = True,
timeout: Optional[float] = None,
maxsize: Optional[int] = None,
) -> str:
"""
Receives a message without logging it. All arguments passed to this
invocation are ignored. For more fine-grain control, please check the
:py:meth:`recv` method.
:param block: Ignored
:type block: bool
:param timeout: Ignored
:type timeout: float | None
:param maxsize: Ignored
:type maxsize: int | None
:return: The received message.
:rtype: str
"""
return self.recv()
@staticmethod
def _serialize_msg(msg: str):
return msg.encode('utf-8')
@staticmethod
def _deserialize_msg(raw_msg: bytes):
return raw_msg.decode('utf-8')
@staticmethod
def _serialize_structured_msg(msg: StructuredMessage):
return dill.dumps(msg)
@staticmethod
def _deserialize_structured_msg(raw_msg: bytes) -> StructuredMessage:
return dill.loads(raw_msg)
@property
def _should_be_server(self) -> bool:
"""
Check whether the local end of this socket will be acting as server or not. The decision
is made based on the node names: the name which is alphabetically before will act as server.
For example: If the socket connects "Alice" with "Bob", "Alice" will act as server, since
the string "Alice" comes lexicographically before the string "Bob".
:return: Whether this end of the socket should act as server or not.
:rtype: bool
"""
# Server will always be the "first"
return self._node_name < self._remote_node_name
def _connect(self, as_server: Optional[bool] = None) -> socket.socket:
self_is_server: bool
if as_server is not None:
self_is_server = as_server
server_name = self._node_name
else:
self_is_server = self._should_be_server
if self_is_server:
server_name = self._node_name
else:
server_name = self._remote_node_name
addr = self._get_addr_info(name=server_name)
app_socket = socket.socket(addr[0], addr[1], addr[2])
attempt = 0
if self_is_server:
self._logger.debug("Trying to open application socket as server")
app_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
while True:
attempt += 1
try:
app_socket.bind(addr[4])
break
except OSError as err:
self._logger.debug(
"Could not bind socket since: %s, Trying again in %ds (attempt %d of %d)...",
err, self.RETRY_TIME, attempt, self.MAX_RETRIES, exc_info=err
)
if attempt > self.MAX_RETRIES:
raise err
time.sleep(self.RETRY_TIME)
app_socket.listen(1)
app_socket.settimeout(self._timeout)
conn, _ = app_socket.accept()
self._logger.debug("Classical socket as server accepted connection.")
connected_socket = conn
else:
self._logger.debug("Trying to open application socket as client")
while True:
attempt += 1
try:
# app_socket.settimeout(self._timeout)
app_socket.connect(addr[4])
break
except ConnectionRefusedError as err:
self._logger.debug(
"Could not open application socket, trying again in %f s (attempt %d of %d)...",
self.RETRY_TIME, attempt, self.MAX_RETRIES, exc_info=err
)
time.sleep(self.RETRY_TIME)
if attempt > self.MAX_RETRIES:
raise err
except Exception as err:
self._logger.exception("Could not open application socket due to unexpected error")
raise err
self._logger.debug("Classical socket connected as client")
connected_socket = app_socket
self._logger.debug("Application socket opened")
return connected_socket
def _get_addr_info(self, name):
app_net = self._get_app_net_config()
remote_host: Host = app_net.hostDict.get(name)
if remote_host is None:
raise ValueError(f"Host name '{name}' is not in the app network")
return remote_host.addr
def _get_app_net_config(self) -> SocketsConfig:
app_net = SocketsConfig(network_config, network_name=self._network_name, config_type="app")
return app_net