Source code for maxwelllink.mxl_drivers.python.models.qutip_model

import numpy as np
import os, importlib.util, ast
from typing import Dict, Optional

try:
    from .dummy_model import DummyModel
except:
    from dummy_model import DummyModel

try:
    import qutip as qt
except ImportError as e:
    raise ImportError(
        "QuTiPModel requires qutip. Install with `conda install conda-forge::qutip`."
    ) from e


def _load_spec_module(path: str):
    """
    Import a user-supplied Python file that defines ``build_model(**kwargs)``.

    Parameters
    ----------
    path : str
        Path to the Python file.

    Returns
    -------
    module
        The imported Python module containing ``build_model(**kwargs)``.

    Raises
    ------
    ImportError
        If the module cannot be loaded from the given path.
    AttributeError
        If the loaded module does not define ``build_model(**kwargs)``.
    """

    path = os.path.abspath(path)
    spec = importlib.util.spec_from_file_location("mxl_qutip_user_spec", path)
    if spec is None or spec.loader is None:
        raise ImportError(f"Cannot load module from {path}")
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)
    if not hasattr(mod, "build_model"):
        raise AttributeError(
            f"{path} must define a function `build_model(**kwargs)` "
            "that returns a dict with keys: H0, mu_ops (dict), c_ops (list), rho0."
        )
    return mod


def _parse_kwargs_string(s: str) -> Dict:
    """
    Parse a compact ``'k1=v1,k2=v2'`` string into a dictionary with numbers/bools
    auto-cast. Used for preset parameters and for passing kwargs to a user spec.

    Parameters
    ----------
    s : str
        Input string.

    Returns
    -------
    dict
        Parsed key-value pairs with best-effort type casting.
    """

    if not s:
        return {}

    def _strip_quotes(s):
        if not isinstance(s, str):
            return s
        s = s.strip()
        if (len(s) >= 2) and ((s[0] == s[-1]) and s[0] in ("'", '"')):
            return s[1:-1]
        return s

    s = _strip_quotes(s)
    out = {}
    for token in s.split(","):
        if not token.strip():
            continue
        if "=" not in token:
            # allow bare flags as True
            out[token.strip()] = True
            continue
        k, v = token.split("=", 1)
        k = k.strip()
        v = v.strip()
        # try literal (int/float/bool/list/tuple) -> else string
        try:
            out[k] = ast.literal_eval(v)
        except Exception:
            # also allow "true"/"false"
            lv = v.lower()
            if lv in ("true", "false"):
                out[k] = lv == "true"
            else:
                out[k] = v
    return out


def _calc_mu_vector_expectation(rho, mu_ops):
    """
    Return :math:`\\langle\\mu_x\\rangle`, :math:`\\langle\\mu_y\\rangle`,
    :math:`\\langle\\mu_z\\rangle` as a length-3 NumPy array.

    Parameters
    ----------
    rho : qutip.Qobj
        Density matrix of the system.
    mu_ops : dict
        Dictionary with keys ``'x'``, ``'y'``, ``'z'`` and values as ``qutip.Qobj`` or
        ``None``, storing dipole operators.

    Returns
    -------
    numpy.ndarray of float, shape (3,)
        The expectation values of the dipole components.
    """

    vec = np.zeros(3, dtype=float)
    for i, key in enumerate(("x", "y", "z")):
        op = mu_ops.get(key, None)
        if op is not None:
            vec[i] = float(qt.expect(op, rho))
    return vec


