Source code for maxwelllink.em_solvers.single_mode_cavity

"""
Single-mode cavity electrodynamics for MaxwellLink.

This module defines a lightweight cavity simulator that treats the EM field as
one damped harmonic oscillator coupled to MaxwellLink molecules. The simulation
runs entirely in atomic units and can operate with both socket-connected and
embedded (non-socket) molecular drivers.
"""

from __future__ import annotations

import json
from typing import Callable, Dict, Iterable, List, Optional, Sequence, Union
import time

import numpy as np

from ..molecule import Molecule
from ..sockets import SocketHub, am_master
from ..units import FS_TO_AU
from .dummy_em import DummyEMUnits, MoleculeDummyWrapper, DummyEMSimulation


[docs] class SingleModeUnits(DummyEMUnits): """ EM units for single-mode cavity simulations (1:1 to atomic units). """
[docs] def __init__(self): super().__init__()
[docs] class MoleculeSingleModeWrapper(MoleculeDummyWrapper): """ Wrapper that adapts a ``Molecule`` to SingleModeSimulation, handling units, sources, and IO. """
[docs] def __init__(self, molecule: Molecule, dt_au: float): """ Initialize the SingleMode molecule wrapper. Parameters ---------- molecule : Molecule The molecule to wrap. dt_au : float Time step in atomic units. axis : int or str Axis of the molecule to be coupled to the cavity mode. Accepted values: ``0``, ``1``, ``2`` or ``"x"``, ``"y"``, ``"z"`` (case-insensitive). """ super().__init__(molecule=molecule) self.dt_au = float(dt_au) if self.dt_au <= 0.0: raise ValueError("dt_au must be positive.") # retrieve molecule settings from the wrapped Molecule self.mode = self.m.mode self.hub: Optional[SocketHub] = getattr(self.m, "hub", None) self.molecule_id: int = getattr(self.m, "molecule_id", -1) self.init_payload: Dict = dict(self.m.init_payload) self.last_amp: np.ndarray = np.zeros(3, dtype=float) self.time_units_fs = 1.0 / FS_TO_AU # so that dt_em == dt_au # to rescale dipoles if needed self.rescaling_factor = self.m.rescaling_factor # Refresh time-units and dt to keep init payloads consistent self.m._refresh_time_units(self.time_units_fs) self.m._refresh_time_step(self.dt_au) self.init_payload["dt_au"] = self.dt_au self.additional_data_history = self.m.additional_data_history if self.mode == "non-socket": if self.molecule_id < 0: self.molecule_id = 0 # will be overwritten by simulation self.m.molecule_id = self.molecule_id
[docs] def append_additional_data(self, time_au: float): """ Store additional molecular data supplied by non-socket drivers. Parameters ---------- time_au : float Current simulation time in atomic units. """ extra = {} if hasattr(self, "d_f"): try: extra = dict(self.d_f.append_additional_data() or {}) except Exception: extra = {} if "time_au" not in extra: extra["time_au"] = time_au if extra: self.additional_data_history.append(extra)
[docs] class SingleModeSimulation(DummyEMSimulation): r""" Damped harmonic oscillator coupled to MaxwellLink molecules. Under the dipole gauge, the total light-matter Hamiltonian is .. math:: H = H_{mol} + \frac{1}{2} p_{\rm c}^2 + \frac{1}{2} \omega_{\rm c}^2 (q_{\rm c} + \frac{\varepsilon}{\omega_{\rm c}^2} \sum_i \mu_i)^2 The cavity mode (classical harmonic oscillator) obeys .. math:: \ddot{q}_{\rm c} = -\omega_c^{2}\, q_{\rm c} \; - \; \varepsilon \sum_i \mu_i \;-\; \gamma_{\rm c} \, p_{\rm c} \;+\; D(t), where the effective electric field of this cavity mode is .. math:: E(t) = -\varepsilon q_{\rm c}(t) - \frac{\varepsilon^2}{\omega_{\rm c}^2} \sum_i \mu_i(t), where :math:`\varepsilon` is ``coupling_strength`` and the sum runs over the selected molecular axis of all molecules. The second term in the electric field accounts for the dipole self-energy term if enabled. Under the velocity gauge, :math:`d\mu/dt` is coupled to cavity momentum instead of :math:`\mu` to cavity position. The total Hamiltonian is .. math:: H = H_{mol} + \frac{1}{2} (p_{\rm c} - \frac{\varepsilon}{\omega_{\rm c}} \sum_i \mu_i)^2 + \frac{1}{2} \omega_{\rm c}^2 q_{\rm c}^2 The cavity mode (classical harmonic oscillator) obeys .. math:: \ddot{q}_{\rm c} = -\omega_c^{2}\, q_{\rm c} \; - \; \frac{\varepsilon}{\omega_{\rm c}} \sum_i \frac{d\mu_i}{dt} \;-\; \gamma_{\rm c} \, p_{\rm c} \;+\; D(t), where the effective electric field of this cavity mode is .. math:: E(t) = \frac{\varepsilon}{\omega_{\rm c}} \dot{q}_{\rm c}(t) = \frac{\varepsilon}{\omega_{\rm c}} (p_c - \frac{\varepsilon}{\omega_{\rm c}} \sum_i \mu_i). All quantities are in atomic units. """
[docs] def __init__( self, dt_au: float, frequency_au: float, damping_au: float, molecules: Optional[Iterable[Molecule]] = None, drive: Optional[Union[float, Callable[[float], float]]] = None, coupling_strength: float = 1.0, coupling_axis: str = "xy", hub: Optional[SocketHub] = None, qc_initial: Optional[list] = None, pc_initial: Optional[list] = None, mu_initial: Optional[list] = None, dmudt_initial: Optional[list] = None, record_history: bool = True, include_dse: bool = False, molecule_half_step: bool = False, shift_dipole_baseline: bool = False, gauge="dipole", ): r""" Parameters ---------- dt_au : float Simulation time step in atomic units. frequency_au : float Cavity angular frequency :math:`\omega_{\rm c}` (a.u.). damping_au : float Damping constant :math:`\kappa` (a.u.). molecules : iterable of Molecule, optional Molecules coupled to the cavity. drive : float or callable, optional Constant drive term or function ``drive(t_au)``. coupling_strength : float, default: 1.0 Prefactor :math:`\varepsilon`. coupling_axis : str, default: "xy" Component(s) of the molecular dipole used for coupling. hub : :class:`~maxwelllink.sockets.sockets.SocketHub`, optional Socket hub shared by all socket-mode molecules. qc_initial : list, default: [0.0, 0.0, 0.0] Initial cavity field coordinate (a.u.). pc_initial : list, default: [0.0, 0.0, 0.0] Initial cavity field momentum (a.u.). mu_initial : list, default: [0.0, 0.0, 0.0] Initial total molecular dipole vector (a.u.). dmudt_initial : list, default: [0.0, 0.0, 0.0] Initial time derivative of the total molecular dipole vector (a.u.). record_history : bool, default: True Record time, field, velocity, drive, and molecular response histories. include_dse : bool, default: True Include dipole self-energy term in the simulation. molecule_half_step : bool, default: True Whether to further evaluate molecular info for another half time step. shift_dipole_baseline : bool, default: False Whether to shift all dipole values using the initial dipole value, so initial dipole value is changed to zero. Setting this to True can facilitate simulating strong coupling systems with large permanent dipoles. gauge : str, default: "dipole" Gauge choice for light-matter coupling: "dipole" or "velocity". Using velocity gauge will couple to dmu/dt to cavity momentum instead of mu to the cavity position. """ super().__init__(hub=hub, molecules=molecules) self.dt = float(dt_au) if self.dt <= 0.0: raise ValueError("dt_au must be positive.") self.frequency = float(frequency_au) self.damping = float(damping_au) self.coupling_strength = float(coupling_strength) self.axis = np.array([False, False, False], dtype=bool) self.gauge = gauge.lower() if self.gauge not in ["dipole", "velocity"]: raise ValueError("gauge must be either 'dipole' or 'velocity'.") if "x" in coupling_axis.lower(): self.axis[0] = True if "y" in coupling_axis.lower(): self.axis[1] = True if "z" in coupling_axis.lower(): self.axis[2] = True # we need True in at least one axis if not np.any(self.axis): raise ValueError( "At least one coupling axis (x, y, or z) must be specified." ) self.drive = drive if drive is not None else (lambda _: 0.0) if isinstance(self.drive, (int, float)): const = float(self.drive) self.drive = lambda _t, c=const: c molecules = list(molecules or []) self.wrappers: List[MoleculeSingleModeWrapper] = [ MoleculeSingleModeWrapper(molecule=m, dt_au=self.dt) for m in molecules ] self.socket_wrappers = [w for w in self.wrappers if w.mode == "socket"] self.non_socket_wrappers = [w for w in self.wrappers if w.mode == "non-socket"] if self.socket_wrappers: hubs = {w.hub for w in self.socket_wrappers if w.hub is not None} if hub is not None: hubs.add(hub) if len(hubs) > 1: raise ValueError( "All socket-mode molecules must share the same SocketHub." ) self.hub: SocketHub = hub or self.socket_wrappers[0].hub if self.hub is None: raise ValueError("Socket-mode molecules require a SocketHub instance.") else: self.hub = None # Assign IDs and initialize non-socket drivers # By default, SocketHub assigns IDs starting from 0, so we start # non-socket IDs after all socket ones. next_id = len(self.socket_wrappers) for wrapper in self.non_socket_wrappers: wrapper.initialize_driver(next_id) next_id += 1 self.time = 0.0 if qc_initial is None: qc_initial = [0.0, 0.0, 0.0] if pc_initial is None: pc_initial = [0.0, 0.0, 0.0] if mu_initial is None: mu_initial = [0.0, 0.0, 0.0] if dmudt_initial is None: dmudt_initial = [0.0, 0.0, 0.0] self.qc = np.array(qc_initial, dtype=float) * self.axis self.pc = np.array(pc_initial, dtype=float) * self.axis self.dipole = np.array(mu_initial, dtype=float) * self.axis self.dipole_prev = self.dipole.copy() self.dmudt = np.array(dmudt_initial, dtype=float) * self.axis self.dmudt_prev = self.dmudt.copy() self.acceleration = np.zeros(3, dtype=float) self.include_dse = bool(include_dse) self.molecule_half_step = bool(molecule_half_step) self.shift_dipole_baseline = bool(shift_dipole_baseline) if self.shift_dipole_baseline: # shift all dipole values using the initial dipole value, so initial dipole value is zero self.dipole_baseline = self.dipole.copy() self.dipole -= self.dipole_baseline self.dipole_prev = self.dipole.copy() print( "[SingleModeCavity] Shifted dipole baseline by:", self.dipole_baseline ) self.record_history = bool(record_history) if self.record_history: self.time_history = [] self.qc_history = [] self.pc_history = [] self.drive_history = [] self.molecule_response_history = [] self.energy_history = []
# ------------------------------------------------------------------ # Core helpers # ------------------------------------------------------------------ def _evaluate_drive(self, time_au: float) -> float: """ Evaluate the drive term at the given time. Parameters ---------- time_au : float Current simulation time in atomic units. Returns ------- float The evaluated drive term. """ try: return float(self.drive(time_au)) except TypeError: return float(self.drive) def _ensure_socket_connections(self): if not self.socket_wrappers: return init_payloads = { w.molecule_id: {**w.init_payload, "molecule_id": w.molecule_id} for w in self.socket_wrappers } ok = self.hub.wait_until_bound(init_payloads, require_init=True, timeout=None) if not ok: raise RuntimeError("Timeout waiting for socket molecules to bind.") def _collect_socket_responses(self, efield_vec: Sequence[float]) -> Dict[int, dict]: """ Send requests to socket molecules and collect their responses. Parameters ---------- efield_vec : array-like of float, shape (3,) Effective electric field vector in atomic units. Returns ------- dict of int to dict Mapping from molecule IDs to their response payloads. """ requests = { w.molecule_id: { "efield_au": efield_vec, "meta": {"time_au": self.time}, "init": {**w.init_payload, "molecule_id": w.molecule_id}, } for w in self.socket_wrappers } responses = self.hub.step_barrier(requests) while not responses: self._ensure_socket_connections() responses = self.hub.step_barrier(requests) return responses def _calc_acceleration( self, time: float, mu: np.ndarray, qc: np.ndarray ) -> np.ndarray: """ Calculate the cavity mode acceleration at the given time. Parameters ---------- time : float Current simulation time in atomic units. mu : np.ndarray of float, shape (3,) Sum of individual molecular dipole moment vectors. qc : np.ndarray of float, shape (3,) Current cavity field coordinates. Returns ------- float The calculated acceleration. """ drive_val = self._evaluate_drive(time) acceleration = ( drive_val - self.coupling_strength * mu - (self.frequency**2) * qc ) # print("In Function, Cavity acceleration:", acceleration) acceleration *= self.axis return acceleration def _calc_effective_efield(self, qc: np.ndarray, mu: np.ndarray) -> np.ndarray: """ Calculate the effective electric field vector for the cavity mode. Parameters ---------- qc : float Current cavity mode coordinate vector. mu : float Current total molecular dipole vector. Returns ------- numpy.ndarray of float, shape (3,) Effective electric field vector in atomic units. """ efield_vec = np.array([0.0, 0.0, 0.0], dtype=float) efield_vec = -self.coupling_strength * qc # add dipole self-energy term for the electric field if enabled if self.include_dse: efield_vec -= self.coupling_strength**2 / self.frequency**2 * mu efield_vec *= self.axis # print("In Function, Effective E-field vector:", efield_vec) return efield_vec def _calc_energy(self, pc, qc, mu) -> float: """ Calculate the total energy of the cavity + molecular system. Parameters ---------- pc : float Current cavity mode momentum. qc : float Current cavity mode coordinate. mu : float Current total molecular dipole along the coupling axis. Returns ------- float Total energy of the cavity + molecular system. """ kinetic_energy = 0.5 * pc**2 potential_energy = ( 0.5 * (self.frequency**2) * qc**2 + qc * self.coupling_strength * mu ) if self.include_dse: potential_energy += ( 0.5 * (self.coupling_strength * mu / self.frequency) ** 2 ) e_molecule = sum( wrapper.additional_data_history[-1]["energy_au"] for wrapper in self.wrappers ) total_energy = ( np.sum((kinetic_energy + potential_energy) * self.axis) + e_molecule ) return total_energy def _calc_dipole_vec(self) -> np.ndarray: """ Calculate the total molecular dipole vector. Returns ------- numpy.ndarray of float, shape (3,) Total molecular dipole vector in atomic units. """ dipole_vec = np.array([0.0, 0.0, 0.0], dtype=float) for wrapper in self.wrappers: if wrapper.additional_data_history: latest_data = wrapper.additional_data_history[-1] rescaling_factor = 1.0 if wrapper.rescaling_factor != None: rescaling_factor = float(wrapper.rescaling_factor) dipole_vec[0] += latest_data.get("mux_au") * rescaling_factor dipole_vec[1] += latest_data.get("muy_au") * rescaling_factor dipole_vec[2] += latest_data.get("muz_au") * rescaling_factor if self.shift_dipole_baseline: dipole_vec -= self.dipole_baseline return dipole_vec def _step_molecules(self, efield_vec: Sequence[float]): """ Propagate all molecules for one EM step and collect their dipole moments. Parameters ---------- efield_vec : array-like of float, shape (3,) Effective electric field vector in atomic units. Returns ------- float Sum of molecular dipole moment along the coupling axis. """ # Non-socket molecules for wrapper in self.non_socket_wrappers: wrapper.propagate(efield_vec) amp = wrapper.calc_amp_vector() * wrapper.rescaling_factor wrapper.last_amp = amp wrapper.append_additional_data(time_au=self.time) # Socket molecules if self.socket_wrappers: self._ensure_socket_connections() responses = self._collect_socket_responses(efield_vec) for wrapper in self.socket_wrappers: payload = responses.get(wrapper.molecule_id) if not payload: continue amp = ( np.asarray(payload.get("amp", [0.0, 0.0, 0.0]), dtype=float) * wrapper.rescaling_factor ) wrapper.last_amp = amp extra_blob = payload.get("extra", b"") if extra_blob: try: data = json.loads(extra_blob.decode("utf-8")) if isinstance(data, dict): data.setdefault("time_au", self.time) wrapper.additional_data_history.append(data) except Exception: pass # dmu/dt dmudt = np.zeros(3, dtype=float) for wrapper in self.wrappers: dmudt += wrapper.last_amp # for dipole gauge we use mu directly instead of dmu/dt dipole = self._calc_dipole_vec() # we need to filter only the coupling axis dipole = dipole * self.axis dmudt = dmudt * self.axis # print("In Function, Total Dipole vector:", dipole) # print("In Function, Total Dipole velocity (dmu/dt):", dmudt) return dipole, dmudt def _step_dipole_gauge(self): """ Advance the simulation by one time step under the dipole gauge. """ # 1. update momentum to half step pc_half = self.pc + 0.5 * self.dt * self.acceleration # 2. update position to full step qc_prev = self.qc.copy() self.qc += self.dt * pc_half # updating E-field at half step using interpolated dipole # this interpolation is not very accurate # dipole = self.dipole + 0.5 * self.dt * (1.5 * self.dmudt - 0.5 * self.dmudt_prev) # the following expression is accurate to the order of dt^4 dipole = self.dipole_prev + self.dt * ( 9.0 / 8.0 * self.dmudt + 3.0 / 8.0 * self.dmudt_prev ) efield_vec = self._calc_effective_efield( qc_prev + 0.5 * self.dt * pc_half, dipole ) # update dipole info self.dipole_prev = self.dipole.copy() self.dmudt_prev = self.dmudt.copy() # the value for n+1/2 time step self.dipole, self.dmudt = self._step_molecules(efield_vec) # extrapolate to n+1 time step (ONLY NEEDED FOR VELOCITY VERLET MOLECULE PROPAGATION) if self.molecule_half_step: self.dipole = 2.0 * self.dipole - self.dipole_prev self.dmudt = 2.0 * self.dmudt - self.dmudt_prev acceleration = self._calc_acceleration(self.time, self.dipole, self.qc) # 3. update momentum from half step to full step self.pc = pc_half + 0.5 * self.dt * acceleration self.pc *= np.exp(-self.damping * self.dt) # 4. update acceleration and time self.time += self.dt self.acceleration = acceleration.copy() if self.record_history: self.time_history.append(self.time) self.qc_history.append(self.qc.copy()) self.pc_history.append(self.pc.copy()) self.drive_history.append(self._evaluate_drive(self.time)) self.molecule_response_history.append(self.dmudt.copy()) self.energy_history.append(self._calc_energy(self.pc, self.qc, self.dipole)) def _step_velocity_gauge(self): """ Advance the simulation by one time step under the velocity gauge. """ pass # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def step(self): """ Advance the simulation by one time step. """ if self.gauge == "dipole": self._step_dipole_gauge() elif self.gauge == "velocity": self._step_velocity_gauge() else: raise ValueError("gauge must be either 'dipole' or 'velocity'.")
[docs] def run(self, until: Optional[float] = None, steps: Optional[int] = None): """ Run the simulation for a specified duration or number of steps. Parameters ---------- until : float, optional Total simulation time (a.u.). ``steps`` must be ``None``. steps : int, optional Number of steps to execute. ``until`` must be ``None``. """ if (until is None) == (steps is None): raise ValueError("Specify exactly one of 'until' or 'steps'.") if until is not None: if until < self.time: return steps = int(np.ceil((until - self.time) / self.dt)) start_time = time.perf_counter() previous_time = start_time for idx in range(int(steps)): self.step() if (idx + 1) % 1000 == 0: current_time = time.perf_counter() avg_time_per_step = (current_time - previous_time) / 1000.0 previous_time = current_time elapsed = current_time - start_time remaining = (elapsed / (idx + 1)) * (steps - (idx + 1)) print( f"[SingleModeCavity] Completed {idx + 1}/{steps} [{(idx + 1) / steps * 100:.1f}%] steps, time/step: {avg_time_per_step:.2e} seconds, remaining time: {remaining:.2f} seconds." ) # close the hub if self.hub is not None: if am_master(): self.hub.stop()