"""
Meep-specific EM units and coupling utilities for MaxwellLink.
Provides conversions between Meep units and atomic units, plus helpers for
source construction and field integrations.
"""
from __future__ import annotations
import json
import numpy as np
from math import exp
from typing import Optional, Dict, List
import atexit
from ..sockets import SocketHub
from .dummy_em import DummyEMUnits, MoleculeDummyWrapper
from ..molecule import Molecule, Vector3
from ..units import EV_TO_CM_INV, FS_TO_AU
try:
import meep as mp
except ImportError:
raise ImportError(
"The meep package is required for this module. Please install it: https://meep.readthedocs.io/en/latest/Installation/."
)
# ----------------------------------------------------------------------------------
# Shared source registry (solver-native) keyed by polarization fingerprint (and comp)
# ----------------------------------------------------------------------------------
from collections import defaultdict
instantaneous_source_amplitudes = defaultdict(float)
_fingerprint_source: Dict = {}
# Helpers to reset this shared state between simulations/tests
__MXL__ATEXIT_REGISTERED = {"flag": False}
def _reset_module_state():
instantaneous_source_amplitudes.clear()
_fingerprint_source.clear()
def _register_sim_cleanup(sim):
"""
Register an interpreter-exit cleanup that resets shared module state.
Parameters
----------
sim : meep.Simulation
The active Meep simulation (unused; present for signature symmetry).
"""
if not __MXL__ATEXIT_REGISTERED["flag"]:
atexit.register(_reset_module_state)
__MXL__ATEXIT_REGISTERED["flag"] = True
def _to_mp_v3(v):
"""
Convert a 3-vector into ``mp.Vector3``.
Parameters
----------
v : Vector3 or tuple or list or mp.Vector3
Vector-like input.
Returns
-------
mp.Vector3
The corresponding Meep vector.
"""
if isinstance(v, Vector3):
return mp.Vector3(v.x, v.y, v.z)
# tolerate mp.Vector3 already, or (x,y,z)
if hasattr(v, "x") and hasattr(v, "y") and hasattr(v, "z"):
return v
if isinstance(v, (tuple, list)) and len(v) == 3:
return mp.Vector3(*v)
return mp.Vector3()
def _make_custom_time_src(key):
"""
Create a Meep ``CustomSource`` that streams time-dependent amplitudes from a
shared accumulator keyed by ``key``.
Parameters
----------
key : hashable
Lookup key for the instantaneous source amplitude.
Returns
-------
mp.CustomSource
A custom source object that queries the accumulator each time step.
"""
# MEEP calls this each time step; stream from the global accumulator
return mp.CustomSource(lambda t: instantaneous_source_amplitudes[key])
[docs]
class MeepUnits(DummyEMUnits):
[docs]
def __init__(self, time_units_fs: float = 0.1):
"""
Meep unit system with conversions to/from atomic units.
Parameters
----------
time_units_fs : float, default: 0.1
The Meep time unit expressed in femtoseconds.
"""
super().__init__()
self.time_units_fs = time_units_fs
[docs]
def efield_em_to_au(self, Emu_vec3):
"""
Convert the regularized electric-field integral from Meep units to atomic units.
Notes
-----
This helper is used directly by the Meep backend.
Parameters
----------
Emu_vec3 : array-like of float, shape (3,)
Regularized field integral in Meep units.
Returns
-------
numpy.ndarray of float, shape (3,)
Field integral in atomic units.
"""
factor = 1.2929541569381223e-6 / (self.time_units_fs**2)
return np.asarray(Emu_vec3, dtype=float) * factor
[docs]
def source_amp_au_to_em(self, amp_au_vec3):
"""
Convert source amplitude (a.u.) to Meep units.
Parameters
----------
amp_au_vec3 : array-like of float, shape (3,)
Source amplitude vector in atomic units.
Returns
-------
numpy.ndarray of float, shape (3,)
Source amplitude vector in Meep units.
"""
factor = 0.002209799779149953
return np.asarray(amp_au_vec3, dtype=float) * factor
[docs]
def time_em_to_au(self, time_em: float):
"""
Convert Meep time to atomic units.
Parameters
----------
time_em : float
Time in Meep units.
Returns
-------
float
Time in atomic units.
"""
return float(time_em) * self.time_units_fs * FS_TO_AU
[docs]
def units_helper(self, dx, dt):
"""
Explain the Meep unit system and its connection to atomic units.
This prints a summary of conversions based on the current resolution and
Courant factor.
Parameters
----------
dx : float
Spatial grid spacing in Meep units.
dt : float
Time step size in Meep units.
Raises
------
RuntimeError
If the Courant factor is not ``0.5``.
"""
# calculate units conversion factors
mu2efield_au = self.efield_em_to_au(1.0)
mu2efield_si = mu2efield_au * 5.14220675112e11 # V/m
resolution = int(1.0 / dx)
courant = dt / dx
if courant != 0.5:
raise RuntimeError("MaxwellLink currently only supports Courant=0.5!")
omega_mu_to_ev = 0.242 / self.time_units_fs * 27.211 * 0.1 * 2.0 * np.pi
omega_mu_to_cminv = (
0.242 / self.time_units_fs * 27.211 * 0.1 * EV_TO_CM_INV * 2.0 * np.pi
)
omega_mu_to_au = 0.242 / self.time_units_fs * 0.1 * 2.0 * np.pi
# audipoledt2mu = atomic_to_meep_units_SourceAmp(1.0, self.time_units_fs)
print(
"\n\n ######### MaxwellLink Units Helper #########\n",
"MEEP uses its own units system, which is based on the speed of light in vacuum (c=1), \n",
"the permittivity of free space (epsilon_0=1), and the permeability of free space (mu_0=1). \n",
"To couple MEEP with molecular dynamics, we set [c] = [epsilon_0] = [mu_0] = [hbar] = 1. \n",
"By further defining the time unit as %.4E fs, we can fix the units system of MEEP (mu).\n\n"
% self.time_units_fs,
"Given the simulation resolution = %d,\n - FDTD dt = %.4E mu (0.5/resolution) = %.4E fs\n"
% (resolution, dt, dt * self.time_units_fs),
"- FDTD dx = %.4E mu (1.0/resolution) = %.4E nm\n"
% (dx, dx * self.time_units_fs * 299.792458),
"- Time [t]: 1 mu = %.4E fs = %.4E a.u.\n"
% (self.time_units_fs, self.time_units_fs * FS_TO_AU),
"- Length [x]: 1 mu = %.4E nm\n" % (299.792458 * self.time_units_fs),
"- EM wavelength of 1 mu, angular frequency omega = 2pi mu = %.4E eV = %.4E cm-1 = %.4E a.u.\n"
% (
omega_mu_to_ev,
omega_mu_to_cminv,
omega_mu_to_au,
),
"- Note that sources and dielectrics defined in MEEP use rotational frequency (f=omega/2pi), \n",
"- so probabably we need covert 1 eV photon energy to rotational frequency f = %.4E mu\n"
% (1.0 / omega_mu_to_ev),
"- Electric field [E]: 1 mu = %.4E V/m = %.4E a.u.\n"
% (mu2efield_si, mu2efield_au),
"Hope this helps!\n",
"############################################\n\n",
)
[docs]
class MoleculeMeepWrapper(MoleculeDummyWrapper):
"""
Wrapper that adapts a ``Molecule`` to Meep, handling units, sources, and IO.
"""
[docs]
def __init__(
self,
time_units_fs: float = 0.1,
dt: Optional[float] = None,
molecule: Molecule = None,
):
"""
Initialize the Meep molecule wrapper.
Parameters
----------
time_units_fs : float, default: 0.1
The Meep time unit expressed in femtoseconds.
dt : float or None, optional
Time step in Meep units; if provided, propagated to the molecule.
molecule : Molecule
The molecule to wrap and couple to Meep.
"""
super().__init__(molecule)
# self.m = molecule
self.m._refresh_time_units(time_units_fs)
if dt is not None:
self.m._refresh_time_step(dt)
self.em_units = MeepUnits(time_units_fs=time_units_fs)
# promote member variables of Molecule for convenience
self.sigma = self.m.sigma
self.dimensions = self.m.dimensions
self.center = self.m.center
self.size = self.m.size
self.rescaling_factor = self.m.rescaling_factor
self.polarization_fingerprint_hash = self.m.polarization_fingerprint_hash
self.initial_payload = self.m.init_payload
self.molecule_id = self.m.molecule_id
self.additional_data_history = self.m.additional_data_history
self.init_payload = self.m.init_payload
self.sources = self.m.sources
self.dt_au = self.m.dt_au
self._polarization_prefactor_3d = (
1.0 / (2.0 * np.pi) ** 1.5 / self.sigma**5 * self.rescaling_factor
)
self._polarization_prefactor_2d = (
1.0 / (2.0 * np.pi) ** 1.0 / self.sigma**2 * self.rescaling_factor
)
self._polarization_prefactor_1d = (
1.0 / (2.0 * np.pi) ** 0.5 / self.sigma * self.rescaling_factor
)
def _init_sources(self):
"""
Construct Meep sources for the molecule's polarization kernel (1D/2D/3D).
Notes
-----
Sources are memoized per polarization fingerprint and component to allow
sharing across molecules with identical spatial kernels.
"""
srcs = []
center = _to_mp_v3(self.center)
size = _to_mp_v3(self.size)
def amp_func_3d_x(R):
return (
self._polarization_prefactor_3d
* R.x**2
* np.exp(-R.norm() ** 2 / (2.0 * self.sigma**2))
)
def amp_func_3d_y(R):
return (
self._polarization_prefactor_3d
* R.y**2
* np.exp(-R.norm() ** 2 / (2.0 * self.sigma**2))
)
def amp_func_3d_z(R):
return (
self._polarization_prefactor_3d
* R.z**2
* np.exp(-R.norm() ** 2 / (2.0 * self.sigma**2))
)
def amp_func_2d(R):
return self._polarization_prefactor_2d * np.exp(
-(R.x**2 + R.y**2) / (2.0 * self.sigma**2)
)
def amp_func_1d(R):
return self._polarization_prefactor_1d * np.exp(
-(R.x**2) / (2.0 * self.sigma**2)
)
if self.dimensions == 1:
key = (self.polarization_fingerprint_hash, "Ez")
if key not in _fingerprint_source:
instantaneous_source_amplitudes[key] = 0.0
_fingerprint_source[key] = mp.Source(
src=_make_custom_time_src(key),
component=mp.Ez,
center=center,
size=size,
amplitude=1.0,
amp_func=amp_func_1d,
)
srcs.append(_fingerprint_source[key])
elif self.dimensions == 2:
key = (self.polarization_fingerprint_hash, "Ez")
if key not in _fingerprint_source:
instantaneous_source_amplitudes[key] = 0.0
_fingerprint_source[key] = mp.Source(
src=_make_custom_time_src(key),
component=mp.Ez,
center=center,
size=size,
amplitude=1.0,
amp_func=amp_func_2d,
)
srcs.append(_fingerprint_source[key])
else: # 3D
for comp, tag, amp_func in (
(mp.Ex, "Ex", amp_func_3d_x),
(mp.Ey, "Ey", amp_func_3d_y),
(mp.Ez, "Ez", amp_func_3d_z),
):
key = (self.polarization_fingerprint_hash, tag)
if key not in _fingerprint_source:
instantaneous_source_amplitudes[key] = 0.0
_fingerprint_source[key] = mp.Source(
src=_make_custom_time_src(key),
component=comp,
center=center,
size=size,
amplitude=1.0,
amp_func=amp_func,
)
srcs.append(_fingerprint_source[key])
self.sources = srcs
def _calculate_ep_integral(self, sim: mp.Simulation):
"""
Compute the regularized E-field integral over the molecule's kernel.
Parameters
----------
sim : meep.Simulation
Active Meep simulation.
Returns
-------
list of float
Regularized field integrals ``[I_x, I_y, I_z]`` in Meep units.
"""
vol = mp.Volume(size=_to_mp_v3(self.size), center=_to_mp_v3(self.center))
x = y = z = 0.0
if self.dimensions == 1:
z = sim.integrate_field_function(
[mp.Ez],
lambda R, ez: self._polarization_prefactor_1d
* exp(
-((R.x - self.center.x) * (R.x - self.center.x))
/ (2.0 * self.sigma**2)
)
* (ez),
vol,
)
elif self.dimensions == 2:
z = sim.integrate_field_function(
[mp.Ez],
lambda R, ez: self._polarization_prefactor_2d
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
)
/ (2.0 * self.sigma**2)
)
* (ez),
vol,
)
else: # 3D
z = sim.integrate_field_function(
[mp.Ez],
lambda R, ez: self._polarization_prefactor_3d
* (R.z - self.center.z)
* (R.z - self.center.z)
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
+ (R.z - self.center.z) * (R.z - self.center.z)
)
/ (2.0 * self.sigma**2)
)
* (ez),
vol,
)
x = sim.integrate_field_function(
[mp.Ex],
lambda R, ex: self._polarization_prefactor_3d
* (R.x - self.center.x)
* (R.x - self.center.x)
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
+ (R.z - self.center.z) * (R.z - self.center.z)
)
/ (2.0 * self.sigma**2)
)
* (ex),
vol,
)
y = sim.integrate_field_function(
[mp.Ey],
lambda R, ey: self._polarization_prefactor_3d
* (R.y - self.center.y)
* (R.y - self.center.y)
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
+ (R.z - self.center.z) * (R.z - self.center.z)
)
/ (2.0 * self.sigma**2)
)
* (ey),
vol,
)
return [np.real(x), np.real(y), np.real(z)]
# ---------- NON-SOCKET Step Function for MEEP ----------
[docs]
def update_molecules_no_socket(
molecules: List[MoleculeMeepWrapper], sources_non_molecule: List = None
):
"""
Create a Meep step function (no sockets) that couples molecules to the EM grid.
Parameters
----------
molecules : list of MoleculeMeepWrapper
Molecules to couple.
sources_non_molecule : list or None, optional
Additional Meep sources unrelated to molecules.
Returns
-------
callable
A step function compatible with ``meep.Simulation.run``.
"""
if sources_non_molecule is None:
sources_non_molecule = []
started = {"flag": False}
def __step_function__(sim: mp.Simulation):
# First-time barrier: bind/init all drivers
while not started["flag"]:
_reset_module_state()
for idx, m in enumerate(molecules):
# initialize molecular drivers directly
m.initialize_driver(assigned_id=idx)
if not m.sources:
m._init_sources()
unique_sources = list(
{id(src): src for m in molecules for src in m.sources}.values()
)
sim.change_sources(list(sources_non_molecule) + unique_sources)
_register_sim_cleanup(sim)
started["flag"] = True
# Build requests (E-field integrals cached by fingerprint)
regularized_efield_integrals: Dict[int, List[float]] = {}
for m in molecules:
fp = m.polarization_fingerprint_hash
if fp not in regularized_efield_integrals:
int_ep_mu = m._calculate_ep_integral(sim)
regularized_efield_integrals[fp] = int_ep_mu
else:
int_ep_mu = regularized_efield_integrals[fp]
int_ep_au = m.em_units.efield_em_to_au(int_ep_mu)
m.propagate(int_ep_au)
# Aggregate amplitudes into per-(fingerprint,component) accumulators
touched = set()
for m in molecules:
amp_au = np.asarray(m.calc_amp_vector(), dtype=float)
amp_mu = m.em_units.source_amp_au_to_em(amp_au)
if m.dimensions in (1, 2):
key = (m.polarization_fingerprint_hash, "Ez")
if key not in touched:
instantaneous_source_amplitudes[key] = 0.0
touched.add(key)
instantaneous_source_amplitudes[key] += float(amp_mu[2])
else:
for tag, val in (
("Ex", amp_mu[0]),
("Ey", amp_mu[1]),
("Ez", amp_mu[2]),
):
key = (m.polarization_fingerprint_hash, tag)
if key not in touched:
instantaneous_source_amplitudes[key] = 0.0
touched.add(key)
instantaneous_source_amplitudes[key] += float(val)
extra_blob = m.d_f.append_additional_data()
if extra_blob:
try:
m.additional_data_history.append(extra_blob)
except Exception:
pass
# No change_sources(); CustomSource reads accumulators
return __step_function__
# ---------- SOCKET: No-MPI Step Function for MEEP ----------
[docs]
def update_molecules_no_mpi(
hub, molecules: List[MoleculeMeepWrapper], sources_non_molecule: List = None
):
"""
Create a Meep step function (sockets, no MPI) that couples molecules to the EM grid.
Parameters
----------
hub : :class:`~maxwelllink.sockets.sockets.SocketHub`
Socket hub for driver communication.
molecules : list of MoleculeMeepWrapper
Molecules to couple.
sources_non_molecule : list or None, optional
Additional Meep sources unrelated to molecules.
Returns
-------
callable
A step function compatible with ``meep.Simulation.run``.
"""
if sources_non_molecule is None:
sources_non_molecule = []
started = {"flag": False}
paused = {"flag": False}
def __step_function__(sim: mp.Simulation):
# First-time barrier: bind/init all drivers
while not started["flag"]:
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
ok = hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
if not ok:
raise RuntimeError("wait_until_bound timed out")
# >>> deterministic per-simulation reset <<<
_reset_module_state()
# ensure meep sources exist & install once
for m in molecules:
if not m.sources:
m._init_sources()
unique_sources = list(
{id(src): src for m in molecules for src in m.sources}.values()
)
sim.change_sources(list(sources_non_molecule) + unique_sources)
_register_sim_cleanup(sim)
started["flag"] = True
# Reconnect guard
ids = [m.molecule_id for m in molecules]
while not hub.all_bound(ids, require_init=True):
if not paused["flag"]:
print("[SocketHub] PAUSED: waiting for driver reconnection...")
paused["flag"] = True
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
print("[SocketHub] RESUMED: all drivers reconnected.")
paused["flag"] = False
# Build requests (E-field integrals cached by fingerprint)
requests = {}
regularized_efield_integrals: Dict[int, List[float]] = {}
for m in molecules:
fp = m.polarization_fingerprint_hash
if fp not in regularized_efield_integrals:
int_ep_mu = m._calculate_ep_integral(sim)
regularized_efield_integrals[fp] = int_ep_mu
else:
int_ep_mu = regularized_efield_integrals[fp]
int_ep_au = m.em_units.efield_em_to_au(int_ep_mu)
requests[m.molecule_id] = {
"efield_au": int_ep_au,
"meta": {"t": sim.meep_time()},
"init": {**m.init_payload, "molecule_id": m.molecule_id},
}
# Hub round-trip with retry
responses = hub.step_barrier(requests)
while not responses:
if not paused["flag"]:
print("[SocketHub] PAUSED: waiting for driver reconnection...")
paused["flag"] = True
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
print("[SocketHub] RESUMED: all drivers reconnected.")
paused["flag"] = False
responses = hub.step_barrier(requests)
# Aggregate amplitudes into per-(fingerprint,component) accumulators
touched = set()
for m in molecules:
if m.molecule_id not in responses:
print(
f"Warning: no response for SocketMolecule {m.molecule_id} at t={sim.meep_time():.2f}."
)
continue
amp_au = np.asarray(responses[m.molecule_id]["amp"], dtype=float)
amp_mu = m.em_units.source_amp_au_to_em(amp_au)
if m.dimensions in (1, 2):
key = (m.polarization_fingerprint_hash, "Ez")
if key not in touched:
instantaneous_source_amplitudes[key] = 0.0
touched.add(key)
instantaneous_source_amplitudes[key] += float(amp_mu[2])
else:
for tag, val in (
("Ex", amp_mu[0]),
("Ey", amp_mu[1]),
("Ez", amp_mu[2]),
):
key = (m.polarization_fingerprint_hash, tag)
if key not in touched:
instantaneous_source_amplitudes[key] = 0.0
touched.add(key)
instantaneous_source_amplitudes[key] += float(val)
extra_blob = responses[m.molecule_id].get("extra", b"")
if extra_blob:
try:
m.additional_data_history.append(
json.loads(extra_blob.decode("utf-8"))
)
except Exception:
pass
# No change_sources(); CustomSource reads accumulators
return __step_function__
# ---------- SOCKET: MPI Step Function for MEEP ----------
[docs]
def update_molecules(
hub, molecules: List[MoleculeMeepWrapper], sources_non_molecule: List = None
):
"""
MPI-safe step function aligned with the no-MPI version and streaming sources.
Parameters
----------
hub : :class:`~maxwelllink.sockets.sockets.SocketHub`
Socket hub for driver communication.
molecules : list of MoleculeMeepWrapper
Molecules to couple.
sources_non_molecule : list or None, optional
Additional Meep sources unrelated to molecules.
Returns
-------
callable
A step function compatible with ``meep.Simulation.run``.
"""
import json as _json
# detect MPI
try:
from mpi4py import MPI as _MPI
_COMM = _MPI.COMM_WORLD
_RANK = _COMM.Get_rank()
_SIZE = _COMM.Get_size()
_HAS_MPI = _SIZE > 1
except Exception:
_COMM = None
_RANK = 0
_SIZE = 1
_HAS_MPI = False
if sources_non_molecule is None:
sources_non_molecule = []
_is_master = (not _HAS_MPI) or (_RANK == 0 and mp.am_master())
if _HAS_MPI and _RANK != 0:
hub = None
started = {"flag": False}
paused = {"flag": False}
def _bcast_bool(x: bool) -> bool:
if _HAS_MPI:
x = bool(_COMM.bcast(bool(x), root=0))
return bool(x)
def _bcast_array(arr, n):
if not _HAS_MPI:
return np.asarray(arr, dtype=float).reshape(n)
if _is_master:
buf = np.asarray(arr, dtype=float).reshape(n)
else:
buf = np.empty(n, dtype=float)
_COMM.Bcast(buf, root=0)
return buf
def __step_function__(sim: mp.Simulation):
# 1) FIRST-TIME BARRIER
while not started["flag"]:
ok = True
if _is_master:
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
ok = hub.wait_until_bound(
init_payloads, require_init=True, timeout=None
)
ok = _bcast_bool(ok)
if not ok:
raise RuntimeError("wait_until_bound timed out")
# >>> deterministic per-simulation reset (all ranks must agree) <<<
if _is_master:
_reset_module_state()
_ = _bcast_bool(True) # simple sync point
for m in molecules:
if not m.sources:
m._init_sources()
unique_sources = list(
{id(src): src for m in molecules for src in m.sources}.values()
)
sim.change_sources(list(sources_non_molecule) + unique_sources)
_register_sim_cleanup(sim)
started["flag"] = True
# 2) MID-RUN GUARD
dropped = False
if _is_master:
ids = [m.molecule_id for m in molecules]
dropped = not hub.all_bound(ids, require_init=True)
dropped = _bcast_bool(dropped)
while dropped:
if not paused["flag"]:
print("[SocketHub] PAUSED: waiting for driver reconnection...")
paused["flag"] = True
if _is_master:
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
_ = _bcast_bool(True)
print("[SocketHub] RESUMED: all drivers reconnected.")
paused["flag"] = False
dropped = False
if _is_master:
ids = [m.molecule_id for m in molecules]
dropped = not hub.all_bound(ids, require_init=True)
dropped = _bcast_bool(dropped)
# 3) BUILD REQUESTS
requests = {}
regularized_efield_integrals: Dict[int, List[float]] = {}
for m in molecules:
fp = m.polarization_fingerprint_hash
if fp not in regularized_efield_integrals:
int_ep_mu = m._calculate_ep_integral(sim)
regularized_efield_integrals[fp] = int_ep_mu
else:
int_ep_mu = regularized_efield_integrals[fp]
int_ep_au = m.em_units.efield_em_to_au(int_ep_mu)
requests[m.molecule_id] = {
"efield_au": int_ep_au,
"meta": {"t": sim.meep_time()},
"init": {**m.init_payload, "molecule_id": m.molecule_id},
}
# 4) MASTER EXCHANGE WITH DRIVERS
nmol = len(molecules)
amps_flat = np.zeros(3 * nmol, dtype=float)
extras_by_id = {}
had_responses = True
if _is_master:
responses = hub.step_barrier(requests)
while not responses:
if not paused["flag"]:
print("[SocketHub] PAUSED: waiting for driver reconnection...")
paused["flag"] = True
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
print("[SocketHub] RESUMED: all drivers reconnected.")
paused["flag"] = False
responses = hub.step_barrier(requests)
off = 0
for m in molecules:
a = np.asarray(responses[m.molecule_id]["amp"], dtype=float).reshape(3)
amps_flat[off : off + 3] = a
extras_by_id[m.molecule_id] = responses[m.molecule_id].get("extra", b"")
off += 3
had_responses = _bcast_bool(had_responses and _is_master or (not _HAS_MPI))
if not had_responses:
return
# 5) BROADCAST AMPLITUDES
amps_flat = _bcast_array(amps_flat, 3 * nmol)
# 6) UPDATE ACCUMULATORS (NO change_sources)
touched = set()
off = 0
for m in molecules:
amp_au = amps_flat[off : off + 3]
off += 3
amp_mu = m.em_units.source_amp_au_to_em(amp_au)
if m.dimensions in (1, 2):
key = (m.polarization_fingerprint_hash, "Ez")
if key not in touched:
instantaneous_source_amplitudes[key] = 0.0
touched.add(key)
instantaneous_source_amplitudes[key] += float(amp_mu[2])
else:
for tag, val in (
("Ex", amp_mu[0]),
("Ey", amp_mu[1]),
("Ez", amp_mu[2]),
):
key = (m.polarization_fingerprint_hash, tag)
if key not in touched:
instantaneous_source_amplitudes[key] = 0.0
touched.add(key)
instantaneous_source_amplitudes[key] += float(val)
if _is_master:
extra_blob = extras_by_id.get(m.molecule_id, b"")
if extra_blob:
try:
m.additional_data_history.append(
_json.loads(extra_blob.decode("utf-8"))
)
except Exception:
pass
return __step_function__
[docs]
class MeepSimulation(mp.Simulation):
"""
A wrapper of ``meep.Simulation`` with MaxwellLink features.
This extends Meep's simulation to couple with quantum models via MaxwellLink.
Attributes
----------
hub : :class:`~maxwelllink.sockets.sockets.SocketHub` or None
Manager for socket communication with external drivers.
molecules : list of MoleculeMeepWrapper
Wrapped molecules coupled to the EM grid.
time_units_fs : float
Time unit in femtoseconds for unit conversions.
"""
[docs]
def __init__(
self,
hub: Optional[SocketHub] = None,
molecules: Optional[List] = None,
time_units_fs: float = 0.1,
**kwargs,
):
"""
Initialize the simulation and wrap molecules with Meep adapters.
Parameters
----------
hub : :class:`~maxwelllink.sockets.sockets.SocketHub` or None, optional
Socket hub for driver communication.
molecules : list or None, optional
Molecules to couple; will be wrapped as ``MoleculeMeepWrapper``.
time_units_fs : float, default: 0.1
The Meep time unit expressed in femtoseconds.
**kwargs
Additional keyword arguments forwarded to ``meep.Simulation``.
"""
super().__init__(**kwargs)
if self.Courant != 0.5:
raise RuntimeError("MaxwellLink currently only supports Courant=0.5!")
self.socket_hub = hub
self.molecules = molecules if molecules is not None else []
self.time_units_fs = time_units_fs
# we need to reassign the time_units_fs to each molecule
self.dx = 1.0 / self.resolution
self.dt = self.Courant * self.dx # Courant factor = 0.5
for idx in range(len(self.molecules)):
# use meep wrapper for this molecule
m = MoleculeMeepWrapper(
time_units_fs=time_units_fs, dt=self.dt, molecule=self.molecules[idx]
)
self.molecules[idx] = m
if len(self.molecules) > 0:
self.molecules[0].em_units.units_helper(self.dx, self.dt)
# overload mp.Simulation.run() function
[docs]
def run(self, *user_step_funcs, **kwargs):
"""
Run the simulation with optional user step functions and stopping conditions.
Notes
-----
If molecules are present, a MaxwellLink coupling step is automatically
inserted (socket or non-socket variant as appropriate) before user-provided
step functions.
Parameters
----------
*user_step_funcs
One or more per-step callables.
**kwargs
Passed through to ``meep.Simulation.run`` (e.g., ``until=...``).
"""
# if **kwargs contains "steps", we need to convert it to "until"
if "steps" in kwargs:
kwargs["until"] = float(kwargs.pop("steps") * self.dt)
step_funcs = list(user_step_funcs)
if self.molecules:
# auto-insert our coupling step first
if self.socket_hub is not None:
step_funcs.insert(
0, update_molecules(self.socket_hub, self.molecules, self.sources)
)
else:
step_funcs.insert(
0, update_molecules_no_socket(self.molecules, self.sources)
)
super().run(*step_funcs, **kwargs)
# after run, stop and clean up the hub
if self.socket_hub is not None:
if mp.am_master():
self.socket_hub.stop()