Source code for simulaqron.netqasm_backend.factory

# Copyright (c) 2017, Stephanie Wehner and Axel Dahlberg
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 3. All advertising materials mentioning features or use of this software
#    must display the following acknowledgement:
#    This product includes software developed by Stephanie Wehner, QuTech.
# 4. Neither the name of the QuTech organization nor the
#    names of its contributors may be used to endorse or promote products
#    derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER ''AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from typing import Type, Dict

from netqasm.backend.messages import MessageHeader, ErrorCode, deserialize_host_msg, Message, \
    InitNewAppMessage
import logging
from twisted.internet.defer import DeferredLock, inlineCallbacks
from twisted.internet.protocol import Factory, Protocol, connectionDone
from twisted.internet.task import deferLater

from simulaqron.reactor import reactor
from simulaqron.general.host_config import SocketsConfig, Host
from simulaqron.netqasm_backend.qnodeos import SubroutineHandler
from simulaqron.sdk.connection import RichErrorMessage
from simulaqron.settings import network_config
from simulaqron.virtual_node.virtual import call_method


[docs]class IncompleteMessageError(ValueError): """ Raised when trying to parse an incomplete NetQASM message. """ pass
[docs]class NetQASMProtocol(Protocol): # Dictionary storing the next unique qubit id for each used app_id _next_q_id = {} # Dictionary storing the next unique entanglement id for each used # (host_app_id,remote_node,remote_app_id) _next_ent_id = {} def __init__(self, factory: "NetQASMFactory"): """ Implementation of a ``twisted.internet.protocol.Protocol`` class that handles the connection to a server capable of handling NetQASM messages. This class is the entry point when a new NetQASM message is received. It is also responsible for calling (in a "twisted deferred" way) the message handlers that implement the logic of handling and executing the message. :param factory: The :py:class:`NetQASMFactory` object that handles this object. :type factory: NetQASMFactory """ # NetQASM Factory, including our connection to the SimulaQron backend self.factory = factory # Default application ID, typically one connection per application but # we will deliberately NOT check for that since this is the task of # higher layers or an OS self.app_id = 0 # Define the backend to use. self.messageHandler = factory.backend self.messageHandler.protocol = self # Flag to determine whether we already received _all_ of the NetQASM header self.got_netqasm_header = False # Header for which we are currently processing a packet self.currHeader = None # Buffer received data (which may arrive in chunks) self.buf = None # Convenience self.name = self.factory.name self._logger = logging.getLogger(f"{self.__class__.__name__}({self.name})") self._logger.debug("Initialized Protocol")
[docs] def connectionMade(self): """ Implementation of the ``connectionMade`` method from the ``twisted.internet.protocol.Protocol`` class. """ self._logger.info("Connection made") pass
[docs] def connectionLost(self, reason=connectionDone): """ Implementation of the ``connectionLost`` method from the ``twisted.internet.protocol.Protocol`` class. """ self._logger.info(f"Connection lost: {reason}") self.factory._active_protocol = None self._cleanup_all()
def _cleanup_all(self): """ Clean up all state - Note that we allow only ONE connection to one NetQASM node server at a time, making the below safe. If we were to ever decide to allow multiple, the below is too radical and decidedly unsafe. """ self._logger.info("Cleaning up all state") # Clear qubit list self.factory.qubitList.clear() # Clear protocol class state NetQASMProtocol._next_q_id.clear() NetQASMProtocol._next_ent_id.clear() # Clear executioner class state from simulaqron.netqasm_backend.executioner import VanillaSimulaQronExecutioner VanillaSimulaQronExecutioner._next_create_id.clear() # Clear executioner instance state executor = self.messageHandler._executor if hasattr(executor, '_network_stack'): executor._network_stack._sockets.clear() if hasattr(executor, '_epr_create_requests'): executor._epr_create_requests.clear() if hasattr(executor, '_epr_recv_requests'): executor._epr_recv_requests.clear() # Reset shared memory try: from netqasm.sdk.shared_memory import SharedMemoryManager SharedMemoryManager.reset_memories() except Exception as e: self._logger.debug(f"Could not reset shared memory: {e}") self._logger.info("Cleanup complete")
[docs] def dataReceived(self, data: bytes): """ We will always wait to receive enough data for the header, and then the entire packet first before commencing processing. This method is the first entry point when the SimulaQron QNodeOS server receives a NetQASM message. Once any data is received, it attaches it to an internal buffer, and tries to parse a NetQASM message. If a message cannot be parsed, the raw data will be stored and this method will return, waiting form more data. It a message can be parsed, it will be for further processing to the message handler (usually the :py:class:´SubroutineHandler´ class). :param data: The data received from the remote. :type data: bytes """ # Read whatever we received into a buffer if self.buf: self.buf = self.buf + data else: self.buf = data try: msg_id, msg = self._parse_message() except IncompleteMessageError: return d = self.messageHandler.handle_netqasm_message(msg_id=msg_id, msg=msg) d.addCallback(self._log_handled_message) d.addErrback(self._log_error)
def _log_handled_message(self, result): self._logger.info("Finished handling message with result = %s", result) @inlineCallbacks def _log_error(self, failure): self._logger.error("Handling message failed with failure = %s", failure.value) # Use the executor's current msg_id so the client's _wait_for_done loop # can unblock for the correct subroutine rather than hanging forever. msg_id = getattr(getattr(self.messageHandler, '_executor', None), '_current_msg_id', 0) if not isinstance(msg_id, int): msg_id = 0 self._return_msg(msg=RichErrorMessage(err_code=ErrorCode.GENERAL, err_msg=str(failure.value), msg_id=msg_id)) # self.transport.abortConnection() # yield None yield deferLater(reactor, 0.1, self.stop)
[docs] def stop(self): """ Stops this NetQASM protocol object. No further messages will be handled after invoking this method. """ self.factory.stop()
def _parse_message(self): try: msg_hdr = MessageHeader.from_buffer_copy(self.buf) except ValueError: raise IncompleteMessageError if len(self.buf) < msg_hdr.length: raise IncompleteMessageError msg = deserialize_host_msg(self.buf[MessageHeader.len():]) self.buf = self.buf[msg_hdr.length:] return msg_hdr.id, msg def _handle_init_new_app(self, msg: InitNewAppMessage): app_id = msg.app_id self._add_app(app_id=app_id) max_qubits = msg.max_qubits self._logger.debug("Allocating a new unit module" "of size %d for application with app ID %d.\n", max_qubits, app_id) self._executioner.init_new_application( app_id=app_id, max_qubits=max_qubits, ) def _return_msg(self, msg: Message | bytes): """ Return a msg to the host. """ if isinstance(msg, bytes): self.transport.write(msg) else: self.transport.write(bytes(msg))
############################################################################### # # NetQASM Factory # # Twisted factory for the NetQASM protocol #
[docs]class NetQASMFactory(Factory): def __init__( self, host: Host, name: str, qnodeos_net: SocketsConfig, backend: Type[SubroutineHandler], network_name: str = "default" ): """ Factory class that creates :py:class:`NetQASMProtocol` objects. This factory will create one protocol instance per connection. This factory is handled by the internals of the twisted reactor, and should not be instantiated by the user. :param host: The hostname to listen to new connections. :type host: str :param name: A name for this protocol factory. :type name: str :param qnodeos_net: The :py:class:`SocketsConfig` object containing the *QNodeOS* sockets specifications. :type qnodeos_net: SocketsConfig :param backend: The class of QNodeOS subroutine handlers that the protocol will forward messages to. :type backend: Type[SubroutineHandler] :param network_name: The name of the network to handle messages from. :type network_name: str """ self.host = host self.name = name self.qnodeos_net = qnodeos_net self.virtRoot = None self.qReg = None self.backend = backend(self) self.network_name = network_name # Dictionary that keeps qubit dictionaries for each application self.qubitList: Dict[int, "VirtualQubitRef"] = {} # noqa: F821 # Lock governing access to the qubitList self._lock = DeferredLock() self._logger = logging.getLogger(f"{self.__class__.__name__}({name})") # Read in topology, if specified. topology=None means fully connected # topology self.topology = network_config[network_name].topology # Track active connection - we will allow only one at a time # as the code for the netqasm backend was not designed for multiple # even though the virtual Node backend of SimulaQron would allow # that self._active_protocol = None
[docs] def stop(self): """ Stops this instance of the factory. No more protocol objects will be created after invoking this method. """ yield call_method(self.virtRoot, "stop_vnode") reactor.stop()
[docs] def buildProtocol(self, addr): """ Return an instance of NetQASMProtocol when a connection is made. """ protocol = NetQASMProtocol(self) self._active_protocol = protocol return protocol
[docs] def set_virtual_node(self, virtRoot): """ Set the virtual root allowing connections to the SimulaQron backend. :param virtRoot: The virtual root object. """ self.virtRoot = virtRoot
[docs] def lookup(self, ip: int, port: int) -> str | None: """ Lookup name of remote host used within SimulaQron given ip and port number. :param ip: The IP address to look for. This value needs to be transformed into an integer value. :param port: The port to look for. :return: The name of the node that matches the IP-port pair. None if none matches. :rtype: str | None """ for entry in self.qnodeos_net.hostDict: node = self.qnodeos_net.hostDict[entry] if (node.ip == ip) and (node.port == port): return node.name self._logger.debug("No such node") return None
[docs] def is_adjacent(self, remote_host_name: str): """ Checks if remote host is adjacent to this node, according to the specified topology. :param remote_host_name: The name of the remote host :type remote_host_name: str """ # Check if a topology is defined, otherwise use fully connected if self.topology is None: return True if self.name in self.topology: if remote_host_name in self.topology[self.name]: return True else: return False else: self._logger.warning( f"Node {self.name} is not in the specified topology and is therefore " "assumed to have no neighbors" ) return False