"""
This module contains utility functions to convert results of the gprMax computation into numpy ndarrays.
"""
import re
import numpy as np
from skimage.measure import block_reduce
import h5py
from pathlib import Path
from tqdm import tqdm
def _update_debye_er(materials_list: list[str], debye_materials: list[str], frequency: float=1e9):
"""
Calculates the relative permittivity of the Debye materials at the specified frequency,
given the list of materials and debye declarations.
Most of this code has been adapted from gprMax source code.
Returns
-------
list
the list of materials, with updated relative permittivity.
"""
from gprMax.materials import Material
for dm in debye_materials:
name = dm[-1]
mat = [m for m in materials_list if name in m]
if len(mat) == 0:
raise ValueError(f"Error: trying to calculate Debye epsilon r for material '{name}', but material declaration not found!")
if len(mat) > 1:
raise ValueError(f"More than 1 material with the same name:", name)
mat = mat[0]
# setup material
m = Material(0, name)
m.er = float(mat[0])
m.se = float(mat[1])
m.mr = float(mat[2])
m.sm = float(mat[3])
poles = int(dm[0])
m.type = 'debye'
m.poles = poles
m.averagable = False
for pole in range(1, 2 * poles, 2):
if float(dm[pole]) > 0:
m.deltaer.append(float(dm[pole]))
m.tau.append(float(dm[pole + 1]))
# calculate er
er = m.calculate_er(frequency)
index = materials_list.index(mat)
materials_list[index][0] = er.real
return materials_list
def _parse_materials_file(file_path: str | Path) -> list:
"""
Parses the geometry materials file.
"""
with open(file_path, "r") as f:
lines = f.read().splitlines()
materials = [l.split() for l in lines if "#material" in l]
materials = [l[1:] for l in materials]
debye_materials = [l.split() for l in lines if "#add_dispersion_debye" in l]
debye_materials = [l[1:] for l in debye_materials]
materials = _update_debye_er(materials, debye_materials)
materials = [l[:-1] for l in materials]
return materials
[docs]
def convert_geometry_to_np(filename: str | Path, output_file: str | Path | None = None, remove_files: bool = False) -> np.ndarray:
"""
Converts the given geometry and materials files into numpy arrays of shape [4, height, width].
Parameters
----------
filename : str | Path
filename of the h5 file containing the geometry. The materials file must be
in the same folder and named the same, adding "_materials.txt"
output_file : str | Path, default: None
output file to store the result in .npy format. If 'None', then does not write to disk, but returns the array.
remove_files : bool, default: False
if set, deletes the initial h5 and txt files.
Returns
-------
ret : np.ndarray of shape [4, height, width]
- `ret[0]` contains the relative permittivity (epsilon),
- `ret[1]` contains the conductivity (sigma),
- `ret[2]` contains the relative permeability (mu),
- `ret[3]` contains the magnetic loss (sigma*).
"""
h5_path = Path(filename).with_suffix(".h5")
txt_path = h5_path.with_name(h5_path.with_suffix("").name + "_materials").with_suffix(".txt")
materials = _parse_materials_file(txt_path)
h5_file = h5py.File(h5_path)
data = np.array(h5_file["data"]).squeeze()
new_data = np.zeros((4, *data.shape), dtype=np.float32)
for index in range(len(materials)):
indexes = data == index
relative_permittivity = materials[index][0]
conductivity = materials[index][1]
relative_permeability = materials[index][2]
magnetic_loss = materials[index][3]
new_data[0, indexes] = relative_permittivity
new_data[1, indexes] = conductivity
new_data[2, indexes] = relative_permeability
new_data[3, indexes] = magnetic_loss
# fix axis order and orientation:
new_data = np.flip(new_data.transpose(0, 2, 1), 1)
if remove_files:
h5_path.unlink()
txt_path.unlink()
if output_file is None:
return new_data
else:
np.save(output_file, new_data)
def _get_trailing_number(s):
"""
Returns the trailing number in the input string, or None
"""
m = re.search(r'\d+$', s)
return int(m.group()) if m else None
[docs]
def convert_snapshots_to_np(snapshot_folder : str | Path,
output_file: str | Path,
remove_files: bool = False,
pool_window: tuple[int, int] | None = None) -> dict[str, np.ndarray]:
"""
Converts snapshots related to a full B-scan into numpy arrays.
Creates a single `.npz` file containing all the arrays.
Each array inside the npz file is of shape:
- [n_snapshots, height, width] for E-fields (z component of the field)
- [n_snapshots, 2, height, width] for H-fields (x, y components)
Parameters
----------
snapshot_folder : str | Path
path to the folder containing all the snapshots of a B-scan.
output_file : str | Path
path to the output to store the results in `.npz` format
remove_files : bool, default: False
if set, deletes the original `.vti` files.
pool_window : tuple[int, int] | None, default: None
if set, applies average pooling of the specified size to the resulting snapshot.
Returns
-------
dict[str, np.ndarray]
the in-memory dictionary that has been saved to disk
"""
snapshot_folder = Path(snapshot_folder)
folders = snapshot_folder.glob("scan_*_snaps*")
folders = [f for f in folders]
def extract_time_from_snapshot_path(snapshot_file_path : Path):
time = float(snapshot_file_path.stem.lstrip("snap_"))
return time
full_data = {}
for folder in folders:
a_scan_number = _get_trailing_number(folder.stem) - 1
snapshot_files = [f for f in folder.glob("snap_*.vti")]
snapshot_files.sort(key=extract_time_from_snapshot_path)
times = [extract_time_from_snapshot_path(f) for f in snapshot_files]
times.sort()
a_scan_data_e_field = []
a_scan_data_h_field = []
for file in snapshot_files:
e_field, h_field = extract_snapshot_fields_numpy(file)
e_field = e_field[:, :, 2]
h_field = h_field[:, :, 0:2]
h_field = h_field.transpose(2, 0, 1)
a_scan_data_e_field.append(e_field)
a_scan_data_h_field.append(h_field)
if remove_files:
file.unlink()
a_scan_data_e_field = np.asarray(a_scan_data_e_field)
a_scan_data_h_field = np.asarray(a_scan_data_h_field)
# apply pooling
if pool_window is not None:
if a_scan_data_e_field.size > 0:
a_scan_data_e_field = block_reduce(a_scan_data_e_field, block_size=(1, pool_window[0], pool_window[1]), func=np.mean)
if a_scan_data_h_field.size > 0:
a_scan_data_h_field = block_reduce(a_scan_data_h_field, block_size=(1, 1, pool_window[0], pool_window[1]), func=np.mean)
full_data[f"{str(a_scan_number).zfill(5)}_E"] = a_scan_data_e_field
full_data[f"{str(a_scan_number).zfill(5)}_H"] = a_scan_data_h_field
full_data[f"{str(a_scan_number).zfill(5)}_times"] = times
if remove_files:
folder.rmdir()
np.savez(output_file, **full_data)
return full_data
if __name__ == "__main__":
import matplotlib.pyplot as plt
# convert_geometry_to_np("data/geometry_2D_cylinders_clean_materials.txt")
# data = convert_snapshots_to_np("gprmax_output_files_old/scan_0000", "scan_0000_snapshots")
# pooled_data = convert_snapshots_to_np("gprmax_output_files_old/scan_0000", "scan_0000_snapshots_pooled", pool_window=(3, 3))
# print(data.keys())
# from ..visualization.misc import save_field_animation
# save_field_animation(data["0000_E"], "gprmax_output_files_old/scan_0000/snapshots0.mp4")
# save_field_animation(pooled_data["0000_E"], "gprmax_output_files_old/scan_0000/snapshots0_pooled.mp4")
print(convert_snapshots_to_np("gprmax_tmp_files/", "prova_wavefield").keys())