Source code for pyftle.integrate

from abc import ABC, abstractmethod
from typing import Optional, cast

import numpy as np
from numba import njit  # type: ignore

from pyftle.interpolate import Interpolator
from pyftle.my_types import Array2xN, Array3xN, ArrayNx2, ArrayNx3
from pyftle.particles import NeighboringParticles

# ============================================================
# Low-level numerical kernels (Numba-accelerated)
# ============================================================


[docs] @njit(inline="always") def euler_step_inplace( positions: ArrayNx2 | ArrayNx3, h: float, velocity: Array2xN | Array3xN, ): r""" Perform an in-place Euler integration step. Updates particle positions according to: .. math:: x_{n+1} = x_n + h \, v_n Parameters ---------- positions : ArrayNx2 | ArrayNx3 Particle positions at the current time. Updated in-place. h : float Time step size. velocity : Array2xN | Array3xN Velocity field evaluated at the current positions. Notes ----- This is a low-level kernel designed for performance with Numba. It does not allocate memory or perform error checking. """ positions += h * velocity
[docs] @njit(inline="always") def adams_bashforth_2_step_inplace( positions: ArrayNx2 | ArrayNx3, h: float, v_current: Array2xN | Array3xN, v_prev: Array2xN | Array3xN, ): """ Perform an in-place second-order Adams–Bashforth (AB2) integration step. Updates particle positions according to: .. math:: x_{n+1} = x_n + h \\left( \\tfrac{3}{2} v_n - \\tfrac{1}{2} v_{n-1} \\right) Parameters ---------- positions : ArrayNx2 or ArrayNx3 Particle positions at the current time. Updated in-place. h : float Time step size. v_current : Array2xN or Array3xN Velocity field evaluated at the current positions. v_prev : Array2xN or Array3xN Velocity field evaluated at the previous positions. """ positions += h * (1.5 * v_current - 0.5 * v_prev)
[docs] @njit(inline="always") def runge_kutta_4_step_inplace( positions: ArrayNx2 | ArrayNx3, h: float, k1: ArrayNx2 | ArrayNx3, k2: ArrayNx2 | ArrayNx3, k3: ArrayNx2 | ArrayNx3, k4: ArrayNx2 | ArrayNx3, ): """ Perform an in-place fourth-order Runge-Kutta (RK4) integration step. Updates particle positions according to: .. math:: x_{n+1} = x_n + \\frac{h}{6}(k_1 + 2k_2 + 2k_3 + k_4) Parameters ---------- positions : ArrayNx2 | ArrayNx3 Particle positions at the current time. Updated in-place. h : float Time step size. k1, k2, k3, k4 : ArrayNx2 | ArrayNx3 Intermediate slope (velocity) evaluations for RK4. """ positions += (h / 6.0) * (k1 + 2.0 * k2 + 2.0 * k3 + k4)
# ============================================================ # Integrator Base Class # =========================================================
[docs] class Integrator(ABC): """ Abstract base class for numerical ODE integrators. All specific integrator implementations (Euler, AB2, RK4) must derive from this class and implement the :meth:`integrate` method. Parameters ---------- interpolator : Interpolator Interpolator used to evaluate velocity from particle positions. **kwargs Optional extra arguments specific to the integrator implementation. """ def __init__(self, interpolator: Interpolator, **kwargs) -> None: # noqa: ARG002 self.interpolator = interpolator
[docs] @abstractmethod def integrate(self, h: float, particles: NeighboringParticles) -> None: """ Perform a single integration step. Parameters ---------- h : float Time step size. particles : NeighboringParticles Dataclass containing particle positions and neighboring data. Notes ----- This method must mutate ``particles.positions`` in-place. Implementations should use Numba-accelerated kernels for efficiency. """ pass
# ============================================================ # Euler Integrator # ============================================================
[docs] class EulerIntegrator(Integrator): r""" First-order explicit Euler integrator. The Euler method approximates the solution of an ODE by advancing the state using the current derivative (velocity): .. math:: x_{n+1} = x_n + h \, v_n """ def __init__(self, interpolator: Interpolator, **kwargs) -> None: super().__init__(interpolator, **kwargs) # Preallocate a temporary velocity buffer self._velocity = None
[docs] def integrate(self, h: float, particles: NeighboringParticles) -> None: """ Perform a single Euler integration step in-place. Parameters ---------- h : float Time step size. particles : NeighboringParticles Particle data with current positions to be updated. """ # Get or allocate buffer if self._velocity is None or self._velocity.shape != particles.positions.shape: self._velocity = np.empty_like(particles.positions) # Interpolate velocity directly into the preallocated buffer np.copyto(self._velocity, self.interpolator.interpolate(particles.positions)) # In-place update using numba kernel euler_step_inplace(particles.positions, h, self._velocity)
# ============================================================ # Adams-Bashforth 2 Integrator # ============================================================
[docs] class AdamsBashforth2Integrator(Integrator): """ Second-order explicit Adams-Bashforth (AB2) integrator. Uses a linear combination of the current and previous velocities to achieve second-order accuracy: .. math:: x_{n+1} = x_n + h \\left( \\frac{3}{2} v_n - \\frac{1}{2} v_{n-1} \\right) On the first iteration (when no previous velocity is available), it defaults to a single Euler step. """ def __init__(self, interpolator: Interpolator, **kwargs) -> None: super().__init__(interpolator, **kwargs) self.previous_velocity = None self._velocity = None
[docs] def integrate(self, h: float, particles: NeighboringParticles) -> None: """ Perform a single AB2 integration step in-place. Parameters ---------- h : float Time step size. particles : NeighboringParticles Particle data with current positions to be updated. Notes ----- Uses Euler integration for the first step, when no previous velocity field is available. """ if self._velocity is None or self._velocity.shape != particles.positions.shape: self._velocity = np.empty_like(particles.positions) # Compute current velocity np.copyto(self._velocity, self.interpolator.interpolate(particles.positions)) if self.previous_velocity is None: # First step: Euler fallback euler_step_inplace(particles.positions, h, self._velocity) else: # Use AB2 formula in-place adams_bashforth_2_step_inplace( particles.positions, h, self._velocity, self.previous_velocity ) # Swap references to avoid reallocations if self.previous_velocity is None: self.previous_velocity = np.empty_like(self._velocity) np.copyto(self.previous_velocity, self._velocity)
# ============================================================ # Runge-Kutta 4 Integrator # ============================================================
[docs] class RungeKutta4Integrator(Integrator): """ Fourth-order explicit Runge-Kutta (RK4) integrator. Uses four slope evaluations per step to achieve fourth-order accuracy: .. math:: x_{n+1} = x_n + \\frac{h}{6} (k_1 + 2k_2 + 2k_3 + k_4) where: k₁ = f(t, x) k₂ = f(t + h/2, x + h k₁/2) k₃ = f(t + h/2, x + h k₂/2) k₄ = f(t + h, x + h k₃) """ def __init__(self, interpolator: Interpolator, **kwargs) -> None: super().__init__(interpolator, **kwargs) # Preallocate buffers for intermediate slopes self._k1: Optional[ArrayNx2 | ArrayNx3 | None] = None self._k2: Optional[ArrayNx2 | ArrayNx3 | None] = None self._k3: Optional[ArrayNx2 | ArrayNx3 | None] = None self._k4: Optional[ArrayNx2 | ArrayNx3 | None] = None # temporary array for intermediate positions self._tmp: Optional[ArrayNx2 | ArrayNx3 | None] = None
[docs] def integrate(self, h: float, particles: NeighboringParticles) -> None: """ Perform a single RK4 integration step in-place. Parameters ---------- h : float Time step size. particles : NeighboringParticles Particle data with current positions to be updated. Notes ----- This method allocates intermediate buffers lazily and reuses them between calls to minimize memory allocations. """ npos = particles.positions.shape # Lazily allocate working buffers if self._k1 is None or self._k1.shape != npos: self._k1 = np.empty_like(particles.positions) self._k2 = np.empty_like(particles.positions) self._k3 = np.empty_like(particles.positions) self._k4 = np.empty_like(particles.positions) self._tmp = np.empty_like(particles.positions) k1 = cast(ArrayNx2 | ArrayNx3, self._k1) k2 = cast(ArrayNx2 | ArrayNx3, self._k2) k3 = cast(ArrayNx2 | ArrayNx3, self._k3) k4 = cast(ArrayNx2 | ArrayNx3, self._k4) tmp = cast(ArrayNx2 | ArrayNx3, self._tmp) # k1 = f(t, y) np.copyto(k1, self.interpolator.interpolate(particles.positions)) # k2 = f(t + h/2, y + h/2 * k1) np.copyto(tmp, particles.positions) tmp += 0.5 * h * k1 np.copyto(k2, self.interpolator.interpolate(tmp)) # k3 = f(t + h/2, y + h/2 * k2) np.copyto(tmp, particles.positions) tmp += 0.5 * h * k2 np.copyto(k3, self.interpolator.interpolate(tmp)) # k4 = f(t + h, y + h * k3) np.copyto(tmp, particles.positions) tmp += h * k3 np.copyto(k4, self.interpolator.interpolate(tmp)) # In-place RK4 update runge_kutta_4_step_inplace(particles.positions, h, k1, k2, k3, k4)
# ============================================================ # Factory # ============================================================
[docs] def create_integrator(integrator_name: str, interpolator: Interpolator) -> Integrator: """ Factory function to instantiate a numerical integrator. Parameters ---------- integrator_name : str Name of the integrator to create. Supported values: ``"euler"``, ``"ab2"``, ``"rk4"``. interpolator : Interpolator Interpolator used to compute velocity from particle positions. Returns ------- Integrator An instance of the requested integrator type. Raises ------ ValueError If the provided ``integrator_name`` is not recognized. """ integrator_name = integrator_name.lower() # Normalize input to lowercase integrator_map: dict[str, type[Integrator]] = { "ab2": AdamsBashforth2Integrator, # Uses Euler for the first step, then AB2 "euler": EulerIntegrator, "rk4": RungeKutta4Integrator, } if integrator_name not in integrator_map: raise ValueError( f"Invalid integrator name '{integrator_name}'. " f"Choose from {list(integrator_map.keys())}." ) return integrator_map[integrator_name](interpolator)