Source code for pyftle.api

from datetime import datetime
from typing import Callable, Optional, cast

import numpy as np

from pyftle.data_source import AnalyticalBatchSource, BatchSource
from pyftle.file_writers import create_writer
from pyftle.ftle_solver import FTLESolver
from pyftle.integrate import create_integrator
from pyftle.interpolate import create_interpolator
from pyftle.parallel import ParallelExecutor
from pyftle.particles import NeighboringParticles


[docs] class AnalyticalSolver: """ A notebook-friendly FTLE manager for in-memory analytical velocity fields. This class provides a high-level interface to compute Finite-Time Lyapunov Exponents (FTLE) from an analytical velocity field. It generates time batches, integrates particle trajectories, and computes flow maps entirely in memory, with optional parallelization and output writing. The solver is particularly suited for analytical or synthetic flow fields defined as Python functions rather than on a discrete spatial grid. Parameters ---------- velocity_fn : Callable[[np.ndarray, float], np.ndarray] A user-defined function that returns the velocity field at a given time. The callable must have the signature: ``velocity_fn(positions: ArrayNx2 | ArrayNx3, t: float) -> ArrayNx2 | ArrayNx3`` where ``positions`` are the spatial coordinates of the particles and ``t`` is the time. particles : NeighboringParticles Initial particle positions and neighbor information used for computing local flow map Jacobians. timestep : float Time step size used for particle integration. Positive values correspond to forward-time integration, and negative values to backward-time. flow_map_period : float Total duration over which each flow map is integrated (in the same units as `timestep`). num_ftles : int Number of FTLE fields to compute. Each FTLE field corresponds to a distinct start time separated by `timestep`. integrator_name : str Name of the time integrator to use. Must be one of the registered integrators available in :mod:`pyftle.integrate` (e.g., "rk4", "euler"). num_processes : int, default=1 Number of parallel processes used for computing FTLE fields. save_output : bool, default=False If True, the computed FTLE fields will be written to disk instead of being returned as NumPy arrays. output_format : str, default="vtk" File format used for writing the output (e.g., "vtk" or "npy"). output_dir_name : Optional[str], default=None Output directory name. If not provided and `save_output=True`, a timestamped directory name (e.g., "run-2025-11-07-18h-42m-00s") will be automatically created. Attributes ---------- velocity_fn : Callable Analytical velocity function used for interpolation. particles : NeighboringParticles Initial and neighboring particle configuration. timestep : float Time step used for numerical integration. flow_map_period : float Integration duration for each FTLE computation. num_ftles : int Number of FTLE computations to perform. num_snapshots : int Number of snapshots per flow map integration (derived from `flow_map_period / |timestep| + 1`). writer : Optional[BaseWriter] Writer object for saving output (if enabled). executor : ParallelExecutor Manager for executing FTLE computations in parallel. integrator : BaseIntegrator Time integrator created via :func:`create_integrator`. """ def __init__( self, velocity_fn: Callable, # TODO: improve this particles: NeighboringParticles, timestep: float, flow_map_period: float, num_ftles: int, integrator_name: str, # TODO: improve this num_processes: int = 1, save_output: bool = False, output_format: str = "vtk", output_dir_name: Optional[str] = None, ): self.velocity_fn = velocity_fn self.particles = particles self.timestep = timestep self.flow_map_period = flow_map_period self.num_ftles = num_ftles self.num_snapshots = int(flow_map_period / abs(timestep)) + 1 self.writer = None self.executor = ParallelExecutor(num_processes) interpolator = create_interpolator("analytical", velocity_fn=velocity_fn) self.integrator = create_integrator(integrator_name, interpolator) if self.timestep < 0: print("Running backward-time FTLE") else: print("Running forward-time FTLE") if save_output: if output_dir_name is None: now = datetime.now() output_dir_name = now.strftime("run-%Y-%m-%d-%Hh-%Mm-%Ss") self.writer = create_writer(output_format, output_dir_name) def _create_batches(self) -> list[BatchSource]: """ Generate a list of analytical batch sources for each FTLE computation. Each batch corresponds to one FTLE field, containing the time sequence of integration snapshots based on the analytical velocity function. Returns ------- list[BatchSource] List of :class:`AnalyticalBatchSource` objects, one per FTLE run. """ # Start time for each FTLE batch start_times = np.arange(self.num_ftles) * self.timestep # Offsets within each batch offsets = np.arange(self.num_snapshots) * self.timestep # Broadcast addition to build all time batches time_batches = start_times[:, None] + offsets tasks: list[BatchSource] = [] for i in range(self.num_ftles): task = AnalyticalBatchSource( self.velocity_fn, self.particles, self.timestep, time_batches[i], ) tasks.append(task) return tasks def _worker(self, batch_source: BatchSource, progress_queue): """ Worker routine executed in parallel processes to compute one FTLE field. This function instantiates an :class:`FTLESolver` for the given batch and runs it using the configured integrator and optional writer. Parameters ---------- batch_source : BatchSource Batch source representing the time series for one FTLE computation. progress_queue : multiprocessing.Queue Queue for tracking progress among parallel workers. Returns ------- np.ndarray or None The computed FTLE field as a NumPy array if `save_output` is False. Otherwise, returns None after writing the data to disk. """ solver = FTLESolver( batch_source, integrator=self.integrator, progress_queue=progress_queue, output_writer=self.writer, ) return solver.run()
[docs] def run(self): """ Execute all FTLE computations, either sequentially or in parallel. This method orchestrates the full FTLE workflow: batch generation → particle integration → flow map computation → FTLE evaluation → optional output writing. Returns ------- np.ndarray or None - If `save_output` is False, returns an array of shape ``(num_ftles, n_points)`` containing the computed FTLE fields. - If `save_output` is True, returns None (fields are written to disk). Raises ------ RuntimeError If no FTLE fields are returned due to worker failure. """ batches = self._create_batches() results = self.executor.run(batches, self._worker) # Case 1: writer was used — results are all None if self.writer is not None: # Nothing to return; data already written to disk return # Case 2: no writer — results are np.ndarray (some may be None if a # worker failed) if not results: raise RuntimeError("No FTLE fields were returned (all results were None).") if len(results) == 1: return results[0] results = cast(list[np.ndarray], results) return np.stack(results, axis=0) # (num_ftles, n_points)