Source code for fracspy.modelling.trueamp_kirchhoff

__all__ = ["TAKirchhoff"]


import logging
import os
from typing import Optional

import numpy as np

from pylops import LinearOperator
from pylops.signalprocessing import Convolve1D
from pylops.utils import deps
from pylops.utils.decorators import reshaped
from pylops.utils.typing import DTypeLike, NDArray

from fracspy.modelling.kirchhoff import Kirchhoff

jit_message = deps.numba_import("the kirchhoff module")

if jit_message is None:
    from numba import jit, prange

    # detect whether to use parallel or not
    numba_threads = int(os.getenv("NUMBA_NUM_THREADS", "1"))
    parallel = True if numba_threads != 1 else False
else:
    prange = range

logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.WARNING)


[docs] class TAKirchhoff(LinearOperator): r"""True-amplitude Kirchhoff single-sided, demigration operator. True-amplitude Kirchhoff-based demigration/migration operator for single-sided propagation (from subsurface to surface). Uses a high-frequency approximation of Green's function propagators based on ``trav`` and ``amp``. Parameters ---------- z : :obj:`numpy.ndarray` Depth axis x : :obj:`numpy.ndarray` Spatial axis t : :obj:`numpy.ndarray` Time axis for data recs : :obj:`numpy.ndarray` Receivers in array of size :math:`\lbrack 2 (3) \times n_r \rbrack` The first axis should be ordered as (``y``,) ``x``, ``z``. vel : :obj:`numpy.ndarray` or :obj:`float` Velocity model of size :math:`\lbrack (n_y\,\times)\; n_x \times n_z \rbrack` (or constant) wav : :obj:`numpy.ndarray` Wavelet. wavcenter : :obj:`int` Index of wavelet center y : :obj:`numpy.ndarray` Additional spatial axis (for 3-dimensional problems) wavfilter : :obj:`bool`, optional Apply wavelet filter (``True``) or not (``False``) trav : :obj:`numpy.ndarray` or :obj:`tuple`, optional Traveltime table of size :math:`\lbrack (n_y) n_x n_z \times n_r \rbrack`. amp : :obj:`numpy.ndarray`, optional Amplitude table of size :math:`\lbrack (n_y) n_x n_z \times n_r \rbrack`. engine : :obj:`str`, optional Engine used for computations (``numpy`` or ``numba``). dtype : :obj:`str`, optional Type of elements in input array. name : :obj:`str`, optional Name of operator (to be used by :func:`pylops.utils.describe.describe`) Attributes ---------- shape : :obj:`tuple` Operator shape explicit : :obj:`bool` Operator contains a matrix that can be solved explicitly (``True``) or not (``False``) Notes ----- The True-amplitude Kirchhoff single-sided demigration operator synthesizes seismic data given a propagation velocity model :math:`v` and a source distribution :math:`m`. In forward mode: .. math:: d(\mathbf{x_r}, \mathbf{x_s}, t) = \widetilde{w}(t) * \int_V G(\mathbf{x_r}, \mathbf{x_s}, t) m(\mathbf{x_s})\,\mathrm{d}\mathbf{x_s} where :math:`m(\mathbf{x_s})` represents the source distribution at every location in the subsurface, :math:`G(\mathbf{x_r}, \mathbf{x_s}, t)` is the Green's function from source-to-receiver, and finally :math:`\widetilde{w}(t)` is a filtered version of the wavelet :math:`w(t)` [1]_ (or the wavelet itself when ``wavfilter=False``). In our implementation, the following high-frequency approximation of the Green's functions is adopted: .. math:: G(\mathbf{x_r}, \mathbf{x}, \omega) = a(\mathbf{x_r}, \mathbf{x}) e^{j \omega t(\mathbf{x_r}, \mathbf{x})} where :math:`a(\mathbf{x_r}, \mathbf{x})` is the amplitude and :math:`t(\mathbf{x_r}, \mathbf{x})` is the traveltime. These must be pre-computed and passed directly to the operator. Finally, the adjoint of the demigration operator is a *migration* operator which projects the data in the model domain creating an image of the source distribution. .. [1] Safron, L. "Multicomponent least-squares Kirchhoff depth migration", MSc Thesis, 2018. """ # Register static methods from Kirchhoff _identify_geometry = Kirchhoff._identify_geometry _traveltime_table = Kirchhoff._traveltime_table _wavelet_reshaping = Kirchhoff._wavelet_reshaping def __init__( self, z, x, t, recs, wav, wavcenter, y=None, wavfilter=False, trav=None, amp=None, engine="numpy", dtype="float64", name="K", ) -> None: # identify geometry ( self.ndims, _, dims, self.ny, self.nx, self.nz, nr, _, _, _, _, _, ) = Kirchhoff._identify_geometry(z, x, recs, y=y) self.dt = t[1] - t[0] self.nt = len(t) # store pre-computed traveltimes and amplitudes self.trav = trav self.amp = amp # create wavelet operator if wavfilter: self.wav = Kirchhoff._wavelet_reshaping( wav, self.dt, recs.shape[0], self.ndims ) else: self.wav = wav self.cop = Convolve1D( (nr, self.nt), h=self.wav, offset=wavcenter, axis=1, dtype=dtype ) # dimensions self.nr = nr self.ni = np.prod(dims) dims = tuple(dims) if self.ndims == 2 else (dims[0] * dims[1], dims[2]) dimsd = (nr, self.nt) super().__init__(dtype=np.dtype(dtype), dims=dims, dimsd=dimsd, name=name) self._register_multiplications(engine) @staticmethod def _trav_kirch_matvec( x: NDArray, y: NDArray, nr: int, nt: int, ni: int, dt: float, trav_recs: NDArray, amp_recs: NDArray, ) -> NDArray: for irec in range(nr): trav = trav_recs[:, irec] amp = amp_recs[:, irec] itrav = (trav / dt).astype("int32") travd = trav / dt - itrav for ii in range(ni): itravii = itrav[ii] travdii = travd[ii] ampii = amp[ii] if 0 <= itravii < nt - 1: y[irec, itravii] += (x[ii] * ampii) * (1 - travdii) y[irec, itravii + 1] += (x[ii] * ampii) * travdii return y @staticmethod def _trav_kirch_rmatvec( x: NDArray, y: NDArray, nr: int, nt: int, ni: int, dt: float, trav_recs: NDArray, amp_recs: NDArray, ) -> NDArray: for ii in prange(ni): trav_recsii = trav_recs[ii] amp_recsii = amp_recs[ii] for irec in range(nr): travii = trav_recsii[irec] ampii = amp_recsii[irec] itravii = int(travii / dt) travdii = travii / dt - itravii if 0 <= itravii < nt - 1: y[ii] += ( (x[irec, itravii] * ampii) * (1 - travdii) + (x[irec, itravii + 1] * ampii) * travdii ) return y def _register_multiplications(self, engine: str) -> None: if engine not in ["numpy", "numba"]: raise KeyError("engine must be numpy or numba") if engine == "numba" and jit is not None: # numba numba_opts = dict( nopython=True, nogil=True, parallel=parallel ) # fastmath=True, self._kirch_matvec = jit(**numba_opts)(self._trav_kirch_matvec) self._kirch_rmatvec = jit(**numba_opts)(self._trav_kirch_rmatvec) else: if engine == "numba" and jit is None: logging.warning(jit_message) self._kirch_matvec = self._trav_kirch_matvec self._kirch_rmatvec = self._trav_kirch_rmatvec @reshaped def _matvec(self, x: NDArray) -> NDArray: y = np.zeros((self.nr, self.nt), dtype=self.dtype) inputs = ( x.ravel(), y, self.nr, self.nt, self.ni, self.dt, self.trav, self.amp, ) y = self._kirch_matvec(*inputs) y = self.cop._matvec(y.ravel()) return y @reshaped def _rmatvec(self, x: NDArray) -> NDArray: x = self.cop._rmatvec(x.ravel()) x = x.reshape(self.nr, self.nt) y = np.zeros(self.ni, dtype=self.dtype) inputs = ( x, y, self.nr, self.nt, self.ni, self.dt, self.trav, self.amp, ) y = self._kirch_rmatvec(*inputs) return y