#
# Copyright (c) 2017, Stephanie Wehner and Axel Dahlberg
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. All advertising materials mentioning features or use of this software
# must display the following acknowledgement:
# This product includes software developed by Stephanie Wehner, QuTech.
# 4. Neither the name of the QuTech organization nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER ''AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import time
import socket
import logging
from cqc.cqcHeader import (
CQC_TP_NEW_OK,
CQC_TP_RECV,
CQC_TP_EPR_OK,
CQC_TP_MEASOUT,
CQC_TP_EXPIRE,
CQC_TP_INF_TIME,
Header,
CQCHeader,
CQCMeasOutHeader,
CQCTimeinfoHeader,
CQCXtraQubitHeader,
)
from cqc.entInfoHeader import EntInfoHeader
from cqc.hostConfig import cqc_node_id_from_addrinfo
from .cqc_handler import CQCHandler
from .util import CQCUnsuppError
from .qubit import qubit
try:
import simulaqron
from simulaqron.general.hostConfig import socketsConfig
from simulaqron.settings import simulaqron_settings
_simulaqron_version = simulaqron.__version__
_simulaqron_major = int(_simulaqron_version.split('.')[0])
except ImportError:
_simulaqron_major = -1
[docs]class CQCConnection(CQCHandler):
"""Handler to be used when sending commands over a socket."""
[docs] def __init__(self, name, socket_address=None, appID=None, pend_messages=False,
retry_connection=True, conn_retry_time=0.1, log_level=None, backend=None,
use_classical_communication=True, network_name=None):
"""
Initialize a connection to the cqc server.
Since version 3.0.0: If socket_address is None or use_classical_communication is True, the CQC connection
needs some way of finding the correct socket addresses. If backend is None or "simulaqron" the connection
will try to make use of the network config file setup in simulaqron. If simulaqron is not installed
- **Arguments**
:param name: Name of the host.
:param socket_address: tuple (str, int) of ip and port number.
:param appID: Application ID. If set to None, defaults to a nonused ID.
:param pend_messages: True if you want to wait with sending messages to the back end.
Use flush() to send all pending messages in one go as a sequence to the server
:param retry_connection: bool
Whether to retry a failed connection or not
:param conn_retry_time: float
How many seconds to wait between each connection retry
:param log_level: int or None
The log-level, for example logging.DEBUG (default: logging.WARNING)
:param backend: None or str
If socket_address is None or use_classical_communication is True, If None or "simulaqron" is used
the cqc library tries to use the network config file setup in simulaqron if network_config_file is None.
If network_config_file is None and simulaqron is not installed a ValueError is raised.
:param use_classical_communication: bool
Whether to use the built-in classical communication or not.
:param network_name: None or str
Used if simulaqron is used to load socket addresses for the backend
"""
super().__init__(
name=name,
app_id=appID,
pend_messages=pend_messages,
)
self._setup_logging(log_level)
# Connection retry time
self._conn_retry_time = conn_retry_time
# Buffer received data
self.buf = None
# ClassicalServer
self._classicalServer = None
# Classical connections in the application network
self._classicalConn = {}
# Get network configuraton and addresses
addr, cqc_net, app_net = self._setup_network_data(
socket_address=socket_address,
use_classical_communication=use_classical_communication,
backend=backend,
network_name=network_name,
)
self._cqcNet = cqc_net
self._appNet = app_net
# Open a socket to the backend
self._s = None
cqc_socket = self._setup_socket(addr=addr, retry_connection=retry_connection)
self._s = cqc_socket
@staticmethod
def _setup_logging(level):
"""
Sets up the logging to the specified level (default logging.WARNING)
:param level: int or None
For example logging.DEBUG
:return: None
"""
if level is None:
logging.basicConfig(format="%(asctime)s:%(levelname)s:%(message)s", level=logging.WARNING)
else:
logging.basicConfig(format="%(asctime)s:%(levelname)s:%(message)s", level=level)
def _setup_network_data(self, socket_address, use_classical_communication, backend,
network_name):
addr = None
cqc_net = None
app_net = None
if socket_address is None or use_classical_communication:
cqc_net, app_net = self._get_net_configs(
use_classical_communication=use_classical_communication,
backend=backend,
network_name=network_name,
)
# Host data
if self.name in cqc_net.hostDict:
myHost = cqc_net.hostDict[self.name]
else:
raise ValueError("Host name '{}' is not in the cqc network".format(self.name))
# Get IP and port number
addr = myHost.addr
if socket_address is not None:
try:
hostname, port = socket_address
if not isinstance(hostname, str):
raise TypeError()
if not isinstance(port, int):
raise TypeError()
addrs = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP, family=socket.AF_INET)
addr = addrs[0]
except Exception:
raise TypeError("When specifying the socket address, this should be a tuple (str,int).")
return addr, cqc_net, app_net
def _get_net_configs(self, use_classical_communication, backend, network_name):
cqc_net = None
app_net = None
if backend is None or backend == "simulaqron":
if _simulaqron_major < 3:
raise ValueError("If (socket_address is None or use_classical_communication is True)"
"and (backend is None or 'simulaqron'\n"
"you need simulaqron>=3.0.0 installed.")
else:
network_config_file = simulaqron_settings.network_config_file
cqc_net = socketsConfig(network_config_file, network_name=network_name, config_type="cqc")
if use_classical_communication:
app_net = socketsConfig(network_config_file, network_name=network_name, config_type="app")
else:
raise ValueError("Unknown backend")
return cqc_net, app_net
def _setup_socket(self, addr, retry_connection):
cqc_socket = None
while True:
try:
logging.debug("App {} : Trying to connect to CQC server".format(self.name))
cqc_socket = socket.socket(addr[0], addr[1], addr[2])
cqc_socket.connect(addr[4])
break
except ConnectionRefusedError as err:
logging.debug("App {} : Could not connect to CQC server, trying again...".format(self.name))
time.sleep(self._conn_retry_time)
cqc_socket.close()
if not retry_connection:
self.close()
raise err
except Exception as err:
logging.exception("App {} : Critical error when connection to CQC server: {}".format(self.name, err))
cqc_socket.close()
raise err
return cqc_socket
[docs] def commit(self, msg):
"""Send message through the socket."""
self._s.send(msg)
[docs] def close(self, release_qubits=True, notify=True):
"""Handle closing actions.
Flushes remaining headers, releases all qubits, closes the
connections, and removes the app ID from the used app IDs.
"""
super().close()
if self._s is not None:
self._s.close()
self.closeClassicalServer()
for name in list(self._classicalConn):
self.closeClassicalChannel(name)
[docs] def new_qubitID(self, print_cqc=False):
"""Provide new qubit ID.
For CQCConnection the qubit ID is given by the server. A message
has to be read and the qubit ID extracted from it.
"""
msg = self.readMessage()
otherHdr = msg[1]
if print_cqc:
self.print_CQC_msg(msg)
return otherHdr.qubit_id
[docs] def startClassicalServer(self):
"""Sets up a server for the application communication,
if not already set up.
"""
if self._appNet is None:
raise ValueError(
"Since use_classical_communication was set to False upon init, the built-in classical communication"
"cannot be used."
)
if not self._classicalServer:
logging.debug("App {}: Starting classical server".format(self.name))
# Get host data
myHost = self._appNet.hostDict[self.name]
hostaddr = myHost.addr
# Setup server
s = socket.socket(hostaddr[0], hostaddr[1], hostaddr[2])
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(hostaddr[4])
s.listen(1)
(conn, addr) = s.accept()
logging.debug("App {}: Classical server started".format(self.name))
self._classicalServer = conn
[docs] def closeClassicalServer(self):
"""Closes classical server."""
if self._classicalServer:
logging.debug("App {}: Closing classical server".format(self.name))
self._classicalServer.close()
logging.debug("App {}: Classical server closed".format(self.name))
self._classicalServer = None
[docs] def recvClassical(self, timout=1, msg_size=1024, close_after=True):
"""Receive classical message."""
if not self._classicalServer:
self.startClassicalServer()
for _ in range(10 * timout):
logging.debug("App {}: Trying to receive classical message".format(self.name))
msg = self._classicalServer.recv(msg_size)
if len(msg) > 0:
logging.debug("App {}: Received classical message".format(self.name))
if close_after:
self.closeClassicalServer()
return msg
time.sleep(0.1)
raise RuntimeError("Timeout: No message received")
[docs] def openClassicalChannel(self, name):
"""
Opens a classical connection to another host in the application network.
- **Arguments**
:name: The name of the host in the application network.
:timout: The time to try to connect to the server. When timout is reached an RuntimeError is raised.
"""
if self._appNet is None:
raise ValueError(
"Since use_classical_communication was set to False upon init, the built-in classical communication"
"cannot be used."
)
if name not in self._classicalConn:
logging.debug("App {}: Opening classical channel to {}".format(self.name, name))
if name in self._appNet.hostDict:
remoteHost = self._appNet.hostDict[name]
else:
raise ValueError("Host name '{}' is not in the cqc network".format(name))
addr = remoteHost.addr
while True:
try:
s = socket.socket(addr[0], addr[1], addr[2])
s.connect(addr[4])
logging.debug("App {}: Classical channel to {} opened".format(self.name, name))
break
except ConnectionRefusedError:
logging.debug(
"App {}: Could not open classical channel to {}, trying again..".format(self.name, name)
)
time.sleep(self._conn_retry_time)
except Exception as e:
logging.warning(
"App {} : Critical error when connection to app node {}: {}".format(self.name, name, e)
)
break
self._classicalConn[name] = s
[docs] def closeClassicalChannel(self, name):
"""
Closes a classical connection to another host in the application network.
- **Arguments**
:name: The name of the host in the application network.
"""
if name in self._classicalConn:
logging.debug("App {}: Closing classical channel to {}".format(self.name, name))
s = self._classicalConn.pop(name)
s.close()
logging.debug("App {}: Classical channel to {} closed".format(self.name, name))
[docs] def sendClassical(self, name, msg, close_after=True):
"""
Sends a classical message to another host in the application network.
- **Arguments**
:name: The name of the host in the application network.
:msg: The message to send. Should be either a int in range(0,256) or a list of such ints.
:timout: The time to try to connect to the server. When timout is reached an RuntimeError is raised.
"""
if name not in self._classicalConn:
self.openClassicalChannel(name)
try:
to_send = bytes([int(msg)])
except (TypeError, ValueError):
to_send = bytes(msg)
logging.debug("App {}: Sending classical message {} to {}".format(self.name, to_send, name))
self._classicalConn[name].send(to_send)
logging.debug("App {}: Classical message {} to {} sent".format(self.name, to_send, name))
if close_after:
self.closeClassicalChannel(name)
def _handle_create_qubits(self, num_qubits, notify):
qubits = []
for _ in range(num_qubits):
msg = self.readMessage()
self.check_error(msg[0])
if msg[0].tp != CQC_TP_NEW_OK:
raise CQCUnsuppError("Unexpected message of type {} send back from backend".format(msg[0].tp))
qubits.append(self.parse_CQC_msg(msg))
self.print_CQC_msg(msg)
if notify:
message = self.readMessage()
self._assert_done_message(message)
self.print_CQC_msg(message)
return qubits
[docs] def readMessage(self, maxsize=192): # WHAT IS GOOD SIZE?
"""Receive the whole message from cqc server.
Returns (CQCHeader,None,None), (CQCHeader,CQCNotifyHeader,None)
or (CQCHeader,CQCNotifyHeader,EntInfoHeader) depending on the
type of message.
Maxsize is the max size of message.
"""
# Initialize checks
gotCQCHeader = False
if self.buf:
checkedBuf = False
else:
checkedBuf = True
while True:
# If buf does not contain enough data, read in more
if checkedBuf:
# Receive data
data = self._s.recv(maxsize)
# Read whatever we received into a buffer
if self.buf:
self.buf += data
else:
self.buf = data
# If we don't have the CQC header yet, try and read it in full.
if not gotCQCHeader:
if len(self.buf) < CQCHeader.HDR_LENGTH:
# Not enough data for CQC header, return and wait for the rest
checkedBuf = True
continue
# Got enough data for the CQC Header so read it in
gotCQCHeader = True
rawHeader = self.buf[0:CQCHeader.HDR_LENGTH]
currHeader = CQCHeader(rawHeader)
# Remove the header from the buffer
self.buf = self.buf[CQCHeader.HDR_LENGTH : len(self.buf)]
# Check for error
self.check_error(currHeader)
# Check whether we already received all the data
if len(self.buf) < currHeader.length:
# Still waiting for data
checkedBuf = True
continue
else:
break
# We got all the data, read other headers if there is any
if currHeader.length == 0:
return currHeader, None, None
else:
if currHeader.tp == CQC_TP_INF_TIME:
timeinfo_header = self._extract_header(CQCTimeinfoHeader)
return currHeader, timeinfo_header, None
elif currHeader.tp == CQC_TP_MEASOUT:
measout_header = self._extract_header(CQCMeasOutHeader)
return currHeader, measout_header, None
elif currHeader.tp in [CQC_TP_RECV, CQC_TP_NEW_OK, CQC_TP_EXPIRE]:
xtra_qubit_header = self._extract_header(CQCXtraQubitHeader)
return currHeader, xtra_qubit_header, None
elif currHeader.tp == CQC_TP_EPR_OK:
xtra_qubit_header = self._extract_header(CQCXtraQubitHeader)
ent_info_hdr = self._extract_header(EntInfoHeader)
return currHeader, xtra_qubit_header, ent_info_hdr
def _extract_header(self, header_class):
"""
Extracts the given header class from the first part of the current buffer.
:param header_class: Subclassed from `cqc.backend.cqcHeader.Header`
:return: An instance of the class
"""
if not issubclass(header_class, Header):
raise ValueError("header_class {} is not a subclass of Header".format(header_class))
try:
rawHeader = self.buf[:header_class.HDR_LENGTH]
except IndexError:
raise ValueError("Got a header message of unexpected size")
self.buf = self.buf[header_class.HDR_LENGTH: len(self.buf)]
header = header_class(rawHeader)
return header
[docs] def sendQubit(self, q, name, remote_appID=0, notify=True, block=True, remote_socket=None):
"""Sends qubit to another node in the cqc network.
If this node is not in the network an error is raised.
- **Arguments**
:q: The qubit to send.
:Name: Name of the node as specified in the cqc network config file.
:remote_appID: The app ID of the application running on the receiving node.
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:remote_socket: tuple (str, int) of ip and port number. Needed if no cqcFile was specified
"""
q = super().sendQubit(
q=q,
name=name,
remote_appID=remote_appID,
notify=notify,
block=block,
remote_socket=remote_socket,
)
[docs] def createEPR(self, name, remote_appID=0, notify=True, block=True, remote_socket=None):
"""Creates epr with other host in cqc network.
- **Arguments**
:name: Name of the node as specified in the cqc network config file.
:remote_appID: The app ID of the application running on the receiving node.
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:remote_socket: tuple (str, int) of ip and port number. Needed if no cqcFile was specified
"""
return super().createEPR(
name,
remote_appID=remote_appID,
notify=notify,
block=block,
remote_socket=remote_socket,
)
def _handle_epr_response(self, notify):
# Get RECV message
message = self.readMessage()
otherHdr = message[1]
entInfoHdr = message[2]
q_id = otherHdr.qubit_id
self.print_CQC_msg(message)
if notify:
message = self.readMessage()
self.print_CQC_msg(message)
# initialize the qubit
q = qubit(self, createNew=False)
q._set_entanglement_info(entInfoHdr)
q._qID = q_id
# Activate and return qubit
q._set_active(True)
return q
def _handle_factory_response(self, num_iter, response_amount, should_notify=False):
"""Handles the responses from a factory command and returns a list of results"""
res = []
for _ in range(num_iter):
for _ in range(response_amount):
message = self.readMessage()
self.check_error(message[0])
# TODO handle new qubit!
res.append(self.parse_CQC_msg(message))
self.print_CQC_msg(message)
if should_notify:
message = self.readMessage()
self.check_error(message[0])
return res
[docs] def return_meas_outcome(self):
"""Return measurement outcome."""
msg = self.readMessage()
try:
otherHdr = msg[1]
return otherHdr.outcome
except AttributeError:
raise RuntimeError("Didn't receive a measurement outcome")
message = self.readMessage()
self._assert_done_message(message)
self.print_CQC_msg(message)
[docs] def get_remote_from_directory_or_address(self, name, remote_socket=None):
cqcNet = self._cqcNet
if remote_socket is None:
try:
# Get receiving host
hostDict = cqcNet.hostDict
except AttributeError:
raise ValueError(
"If a CQCConnections is initialized without specifying a cqcFile you need to also provide a"
"socket address for the remote node here."
)
if name in hostDict:
recvHost = hostDict[name]
remote_ip = recvHost.ip
remote_port = recvHost.port
else:
raise ValueError("Host name '{}' is not in the cqc network".format(name))
else:
try:
remote_host, remote_port = remote_socket
if not isinstance(remote_host, str):
raise TypeError()
if not isinstance(remote_port, int):
raise TypeError()
except Exception:
raise TypeError("When specifying the remote socket address, this should be a tuple (str,int).")
# Pack the IP
addrs = socket.getaddrinfo(remote_host, remote_port, proto=socket.IPPROTO_TCP, family=socket.AF_INET)
addr = addrs[0]
remote_ip = cqc_node_id_from_addrinfo(addr)
remote_port = addr[4][1]
return remote_ip, remote_port