Source code for pyftle.data_source

"""
Batch data sources for FTLE computation.

This module defines the `BatchSource` protocol and concrete implementations
(`FileBatchSource` and `AnalyticalBatchSource`) for providing particle and
velocity field data during flow map integration.

Two main use cases are supported:
1. File-based data: reading velocity and coordinate snapshots from disk.
2. Analytical data: using a user-defined velocity function evaluated at runtime.

All sources expose a consistent interface defined by the `BatchSource` protocol,
so they can be used interchangeably in higher-level FTLE computation routines.
"""

from pathlib import Path
from typing import Callable, List, Protocol, Tuple

from pyftle.file_readers import (
    read_coordinate,
    read_seed_particles_coordinates,
    read_velocity,
)
from pyftle.interpolate import Interpolator
from pyftle.my_types import Array2xN, Array3xN
from pyftle.particles import NeighboringParticles


[docs] class BatchSource(Protocol): """ Protocol defining the interface for batch data sources used in FTLE computation. Any concrete implementation must provide methods and properties to: - Access the time-step information. - Retrieve initial particle positions. - Update the velocity interpolator for a given snapshot. This abstraction allows using either numerical data (from files) or analytical functions for flow map integration. """ @property def timestep(self) -> float: """Time step between consecutive snapshots or time samples.""" ... @property def num_steps(self) -> int: """Total number of available time steps.""" ... @property def id(self) -> str: """ Unique identifier for the batch source (e.g., first file name or time label). """ ...
[docs] def get_particles(self) -> NeighboringParticles: """ Retrieve the set of neighboring particles used for flow map computation. Returns ------- NeighboringParticles Dataclass containing particle positions and their initial displacements. """ ...
[docs] def update_interpolator(self, interpolator: Interpolator, step_index: int) -> None: """ Update the velocity interpolator with the data for the given time step. Parameters ---------- interpolator : Interpolator The interpolator instance to update with velocity (and optionally coordinate) data. step_index : int Index of the current time step (0-based). """ ...
[docs] class FileBatchSource(BatchSource): """ Batch source that loads velocity and coordinate data from files. This implementation supports both static and time-varying coordinate grids. It reads the velocity and coordinate snapshots corresponding to each time step, as well as the seed particle positions for flow map initialization. Parameters ---------- snapshot_files : list of str List of file paths containing velocity field data for each time step. coordinate_files : list of str List of file paths containing the spatial grid coordinates. Can be identical for all time steps if the grid is stationary. particle_file : str Path to the file containing seed particle positions. snapshot_timestep : float Time step between consecutive velocity snapshots. flow_map_period : int or float Number of time steps or total time duration between flow map evaluations. Attributes ---------- snapshot_files : list of str Paths to velocity data files. coordinate_files : list of str Paths to coordinate grid files. particle_file : str Path to the seed particle file. snapshot_timestep : float Time step between snapshots. flow_map_period : int or float Integration period between flow map outputs. reuse_coordinates : bool Whether the same coordinate file is used for all time steps. """ def __init__( self, snapshot_files: List[str], coordinate_files: List[str], particle_file: str, snapshot_timestep: float, flow_map_period: int | float, ): self.snapshot_files = snapshot_files self.coordinate_files = coordinate_files self.particle_file = particle_file # Assume single file self.snapshot_timestep = snapshot_timestep self.flow_map_period = flow_map_period self._n = len(snapshot_files) self._id = f"{Path(self.snapshot_files[0]).stem}" is_coordinate_files_identical = len(set(coordinate_files)) == 1 self.reuse_coordinates = is_coordinate_files_identical @property def id(self) -> str: """Unique identifier derived from the first snapshot file name.""" return self._id @property def num_steps(self) -> int: """Number of available time steps (number of snapshot files).""" return self._n @property def timestep(self) -> float: """Time interval between consecutive velocity snapshots.""" return self.snapshot_timestep
[docs] def get_particles(self) -> NeighboringParticles: """ Load and return the seed particles used to initialize the flow map. Returns ------- NeighboringParticles Dataclass containing particle positions and neighbor offsets. """ return read_seed_particles_coordinates(self.particle_file)
[docs] def get_data_for_step( self, step_index: int ) -> Tuple[Array2xN | Array3xN, Array2xN | Array3xN | None]: """ Load velocity and coordinate data for a specific time step. Parameters ---------- step_index : int Index of the time step to read (0-based). Returns ------- velocities : Array2xN or Array3xN Velocity field data at the current time step. coordinates : Array2xN or Array3xN or None Coordinate grid corresponding to the current time step. Returns `None` if the grid is reused and unchanged. """ vel_file = self.snapshot_files[step_index] coord_file = self.coordinate_files[step_index] velocities = read_velocity(vel_file) coordinates = None if step_index == 0 or not self.reuse_coordinates: coordinates = read_coordinate(coord_file) return velocities, coordinates
[docs] def update_interpolator(self, interpolator: Interpolator, step_index: int) -> None: """ Update the velocity interpolator with the data for the specified time step. Parameters ---------- interpolator : Interpolator Interpolator instance to update. step_index : int Index of the time step to load. """ velocities, coordinates = self.get_data_for_step(step_index) interpolator.update(velocities, coordinates)
[docs] class AnalyticalBatchSource(BatchSource): """ Batch source for analytical velocity fields. This implementation allows using a user-defined velocity function instead of reading data from files. It provides access to precomputed particle positions and a sequence of time values. Parameters ---------- velocity_fn : Callable Analytical function that returns the velocity field given space and time. Expected signature: `velocity_fn(x: Array2xN | Array3xN, t: float) -> Array2xN | Array3xN` particles : NeighboringParticles Particle data used for initialization. timestep : float Time step between consecutive evaluations. times : array-like of float Sequence of time values at which the velocity function is evaluated. """ def __init__( self, velocity_fn: Callable, # TODO: add type particles: NeighboringParticles, timestep: float, times, # TODO: add type -- 1D array of floats ): self.velocity_fn = velocity_fn self.particles = particles self._timestep = timestep self.times = times @property def id(self) -> str: """Unique identifier based on the initial time value.""" return f"{self.times[0]:06f}" @property def num_steps(self) -> int: """Number of available time samples.""" return len(self.times) @property def timestep(self) -> float: """Fixed time step between consecutive evaluations.""" return self._timestep
[docs] def get_particles(self) -> NeighboringParticles: """ Return the particle data associated with this analytical source. Returns ------- NeighboringParticles Dataclass containing particle positions and neighbor offsets. """ return self.particles
[docs] def update_interpolator(self, interpolator, step_index: int) -> None: """ No-op method: analytical velocity fields do not require data updates. Parameters ---------- interpolator : Interpolator The interpolator instance (unused). step_index : int Index of the current step (unused). """ pass