Source code for maxwelllink.em_solvers.laser_driven

"""
Laser driven dynamics for MaxwellLink.

This module defines a lightweight laser driven simulator that applies a user
defined driven pulse to excited the molecules. The simulation runs entirely
in atomic units and can operate with both socket-connected and embedded (non-socket)
molecular drivers.
"""

from __future__ import annotations

import json
from typing import Callable, Dict, Iterable, List, Optional, Sequence, Union
import time

import numpy as np

from ..molecule import Molecule
from ..sockets import SocketHub, am_master
from ..units import FS_TO_AU
from .dummy_em import DummyEMUnits, MoleculeDummyWrapper, DummyEMSimulation


[docs] class LaserDrivenUnits(DummyEMUnits): """ EM units for laser driven simulations (1:1 to atomic units). """
[docs] def __init__(self): super().__init__()
[docs] class MoleculeLaserDrivenWrapper(MoleculeDummyWrapper): """ Wrapper that adapts a ``Molecule`` to LaserDrivenSimulation, handling units, sources, and IO. """
[docs] def __init__(self, molecule: Molecule, dt_au: float): """ Initialize the Laser Driven molecule wrapper. Parameters ---------- molecule : Molecule The molecule to wrap. dt_au : float Time step in atomic units. """ super().__init__(molecule=molecule) self.dt_au = float(dt_au) if self.dt_au <= 0.0: raise ValueError("dt_au must be positive.") # retrieve molecule settings from the wrapped Molecule self.mode = self.m.mode self.hub: Optional[SocketHub] = getattr(self.m, "hub", None) self.molecule_id: int = getattr(self.m, "molecule_id", -1) self.init_payload: Dict = dict(self.m.init_payload) self.last_amp: np.ndarray = np.zeros(3, dtype=float) self.time_units_fs = 1.0 / FS_TO_AU # so that dt_em == dt_au # to rescale dipoles if needed self.rescaling_factor = self.m.rescaling_factor # Refresh time-units and dt to keep init payloads consistent self.m._refresh_time_units(self.time_units_fs) self.m._refresh_time_step(self.dt_au) self.init_payload["dt_au"] = self.dt_au self.additional_data_history = self.m.additional_data_history if self.mode == "non-socket": if self.molecule_id < 0: self.molecule_id = 0 # will be overwritten by simulation self.m.molecule_id = self.molecule_id
[docs] def append_additional_data(self, time_au: float): """ Store additional molecular data supplied by non-socket drivers. Parameters ---------- time_au : float Current simulation time in atomic units. """ extra = {} if hasattr(self, "d_f"): try: extra = dict(self.d_f.append_additional_data() or {}) except Exception: extra = {} if "time_au" not in extra: extra["time_au"] = time_au if extra: self.additional_data_history.append(extra)
[docs] class LaserDrivenSimulation(DummyEMSimulation): r""" Laser driven dynamics of the MaxwellLink molecules. This class employes a user-defined electric field: .. math:: E(t) = f(t) applied on x, y, z, or any combinations of the molecular dipole vector All quantities are in atomic units. """
[docs] def __init__( self, dt_au: float, molecules: Optional[Iterable[Molecule]] = None, drive: Optional[Union[float, Callable[[float], float]]] = None, coupling_axis: str = "xyz", hub: Optional[SocketHub] = None, record_history: bool = True, ): """ Parameters ---------- dt_au : float Simulation time step in atomic units. molecules : iterable of Molecule, optional Molecules coupled to the cavity. drive : float or callable, optional Constant drive term or function ``drive(t_au)``. coupling_axis : str, default: "xyz" Component(s) of the molecular dipole used for coupling. hub : maxwelllink.sockets.SocketHub, optional Socket hub shared by all socket-mode molecules. record_history : bool, default: True Record time, field, velocity, drive, and molecular response histories. """ super().__init__(hub=hub, molecules=molecules) self.dt = float(dt_au) if self.dt <= 0.0: raise ValueError("dt_au must be positive.") self.axis = np.array([False, False, False], dtype=bool) if "x" in coupling_axis.lower(): self.axis[0] = True if "y" in coupling_axis.lower(): self.axis[1] = True if "z" in coupling_axis.lower(): self.axis[2] = True # we need True in at least one axis if not np.any(self.axis): raise ValueError( "At least one coupling axis (x, y, or z) must be specified." ) self.drive = drive if drive is not None else (lambda _: 0.0) if isinstance(self.drive, (int, float)): const = float(self.drive) self.drive = lambda _t, c=const: c molecules = list(molecules or []) self.wrappers: List[MoleculeLaserDrivenWrapper] = [ MoleculeLaserDrivenWrapper(molecule=m, dt_au=self.dt) for m in molecules ] self.socket_wrappers = [w for w in self.wrappers if w.mode == "socket"] self.non_socket_wrappers = [w for w in self.wrappers if w.mode == "non-socket"] if self.socket_wrappers: hubs = {w.hub for w in self.socket_wrappers if w.hub is not None} if hub is not None: hubs.add(hub) if len(hubs) > 1: raise ValueError( "All socket-mode molecules must share the same SocketHub." ) self.hub: SocketHub = hub or self.socket_wrappers[0].hub if self.hub is None: raise ValueError("Socket-mode molecules require a SocketHub instance.") else: self.hub = None # Assign IDs and initialize non-socket drivers # By default, SocketHub assigns IDs starting from 0, so we start # non-socket IDs after all socket ones. next_id = len(self.socket_wrappers) for wrapper in self.non_socket_wrappers: wrapper.initialize_driver(next_id) next_id += 1 self.time = 0.0 self.record_history = bool(record_history) if self.record_history: self.time_history = [] self.drive_history = [] self.molecule_response_history = []
# ------------------------------------------------------------------ # Core helpers # ------------------------------------------------------------------ def _evaluate_drive(self, time_au: float) -> float: """ Evaluate the drive term at the given time. Parameters ---------- time_au : float Current simulation time in atomic units. Returns ------- float The evaluated drive term. """ try: return float(self.drive(time_au)) except TypeError: return float(self.drive) def _ensure_socket_connections(self): if not self.socket_wrappers: return init_payloads = { w.molecule_id: {**w.init_payload, "molecule_id": w.molecule_id} for w in self.socket_wrappers } ok = self.hub.wait_until_bound(init_payloads, require_init=True, timeout=None) if not ok: raise RuntimeError("Timeout waiting for socket molecules to bind.") def _collect_socket_responses(self, efield_vec: Sequence[float]) -> Dict[int, dict]: """ Send requests to socket molecules and collect their responses. Parameters ---------- efield_vec : array-like of float, shape (3,) Effective electric field vector in atomic units. Returns ------- dict of int to dict Mapping from molecule IDs to their response payloads. """ requests = { w.molecule_id: { "efield_au": efield_vec, "meta": {"time_au": self.time}, "init": {**w.init_payload, "molecule_id": w.molecule_id}, } for w in self.socket_wrappers } responses = self.hub.step_barrier(requests) while not responses: self._ensure_socket_connections() responses = self.hub.step_barrier(requests) return responses def _calc_effective_efield(self, time_au: float) -> np.ndarray: """ Calculate the effective electric field vector for the laser driven dynamics. Parameters ---------- time_au : float Current time in atomic units. Returns ------- numpy.ndarray of float, shape (3,) Effective electric field vector in atomic units. """ efield_vec = np.ones(3, dtype=float) * self._evaluate_drive(time_au) efield_vec *= self.axis return efield_vec def _calc_dipole_vec(self) -> np.ndarray: """ Calculate the total molecular dipole vector. Returns ------- numpy.ndarray of float, shape (3,) Total molecular dipole vector in atomic units. """ dipole_vec = np.array([0.0, 0.0, 0.0], dtype=float) for wrapper in self.wrappers: if wrapper.additional_data_history: latest_data = wrapper.additional_data_history[-1] rescaling_factor = 1.0 if wrapper.rescaling_factor != None: rescaling_factor = float(wrapper.rescaling_factor) dipole_vec[0] += latest_data.get("mux_au") * rescaling_factor dipole_vec[1] += latest_data.get("muy_au") * rescaling_factor dipole_vec[2] += latest_data.get("muz_au") * rescaling_factor return dipole_vec def _step_molecules(self, efield_vec: Sequence[float]): """ Propagate all molecules for one EM step. Parameters ---------- efield_vec : array-like of float, shape (3,) Effective electric field vector in atomic units. Returns ------- float Sum of molecular dipole moment along the coupling axis. """ # Non-socket molecules for wrapper in self.non_socket_wrappers: wrapper.propagate(efield_vec) amp = wrapper.calc_amp_vector() * wrapper.rescaling_factor wrapper.last_amp = amp wrapper.append_additional_data(time_au=self.time) # Socket molecules if self.socket_wrappers: self._ensure_socket_connections() responses = self._collect_socket_responses(efield_vec) for wrapper in self.socket_wrappers: payload = responses.get(wrapper.molecule_id) if not payload: continue amp = ( np.asarray(payload.get("amp", [0.0, 0.0, 0.0]), dtype=float) * wrapper.rescaling_factor ) wrapper.last_amp = amp extra_blob = payload.get("extra", b"") if extra_blob: try: data = json.loads(extra_blob.decode("utf-8")) if isinstance(data, dict): data.setdefault("time_au", self.time) wrapper.additional_data_history.append(data) except Exception: pass # dmu/dt dmudt = np.zeros(3, dtype=float) for wrapper in self.wrappers: dmudt += wrapper.last_amp # for dipole gauge we use mu directly instead of dmu/dt dipole = self._calc_dipole_vec() # we need to filter only the coupling axis dipole = dipole * self.axis dmudt = dmudt * self.axis # print("In Function, Total Dipole vector:", dipole) # print("In Function, Total Dipole velocity (dmu/dt):", dmudt) return dipole, dmudt # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def step(self): """ Advance the simulation by one time step. """ efield_vec = self._calc_effective_efield(self.time) self.dipole, self.dmudt = self._step_molecules(efield_vec) self.time += self.dt if self.record_history: self.time_history.append(self.time) self.drive_history.append(self._evaluate_drive(self.time)) self.molecule_response_history.append(self.dmudt.copy())
[docs] def run(self, until: Optional[float] = None, steps: Optional[int] = None): """ Run the simulation for a specified duration or number of steps. Parameters ---------- until : float, optional Total simulation time (a.u.). ``steps`` must be ``None``. steps : int, optional Number of steps to execute. ``until`` must be ``None``. """ if (until is None) == (steps is None): raise ValueError("Specify exactly one of 'until' or 'steps'.") if until is not None: if until < self.time: return steps = int(np.ceil((until - self.time) / self.dt)) start_time = time.perf_counter() previous_time = start_time for idx in range(int(steps)): self.step() if (idx + 1) % 1000 == 0: current_time = time.perf_counter() avg_time_per_step = (current_time - previous_time) / 1000.0 previous_time = current_time elapsed = current_time - start_time remaining = (elapsed / (idx + 1)) * (steps - (idx + 1)) print( f"[LaserDriven] Completed {idx + 1}/{steps} [{(idx + 1) / steps * 100:.1f}%] steps, time/step: {avg_time_per_step:.2e} seconds, remaining time: {remaining:.2f} seconds." ) # close the hub if self.hub is not None: if am_master(): self.hub.stop()