"""
Legacy module defining molecule classes for coupling quantum molecular dynamics
with Maxwell’s equations using MEEP.
Coupling to other Maxwell solvers should define analogous molecule classes.
"""
from __future__ import annotations
import warnings
import numpy as np
from scipy.linalg import expm
from math import exp
from typing import Optional, Dict, List
from maxwelllink.sockets import SocketHub
import json
from collections import defaultdict
import atexit, weakref
try:
import meep as mp
except ImportError:
raise ImportError(
"The meep package is required for this module. Please install it: https://meep.readthedocs.io/en/latest/Installation/."
)
# ----- Two global dictionaries to hold unique state across multiple molecule instances -----
# 1. Instantaneous (per–time step) source amplitude for each unique polarization density
# (which may be shared by multiple molecules, e.g., superradiance for a collection of closely spaced molecules)
instantaneous_source_amplitudes = defaultdict(float)
# 2. The fingerprint to define unique polarization density, which may be shared by many molecules.
_fingerprint_source = {}
# The issue is if this file is called many times within the same python interpreter session (such as pytest),
# we need to ensure that the global state is reset between simulations.
[docs]
def reset_module_state():
"""
Clear shared global state so a new simulation starts from a clean slate.
Notes
-----
Resets both the per–time step source amplitude cache and the fingerprint map.
Call this between simulations (e.g., in test suites) to avoid cross-run bleed.
"""
instantaneous_source_amplitudes.clear()
_fingerprint_source.clear()
# one-time registration guard for atexit
__MXL__ATEXIT_REGISTERED = {"flag": False}
def _register_sim_cleanup(sim):
"""
Ensure global cleanup is performed when the simulation ends.
Registers an ``atexit`` handler (once) and also attaches a finalizer to the
simulation object so that shared module state is reset on GC.
"""
if not __MXL__ATEXIT_REGISTERED["flag"]:
atexit.register(reset_module_state)
__MXL__ATEXIT_REGISTERED["flag"] = True
# When `sim` is garbage-collected, run the reset, too.
weakref.finalize(sim, reset_module_state)
[docs]
def make_custom_time_src(key):
"""
Create a MEEP ``CustomSource`` that reads an accumulator keyed by ``key``.
Parameters
----------
key : hashable
Fingerprint identifying a shared polarization density.
Returns
-------
meep.CustomSource
Custom source whose time-dependent amplitude is taken from the global
``instantaneous_source_amplitudes[key]`` each MEEP step.
"""
# Meep calls this each time step; just return the current amplitude for this polarization fingerprint
return mp.CustomSource(lambda t: instantaneous_source_amplitudes[key])
[docs]
class DummyMolecule:
"""
A minimal base class for molecules coupled to MEEP.
Subclasses implement propagation, source setup, and source-amplitude updates.
"""
[docs]
def __init__(self, dt, dx, center=mp.Vector3(), size=mp.Vector3(1, 1, 1)):
"""
Initialize basic molecule metadata and storage for MEEP sources.
Parameters
----------
dt : float
Time step (MEEP units).
dx : float
Spatial resolution (MEEP units).
center : meep.Vector3, default: meep.Vector3()
Molecule center in the simulation domain.
size : meep.Vector3, default: meep.Vector3(1, 1, 1)
Molecule extent used for integrals and source placement.
"""
self.dt = dt
self.dx = dx
self.center = center
self.size = size
# each molecule has a corresponding MEEP Source object and will be used in defining the MEEP simulation object
self.sources = []
def _propagate(self, int_ep):
"""
Advance the molecule’s internal dynamics given light–matter coupling.
Parameters
----------
int_ep : float or array-like
Integral of :math:`\\mathbf{E}(\\mathbf{r})\\cdot\\mathbf{P}(\\mathbf{r})`
over the molecule volume (units depend on subclass).
Notes
-----
Must be implemented by subclasses.
"""
raise NotImplementedError("This method should be overridden by subclasses.")
def _init_sources(self):
"""
Create and attach MEEP ``Source`` objects representing this molecule.
Notes
-----
Must be implemented by subclasses. Populate ``self.sources`` with one or more
``meep.Source`` instances.
"""
raise NotImplementedError("This method should be overridden by subclasses.")
def _update_source_amplitude(self):
"""
Update the molecule’s source amplitude(s) after one propagation step.
Notes
-----
Must be implemented by subclasses. Typically computes :math:`\\mathrm{d}\\mu / \\mathrm{d}t`
or an equivalent quantity used to drive the EM field.
"""
raise NotImplementedError("This method should be overridden by subclasses.")
def _calculate_ep_integral(self, sim):
"""
Compute the coupling integral of the electric field over the molecule volume.
Parameters
----------
sim : meep.Simulation
Active MEEP simulation.
Returns
-------
float or array-like
The field integral(s) required by the propagation step.
Notes
-----
Must be implemented by subclasses.
"""
raise NotImplementedError("This method should be overridden by subclasses.")
[docs]
def update_molecule(self, sources_non_molecule=[]):
"""
Return a MEEP step function that advances this molecule each time step.
Parameters
----------
sources_non_molecule : list, optional
Additional non-molecule sources (e.g., Gaussian pulses) to include.
Returns
-------
Callable
Function of signature ``step(sim)`` suitable for ``sim.run(step, ...)``.
"""
def __step_function__(sim):
"""
Step function used within ``simulation.run`` to couple the molecule and field.
Notes
-----
At each call:
1. Compute the light–matter coupling integral.
2. Propagate the molecule.
3. Update source amplitude(s) and apply via ``sim.change_sources`` along with
any non-molecule sources.
"""
# 1. calculate the light-matter coupling: int_ep
int_ep = self._calculate_ep_integral(sim)
# 2. propagate the quantum molecular dynamics given int_ep
self._propagate(int_ep)
# 3. update the source amplitude after propagating this molecule for one time step
self._update_source_amplitude()
sim.change_sources(sources_non_molecule + self.sources)
return __step_function__
[docs]
def units_helper(self):
"""
Explain the unit system used by MEEP and its relation to atomic units.
Notes
-----
Subclasses may override to print or return detailed conversion helpers.
"""
pass
[docs]
def print_dynamics(self):
"""
Print molecule-specific dynamical information.
Notes
-----
Subclasses should override to report relevant observables over time.
"""
raise NotImplementedError(
"This method should be overridden by subclasses to print specific molecular dynamics properties."
)
[docs]
class TLSMolecule(DummyMolecule):
"""
Two-level system (TLS) molecule model.
Implements a Gaussian-like polarization profile and TLS dynamics driven by
the local electric field.
"""
[docs]
def __init__(
self,
resolution,
center=mp.Vector3(),
size=mp.Vector3(1, 1, 1),
frequency=1.0,
dipole_moment=0.1,
sigma=0.1,
orientation=mp.Ez,
dimensions=2,
rescaling_factor=1.0,
):
"""
Initialize a TLS molecule and its polarization profile.
Parameters
----------
resolution : int
MEEP resolution; sets ``dt = 0.5/resolution`` and ``dx = 1/resolution``.
center : meep.Vector3, default: meep.Vector3()
Molecule center.
size : meep.Vector3, default: meep.Vector3(1, 1, 1)
Molecule extent for integrals and sources.
frequency : float, default: 1.0
TLS transition frequency :math:`\\omega` (MEEP units).
dipole_moment : float, default: 0.1
Effective dipole moment magnitude.
sigma : float, default: 0.1
Width of the polarization density.
orientation : meep.EComponent, default: meep.Ez
Polarization component (``mp.Ex``, ``mp.Ey``, or ``mp.Ez``).
dimensions : int, default: 2
Dimensionality (1, 2, or 3). In 1D/2D the orientation is constrained to ``mp.Ez``.
rescaling_factor : float, default: 1.0
Optional scaling applied to the polarization density and dipole moment.
"""
dt = 0.5 / resolution
dx = 1.0 / resolution
super().__init__(dt=dt, dx=dx, center=center, size=size)
self.omega = frequency
self.dipole_moment = dipole_moment
self.sigma = sigma
self.orientation = orientation
self.dimensions = dimensions
self.rescaling_factor = rescaling_factor
self.t = 0.0
# for 1D or 2D simulations, the TLS orientation is fixed to mp.Ez
if self.dimensions == 2 and self.orientation != mp.Ez:
warnings.warn(
"In 2D simulations, the TLS orientation is fixed to mp.Ez. Setting orientation to mp.Ez."
)
self.orientation = mp.Ez
if self.dimensions == 1 and self.orientation != mp.Ez:
warnings.warn(
"In 1D simulations, the TLS orientation is fixed to mp.Ez. Setting orientation to mp.Ez."
)
self.orientation = mp.Ez
# the polarization prefactor in 3D
self._polarization_prefactor_3d = (
1.0
/ (2.0 * np.pi) ** (1.5)
/ self.sigma**5
* self.dipole_moment
* self.rescaling_factor
)
# the polarization prefactor in 2D
self._polarization_prefactor_2d = (
1.0
/ (2.0 * np.pi) ** (1.0)
/ self.sigma**2
* self.dipole_moment
* self.rescaling_factor
)
# the polarization prefactor in 1D
self._polarization_prefactor_1d = (
1.0
/ (2.0 * np.pi) ** (0.5)
/ self.sigma
* self.dipole_moment
* self.rescaling_factor
)
# the regularized E-field integral depends on the "fingerprint" of molecular polarization density distribution
self.polarization_fingerprint = {
"dimensions": self.dimensions,
"sigma": self.sigma,
"size": [self.size.x, self.size.y, self.size.z],
"rescaling_factor": self.rescaling_factor,
"center": [self.center.x, self.center.y, self.center.z],
}
# generate a hash value for the polarization fingerprint
# this hash value is used to identify if two SocketMolecule instances have the same polarization distribution
self.polarization_fingerprint_hash = hash(
json.dumps(self.polarization_fingerprint)
)
# initialize the instantaneous source amplitude for this fingerprint
if self.polarization_fingerprint_hash not in instantaneous_source_amplitudes:
instantaneous_source_amplitudes[self.polarization_fingerprint_hash] = 0.0
# set the Hamiltonian and density matrix for the TLS molecule
self.Hs = np.array([[0, 0], [0, self.omega]], dtype=np.complex128)
self.SIGMAX = np.array([[0, 1], [1, 0]], dtype=np.complex128)
self.expHs = expm(-1j * self.dt * self.Hs / 2.0)
self.C = np.array([[1], [0]], dtype=np.complex128)
self.rho = np.dot(self.C, self.C.conj().transpose())
self.additional_data_history = []
# initialize the sources for the TLS molecule
self._init_sources()
[docs]
def reset_tls_population(self, excited_population=0.1):
"""
Reset the TLS population to a specified excited-state fraction.
Parameters
----------
excited_population : float, default: 0.1
Population of the excited state (between 0 and 1).
"""
if excited_population < 0 or excited_population > 1:
raise ValueError("Excited population must be between 0 and 1.")
self.C = np.array(
[[(1 - excited_population) ** 0.5], [excited_population**0.5]],
dtype=np.complex128,
)
self.rho = np.dot(self.C, self.C.conj().transpose())
def _refresh_time_units(self, time_units_fs):
"""
Update internal time-unit conversions for this TLS molecule.
Parameters
----------
time_units_fs : float
New femtosecond time unit used for reporting or coupling.
"""
pass
def _init_sources(self):
"""
Initialize MEEP sources for the TLS polarization distribution.
Notes
-----
Builds (or reuses) a shared ``meep.Source`` keyed by a polarization fingerprint,
using a ``CustomSource`` whose amplitude is read from a global accumulator.
"""
# set the source corresponding to the TLS molecule
"""
import math
local_size = 10.0
if (
self.size[0] > local_size
or self.size[1] > local_size
or self.size[2] > local_size
):
raise ValueError(
"The size of the molecule is too large. Please use a smaller size."
)
x = np.arange(-local_size / 2.0, local_size / 2.0, self.dx)
y = np.arange(-local_size / 2.0, local_size / 2.0, self.dx)
z = np.arange(-local_size / 2.0, local_size / 2.0, self.dx)
if self.dimensions == 2:
[X, Y] = np.meshgrid(x, y)
R2 = X**2 + Y**2
Pz_raw = self._polarization_prefactor_2d * np.exp(-R2 / 2.0 / self.sigma**2)
start_idx_x, end_idx_x = int(
(local_size - self.size[0]) / 2 / self.dx
), int((local_size + self.size[0]) / 2 / self.dx)
start_idx_y, end_idx_y = int(
(local_size - self.size[1]) / 2 / self.dx
), int((local_size + self.size[1]) / 2 / self.dx)
Pz0 = Pz_raw[start_idx_x:end_idx_x, start_idx_y:end_idx_y]
Pz0 = np.reshape(
Pz0,
(
math.ceil(self.size[0] / self.dx),
math.ceil(self.size[1] / self.dx),
1,
),
)
polarization_profile_2d = np.copy(Pz0.astype(np.complex128), order="C")
elif self.dimensions == 3:
[X, Y, Z] = np.meshgrid(x, y, z)
R2 = X**2 + Y**2 + Z**2
if self.orientation == mp.Ez:
P3d_raw = (
self._polarization_prefactor_3d
* Z**2
* np.exp(-R2 / 2.0 / self.sigma**2)
)
elif self.orientation == mp.Ex:
P3d_raw = (
self._polarization_prefactor_3d
* X**2
* np.exp(-R2 / 2.0 / self.sigma**2)
)
elif self.orientation == mp.Ey:
P3d_raw = (
self._polarization_prefactor_3d
* Y**2
* np.exp(-R2 / 2.0 / self.sigma**2)
)
start_idx_x, end_idx_x = int(
(local_size - self.size[0]) / 2 / self.dx
), int((local_size + self.size[0]) / 2 / self.dx)
start_idx_y, end_idx_y = int(
(local_size - self.size[1]) / 2 / self.dx
), int((local_size + self.size[1]) / 2 / self.dx)
start_idx_z, end_idx_z = int(
(local_size - self.size[2]) / 2 / self.dx
), int((local_size + self.size[2]) / 2 / self.dx)
P3d0 = P3d_raw[
start_idx_x:end_idx_x, start_idx_y:end_idx_y, start_idx_z:end_idx_z
]
P3d0 = np.reshape(
P3d0,
(
math.ceil(self.size[0] / self.dx),
math.ceil(self.size[1] / self.dx),
math.ceil(self.size[2] / self.dx),
),
)
polarization_profile_3d = np.copy(P3d0.astype(np.complex128), order="C")
# 1. number of primary cells in the molecule box
Nx = int(np.ceil(self.size[0] / self.dx))
Ny = int(np.ceil(self.size[1] / self.dx))
Nz = int(np.ceil(self.size[2] / self.dx))
# 2. half-cell offsets for the Yee lattice
off = {
mp.Ex: (0.0, 0.5, 0.5),
mp.Ey: (0.5, 0.0, 0.5),
mp.Ez: (0.5, 0.5, 0.0),
}[self.orientation]
# coordinate arrays on the **Nx×Ny×Nz** grid
x = (np.arange(Nx) - (Nx - 1) / 2 + off[0]) * self.dx
y = (np.arange(Ny) - (Ny - 1) / 2 + off[1]) * self.dx
z = (np.arange(Nz) - (Nz - 1) / 2 + off[2]) * self.dx
X, Y, Z = np.meshgrid(x, y, z, indexing="ij")
R2 = X**2 + Y**2 + Z**2
if self.orientation == mp.Ex:
P = (
self._polarization_prefactor_3d
* X**2
* np.exp(-R2 / (2 * self.sigma**2))
)
elif self.orientation == mp.Ey:
P = (
self._polarization_prefactor_3d
* Y**2
* np.exp(-R2 / (2 * self.sigma**2))
)
else: # Ez
P = (
self._polarization_prefactor_3d
* Z**2
* np.exp(-R2 / (2 * self.sigma**2))
)
polarization_profile_3d = np.ascontiguousarray(P.astype(np.complex128))
"""
def amp_func_3d_x(R):
return (
self._polarization_prefactor_3d
* R.x
* R.x
* np.exp(-(R.x * R.x + R.y * R.y + R.z * R.z) / 2.0 / self.sigma**2)
)
def amp_func_3d_y(R):
return (
self._polarization_prefactor_3d
* R.y
* R.y
* np.exp(-(R.x * R.x + R.y * R.y + R.z * R.z) / 2.0 / self.sigma**2)
)
def amp_func_3d_z(R):
return (
self._polarization_prefactor_3d
* R.z
* R.z
* np.exp(-(R.x * R.x + R.y * R.y + R.z * R.z) / 2.0 / self.sigma**2)
)
def amp_func_2d(R):
return self._polarization_prefactor_2d * np.exp(
-(R.x * R.x + R.y * R.y) / 2.0 / self.sigma**2
)
def amp_func_1d(R):
return self._polarization_prefactor_1d * np.exp(
-(R.x * R.x) / 2.0 / self.sigma**2
)
key = self.polarization_fingerprint_hash
# ensure an accumulator exists for this fingerprint
_ = instantaneous_source_amplitudes[key] # initializes to 0.0 via defaultdict
# note that using amp_data is x1.5 faster than using amp_func but for now we use amp_func for simplicity
if key not in _fingerprint_source:
_fingerprint_source[key] = mp.Source(
src=make_custom_time_src(key),
component=self.orientation,
center=self.center,
size=self.size,
amplitude=1.0,
# amp_data=polarization_profile_2d if self.dimensions == 2 else polarization_profile_3d
amp_func=(
amp_func_1d
if self.dimensions == 1
else (
amp_func_2d
if self.dimensions == 2
else (
amp_func_3d_x
if self.orientation == mp.Ex
else (
amp_func_3d_y
if self.orientation == mp.Ey
else amp_func_3d_z
)
)
)
),
)
# Each TLSMolecule references the shared Source for its fingerprint
self.sources = [_fingerprint_source[key]]
def _propagate(self, int_ep):
"""
Propagate TLS dynamics under light–matter coupling.
Parameters
----------
int_ep : float
Integral of field–polarization coupling over the molecule volume.
Notes
-----
Uses a split-operator update for the TLS density matrix and logs
per-step data to ``additional_data_history``.
"""
U = np.dot(
self.expHs, np.dot(expm((1j * self.dt * int_ep) * self.SIGMAX), self.expHs)
)
self.rho = np.dot(np.dot(U, self.rho), U.conj().transpose())
self.t += self.dt
self.additional_data_history.append(self.append_additional_data())
[docs]
def append_additional_data(self):
"""
Append diagnostic data for the current TLS state.
Returns
-------
dict
Dictionary with time, populations (``Pg``, ``Pe``) and coherence terms
(``Pge_real``, ``Pge_imag``).
"""
data = {}
data["time"] = self.t
data["Pe"] = self.rho[1, 1].real
data["Pg"] = self.rho[0, 0].real
data["Pge_real"] = np.real(self.rho[0, 1])
data["Pge_imag"] = np.imag(self.rho[0, 1])
return data
def _update_source_amplitude(self):
"""
Compute instantaneous TLS source amplitude for driving the EM field.
Returns
-------
float
Amplitude proportional to :math:`-2\\,\\omega\\,\\Im\\rho_{ge}`.
"""
amp = -2.0 * self.omega * np.imag(self.rho[0, 1])
return float(amp)
def _calculate_ep_integral(self, sim):
"""
Compute the field–polarization coupling integral for the TLS.
Parameters
----------
sim : meep.Simulation
Active MEEP simulation.
Returns
-------
float
Scalar coupling integral consistent with the TLS orientation and dimension.
"""
int_ep = 0.0
if self.dimensions == 1:
int_ep = sim.integrate_field_function(
[mp.Ez],
lambda R, ez: self._polarization_prefactor_1d
* exp(
-((R.x - self.center.x) * (R.x - self.center.x))
/ (2.0 * self.sigma**2)
)
* (ez),
mp.Volume(size=self.size, center=self.center),
)
elif self.dimensions == 2:
int_ep = sim.integrate_field_function(
[mp.Ez],
lambda R, ez: self._polarization_prefactor_2d
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
)
/ (2.0 * self.sigma**2)
)
* (ez),
mp.Volume(size=self.size, center=self.center),
)
elif self.dimensions == 3:
if self.orientation == mp.Ez:
int_ep = sim.integrate_field_function(
[mp.Ez],
lambda R, ez: self._polarization_prefactor_3d
* (R.z - self.center.z)
* (R.z - self.center.z)
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
+ (R.z - self.center.z) * (R.z - self.center.z)
)
/ (2.0 * self.sigma**2)
)
* (ez),
mp.Volume(size=self.size, center=self.center),
)
elif self.orientation == mp.Ex:
int_ep = sim.integrate_field_function(
[mp.Ex],
lambda R, ex: self._polarization_prefactor_3d
* (R.x - self.center.x)
* (R.x - self.center.x)
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
+ (R.z - self.center.z) * (R.z - self.center.z)
)
/ (2.0 * self.sigma**2)
)
* (ex),
mp.Volume(size=self.size, center=self.center),
)
elif self.orientation == mp.Ey:
int_ep = sim.integrate_field_function(
[mp.Ey],
lambda R, ey: self._polarization_prefactor_3d
* (R.y - self.center.y)
* (R.y - self.center.y)
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
+ (R.z - self.center.z) * (R.z - self.center.z)
)
/ (2.0 * self.sigma**2)
)
* (ey),
mp.Volume(size=self.size, center=self.center),
)
return int_ep
[docs]
def print_dynamics(self):
"""
Print basic TLS dynamics (e.g., excited-state population) over time.
"""
for idx, rho in enumerate(self.rho_history):
print(f"Time step {idx}: Excited-state population = {np.real(rho[1,1])}")
[docs]
def update_molecules_no_socket(sources_non_molecule=None, molecules=None):
"""
Step function for molecule collections without socket-based drivers.
Parameters
----------
sources_non_molecule : list or None
Non-molecule sources to include in the simulation.
molecules : list or None
List of molecule instances (e.g., ``TLSMolecule``).
Returns
-------
Callable
Function ``step(sim)`` to be passed to ``sim.run(step, ...)``.
Notes
-----
Accumulates per-fingerprint source amplitudes so multiple molecules sharing
the same polarization density drive a single shared ``CustomSource``.
"""
if sources_non_molecule is None:
sources_non_molecule = []
if molecules is None:
molecules = []
started = {"flag": False}
def __step_function__(sim):
# 0. First call: install the union of unique fingerprint Sources + non-molecule sources
if not started["flag"]:
# ensure every molecule has created/registered its fingerprint Source
for m in molecules:
if not m.sources:
m._init_sources()
unique_sources = list(
{id(src): src for m in molecules for src in m.sources}.values()
)
sim.change_sources(list(sources_non_molecule) + unique_sources)
# register cleanup to reset global state when sim ends or at exit
_register_sim_cleanup(sim)
started["flag"] = True
# 1. (optional) precompute field integrals by fingerprint
regularized_efield_integrals = {}
# 2. ZERO the per-fingerprint accumulator for THIS STEP
touched_keys = set()
# 3. Propagate each molecule and accumulate its amplitude into the fingerprint bin
for m in molecules:
key = m.polarization_fingerprint_hash
if key not in regularized_efield_integrals:
int_ep = m._calculate_ep_integral(sim)
regularized_efield_integrals[key] = int_ep
else:
int_ep = regularized_efield_integrals[key]
m._propagate(int_ep)
amp = m._update_source_amplitude()
# first time we see this key this step, reset its accumulator
if key not in touched_keys:
instantaneous_source_amplitudes[key] = 0.0
touched_keys.add(key)
instantaneous_source_amplitudes[key] += amp
# 4. We do not call sim.change_sources() here: CustomSource reads the updated accumulators automatically
return __step_function__
# ----------- SocketMolecule class for coupling to a local driver code via socket communication -----------
fs_to_au = 41.341373335 # 1 fs = 41.341373335 a.u. from wikipedia::atomic_units
[docs]
def meep_to_atomic_units_E(
Emu_vec3: np.ndarray, time_units_fs: float = 0.1
) -> np.ndarray:
"""
Convert E-field integrals from MEEP units to atomic units.
Parameters
----------
Emu_vec3 : array-like, shape (3,)
Field integral(s) in MEEP units.
time_units_fs : float, default: 0.1
Time unit :math:`\\tau` in femtoseconds.
Returns
-------
numpy.ndarray
Field integral(s) in atomic units.
"""
factor = 1.2929541569381223e-6 / (time_units_fs**2)
return np.asarray(Emu_vec3, dtype=float) * factor
[docs]
def atomic_to_meep_units_SourceAmp(
amp_au_vec3: np.ndarray, time_units_fs: float = 0.1
) -> np.ndarray:
"""
Convert source amplitudes (dipole-rate) from atomic units to MEEP units.
The source amplitude has units of dipole moment per unit time.
Parameters
----------
amp_au_vec3 : array-like, shape (3,)
Source amplitude(s) in atomic units.
time_units_fs : float, default: 0.1
Time unit used for conversion.
Returns
-------
numpy.ndarray
Source amplitude(s) in MEEP units.
"""
factor = 0.002209799779149953
return np.asarray(amp_au_vec3, dtype=float) * factor
[docs]
class SocketMolecule(DummyMolecule):
"""
Molecule whose dynamics are provided by an external (socket) driver.
The driver receives field–polarization integrals and returns source amplitudes
per component; the amplitudes are then applied via MEEP ``CustomSource``.
"""
[docs]
def __init__(
self,
hub: SocketHub,
molecule_id: int,
resolution: int,
center=mp.Vector3(),
size=mp.Vector3(1, 1, 1),
dimensions=2,
sigma=0.1,
init_payload: Optional[Dict] = None,
time_units_fs: float = 0.1,
rescaling_factor: float = 1.0,
):
"""
Initialize a socket-coupled molecule and register it with the hub.
Parameters
----------
hub : :class:`~maxwelllink.sockets.sockets.SocketHub`
Socket hub handling driver connections.
molecule_id : int
Unique molecule identifier known to the driver.
resolution : int
MEEP resolution; sets ``dt`` and ``dx``.
center : meep.Vector3, default: meep.Vector3()
Molecule center.
size : meep.Vector3, default: meep.Vector3(1, 1, 1)
Molecule extent for integrals and sources.
dimensions : int, default: 2
Simulation dimensionality (1, 2, or 3).
sigma : float, default: 0.1
Width of the polarization density.
init_payload : dict or None, optional
Extra initialization parameters forwarded to the driver.
time_units_fs : float, default: 0.1
Time unit (fs). Driver time step is ``dt * time_units_fs * fs_to_au`` (a.u.).
rescaling_factor : float, default: 1.0
Optional scaling applied to the polarization density.
"""
super().__init__(
dt=0.5 / resolution, dx=1.0 / resolution, center=center, size=size
)
self.resolution = resolution
self.dimensions = dimensions
self.sigma = sigma
self.rescaling_factor = rescaling_factor
# MEEP by default uses the units system of epsion_0 = mu_0 = c = 1.
# SocketMolecule communicates with the driver code using atomic units.
# To connect the two unit systems, we define the time unit in fs.
# The time step in MEEP is self.dt in MEEP units, which is self.dt * time_units_fs in fs.
# So SocketMolecule will tell the driver code to use a time step of self.dt * time_units_fs * fs_to_au atomic units.
self.time_units_fs = time_units_fs
self.dt_au = self.dt * time_units_fs * fs_to_au
if dimensions not in [1, 2, 3]:
raise ValueError("SocketMolecule only supports 1D, 2D and 3D simulations.")
if hub is None:
raise ValueError("SocketHub must be provided for SocketMolecule.")
if molecule_id is None:
raise ValueError("molecule_id must be provided for SocketMolecule.")
# the polarization prefactor in 3D excluding the transient dipole moment
self._polarization_prefactor_3d = (
1.0 / (2.0 * np.pi) ** (1.5) / self.sigma**5 * self.rescaling_factor
)
# the polarization prefactor in 2D excluding the transient dipole moment
self._polarization_prefactor_2d = (
1.0 / (2.0 * np.pi) ** (1.0) / self.sigma**2 * self.rescaling_factor
)
# the polarization prefactor in 1D excluding the transient dipole moment
self._polarization_prefactor_1d = (
1.0 / (2.0 * np.pi) ** (0.5) / self.sigma * self.rescaling_factor
)
# the regularized E-field integral depends on the "fingerprint" of molecular polarization density distribution
self.polarization_fingerprint = {
"dimensions": self.dimensions,
"sigma": self.sigma,
"size": [self.size.x, self.size.y, self.size.z],
"rescaling_factor": self.rescaling_factor,
"center": [self.center.x, self.center.y, self.center.z],
}
# generate a hash value for the polarization fingerprint
# this hash value is used to identify if two SocketMolecule instances have the same polarization distribution
self.polarization_fingerprint_hash = hash(
json.dumps(self.polarization_fingerprint)
)
self._init_sources()
# define the socket hub and molecule id
self.hub = hub
self.molecule_id = molecule_id
self.init_payload = init_payload if init_payload is not None else {}
# append the time step information to the init_payload
self.init_payload.update({"dt_au": self.dt_au})
# register this molecule with the hub so it knows to expect a client
if mp.am_master():
self.hub.register_molecule(self.molecule_id)
# only the master process prints the units information
if self.molecule_id == 0:
self.units_helper()
# store the additional data the driver code transfers back to SocketMolecule. Each entry is a dict.
self.additional_data_history = []
def _refresh_time_units(self, time_units_fs):
"""
Refresh time-unit conversion used for driver coupling.
Parameters
----------
time_units_fs : float
New femtosecond time unit; updates internal ``dt_au`` and payload.
"""
self.time_units_fs = time_units_fs
self.dt_au = self.dt * time_units_fs * fs_to_au
# also refresh in init_payload for drivers
self.init_payload["dt_au"] = self.dt_au
def _init_sources(self):
"""
Initialize shared MEEP sources for this molecule’s polarization.
Notes
-----
In 1D/2D, only ``Ez`` is used; in 3D, separate sources are created for
``Ex``, ``Ey``, and ``Ez``. Sources are shared across molecules with the
same polarization fingerprint.
"""
# Define the spatial shape of the polarization density
def amp_func_3d_x(R):
return (
self._polarization_prefactor_3d
* R.x**2
* np.exp(-R.norm() ** 2 / (2.0 * self.sigma**2))
)
def amp_func_3d_y(R):
return (
self._polarization_prefactor_3d
* R.y**2
* np.exp(-R.norm() ** 2 / (2.0 * self.sigma**2))
)
def amp_func_3d_z(R):
return (
self._polarization_prefactor_3d
* R.z**2
* np.exp(-R.norm() ** 2 / (2.0 * self.sigma**2))
)
def amp_func_2d(R):
return self._polarization_prefactor_2d * np.exp(
-(R.x**2 + R.y**2) / (2.0 * self.sigma**2)
)
def amp_func_1d(R):
return self._polarization_prefactor_1d * np.exp(
-(R.x**2) / (2.0 * self.sigma**2)
)
# Build/reuse one Source per (fingerprint, component)
srcs = []
if self.dimensions == 1:
key = (self.polarization_fingerprint_hash, "Ez")
if key not in _fingerprint_source:
instantaneous_source_amplitudes[key] = 0.0 # ensure accumulator exists
_fingerprint_source[key] = mp.Source(
src=make_custom_time_src(key), # <- reads accumulator each step
component=mp.Ez,
center=self.center,
size=self.size,
amplitude=1.0, # fixed scalar; time dependence via CustomSource
amp_func=amp_func_1d,
)
srcs.append(_fingerprint_source[key])
elif self.dimensions == 2:
key = (self.polarization_fingerprint_hash, "Ez")
if key not in _fingerprint_source:
instantaneous_source_amplitudes[key] = 0.0 # ensure accumulator exists
_fingerprint_source[key] = mp.Source(
src=make_custom_time_src(key), # <- reads accumulator each step
component=mp.Ez,
center=self.center,
size=self.size,
amplitude=1.0, # fixed scalar; time dependence via CustomSource
amp_func=amp_func_2d,
)
srcs.append(_fingerprint_source[key])
elif self.dimensions == 3:
for comp, tag, amp_func in (
(mp.Ex, "Ex", amp_func_3d_x),
(mp.Ey, "Ey", amp_func_3d_y),
(mp.Ez, "Ez", amp_func_3d_z),
):
key = (self.polarization_fingerprint_hash, tag)
if key not in _fingerprint_source:
instantaneous_source_amplitudes[key] = 0.0
_fingerprint_source[key] = mp.Source(
src=make_custom_time_src(key),
component=comp,
center=self.center,
size=self.size,
amplitude=1.0,
amp_func=amp_func,
)
srcs.append(_fingerprint_source[key])
else:
raise ValueError("SocketMolecule only supports 2D or 3D.")
# This molecule references the shared sources
self.sources = srcs
def _calculate_ep_integral(self, sim):
"""
Compute vector field–polarization integrals over the molecule volume.
Parameters
----------
sim : meep.Simulation
Active MEEP simulation.
Returns
-------
list of float
``[int_x, int_y, int_z]`` (real parts), with zeroed components when
not applicable in lower dimensions.
"""
vol = mp.Volume(size=self.size, center=self.center)
int_ep_x, int_ep_y, int_ep_z = 0.0, 0.0, 0.0
if self.dimensions == 1:
int_ep_z = sim.integrate_field_function(
[mp.Ez],
lambda R, ez: self._polarization_prefactor_1d
* exp(
-((R.x - self.center.x) * (R.x - self.center.x))
/ (2.0 * self.sigma**2)
)
* (ez),
vol,
)
elif self.dimensions == 2:
int_ep_z = sim.integrate_field_function(
[mp.Ez],
lambda R, ez: self._polarization_prefactor_2d
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
)
/ (2.0 * self.sigma**2)
)
* (ez),
vol,
)
elif self.dimensions == 3:
int_ep_z = sim.integrate_field_function(
[mp.Ez],
lambda R, ez: self._polarization_prefactor_3d
* (R.z - self.center.z)
* (R.z - self.center.z)
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
+ (R.z - self.center.z) * (R.z - self.center.z)
)
/ (2.0 * self.sigma**2)
)
* (ez),
vol,
)
int_ep_x = sim.integrate_field_function(
[mp.Ex],
lambda R, ex: self._polarization_prefactor_3d
* (R.x - self.center.x)
* (R.x - self.center.x)
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
+ (R.z - self.center.z) * (R.z - self.center.z)
)
/ (2.0 * self.sigma**2)
)
* (ex),
vol,
)
int_ep_y = sim.integrate_field_function(
[mp.Ey],
lambda R, ey: self._polarization_prefactor_3d
* (R.y - self.center.y)
* (R.y - self.center.y)
* exp(
-(
(R.x - self.center.x) * (R.x - self.center.x)
+ (R.y - self.center.y) * (R.y - self.center.y)
+ (R.z - self.center.z) * (R.z - self.center.z)
)
/ (2.0 * self.sigma**2)
)
* (ey),
vol,
)
# MEEP returns complex numbers, but the physical interaction is real
return [np.real(int_ep_x), np.real(int_ep_y), np.real(int_ep_z)]
def _update_source_amplitude(self, amp_vec3: np.ndarray):
"""
Convert driver-returned amplitudes (a.u.) to MEEP units per component.
Parameters
----------
amp_vec3 : numpy.ndarray, shape (3,)
Source amplitudes in atomic units.
Returns
-------
numpy.ndarray, shape (3,)
Converted amplitudes in MEEP units.
"""
amp_mu = atomic_to_meep_units_SourceAmp(amp_vec3, self.time_units_fs)
return np.asarray(amp_mu, dtype=float)
[docs]
def units_helper(self):
"""
Explain the MEEP unit system and its relation to atomic units.
Notes
-----
Prints commonly used conversions for the current ``time_units_fs`` and
simulation resolution.
"""
# calculate units conversion factors
mu2efield_au = meep_to_atomic_units_E(1.0, self.time_units_fs)
mu2efield_si = mu2efield_au * 5.14220675112e11 # V/m
# audipoledt2mu = atomic_to_meep_units_SourceAmp(1.0, self.time_units_fs)
print(
"\n\n ######### MaxwellLink Units Helper #########\n",
"MEEP uses its own units system, which is based on the speed of light in vacuum (c=1), \n",
"the permittivity of free space (epsilon_0=1), and the permeability of free space (mu_0=1). \n",
"To couple MEEP with molecular dynamics, we set [c] = [epsilon_0] = [mu_0] = [hbar] = 1. \n",
"By further defining the time unit as %.2E fs, we can fix the units system of MEEP (mu).\n\n"
% self.time_units_fs,
"Given the simulation resolution = %d,\n - FDTD dt = %.2E mu (0.5/resolution) = %.2E fs\n"
% (self.resolution, self.dt, self.dt * self.time_units_fs),
"- FDTD dx = %.2E mu (1.0/resolution) = %.2E nm\n"
% (self.dx, self.dx * self.time_units_fs * 299.792458),
"- Time [t]: 1 mu = %.2E fs = %.2E a.u.\n"
% (self.time_units_fs, self.time_units_fs * fs_to_au),
"- Length [x]: 1 mu = %.2E nm\n" % (299.792458 * self.time_units_fs),
# "- Frequency (defining MEEP source frequency) [f]: 1 mu = %.4E THz\n"
# % (41.341373335 / self.time_units_fs / 2.0 / np.pi),
"- Angular frequency [omega = 2 pi * f]: 1 mu = %.4E eV = %.4E a.u.\n"
% (
0.242 / self.time_units_fs * 27.211 * 0.1,
0.242 / self.time_units_fs * 0.1,
),
"- Electric field [E]: 1 mu = %.2E V/m = %.2E a.u.\n"
% (mu2efield_si, mu2efield_au),
# "- Dipole moment [d]: 1 mu = %.2E C*m = %.2E a.u.\n"
# % (1.0 * self.dx * 299.792458, 1.0 * self.dx * 299.792458 * 1.0),
"Hope this helps!\n",
"############################################\n\n",
)
### Collective step function for many socket molecules ###
[docs]
def update_molecules_no_mpi(
hub: SocketHub, molecules: List[SocketMolecule], sources_non_molecule: List = None
):
"""
Collective step function for multiple socket molecules (no MPI).
Parameters
----------
hub : :class:`~maxwelllink.sockets.sockets.SocketHub`
Socket hub to use for communication.
molecules : list of SocketMolecule
Molecules to advance.
sources_non_molecule : list, optional
Non-molecule sources to include.
Returns
-------
Callable
Function ``step(sim)`` to be passed to ``sim.run(step, ...)``.
Notes
-----
Handles binding/rebinding of drivers, performs barrier-style exchanges,
and accumulates amplitudes into per-fingerprint component bins.
"""
if sources_non_molecule is None:
sources_non_molecule = []
started = {"flag": False}
paused = {"flag": False}
def __step_function__(sim: mp.Simulation):
# === First-time barrier ===
while not started["flag"]:
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
ok = hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
if not ok:
raise RuntimeError("wait_until_bound timed out")
# register cleanup to reset global state when sim ends or at exit
_register_sim_cleanup(sim)
started["flag"] = True
# Install sources ONCE: union of non-molecule sources + unique shared sources
unique_sources = list(
{id(src): src for m in molecules for src in m.sources}.values()
)
sim.change_sources(list(sources_non_molecule) + unique_sources)
# === Mid-run guard ===
ids = [m.molecule_id for m in molecules]
while not hub.all_bound(ids, require_init=True):
if not paused["flag"]:
print("[SocketHub] PAUSED: waiting for driver reconnection...")
paused["flag"] = True
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
print("[SocketHub] RESUMED: all drivers reconnected.")
paused["flag"] = False
# === Build requests (cache integrals per fingerprint) ===
requests = {}
regularized_efield_integrals = {}
for mol in molecules:
key_fp = mol.polarization_fingerprint_hash
if key_fp not in regularized_efield_integrals:
int_ep_mu = mol._calculate_ep_integral(sim)
regularized_efield_integrals[key_fp] = int_ep_mu
else:
int_ep_mu = regularized_efield_integrals[key_fp]
int_ep_au = meep_to_atomic_units_E(int_ep_mu, mol.time_units_fs)
requests[mol.molecule_id] = {
"efield_au": int_ep_au,
"meta": {"t": sim.meep_time()},
"init": {**mol.init_payload, "molecule_id": mol.molecule_id},
}
# === Hub round-trip (with reconnection pause) ===
responses = hub.step_barrier(requests)
while not responses:
if not paused["flag"]:
print("[SocketHub] PAUSED: waiting for driver reconnection...")
paused["flag"] = True
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
print("[SocketHub] RESUMED: all drivers reconnected.")
paused["flag"] = False
responses = hub.step_barrier(requests)
# === Aggregate amplitudes into per-(fingerprint,component) accumulators ===
# Zero only the keys we will touch this step (avoids clearing unrelated fingerprints)
touched_keys = set()
for mol in molecules:
if mol.molecule_id not in responses:
print(
f"Warning: no response for SocketMolecule {mol.molecule_id} at t={sim.meep_time():.2f}."
)
continue
amp_au = np.asarray(responses[mol.molecule_id]["amp"], dtype=float)
amp_mu = mol._update_source_amplitude(
amp_au
) # returns [ax, ay, az] (or only z in 2D)
# Map returned vector into component keys
if mol.dimensions == 1 or mol.dimensions == 2:
key = (mol.polarization_fingerprint_hash, "Ez")
if key not in touched_keys:
instantaneous_source_amplitudes[key] = 0.0
touched_keys.add(key)
instantaneous_source_amplitudes[key] += float(amp_mu[2])
else: # 3D
for tag, val in (
("Ex", amp_mu[0]),
("Ey", amp_mu[1]),
("Ez", amp_mu[2]),
):
key = (mol.polarization_fingerprint_hash, tag)
if key not in touched_keys:
instantaneous_source_amplitudes[key] = 0.0
touched_keys.add(key)
instantaneous_source_amplitudes[key] += float(val)
# optional: stash any extra payload
extra_blob = responses[mol.molecule_id].get("extra", b"")
if extra_blob:
try:
mol.additional_data_history.append(
json.loads(extra_blob.decode("utf-8"))
)
except Exception:
pass
# === Do NOT call sim.change_sources() here ===
# Meep will sample the updated accumulators through CustomSource on the next field update.
return __step_function__
[docs]
def update_molecules(
hub: SocketHub, molecules: List[SocketMolecule], sources_non_molecule: List = None
):
"""
MPI-safe collective step function for multiple socket molecules.
Parameters
----------
hub : :class:`~maxwelllink.sockets.sockets.SocketHub`
Socket hub used by the master rank.
molecules : list of SocketMolecule
Molecules to advance.
sources_non_molecule : list, optional
Non-molecule sources to include.
Returns
-------
Callable
Function ``step(sim)`` for use with ``sim.run(step, ...)`` on all ranks.
Notes
-----
Performs driver synchronization on the master, broadcasts amplitudes to
workers, and updates shared accumulators without additional source changes.
"""
import json as _json
# --- detect MPI (optional) ---
try:
from mpi4py import MPI as _MPI
_COMM = _MPI.COMM_WORLD
_RANK = _COMM.Get_rank()
_SIZE = _COMM.Get_size()
_HAS_MPI = _SIZE > 1
except Exception:
_COMM = None
_RANK = 0
_SIZE = 1
_HAS_MPI = False
if sources_non_molecule is None:
sources_non_molecule = []
# Only the true master should touch the hub
_is_master = (not _HAS_MPI) or (_RANK == 0 and mp.am_master())
if _HAS_MPI and _RANK != 0:
hub = None # safety: non-master must never call hub methods
started = {"flag": False}
paused = {"flag": False}
def _bcast_bool(x: bool) -> bool:
if _HAS_MPI:
x = bool(_COMM.bcast(bool(x), root=0))
return bool(x)
def _bcast_array(arr, n):
"""Broadcast a flat float64 array of length n from master."""
if not _HAS_MPI:
return np.asarray(arr, dtype=float).reshape(n)
if _is_master:
buf = np.asarray(arr, dtype=float).reshape(n)
else:
buf = np.empty(n, dtype=float)
_COMM.Bcast(buf, root=0)
return buf
def __step_function__(sim: mp.Simulation):
# ============= 1) FIRST-TIME BARRIER (ALWAYS BLOCK UNTIL BOUND) =============
while not started["flag"]:
ok = True
if _is_master:
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
ok = hub.wait_until_bound(
init_payloads, require_init=True, timeout=None
)
ok = _bcast_bool(ok)
if not ok:
raise RuntimeError("wait_until_bound timed out")
# Ensure each molecule has created/registered its shared sources
for m in molecules:
if not m.sources:
m._init_sources()
# Install sources identically on all ranks ONCE
unique_sources = list(
{id(src): src for m in molecules for src in m.sources}.values()
)
sim.change_sources(list(sources_non_molecule) + unique_sources)
# register cleanup to reset global state when sim ends or at exit
_register_sim_cleanup(sim)
started["flag"] = True
# ============= 2) MID-RUN GUARD (BLOCK UNTIL EVERYONE IS STILL BOUND) =============
dropped = False
if _is_master:
ids = [m.molecule_id for m in molecules]
dropped = not hub.all_bound(ids, require_init=True)
dropped = _bcast_bool(dropped)
while dropped:
if not paused["flag"]:
print("[SocketHub] PAUSED: waiting for driver reconnection...")
paused["flag"] = True
if _is_master:
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
_ = _bcast_bool(True) # sync point
print("[SocketHub] RESUMED: all drivers reconnected.")
paused["flag"] = False
dropped = False
if _is_master:
ids = [m.molecule_id for m in molecules]
dropped = not hub.all_bound(ids, require_init=True)
dropped = _bcast_bool(dropped)
# ============= 3) BUILD REQUESTS (E-FIELD INTEGRALS) =============
requests = {}
regularized_efield_integrals = {}
for mol in molecules:
key_fp = mol.polarization_fingerprint_hash
if key_fp not in regularized_efield_integrals:
int_ep_mu = mol._calculate_ep_integral(sim)
regularized_efield_integrals[key_fp] = int_ep_mu
else:
int_ep_mu = regularized_efield_integrals[key_fp]
int_ep_au = meep_to_atomic_units_E(int_ep_mu, mol.time_units_fs)
requests[mol.molecule_id] = {
"efield_au": int_ep_au,
"meta": {"t": sim.meep_time()},
"init": {**mol.init_payload, "molecule_id": mol.molecule_id},
}
# ============= 4) HUB ROUND-TRIP ON MASTER, WITH RETRY =============
nmol = len(molecules)
amps_flat = np.zeros(3 * nmol, dtype=float) # [ax,ay,az] per molecule
extras_by_id = {}
had_responses = True
if _is_master:
responses = hub.step_barrier(requests)
while not responses:
if not paused["flag"]:
print("[SocketHub] PAUSED: waiting for driver reconnection...")
paused["flag"] = True
init_payloads = {
m.molecule_id: {**m.init_payload, "molecule_id": m.molecule_id}
for m in molecules
}
hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
print("[SocketHub] RESUMED: all drivers reconnected.")
paused["flag"] = False
responses = hub.step_barrier(requests)
# Pack responses in molecule order (atomic units)
off = 0
for mol in molecules:
a = np.asarray(responses[mol.molecule_id]["amp"], dtype=float).reshape(
3
)
amps_flat[off : off + 3] = a
extras_by_id[mol.molecule_id] = responses[mol.molecule_id].get(
"extra", b""
)
off += 3
had_responses = _bcast_bool(had_responses and _is_master or (not _HAS_MPI))
if not had_responses:
return # should not happen with the retry loop
# ============= 5) BROADCAST AMPLITUDES (a.u.) TO ALL RANKS =============
amps_flat = _bcast_array(amps_flat, 3 * nmol)
# ============= 6) UPDATE ACCUMULATORS (NO change_sources) =============
# Zero only the accumulators we touch this step
touched_keys = set()
off = 0
for mol in molecules:
amp_au = amps_flat[off : off + 3]
off += 3
# Convert to Meep units using the molecule's own conversion (handles time_units_fs)
amp_mu = mol._update_source_amplitude(
amp_au
) # returns np.array([ax, ay, az])
# Sum into shared accumulators by (fingerprint, component)
if mol.dimensions == 1 or mol.dimensions == 2:
key = (mol.polarization_fingerprint_hash, "Ez")
if key not in touched_keys:
instantaneous_source_amplitudes[key] = 0.0
touched_keys.add(key)
instantaneous_source_amplitudes[key] += float(amp_mu[2])
else:
for tag, val in (
("Ex", amp_mu[0]),
("Ey", amp_mu[1]),
("Ez", amp_mu[2]),
):
key = (mol.polarization_fingerprint_hash, tag)
if key not in touched_keys:
instantaneous_source_amplitudes[key] = 0.0
touched_keys.add(key)
instantaneous_source_amplitudes[key] += float(val)
# Master keeps optional extras to avoid extra comms
if _is_master:
extra_blob = extras_by_id.get(mol.molecule_id, b"")
if extra_blob:
try:
mol.additional_data_history.append(
_json.loads(extra_blob.decode("utf-8"))
)
except Exception:
pass
# ============= 7) DONE — CustomSource will read new accumulators next tick =============
# NO sim.change_sources() here.
return __step_function__