def _build_model_tls(
    omega=0.242,
    mu12=187,
    orientation=2,
    pe_initial=0.0,
    gamma_relax=0.0,
    gamma_dephase=0.0,
):
    r"""
    Simple 2-level model preset like ``TLSModel``, but using QuTiP objects.

    Equations
    ---------
    - :math:`H_0 = \\lvert e \\rangle \\langle e \\rvert \\, \\omega`
    - :math:`\\boldsymbol{\\mu} = \\mu_{12}\\big(\\lvert g \\rangle \\langle e \\rvert + \\lvert e \\rangle \\langle g \\rvert\\big)` along the chosen axis
    - Lindblad: relaxation (:math:`\\sigma_-`) at rate ``gamma_relax``, pure dephasing at ``gamma_dephase``.

    This function provides a reference implementation for the ``build_model(**kwargs)`` function.

    Parameters
    ----------
    omega : float, default: 0.242
        Transition frequency in atomic units (a.u.).
    mu12 : float, default: 187
        Dipole moment in atomic units (a.u.).
    orientation : int, default: 2
        Orientation of the dipole moment; can be ``0`` (x), ``1`` (y), or ``2`` (z).
    pe_initial : float, default: 0.0
        Initial population in the excited state.
    gamma_relax : float, default: 0.0
        Relaxation rate (a.u.).
    gamma_dephase : float, default: 0.0
        Pure dephasing rate (a.u.).

    Returns
    -------
    dict
        A dictionary with keys ``H0``, ``mu_ops``, ``c_ops``, and ``rho0``.
    """

    # basis
    g = qt.basis(2, 0)
    e = qt.basis(2, 1)
    H0 = omega * e * e.dag()

    # dipole operator along chosen axis; others set to None
    sigmax = g * e.dag() + e * g.dag()
    mux = muy = muz = None
    dip = mu12 * sigmax
    if orientation == 0:
        mux = dip
    elif orientation == 1:
        muy = dip
    else:
        muz = dip
    mu_ops = {"x": mux, "y": muy, "z": muz}

    # collapse operators
    c_ops = []
    if gamma_relax > 0.0:
        c_ops.append(np.sqrt(gamma_relax) * (g * e.dag()))
    if gamma_dephase > 0.0:
        c_ops.append(np.sqrt(gamma_dephase) * (e * e.dag() - g * g.dag()))

    # initial state (density matrix form of a coherent state)
    pe = pe_initial
    rho0 = (
        (1.0 - pe) * (g * g.dag())
        + pe * (e * e.dag())
        + np.sqrt(pe * (1.0 - pe)) * (g * e.dag() + e * g.dag())
    )

    return dict(H0=H0, mu_ops=mu_ops, c_ops=c_ops, rho0=rho0)


