Source code for simulaqron.run.run

import os
import signal
import time
from importlib import reload
from importlib.util import find_spec
from os import PathLike
from pathlib import Path
from typing import Callable, Optional, Any, Dict, List, Union, Tuple

from multiprocess.context import ForkContext as ProcessContext
from multiprocess.pool import ApplyResult
from multiprocess.sharedctypes import SynchronizedArray
import logging
from netqasm.logging.output import (reset_struct_loggers,
                                    save_all_struct_loggers)
from netqasm.runtime import env, process_logs
from netqasm.runtime.app_config import AppConfig
from netqasm.runtime.application import ApplicationInstance
from netqasm.runtime.settings import Formalism
from netqasm.sdk.classical_communication import reset_socket_hub
from netqasm.sdk.config import LogConfig
from netqasm.sdk.shared_memory import SharedMemoryManager
from netqasm.util.yaml import dump_yaml

from simulaqron.network import Network
from simulaqron.sdk import SimulaQronConnection
from simulaqron.settings import simulaqron_settings, network_config
from simulaqron.settings import get_default_network_config_file
from simulaqron.settings.simulaqron_config import SimBackend

logger = logging.getLogger()


_SIMULAQRON_BACKENDS = {
    Formalism.STAB: SimBackend.STABILIZER,
    Formalism.KET: SimBackend.PROJECTQ,
    Formalism.DM: SimBackend.QUTIP,
}


def _as_completed(futures: List[ApplyResult], names: List[str]) -> List[Tuple[ApplyResult, str]]:
    if len(futures) is not len(names):
        raise RuntimeError("Not all registered applications have an associated name")
    return [(future, name) for future, name in zip(futures, names)]


