"""
This module contains code to:
1) Train a CNN-based black box neural network model to approximate the B-scan response from a given dataset.
2) Use a trained model to quickly compute B-scan predictions from a (possibly) geometry-only dataset.
"""
import numpy as np
import matplotlib.pyplot as plt
import argparse
# CHOOSE GPU
import os
from sklearn.model_selection import train_test_split
from pathlib import Path
from tqdm import tqdm
def _parse_arguments():
"""
Parses the arguments and returns the derived Namespace.
"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="action", required=True)
parser_train = subparsers.add_parser("train", help='Train a model to approximate the given dataset.')
parser_train.add_argument("-d", "--dataset_dir", type=str, help="Dataset output folder path.", required=True)
parser_train.add_argument("-o", "--output_dir", type=str, help="Output directory for model training.", required=True)
parser_train.add_argument("-e", "--epochs", type=str, help="Number of training epochs.", default=300)
parser_train.add_argument("-bs", "--batch_size", type=str, help="Training batch size.", default=10)
parser_train.add_argument("--gpu", type=int, help="GPU to use for predictions/training", default=0)
parser_predict = subparsers.add_parser("predict", help='Use a model checkpoint to predict the given dataset.')
parser_predict.add_argument("-d", "--dataset_dir", type=str, help="Dataset output folder path.", required=True)
parser_predict.add_argument("-m", "--model_path", type=str, help="Model checkpoint path.", required=True)
parser_predict.add_argument("-o", "--output_dir", type=str, help="Output directory for predictions.", required=True)
parser_predict.add_argument("--mask_path", type=str, help="Path to the file containing the label median mask used to train the model.", default=None)
parser_predict.add_argument("--mem_batch_size", type=int, help="Number of samples to load into memory and predict at a time.", default=None)
parser_predict.add_argument("--gpu", type=int, help="GPU to use for predictions/training", default=0)
args = parser.parse_args()
return args
def _get_dataset_len(dataset_output_path: str | Path):
dataset_output_path = Path(dataset_output_path)
return len(list(dataset_output_path.glob("scan_*")))
[docs]
def load_dataset(dataset_output_path: str | Path = Path("dataset_bscan/gprmax_output_files"), indexes_interval: tuple[int,int] | None = None, verbose = True):
"""
Loads the B-scan dataset at the specified location.
Performs filtering of the PML bug related to steel sleepers.
Parameters
----------
dataset_output_path : str | Path, optional
location of the output folder of the dataset, by default Path("dataset_bscan/gprmax_output_files")
indexes_interval : tuple[int, int] | None
If specified, the interval of indexes to load, upper limit excluded. Default : None.
verbose : bool
Controls weather to print info and show a progress bar for the data loading. Default: True.
Returns
-------
tuple[np.ndarray, np.ndarray, list[str]]
data, labels, sample names
"""
geometries = []
bscans = []
dataset_output_path = Path(dataset_output_path)
if verbose:
print("Loading dataset...")
folders = list(dataset_output_path.glob("scan_*"))
folders.sort()
if indexes_interval is not None:
folders = folders[indexes_interval[0] : indexes_interval[1]]
if len(folders) == 0:
raise ValueError("No sample in the dataset, check dataset path and indexes interval.")
sample_names = [f.name for f in folders]
if verbose:
folders = tqdm(folders)
for f in folders:
geom = np.load(f / f"{f.name}_geometry.npy")
bscan_path = f / f"{f.name}_merged.out"
geometries.append(geom[0:2])
if bscan_path.exists():
bscan, dt = get_output_data(bscan_path, 1, "Ez")
bscan = cv2.resize(bscan, (192, 224))
bscans.append(bscan)
geometries = np.asarray(geometries)
if len(bscans) == 0:
bscans = None
else:
bscans = np.asarray(bscans)
geometries, bscans = filter_PML_bug_bscans(geometries, bscans)
bscans = np.expand_dims(bscans, -1)
# geometries, bscans = preprocess_data(geometries, bscans)
geometries = geometries.transpose(0, 2, 3, 1)
return geometries, bscans, sample_names
[docs]
def split_dataset(geometries: np.ndarray, bscans: np.ndarray, random_state: int = 42):
"""
Splits the dataset into train and test set.
Parameters
----------
geometries : np.ndarray
dataset geometries (input data)
bscans : np.ndarray
dataset bscans (labels)
random_state : int, optional
seed for the dataset split, by default 42
Returns
-------
tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
train data, test data, train labels, test labels.
"""
train_data, test_data, train_labels, test_labels = train_test_split(geometries, bscans, random_state=random_state)
return train_data, train_labels, test_data, test_labels
[docs]
def filter_PML_bug_bscans(geometries: np.ndarray, bscans: np.ndarray, upper_limit: float = 1.4e6):
"""
Filters the samples of the dataset to remove the ones in which the PML bug appears.
This is done using a threshold value on the sum of the absolute value of the pixels of B-scans.
The reason is that the samples with the PML bug exibit much more reflections, so are well divided from the other ones.
Parameters
----------
geometries : np.ndarray of shape [B, C, H, W]
sample geometries
bscans : np.ndarray of shape [B, H, W]
sample bscans
upper_limit : float, optional
the upper limit threshold. A value of 1.4e6 has been empirically found to divide the dataset in a clean way.
Returns
-------
tuple[np.ndarray, np.ndarray]
the filtered geometries and bscans
"""
coefficient = np.absolute(bscans).sum((1, 2))
to_keep = coefficient <= upper_limit
return geometries[to_keep], bscans[to_keep]
[docs]
def preprocess_data(geoms: np.ndarray, bscans: np.ndarray):
"""
Preprocesses the data by applying a rescaling of 1/80 to the relative permittivity values
and the cube root to both the conductivity and bscan values
Parameters
----------
geoms : np.ndarray of shape [B, 2, H, W]
sample geometries
bscans : np.ndarray of shape [B, H, W]
sample bscans
Returns
-------
tuple[np.ndarray, np.ndarray]
processed geometries and bscans
"""
epsilon = geoms[:, 0, :, :]
sigma = geoms[:, 1, :, :]
epsilon = epsilon / 80.
sigma = np.cbrt(sigma)
# bscans = np.cbrt(bscans)
return geoms, bscans
[docs]
def filter_initial_wave(train_labels: np.ndarray, test_labels: np.ndarray):
"""
Filters the initial wave in the labels by removing the median of the values present in the pixels of the train labels.
Parameters
----------
train_labels : np.ndarray of shape [B, H, W, C]
the train B-scan labels
test_labels : np.ndarray of shape [B, H, W, C]
the test B-scan labels
Returns
-------
tuple[np.ndarray, np.ndarray, np.ndarray]
train labels, test labels, median image used for the filtering
"""
median = np.median(train_labels, axis=(0, 3))
train_labels = train_labels - median[:, :, None]
test_labels = test_labels - median[:, :, None]
return train_labels, test_labels, median
[docs]
def build_network():
"""
Builds a geom2bscan keras model
Returns
-------
tf.keras.Model
the geom2bscan keras model
"""
# Build model
def cross_attention(x1, x2, filters, h=8):
v1 = keras.layers.Conv2D(filters, (1,1), padding='same', strides=1)(x1)
q2 = keras.layers.Conv2D(filters, (1,1), padding='same', strides=1)(x2)
a1 = keras.layers.MultiHeadAttention(num_heads=h, key_dim=filters//h)(q2, v1)
o1 = keras.layers.LayerNormalization()(a1 + v1)
out1 = keras.layers.LayerNormalization()(o1 + keras.layers.Conv2D(filters, (1,1), padding='same', strides=1)(o1))
return out1
def down_block(x, filters, kernel_size=(3,3), padding='same', strides=1):
c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(x)
c = keras.layers.Activation('relu')(c)
c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(c)
c = keras.layers.Activation('relu')(c)
p = keras.layers.MaxPool2D((2, 2), (2, 2))(c)
return p
def connect_block(x, filters, kernel_size=(3,3)):
c = keras.layers.Conv2D(filters, kernel_size, padding='same', strides=(3,3))(x)
c = keras.layers.Activation('relu')(c)
c = keras.layers.Conv2DTranspose(filters, kernel_size, padding='valid', strides=1)(c)
c = keras.layers.Activation('relu')(c)
c = keras.layers.Conv2D(filters, kernel_size, padding='same', strides=1)(c)
c = keras.layers.Activation('relu')(c)
return c
def up_block(x, filters, kernel_size=(3,3), padding='same', strides=1):
us_x = keras.layers.UpSampling2D((2, 2))(x)
c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(us_x)
c = keras.layers.Activation('relu')(c)
c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(c)
c = keras.layers.Activation('relu')(c)
return c
data_sizeX = 750
data_sizeY = 850
num_channel = 1
# Feature Map Channel
f0 = 16
f = [f0, f0*2, f0*4, f0*8, f0*16, f0*32]
inputs1 = keras.layers.Input((data_sizeY, data_sizeX, num_channel))
inputs2 = keras.layers.Input((data_sizeY, data_sizeX, num_channel))
# Encoder1
p10 = inputs1
p11 = down_block(p10, f[0])
p12 = down_block(p11, f[1])
p13 = down_block(p12, f[2])
p14 = down_block(p13, f[3])
p15 = down_block(p14, f[4])
e1 = down_block(p15, f[4])
# Encoder2
p20 = inputs2
p21 = down_block(p20, f[0])
p22 = down_block(p21, f[1])
p23 = down_block(p22, f[2])
p24 = down_block(p23, f[3])
p25 = down_block(p24, f[4])
e2 = down_block(p25, f[4])
# Fusion
fu1 = cross_attention(e1, e2, f[4])
fu2 = cross_attention(e2, e1, f[4])
fu = connect_block(keras.layers.Concatenate()([fu1, fu2]), f[5])
# Decoder
u1 = up_block(fu, f[4])
u2 = up_block(u1, f[3])
u3 = up_block(u2, f[2])
u4 = up_block(u3, f[1])
u5 = up_block(u4, f[0])
outputs = keras.layers.Conv2D(1, (1, 1), padding='same')(u5)
model = tf.keras.Model([inputs1, inputs2], outputs)
return model
[docs]
def train(dataset_output_path:str | Path, output_path: str | Path, epochs: int, batch_size: int):
"""
Trains a geom2bscan model with the (labelled) data in the dataset.
Parameters
----------
dataset_output_path : str | Path
dataset output folder.
output_path : str | Path
Directory in which all training results will be saved.
epochs : int
number of training epochs.
batch_size : int
training batch size.
"""
def get_lr_metric(optimizer):
def learning_rate(y_true, y_pred):
return optimizer.learning_rate
return learning_rate
output_path = Path(output_path)
geometries, bscans, _ = load_dataset(dataset_output_path)
train_data, train_labels, test_data, test_labels = split_dataset(geometries, bscans)
train_labels, test_labels, median = filter_initial_wave(train_labels, test_labels)
np.save(output_path / "median_mask.npy", median)
train_data1 = np.expand_dims(train_data[:, :, :, 0], -1)
train_data2 = np.expand_dims(train_data[:, :, :, 1], -1)
train_mask = train_labels
test_data1 = np.expand_dims(test_data[:, :, :, 0], -1)
test_data2 = np.expand_dims(test_data[:, :, :, 1], -1)
test_mask = test_labels
model = build_network()
model.summary()
# Training
model_path = output_path / "model_checkpoint.keras"
output_path.mkdir(exist_ok=True)
Adam = keras.optimizers.Adam(learning_rate=1e-4)
lr_metric = get_lr_metric(Adam)
model.compile(optimizer=Adam, loss='mse', metrics=[lr_metric])
model_checkpoint = keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss', verbose=1, save_best_only=True)
lr_checkpoint = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.98, patience=1, min_lr=0)
history = model.fit(x=[train_data1,train_data2], y=train_mask, batch_size=batch_size, epochs=epochs, verbose=2, \
validation_data=([test_data1,test_data2], test_mask), callbacks=[model_checkpoint,lr_checkpoint])
# plot history
print(history.history.keys())
plt.plot(history.history["loss"], label="train loss")
plt.plot(history.history["val_loss"], label = "val loss")
plt.legend()
plt.savefig(output_path/"loss_history.png")
# Testing
model.load_weights(model_path)
model.evaluate(x=[test_data1,test_data2], y=test_mask)
vmin, vmax = test_mask.min(), test_mask.max()
vmax = max(np.absolute(vmin), vmax)
vmin = -vmax
vmin = vmin / 3
vmax = vmax / 3
test_pred = model.predict([test_data1,test_data2])
test_pred = np.asarray(test_pred)
def get_ranked_losses(preds, labels):
mse = (np.square(preds - labels)).mean(axis=(1, 2))
mse = mse.squeeze()
indexes = np.argsort(mse)
return mse, indexes
mse, mse_indexes = get_ranked_losses(test_pred, test_mask)
with open(output_path / "mse_rank.txt", "w") as f:
for i in mse_indexes:
f.write(f"{i}: {mse[i]}\n")
print("Saving test set predictions...")
for i, (e, s, p, l) in tqdm(enumerate(zip(test_data1, test_data2, test_pred, test_mask)), total=len(test_mask)):
results_subfolder = output_path / "figures_test_set" / str(i).zfill(4)
results_subfolder.mkdir(exist_ok=True, parents=True)
plt.imsave(results_subfolder / "epsilon_r.png", e.squeeze(), vmin=1, vmax=20, cmap="jet")
plt.close()
plt.imsave(results_subfolder / "sigma.png", s.squeeze(), vmin=0, vmax=0.05, cmap="jet")
plt.close()
plt.imsave(results_subfolder / "prediction.png", p.squeeze(), vmin=vmin, vmax=vmax, cmap="jet")
plt.close()
plt.imsave(results_subfolder / "label.png", l.squeeze(), vmin=vmin, vmax=vmax, cmap="jet")
plt.close()
plt.imsave(results_subfolder / "diff.png", p.squeeze() - l.squeeze(), vmin=vmin, vmax=vmax, cmap="jet")
plt.close()
[docs]
def predict_batch(geometries: np.ndarray, model, mask: np.ndarray | None):
"""
Predicts the B-scans for the specified geometries
Parameters
----------
geometries : np.ndarray
Input geometries
model : tf.keras.Model
Model used for inference
mask : np.ndarray | None
Mask added to the predictions of the model to calculate the B-scans, or None.
Returns
-------
np.ndarray
predictions
"""
epsilon_maps = np.expand_dims(geometries[:, :, :, 0], -1)
sigma_maps = np.expand_dims(geometries[:, :, :, 1], -1)
predictions = model.predict([epsilon_maps, sigma_maps])
predictions = np.asarray(predictions).squeeze()
if mask is not None:
predictions += mask
return predictions
[docs]
def predict_batch_wide(geometries: np.ndarray, model, mask: np.ndarray | None):
"""
Splits wide geometries into multiple inputs for the model, then combines the predictions back into a single B-scan.
Uses a sliding window approach for predictions, with an offset of half image (192/2 = 96 pixels for the pretrained model).
Parameters
----------
geometries : np.ndarray
input wide geometries.
model : tf.keras.Model
Model used for inference.
mask : np.ndarray | None
label mask, or None.
Returns
-------
np.ndarray
predictions
"""
model_input_width = model.input_shape[0][2]
model_output_height, model_output_width = model.output_shape[1:3]
geometries_width = geometries.shape[2]
batch_size = geometries.shape[0]
offsets_per_image = 2
input_offset = model_input_width // offsets_per_image
output_offset = model_output_width // offsets_per_image
if not input_offset * offsets_per_image == model_input_width:
raise ValueError(f"Error: model output width ({model_output_width}) is not even.")
num_images = geometries_width // input_offset
if not (num_images * input_offset == geometries_width):
raise ValueError(f"Error: sample width ({geometries_width}) is not a multiple of the input offset ({input_offset}).")
if geometries_width <= model_output_width + output_offset:
raise ValueError(f"Error: sample width ({geometries_width}) must be at least double the model output width ({model_output_width}).")
num_images -= 1
# split the wide geometries into smaller inputs
split_indexes = np.arange(0, num_images, 1) * input_offset
split_geometries = [geometries[:, :, i:i+model_input_width, :] for i in split_indexes]
# predict the outputs
split_geometries = np.concatenate(split_geometries)
split_predictions = predict_batch(split_geometries, model, mask)
split_predictions = split_predictions.reshape(num_images, batch_size, model_output_height, model_output_width)
# create masks
stair_mask = np.linspace(0, 1, output_offset)
first_mask = np.ones(model_output_width)
first_mask[output_offset:] = np.flip(stair_mask)
last_mask = np.ones(model_output_width)
last_mask[:output_offset] = stair_mask
intermediate_mask = np.ones(model_output_width)
intermediate_mask[:output_offset] = stair_mask
intermediate_mask[output_offset:] = np.flip(stair_mask)
masks = [first_mask] + [intermediate_mask]*(num_images - 2) + [last_mask]
masks = np.asarray(masks)
# apply the masks
# shape: B x H x num_images x W
split_predictions = split_predictions.transpose(1, 2, 0, 3)
split_predictions = split_predictions * masks
predictions = np.zeros((batch_size, model_output_height, output_offset * (num_images + 1)), np.float32)
# add the predictions in a overlapped fashion
# (concatenating odd and even predictions and then doing a simgle sum might improve performance)
for i in range(num_images):
offset = i * output_offset
predictions[:, :, offset : offset + model_output_width] += split_predictions[:, :, i, :]
return predictions
[docs]
def save_predictions(preds: np.ndarray, output_dir: str | Path, sample_names: list[str]):
"""
Saves the predictions, each in its own file.
Parameters
----------
preds : np.ndarray
Predictions
output_dir : str | Path
Directory in which to save the files
sample_names : list[str]
ordered list of names for the samples to save
"""
output_dir = Path(output_dir)
for p, name in zip(preds, sample_names):
np.save((output_dir / name).with_suffix(".npy"), p)
[docs]
def check_shapes_equal(dataset_output_path: str | Path, model) -> bool:
"""
Checks if the shapes of model input and dataset samples are equal.
Parameters
----------
dataset_output_path : str | Path
dataset output folder path.
model : keras.Model
model used for inference.
Returns
-------
bool
True if the model input and geometry shapes are equal, False otherwise.
"""
model_input_shape_1, model_input_shape_2 = model.input_shape
assert model_input_shape_1 == model_input_shape_2, f"Error: the model has different input shapes for epsilon_r ({model_input_shape_1}) and sigma ({model_input_shape_2})."
model_height, model_width = model_input_shape_1[1:3]
first_geometry, _, _ = load_dataset(dataset_output_path, (0, 1), verbose=False)
geometry_shape = first_geometry.shape
geometry_height, geometry_width = geometry_shape[1:3]
if not model_height == geometry_height:
raise ValueError(f"Error: model input height ({model_height}) is different from dataset sample height ({geometry_height}).")
if model_width > geometry_width:
raise ValueError(f"Error: model input width ({model_width}) is greater than dataset sample width ({geometry_width}).")
if model_width < geometry_width:
return False
return True
[docs]
def predict(dataset_output_path: str | Path, model_checkpoint_path: str | Path, output_dir: str | Path, label_mask_path: str | Path | None, memory_batch_size: int | None = None):
"""
Predicts B-scans for the full dataset given and stores them as numpy arrays.
Parameters
----------
dataset_output_path : str | Path
Path to the dataset output folder with samples to predict.
model_checkpoint_path : str | Path
Path to the keras model checkpoint to use for prediction.
output_dir : str | Path
Directory in which the predictions will be stored.
label_mask_path : str | Path | None
Path to the label median mask used for preprocessing labels during training.
This will be added to the predictions to obtain the output B-scan.
If None, no postprocessing is done.
memory_batch_size : int | None
Size of the sample batches loaded into memory for each predictions cycle.
If None, the full dataset is loaded into memory.
Returns
-------
np.ndarray | None
predictions, only if memory_batch_size is None.
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
model_checkpoint_path = Path(model_checkpoint_path)
if model_checkpoint_path.suffix == ".keras":
model = keras.models.load_model(model_checkpoint_path)
else:
model = build_network()
model.load_weights(model_checkpoint_path)
prediction_fn = predict_batch
wide_geometries = not check_shapes_equal(dataset_output_path, model)
if wide_geometries:
print(f"Detected model input width smaller than dataset sample width. Using sliding windows for prediction.")
prediction_fn = predict_batch_wide
if label_mask_path is not None:
mask = np.load(label_mask_path)
else:
mask = None
print()
print("WARNING: label mask not set. This will likely cause wrong predictions.")
print()
if memory_batch_size is None:
geometries, _, sample_names = load_dataset(dataset_output_path)
predictions = prediction_fn(geometries, model, mask)
save_predictions(predictions, output_dir, sample_names)
return predictions
else:
num_samples = _get_dataset_len(dataset_output_path)
for start_index in range(0, num_samples, memory_batch_size):
end_index = min(start_index + memory_batch_size, num_samples)
geometries, _, sample_names = load_dataset(dataset_output_path, indexes_interval=(start_index, end_index))
predictions = prediction_fn(geometries, model, mask)
save_predictions(predictions, output_dir, sample_names)
if __name__ == "__main__":
args = _parse_arguments()
# set the gpu device used by keras
os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu)
import tensorflow as tf
import keras
from tools.plot_Bscan import get_output_data
import cv2
if args.action == "train":
train(args.dataset_dir, args.output_dir, args.epochs, args.batch_size)
elif args.action == "predict":
predict(args.dataset_dir, args.model_path, args.output_dir, args.mask_path, args.mem_batch_size)