[docs] class QuTiPModel(DummyModel): """ General N-level quantum model driven by an external E-field using QuTiP. The time-dependent Hamiltonian is :math:`H(t) = H_0 - E_x(t)\\mu_x - E_y(t)\\mu_y - E_z(t)\\mu_z`. Two options for constructing the N-level quantum model are provided: - **Preset TLS** via a simple CLI parameter, e.g.: ``--param "preset=tls,preset_kwargs=omega=0.242,mu12=187,orientation=2,pe_initial=1e-4"`` - **Fully custom model** via: ``--param "module=/path/spec.py,kwargs=..."`` The module must define a callable ``build_model(**kwargs)`` that returns a dictionary with the following keys: .. code-block:: python def build_model(**kwargs): return { "H0": qutip.Qobj, # (N x N) "mu_ops": {"x": Qobj|None, "y": Qobj|None, "z": Qobj|None}, "c_ops": [Qobj, ...], "rho0": Qobj, # ket or density matrix } Optional fields may be omitted; defaults: no ``c_ops``, and ``rho0`` is the ground state if not provided. """
[docs] def __init__( self, # --- Preset controls --- preset: str = "tls", preset_kwargs: str = "", # --- Custom module controls --- module: Optional[str] = None, kwargs: str = "", # --- finite-difference or analytical dmu/dt --- fd_dmudt: bool = False, # --- Common controls --- verbose: bool = False, checkpoint: bool = False, restart: bool = False, **extra, ): """ Initialize the necessary parameters for the QuTiP quantum dynamics model. Parameters ---------- preset : str, default: 'tls' Preset model name, e.g. ``'tls'``. Default is ``'tls'``. preset_kwargs : str Comma-separated ``key=value`` pairs for the preset, such as ``preset_kwargs=omega=0.242,mu12=187,orientation=2,pe=1e-4``. All key value pairs not recognized will be treated as preset parameters. module : str or None, default: None Path to a Python file defining ``build_model(**kwargs)``. kwargs : str Comma-separated ``key=value`` pairs for the user module, such as ``kwargs=omega=0.242,mu12=187,orientation=2,pe=1e-4``. All key value pairs not recognized will be treated as user module parameters if ``module`` is not ``None``. fd_dmudt : bool, default: False Whether to use finite-difference :math:`\\mathrm{d}\\mu/\\mathrm{d}t` for current density computation. Default is ``False``. If ``False``, an analytical derivative will be used if available. verbose : bool, default: False Whether to print verbose output. Default is ``False``. checkpoint : bool, default: False Whether to enable checkpointing. Default is ``False``. restart : bool, default: False Whether to restart from a checkpoint if available. Default is ``False``. **extra Additional keyword arguments for future extensions. """ super().__init__(verbose=verbose, checkpoint=checkpoint, restart=restart) # First deal with preset parsing and merging with top-level tokens. # Given --param `preset_kwargs=omega=0.242,mu12=187,orientation=2,pe=1e-4`, the mxl_driver will # parse it as `preset_kwargs=omega=0.242; mu12=187; orientation=2; pe=1e-4` (splitted by the comma). # By default, `mu12=187; orientation=2; pe=1e-4` will be merged into extra dict. We need to collect # them back to preset_kwargs for the preset model to consume. This brings some inconvenience. # 1. given known subkeys for the TLS preset _tls_keys = { "omega", "mu12", "orientation", "pe_initial", "gamma_relax", "gamma_dephase", } # 2. recover the first hit of 'preset_kwargs' self.preset = preset.lower().strip() self.preset_kwargs = _parse_kwargs_string(preset_kwargs) # 3. merge available top-level tokens in **extra** to TLS preset for k in list(extra.keys()): if k in _tls_keys: # here we do not attempt to pop extra[k] to avoid removing user module params self.preset_kwargs[k] = extra[k] # 4. deal with module path and module kwargs in a similar way self.module_path = module # here only the first hit of 'kwargs' is recovered self.module_kwargs = _parse_kwargs_string(kwargs) # if user chose custom module, *assume* any remaining extra items # are intended for build_model(**kwargs) unless they are our own config flags. _own_keys = { "preset", "preset_kwargs", "module", "kwargs", "fd_dmudt", "verbose", "checkpoint", "restart", } if self.preset == "custom" and self.module_path is not None: for k in list(extra.keys()): if k not in _own_keys: self.module_kwargs[k] = extra.pop(k) if self.preset != "custom" and self.module_path is None: print( f"[QuTiPModel] Using preset '{self.preset}' with params: {self.preset_kwargs}" ) else: print( f"[QuTiPModel] Using custom module '{self.module_path}' with params: {self.module_kwargs}" ) self.fd_dmudt = bool(fd_dmudt) if self.verbose: print( f"[QuTiPModel] Using {'finite-difference' if self.fd_dmudt else 'analytical'} dmu/dt for current density." ) # internal state self.H0 = None self.mu_ops = {"x": None, "y": None, "z": None} self.c_ops = [] self.rho = None self._mu_prev = np.zeros(3, float) self._mu_curr = np.zeros(3, float) self._dim = None self._eye = None # operators for evaluating analytical dmu/dt self._dmudt_op = {} # whether restarted from checkpoint self.restarted = False
# -------------- heavy-load initialization (at INIT) --------------
[docs] def initialize(self, dt_new, molecule_id): """ Initialize the model with the new time step and molecule ID. Parameters ---------- dt_new : float The new time step in atomic units (a.u.). molecule_id : int The ID of the molecule. """ self.dt = float(dt_new) self.molecule_id = int(molecule_id) self.checkpoint_filename = {} self.checkpoint_filename["rho"] = f"qutip_rho_{self.molecule_id}.qu" self.checkpoint_filename["meta"] = f"qutip_meta_{self.molecule_id}.npz" # Build model either from preset or user module if self.preset == "tls" and self.module_path is None: cfg = _build_model_tls(**self.preset_kwargs) elif self.preset == "custom" and self.module_path is not None: mod = _load_spec_module(self.module_path) cfg = mod.build_model(**self.module_kwargs) else: raise ValueError( f"Invalid quantum model: preset={self.preset}, module={self.module_path}" ) self.H0 = cfg["H0"] self.mu_ops = {"x": None, "y": None, "z": None, **cfg.get("mu_ops", {})} self.c_ops = list(cfg.get("c_ops", [])) if self.verbose: print( f"[QuTiPModel {self.molecule_id}] initialized with dt={self.dt}, " f"H0 shape={self.H0.shape}, mu_ops keys={list(self.mu_ops.keys())}, " f"c_ops count={len(self.c_ops)}" ) # Initialize density matrix rho (accept ket or density matrix) rho0 = cfg.get("rho0", None) if rho0 is None: print( f"[QuTiPModel {self.molecule_id}] Warning: No initial state rho0 provided, using ground state." ) # try ground state projector of H0 evals, evecs = self.H0.eigenstates() rho0 = evecs[0] * evecs[0].dag() elif isinstance(rho0, qt.Qobj) and rho0.isket: rho0 = rho0 * rho0.dag() self.rho = rho0 if self.verbose: print( f"[QuTiPModel {self.molecule_id}] Initial state rho0 (dim={self.rho.shape[0]}):\n{self.rho}" ) # temporary storage self._dim = self.H0.shape[0] self._eye = qt.qeye(self._dim) # optional restart if self.restart and self.checkpoint: self._reset_from_checkpoint() self.restarted = True # calculate initial dipole self._mu_curr = _calc_mu_vector_expectation(self.rho, self.mu_ops) # construct operators for analytical dmu/dt # dmu_k/dt = i*[H0, mu_k] - i sum_j E_j*[mu_j, mu_k] + sum_j [c_j^dag * mu_k * c_j * rho - 0.5 * {c_j^dag * c_j, mu_k}] # d<mu_k>/dt = Tr[rho * dmu_k/dt] if not self.fd_dmudt: axes = ("x", "y", "z") for ax in axes: mu_k = self.mu_ops.get(ax, None) if mu_k is None: continue # K0 = i * [H0, mu_k] K0 = 1j * (self.H0 * mu_k - mu_k * self.H0) # Kj = -i * [mu_j, mu_k] Kj = {} for aj in axes: mu_j = self.mu_ops.get(aj, None) if mu_j is not None: Kj[aj] = -1j * (mu_j * mu_k - mu_k * mu_j) # L = sum_j [c_j^dag * mu_k * c_j - 0.5 * {c_j^dag * c_j, mu_k}] L = 0 for c in self.c_ops: L += c.dag() * mu_k * c - 0.5 * ( c.dag() * c * mu_k + mu_k * c.dag() * c ) # combine E-field independent parts in a single operator H_indep = K0 + L self._dmudt_op[ax] = {"H_indep": H_indep, "Kj": Kj} if self.verbose: print( f"[QuTiPModel {self.molecule_id}] Analytical dmu/dt operator for axis '{ax}' constructed." ) print(f" H_indep:\n{H_indep}") for aj, K_j in Kj.items(): print(f" K_{aj}:\n{K_j}")
# ------------ internal functions ------------- def _effective_unitary_step(self, E_vec): r""" Fast path for closed-system evolution without collapse operators. This uses the effective Hamiltonian :math:`H_{\\mathrm{eff}} = H_0 - \\mathbf{E} \\cdot \\boldsymbol{\\mu}`. Parameters ---------- E_vec : array-like of float, shape (3,) Electric field vector ``[E_x, E_y, E_z]`` at the current time step. """ Heff = self.H0 if self.mu_ops["x"] is not None: Heff -= float(E_vec[0]) * self.mu_ops["x"] if self.mu_ops["y"] is not None: Heff -= float(E_vec[1]) * self.mu_ops["y"] if self.mu_ops["z"] is not None: Heff -= float(E_vec[2]) * self.mu_ops["z"] U = (-1j * Heff * self.dt).expm() self.rho = U * self.rho * U.dag() def _lindblad_step(self, E_vec): """ Slow path for open-system evolution with collapse operators using QuTiP ``mesolve``. Parameters ---------- E_vec : array-like of float, shape (3,) Electric field vector ``[E_x, E_y, E_z]`` at the current time step. """ H = self.H0 if self.mu_ops["x"] is not None: H -= float(E_vec[0]) * self.mu_ops["x"] if self.mu_ops["y"] is not None: H -= float(E_vec[1]) * self.mu_ops["y"] if self.mu_ops["z"] is not None: H -= float(E_vec[2]) * self.mu_ops["z"] # Single "macro" step: piecewise-constant E over [0, dt] res = qt.mesolve(H, self.rho, tlist=[0.0, self.dt], c_ops=self.c_ops, e_ops=[]) self.rho = res.states[-1] # -------------- one FDTD step under E-field --------------
[docs] def propagate(self, effective_efield_vec): """ Propagate the quantum molecular dynamics given the effective electric field vector. Parameters ---------- effective_efield_vec : array-like of float, shape (3,) Effective electric field vector in the form ``[E_x, E_y, E_z]``. """ self.E_vec = np.asarray(effective_efield_vec, dtype=float).reshape(3) if self.verbose: print( f"[QuTiPModel {self.molecule_id}] t={self.t:.6f} a.u., E={self.E_vec}" ) # Store starting dipole self._mu_prev = self._mu_curr # Choose fast path if no collapse operators are present if len(self.c_ops) == 0: self._effective_unitary_step(self.E_vec) else: self._lindblad_step(self.E_vec) self.t += self.dt self._mu_curr = _calc_mu_vector_expectation(self.rho, self.mu_ops)
[docs] def calc_amp_vector(self): """ Calculate the amplitude vector :math:`\\mathrm{d}\\langle\\mu\\rangle/\\mathrm{d}t` for the current time step. If ``fd_dmudt`` is ``True``, use finite differences (cheaper); otherwise, use the analytical derivative if available. Returns ------- numpy.ndarray of float, shape (3,) The amplitude vector :math:`[\\mathrm{d}\\langle\\mu_x\\rangle/\\mathrm{d}t,\\ \\mathrm{d}\\langle\\mu_y\\rangle/\\mathrm{d}t,\\ \\mathrm{d}\\langle\\mu_z\\rangle/\\mathrm{d}t]`. """ amp = np.zeros(3, float) if self.fd_dmudt: amp = (self._mu_curr - self._mu_prev) / self.dt else: # Analytical derivative for i, ax in enumerate(("x", "y", "z")): op_info = self._dmudt_op.get(ax, None) if op_info is None: continue # construct the overall operator to do trace with rho H_indep = op_info["H_indep"] Kj = op_info["Kj"] op = H_indep for j, Ej in zip(("x", "y", "z"), self.E_vec): K_j = Kj.get(j, None) if K_j is not None: op = op + Ej * K_j amp[i] = float(qt.expect(op, self.rho)) if self.verbose: print(f"[QuTiPModel {self.molecule_id}] d<mu>/dt = {amp}") return amp
# ------------ optional operation / checkpoint --------------
[docs] def append_additional_data(self): """ Append additional data to be sent back to MaxwellLink. The data can be retrieved by the user via the Python interface: ``maxwelllink.SocketMolecule.additional_data_history``, where ``additional_data_history`` is a list of dictionaries. Returns ------- dict A dictionary containing additional data. """ data = { "time_au": self.t, "mux_au": float(self._mu_curr[0]), "muy_au": float(self._mu_curr[1]), "muz_au": float(self._mu_curr[2]), "energy_au": float(qt.expect(self.H0, self.rho)), "rho_diag": np.real(np.diag(self.rho.full())).tolist(), } if self._dim == 2: data["Pg"] = float(self.rho[0, 0].real) data["Pe"] = float(self.rho[1, 1].real) data["Pge_real"] = float(self.rho[0, 1].real) if self._dim == 2 else None data["Pge_imag"] = float(self.rho[0, 1].imag) if self._dim == 2 else None return data
def _dump_to_checkpoint(self): """ Dump the internal state of the model to a checkpoint. """ qt.qsave(self.rho, self.checkpoint_filename["rho"]) np.savez( self.checkpoint_filename["meta"], t=self.t, mu_prev=self._mu_prev, mu_curr=self._mu_curr, ) def _reset_from_checkpoint(self): """ Reset the internal state of the model from a checkpoint. """ if not os.path.exists(self.checkpoint_filename["rho"]) or not os.path.exists( self.checkpoint_filename["meta"] ): # No checkpoint file found means this driver has not been paused or terminated abnormally # so we just start fresh. if self.verbose: print( f"[QuTiPModel] No checkpoint file found for id={self.molecule_id}, starting fresh." ) else: self.rho = qt.qload(self.checkpoint_filename["rho"]) meta = np.load(self.checkpoint_filename["meta"]) self.t = float(meta["t"]) self._mu_prev = np.asarray(meta["mu_prev"], float) self._mu_curr = np.asarray(meta["mu_curr"], float) if self.verbose: print( f"[QuTiPModel] Restarted from checkpoint for id={self.molecule_id}" ) def _snapshot(self): """ Return a snapshot of the internal state for propagation. Notes ----- Deep copy the arrays to avoid mutation issues. Returns ------- dict A dictionary containing the snapshot of the internal state. """ return { "time": self.t, "rho": self.rho.full().copy(), "mu_prev": self._mu_prev.copy(), "mu_curr": self._mu_curr.copy(), } def _restore(self, snapshot): """ Restore the internal state from a snapshot. Parameters ---------- snapshot : dict A dictionary containing the snapshot of the internal state. """ self.t = snapshot["time"] self.rho = qt.Qobj(snapshot["rho"]) self._mu_prev = snapshot["mu_prev"].copy() self._mu_curr = snapshot["mu_curr"].copy()