[docs]def reset(save_loggers=False): """ Resets the SimulaQron simulation to a clean state, leaving it ready for a new application execution. :param save_loggers: Whether to save the NetQASM's struct logs in a file or not. :type save_loggers: bool """ if save_loggers: save_all_struct_loggers() SharedMemoryManager.reset_memories() reset_socket_hub() reset_struct_loggers() # Reset logging logging.shutdown() reload(logging)
def _setup_sim_backend(sim_backend: SimBackend): if sim_backend in [SimBackend.PROJECTQ, SimBackend.QUTIP]: assert find_spec(sim_backend.value) is not None, \ f"To use {sim_backend} as backend you need to install the package" simulaqron_settings.sim_backend = sim_backend # Global array helper to store PIDs of the children processes running the applications # Note; this array *will not* store the pids of the QNodeOS and/or Vnode processes _apps_pids: Optional[SynchronizedArray] = None def _worker_initializer(synced_array: SynchronizedArray): # We simply store the reference of the synced object for this process global _apps_pids _apps_pids = synced_array def _app_wrapper(**kwargs): assert _apps_pids is not None assert "__instance_num" in kwargs and isinstance(kwargs["__instance_num"], int) assert "__entry_function" in kwargs and isinstance(kwargs["__entry_function"], Callable) # Save the pid for this worker _apps_pids[kwargs["__instance_num"]] = os.getpid() entry_function = kwargs["__entry_function"] del kwargs["__entry_function"] del kwargs["__instance_num"] # TODO - Signal handler for the SIGINT signal? # Call the app main function try: return entry_function(**kwargs) except BaseException as e: _signal_other_apps() raise e def _signal_other_apps(): assert _apps_pids is not None for pid in _apps_pids: # Do not send SIGINT to self process if pid != os.getpid(): os.kill(pid, signal.SIGINT) # The signature of this function was "harmonized" with the `run_applications` method exposed # by the SquidASM simulator. The idea was to allow programs written in NetQASM to be executed # both in SimulaQron and SquidASM _with minimal changes_.
[docs]def run_applications( app_instance: ApplicationInstance, num_rounds: int = 1, network_cfg: Union[str, PathLike, Path] = None, # WARNING - The type of this argument *cannot* be harmonized nv_cfg: Any = None, # Unused; it's here for harmonization with squidasm "simulate_application" log_cfg: LogConfig = None, formalism: Formalism = Formalism.DM, use_app_config: bool = True, post_function: Optional[Callable] = None, enable_logging: bool = True, hardware: Any = None, # Unused; it's here for harmonization with squidasm "simulate_application" init_func: Callable = None, ) -> List[Dict[str, Any]]: """ Executes functions containing quantum applications. :param app_instance: A ``netqasm.runtime.Application`` instance containing the names of the nodes and the function that implements the application. The easiest way to create this object is by using the ``default_app_instance`` from the ``netqasm.runtime.application`` module. Please check the documentation from that method to get more information. :type app_instance: ApplicationInstance :param num_rounds: Number executions for this simulation. :type num_rounds: int :param network_cfg: Path of the network configuration file. :type network_cfg: str | Path | PathLike | None :param nv_cfg: Unused argument. Any parameter given here will be ignored. :type nv_cfg: Any :param log_cfg: Configuration object for the logging. Check the documentation of :py:class:`LogConfig` for more information abut how to configure the logging. :type log_cfg: LogConfig :param formalism: Qubit formalism to use for the simulation. The SimulaQron backend to use depends on this value. :type formalism: Formalism :param use_app_config: Whether to give app_config as argument to app's main(). :type use_app_config: bool :param post_function: Function to execute after all rounds have been executed. :type post_function: Optional[Callable] :param enable_logging: Whether to enable logging. :type enable_logging: bool :param hardware: Unused argument. Any parameter given here will be ignored. :type hardware: Any :param init_func: Function to execute to initialize the state of the child processes. The implemented executor uses the *spawn* method for creating new processes. In this sense, the child processes *do not receive* a copy of the full memory, but only what is needed. In particular, all modules will be reimported in the child processes, hence any state of the classes *will not transfer* to the child processes. :type init_func: Callable :return: List of dictionaries describing the application names and the simulation results. The i-th entry of the list will correspond to the i-th execution round of the simulation. :rtype: List[Dict[str, Any]] """ # Before all; we need to instruct the OMP library to use a single thread to avoid # heavy-processes deadlocks os.environ["OMP_NUM_THREADS"] = "1" # app_names = [app_cfg.app_name for app_cfg in app_cfgs] app_names: List[str] = [program.party for program in app_instance.app.programs] sim_backend: SimBackend = _SIMULAQRON_BACKENDS[formalism] timed_log_dir: str = "" if enable_logging: log_cfg = LogConfig() if log_cfg is None else log_cfg app_instance.logging_cfg = log_cfg log_dir = ( os.path.abspath("./log") if log_cfg.log_dir is None else log_cfg.log_dir ) if not os.path.exists(log_dir): os.mkdir(log_dir) timed_log_dir = env.get_timed_log_dir(log_dir) app_instance.logging_cfg.log_subroutines_dir = timed_log_dir app_instance.logging_cfg.comm_log_dir = timed_log_dir results: List[Dict[str, Any]] = [] # Read the network config if network_cfg is None: network_cfg = get_default_network_config_file() network_cfg = Path(network_cfg).resolve() network_config.read_from_file(network_cfg) network_config.read_from_file(network_cfg) for _ in range(num_rounds): network = Network( nodes=app_names, network_config_file=network_cfg, network_name="default", ) # Start the processes that support the simulator: QNodeOS + VirtualNode network.start() # Create the executor pool process_ctx = ProcessContext() synced_array = process_ctx.Array('i', len(app_instance.app.programs)) executor = process_ctx.Pool( processes=len(app_names) + 3, initializer=_worker_initializer, initargs=[synced_array] ) try: with executor: SimulaQronConnection.PROCESS_POOL = executor global _apps_pids _apps_pids = synced_array logger.debug("Starting simulaqron sim_backend process with nodes %s", app_names) _setup_sim_backend(sim_backend) # Start the application processes app_futures = [] programs = app_instance.app.programs for i, program in enumerate(programs): inputs = app_instance.program_inputs[program.party] if use_app_config: app_cfg = AppConfig( app_name=program.party, node_name=program.party, # node name should be same as app name main_func=program.entry, log_config=app_instance.logging_cfg, inputs=inputs, ) inputs["app_config"] = app_cfg inputs["__instance_num"] = i inputs["__entry_function"] = program.entry future: ApplyResult = executor.apply_async( _app_wrapper, kwds=inputs, ) app_futures.append(future) # for app_cfg in app_cfgs: # inputs = app_cfg.inputs # if use_app_config: # inputs['app_config'] = app_cfg # future = executor.submit(app_cfg.main_func, **inputs) # app_futures.append(future) # Join the application processes and the backend names = [f'app_{app_name}' for app_name in app_names] result = {} futures = _as_completed(app_futures, names) start_time = time.time() while len(result) < len(app_names): for future, name in futures: if name in result: continue if future.ready(): result[name] = future.get() time.sleep(0.1) waited_for = time.time() - start_time if 0.0 < simulaqron_settings.max_app_waiting_time < waited_for: raise TimeoutError("SimulaQron: max app waiting time exceeded; " "app did not finish in time. Please check that " "your code runs correctly standalone. If your " "code takes a long time to run, please adjust the " "value of 'max_app_waiting_time' in your simulaqron" "settings file.") # if results_file is not None: # save_results(results=results, results_file=results_file) if enable_logging: assert timed_log_dir is not None path = os.path.join(timed_log_dir, "results.yaml") dump_yaml(data=result, file_path=path) results.append(result) finally: network.stop() reset(save_loggers=True) if enable_logging: process_logs.make_last_log(log_dir=timed_log_dir) return results