__all__ = ["Kirchhoff"]
import logging
import os
from typing import Optional, Tuple, Union
import numpy as np
from pylops import LinearOperator
from pylops.signalprocessing import Convolve1D
from pylops.utils import deps
from pylops.utils.decorators import reshaped
from pylops.utils.typing import DTypeLike, NDArray
skfmm_message = deps.skfmm_import("the kirchhoff module")
jit_message = deps.numba_import("the kirchhoff module")
if skfmm_message is None:
import skfmm
if jit_message is None:
from numba import jit, prange
# detect whether to use parallel or not
numba_threads = int(os.getenv("NUMBA_NUM_THREADS", "1"))
parallel = True if numba_threads != 1 else False
else:
prange = range
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.WARNING)
[docs]
class Kirchhoff(LinearOperator):
r"""Kirchhoff single-sided, demigration operator.
Kirchhoff-based demigration/migration operator for single-sided propagation (from
subsurface to surface). Uses a high-frequency approximation of Green's function
propagators based on ``trav``.
Parameters
----------
z : :obj:`numpy.ndarray`
Depth axis
x : :obj:`numpy.ndarray`
Spatial axis
t : :obj:`numpy.ndarray`
Time axis for data
recs : :obj:`numpy.ndarray`
Receivers in array of size :math:`\lbrack 2 (3) \times n_r \rbrack`
The first axis should be ordered as (``y``,) ``x``, ``z``.
vel : :obj:`numpy.ndarray` or :obj:`float`
Velocity model of size :math:`\lbrack (n_y\,\times)\; n_x
\times n_z \rbrack` (or constant)
wav : :obj:`numpy.ndarray`
Wavelet.
wavcenter : :obj:`int`
Index of wavelet center
y : :obj:`numpy.ndarray`
Additional spatial axis (for 3-dimensional problems)
mode : :obj:`str`, optional
Computation mode (``analytic``, ``eikonal`` or ``byot``, see Notes for
more details)
wavfilter : :obj:`bool`, optional
Apply wavelet filter (``True``) or not (``False``)
trav : :obj:`numpy.ndarray` or :obj:`tuple`, optional
Traveltime table of size
:math:`\lbrack (n_y) n_x n_z \times n_r \rbrack` (to be provided if ``mode='byot'``).
engine : :obj:`str`, optional
Engine used for computations (``numpy`` or ``numba``).
dtype : :obj:`str`, optional
Type of elements in input array.
name : :obj:`str`, optional
Name of operator (to be used by :func:`pylops.utils.describe.describe`)
Attributes
----------
shape : :obj:`tuple`
Operator shape
explicit : :obj:`bool`
Operator contains a matrix that can be solved explicitly (``True``) or
not (``False``)
Raises
------
NotImplementedError
If ``mode`` is neither ``analytic``, ``eikonal``, or ``byot``
Notes
-----
The Kirchhoff single-sided demigration operator synthesizes seismic data given a
propagation velocity model :math:`v` and a source distribution :math:`m`.
In forward mode:
.. math::
d(\mathbf{x_r}, \mathbf{x_s}, t) =
\widetilde{w}(t) * \int_V G(\mathbf{x_r}, \mathbf{x_s}, t)
m(\mathbf{x_s})\,\mathrm{d}\mathbf{x_s}
where :math:`m(\mathbf{x_s})` represents the source distribution
at every location in the subsurface, :math:`G(\mathbf{x_r}, \mathbf{x_s}, t)`
is the Green's function from source-to-receiver, and finally :math:`\widetilde{w}(t)` is
a filtered version of the wavelet :math:`w(t)` [1]_ (or the wavelet itself when
``wavfilter=False``). In our implementation, the following high-frequency
approximation of the Green's functions is adopted:
.. math::
G(\mathbf{x_r}, \mathbf{x_s}, \omega) = e^{j \omega t(\mathbf{x_r}, \mathbf{x_s})}
where :math:`t(\mathbf{x_r}, \mathbf{x})` is the traveltime and no amplitude term is applied
Depending on the choice of ``mode`` the traveltime and amplitude of the Green's
function will be also computed differently:
* ``mode=analytic`` or ``mode=eikonal``: traveltimes are computed for every
source-receiver pair and the Green's functions are implemented from traveltime
look-up tables, placing the source distribution values at corresponding source-to-receiver
time in the data.
* ``byot``: bring your own tables. The traveltime table is provided
directly by user using ``trav`` input parameter.
Finally, the adjoint of the demigration operator is a *migration* operator which
projects the data in the model domain creating an image of the source distribution.
.. [1] Safron, L. "Multicomponent least-squares Kirchhoff depth migration",
MSc Thesis, 2018.
"""
def __init__(
self,
z,
x,
t,
recs,
vel,
wav,
wavcenter,
y=None,
mode="eikonal",
wavfilter=False,
trav=None,
engine="numpy",
dtype="float64",
name="K",
) -> None:
# identify geometry
(
self.ndims,
_,
dims,
self.ny,
self.nx,
self.nz,
nr,
_,
_,
_,
_,
_,
) = Kirchhoff._identify_geometry(z, x, recs, y=y)
self.dt = t[1] - t[0]
self.nt = len(t)
# compute traveltime
if mode in ["analytic", "eikonal", "byot"]:
if mode in ["analytic", "eikonal"]:
# compute traveltime table
self.trav_recs = Kirchhoff._traveltime_table(z, x, recs, vel, y=y, mode=mode)
else:
self.trav_recs = trav
else:
raise NotImplementedError("method must be analytic, eikonal or byot")
# create wavelet operator
if wavfilter:
self.wav = self._wavelet_reshaping(
wav, self.dt, recs.shape[0], self.ndims
)
else:
self.wav = wav
self.cop = Convolve1D(
(nr, self.nt), h=self.wav, offset=wavcenter, axis=1, dtype=dtype
)
# dimensions
self.nr = nr
self.ni = np.prod(dims)
dims = tuple(dims) if self.ndims == 2 else (dims[0] * dims[1], dims[2])
dimsd = (nr, self.nt)
super().__init__(dtype=np.dtype(dtype), dims=dims, dimsd=dimsd, name=name)
self._register_multiplications(engine)
@staticmethod
def _identify_geometry(
z: NDArray,
x: NDArray,
recs: NDArray,
y: Optional[NDArray] = None,
) -> Tuple[
int,
int,
NDArray,
int,
int,
int,
int,
float,
float,
float,
NDArray,
NDArray,
]:
"""Identify geometry and acquisition size and sampling"""
nr = recs.shape[1]
nz, nx = len(z), len(x)
dz = np.abs(z[1] - z[0])
dx = np.abs(x[1] - x[0])
if y is None:
ndims = 2
shiftdim = 0
ny = 1
dy = None
dims = np.array([nx, nz])
dsamp = np.array([dx, dz])
origin = np.array([x[0], z[0]])
else:
ndims = 3
shiftdim = 1
ny = len(y)
dy = np.abs(y[1] - y[0])
dims = np.array([ny, nx, nz])
dsamp = np.array([dy, dx, dz])
origin = np.array([y[0], x[0], z[0]])
return ndims, shiftdim, dims, ny, nx, nz, nr, dy, dx, dz, dsamp, origin
@staticmethod
def _traveltime_table(
z: NDArray,
x: NDArray,
recs: NDArray,
vel: Union[float, NDArray],
y: Optional[NDArray] = None,
mode: str = "eikonal",
) -> NDArray:
r"""Traveltime table
Compute traveltimes along the source-subsurface-receivers triplet
in 2- or 3-dimensional media given a constant or depth- and space variable
velocity.
Parameters
----------
z : :obj:`numpy.ndarray`
Depth axis
x : :obj:`numpy.ndarray`
Spatial axis
recs : :obj:`numpy.ndarray`
Receivers in array of size :math:`\lbrack 2 (3) \times n_r \rbrack`
vel : :obj:`numpy.ndarray` or :obj:`float`
Velocity model of size :math:`\lbrack (n_y \times)\, n_x
\times n_z \rbrack` (or constant)
y : :obj:`numpy.ndarray`
Additional spatial axis (for 3-dimensional problems)
mode : :obj:`numpy.ndarray`, optional
Computation mode (``eikonal``, ``analytic`` - only for constant velocity)
Returns
-------
trav_recs : :obj:`numpy.ndarray`
Receiver-to-subsurface traveltime table of size
:math:`\lbrack (n_y) n_x n_z \times n_r \rbrack`
"""
# define geometry
(
ndims,
shiftdim,
dims,
ny,
nx,
nz,
nr,
_,
_,
_,
dsamp,
origin,
) = Kirchhoff._identify_geometry(z, x, recs, y=y)
# compute traveltimes
if mode == "analytic":
if not isinstance(vel, (float, int)):
raise ValueError("vel must be scalar for mode=analytical")
# compute grid
if ndims == 2:
X, Z = np.meshgrid(x, z, indexing="ij")
X, Z = X.ravel(), Z.ravel()
else:
Y, X, Z = np.meshgrid(y, x, z, indexing="ij")
Y, X, Z = Y.ravel(), X.ravel(), Z.ravel()
dist_recs2 = np.zeros((ny * nx * nz, nr))
for irec, rec in enumerate(recs.T):
dist_recs2[:, irec] = (X - rec[0 + shiftdim]) ** 2 + (
Z - rec[1 + shiftdim]
) ** 2
if ndims == 3:
dist_recs2[:, irec] += (Y - rec[0]) ** 2
trav_recs = np.sqrt(dist_recs2) / vel
elif mode == "eikonal":
if skfmm_message is None:
trav_recs = np.zeros((ny * nx * nz, nr), dtype=np.float32)
for irec, rec in enumerate(recs.T):
rec = np.round((rec - origin) / dsamp).astype(np.int32)
phi = np.ones_like(vel)
if ndims == 2:
phi[rec[0], rec[1]] = -1
else:
phi[rec[0], rec[1], rec[2]] = -1
trav_recs[:, irec] = (
skfmm.travel_time(phi=phi, speed=vel, dx=dsamp)
).ravel()
else:
raise NotImplementedError(skfmm_message)
else:
raise NotImplementedError("method must be analytic or eikonal")
return trav_recs
@staticmethod
def _wavelet_reshaping(
wav: NDArray,
dt: float,
dimrec: int,
dimv: int,
) -> NDArray:
"""Apply wavelet reshaping as from theory in [1]_"""
f = np.fft.rfftfreq(len(wav), dt)
W = np.fft.rfft(wav, n=len(wav))
if dimv == 2:
# 2D
Wfilt = W * (2 * np.pi * f)
elif dimrec == 2 and dimv == 3:
# 2.5D
raise NotImplementedError("2.D wavelet currently not available")
elif dimrec == 3 and dimv == 3:
# 3D
Wfilt = W * (-1j * 2 * np.pi * f)
wavfilt = np.fft.irfft(Wfilt, n=len(wav))
return wavfilt
@staticmethod
def _trav_kirch_matvec(
x: NDArray,
y: NDArray,
nr: int,
nt: int,
ni: int,
dt: float,
trav_recs: NDArray,
) -> NDArray:
for irec in range(nr):
trav = trav_recs[:, irec]
itrav = (trav / dt).astype("int32")
travd = trav / dt - itrav
for ii in range(ni):
itravii = itrav[ii]
travdii = travd[ii]
if 0 <= itravii < nt - 1:
y[irec, itravii] += x[ii] * (1 - travdii)
y[irec, itravii + 1] += x[ii] * travdii
return y
@staticmethod
def _trav_kirch_rmatvec(
x: NDArray,
y: NDArray,
nr: int,
nt: int,
ni: int,
dt: float,
trav_recs: NDArray,
) -> NDArray:
for ii in prange(ni):
trav_recsii = trav_recs[ii]
for irec in range(nr):
travii = trav_recsii[irec]
itravii = int(travii / dt)
travdii = travii / dt - itravii
if 0 <= itravii < nt - 1:
y[ii] += (
x[irec, itravii] * (1 - travdii)
+ x[irec, itravii + 1] * travdii
)
return y
def _register_multiplications(self, engine: str) -> None:
if engine not in ["numpy", "numba"]:
raise KeyError("engine must be numpy or numba")
if engine == "numba" and jit is not None:
# numba
numba_opts = dict(
nopython=True, nogil=True, parallel=parallel
) # fastmath=True,
self._kirch_matvec = jit(**numba_opts)(self._trav_kirch_matvec)
self._kirch_rmatvec = jit(**numba_opts)(self._trav_kirch_rmatvec)
else:
if engine == "numba" and jit is None:
logging.warning(jit_message)
self._kirch_matvec = self._trav_kirch_matvec
self._kirch_rmatvec = self._trav_kirch_rmatvec
@reshaped
def _matvec(self, x: NDArray) -> NDArray:
y = np.zeros((self.nr, self.nt), dtype=self.dtype)
inputs = (
x.ravel(),
y,
self.nr,
self.nt,
self.ni,
self.dt,
self.trav_recs,
)
y = self._kirch_matvec(*inputs)
y = self.cop._matvec(y.ravel())
return y
@reshaped
def _rmatvec(self, x: NDArray) -> NDArray:
x = self.cop._rmatvec(x.ravel())
x = x.reshape(self.nr, self.nt)
y = np.zeros(self.ni, dtype=self.dtype)
inputs = (
x,
y,
self.nr,
self.nt,
self.ni,
self.dt,
self.trav_recs,
)
y = self._kirch_rmatvec(*inputs)
return y