Source code for src.dataset_creation.geom2bscan

"""
This module contains code to:

1) Train a CNN-based black box neural network model to approximate the B-scan response from a given dataset.

2) Use a trained model to quickly compute B-scan predictions from a (possibly) geometry-only dataset.
"""

import numpy as np
import matplotlib.pyplot as plt
import argparse

# CHOOSE GPU
import os

from sklearn.model_selection import train_test_split
from pathlib import Path
from tqdm import tqdm

def _parse_arguments():
    """
    Parses the arguments and returns the derived Namespace.
    """

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(dest="action", required=True)

    parser_train = subparsers.add_parser("train", help='Train a model to approximate the given dataset.')
    parser_train.add_argument("-d", "--dataset_dir", type=str, help="Dataset output folder path.", required=True)
    parser_train.add_argument("-o", "--output_dir", type=str, help="Output directory for model training.", required=True)
    parser_train.add_argument("-e", "--epochs", type=str, help="Number of training epochs.", default=300)
    parser_train.add_argument("-bs", "--batch_size", type=str, help="Training batch size.", default=10)
    parser_train.add_argument("--gpu", type=int, help="GPU to use for predictions/training", default=0)


    parser_predict = subparsers.add_parser("predict", help='Use a model checkpoint to predict the given dataset.')
    parser_predict.add_argument("-d", "--dataset_dir", type=str, help="Dataset output folder path.", required=True)
    parser_predict.add_argument("-m", "--model_path", type=str, help="Model checkpoint path.", required=True)
    parser_predict.add_argument("-o", "--output_dir", type=str, help="Output directory for predictions.", required=True)
    parser_predict.add_argument("--mask_path", type=str, help="Path to the file containing the label median mask used to train the model.", default=None)
    parser_predict.add_argument("--mem_batch_size", type=int, help="Number of samples to load into memory and predict at a time.", default=None)
    parser_predict.add_argument("--gpu", type=int, help="GPU to use for predictions/training", default=0)

    args = parser.parse_args()

    return args

def _get_dataset_len(dataset_output_path: str | Path):
    dataset_output_path = Path(dataset_output_path)
    return len(list(dataset_output_path.glob("scan_*")))

[docs] def load_dataset(dataset_output_path: str | Path = Path("dataset_bscan/gprmax_output_files"), indexes_interval: tuple[int,int] | None = None, verbose = True): """ Loads the B-scan dataset at the specified location. Performs filtering of the PML bug related to steel sleepers. Parameters ---------- dataset_output_path : str | Path, optional location of the output folder of the dataset, by default Path("dataset_bscan/gprmax_output_files") indexes_interval : tuple[int, int] | None If specified, the interval of indexes to load, upper limit excluded. Default : None. verbose : bool Controls weather to print info and show a progress bar for the data loading. Default: True. Returns ------- tuple[np.ndarray, np.ndarray, list[str]] data, labels, sample names """ geometries = [] bscans = [] dataset_output_path = Path(dataset_output_path) if verbose: print("Loading dataset...") folders = list(dataset_output_path.glob("scan_*")) folders.sort() if indexes_interval is not None: folders = folders[indexes_interval[0] : indexes_interval[1]] if len(folders) == 0: raise ValueError("No sample in the dataset, check dataset path and indexes interval.") sample_names = [f.name for f in folders] if verbose: folders = tqdm(folders) for f in folders: geom = np.load(f / f"{f.name}_geometry.npy") bscan_path = f / f"{f.name}_merged.out" geometries.append(geom[0:2]) if bscan_path.exists(): bscan, dt = get_output_data(bscan_path, 1, "Ez") bscan = cv2.resize(bscan, (192, 224)) bscans.append(bscan) geometries = np.asarray(geometries) if len(bscans) == 0: bscans = None else: bscans = np.asarray(bscans) geometries, bscans = filter_PML_bug_bscans(geometries, bscans) bscans = np.expand_dims(bscans, -1) # geometries, bscans = preprocess_data(geometries, bscans) geometries = geometries.transpose(0, 2, 3, 1) return geometries, bscans, sample_names
[docs] def split_dataset(geometries: np.ndarray, bscans: np.ndarray, random_state: int = 42): """ Splits the dataset into train and test set. Parameters ---------- geometries : np.ndarray dataset geometries (input data) bscans : np.ndarray dataset bscans (labels) random_state : int, optional seed for the dataset split, by default 42 Returns ------- tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] train data, test data, train labels, test labels. """ train_data, test_data, train_labels, test_labels = train_test_split(geometries, bscans, random_state=random_state) return train_data, train_labels, test_data, test_labels
[docs] def filter_PML_bug_bscans(geometries: np.ndarray, bscans: np.ndarray, upper_limit: float = 1.4e6): """ Filters the samples of the dataset to remove the ones in which the PML bug appears. This is done using a threshold value on the sum of the absolute value of the pixels of B-scans. The reason is that the samples with the PML bug exibit much more reflections, so are well divided from the other ones. Parameters ---------- geometries : np.ndarray of shape [B, C, H, W] sample geometries bscans : np.ndarray of shape [B, H, W] sample bscans upper_limit : float, optional the upper limit threshold. A value of 1.4e6 has been empirically found to divide the dataset in a clean way. Returns ------- tuple[np.ndarray, np.ndarray] the filtered geometries and bscans """ coefficient = np.absolute(bscans).sum((1, 2)) to_keep = coefficient <= upper_limit return geometries[to_keep], bscans[to_keep]
[docs] def preprocess_data(geoms: np.ndarray, bscans: np.ndarray): """ Preprocesses the data by applying a rescaling of 1/80 to the relative permittivity values and the cube root to both the conductivity and bscan values Parameters ---------- geoms : np.ndarray of shape [B, 2, H, W] sample geometries bscans : np.ndarray of shape [B, H, W] sample bscans Returns ------- tuple[np.ndarray, np.ndarray] processed geometries and bscans """ epsilon = geoms[:, 0, :, :] sigma = geoms[:, 1, :, :] epsilon = epsilon / 80. sigma = np.cbrt(sigma) # bscans = np.cbrt(bscans) return geoms, bscans
[docs] def filter_initial_wave(train_labels: np.ndarray, test_labels: np.ndarray): """ Filters the initial wave in the labels by removing the median of the values present in the pixels of the train labels. Parameters ---------- train_labels : np.ndarray of shape [B, H, W, C] the train B-scan labels test_labels : np.ndarray of shape [B, H, W, C] the test B-scan labels Returns ------- tuple[np.ndarray, np.ndarray, np.ndarray] train labels, test labels, median image used for the filtering """ median = np.median(train_labels, axis=(0, 3)) train_labels = train_labels - median[:, :, None] test_labels = test_labels - median[:, :, None] return train_labels, test_labels, median
[docs] def build_network(): """ Builds a geom2bscan keras model Returns ------- tf.keras.Model the geom2bscan keras model """ # Build model def cross_attention(x1, x2, filters, h=8): v1 = keras.layers.Conv2D(filters, (1,1), padding='same', strides=1)(x1) q2 = keras.layers.Conv2D(filters, (1,1), padding='same', strides=1)(x2) a1 = keras.layers.MultiHeadAttention(num_heads=h, key_dim=filters//h)(q2, v1) o1 = keras.layers.LayerNormalization()(a1 + v1) out1 = keras.layers.LayerNormalization()(o1 + keras.layers.Conv2D(filters, (1,1), padding='same', strides=1)(o1)) return out1 def down_block(x, filters, kernel_size=(3,3), padding='same', strides=1): c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(x) c = keras.layers.Activation('relu')(c) c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(c) c = keras.layers.Activation('relu')(c) p = keras.layers.MaxPool2D((2, 2), (2, 2))(c) return p def connect_block(x, filters, kernel_size=(3,3)): c = keras.layers.Conv2D(filters, kernel_size, padding='same', strides=(3,3))(x) c = keras.layers.Activation('relu')(c) c = keras.layers.Conv2DTranspose(filters, kernel_size, padding='valid', strides=1)(c) c = keras.layers.Activation('relu')(c) c = keras.layers.Conv2D(filters, kernel_size, padding='same', strides=1)(c) c = keras.layers.Activation('relu')(c) return c def up_block(x, filters, kernel_size=(3,3), padding='same', strides=1): us_x = keras.layers.UpSampling2D((2, 2))(x) c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(us_x) c = keras.layers.Activation('relu')(c) c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(c) c = keras.layers.Activation('relu')(c) return c data_sizeX = 750 data_sizeY = 850 num_channel = 1 # Feature Map Channel f0 = 16 f = [f0, f0*2, f0*4, f0*8, f0*16, f0*32] inputs1 = keras.layers.Input((data_sizeY, data_sizeX, num_channel)) inputs2 = keras.layers.Input((data_sizeY, data_sizeX, num_channel)) # Encoder1 p10 = inputs1 p11 = down_block(p10, f[0]) p12 = down_block(p11, f[1]) p13 = down_block(p12, f[2]) p14 = down_block(p13, f[3]) p15 = down_block(p14, f[4]) e1 = down_block(p15, f[4]) # Encoder2 p20 = inputs2 p21 = down_block(p20, f[0]) p22 = down_block(p21, f[1]) p23 = down_block(p22, f[2]) p24 = down_block(p23, f[3]) p25 = down_block(p24, f[4]) e2 = down_block(p25, f[4]) # Fusion fu1 = cross_attention(e1, e2, f[4]) fu2 = cross_attention(e2, e1, f[4]) fu = connect_block(keras.layers.Concatenate()([fu1, fu2]), f[5]) # Decoder u1 = up_block(fu, f[4]) u2 = up_block(u1, f[3]) u3 = up_block(u2, f[2]) u4 = up_block(u3, f[1]) u5 = up_block(u4, f[0]) outputs = keras.layers.Conv2D(1, (1, 1), padding='same')(u5) model = tf.keras.Model([inputs1, inputs2], outputs) return model
[docs] def train(dataset_output_path:str | Path, output_path: str | Path, epochs: int, batch_size: int): """ Trains a geom2bscan model with the (labelled) data in the dataset. Parameters ---------- dataset_output_path : str | Path dataset output folder. output_path : str | Path Directory in which all training results will be saved. epochs : int number of training epochs. batch_size : int training batch size. """ def get_lr_metric(optimizer): def learning_rate(y_true, y_pred): return optimizer.learning_rate return learning_rate output_path = Path(output_path) geometries, bscans, _ = load_dataset(dataset_output_path) train_data, train_labels, test_data, test_labels = split_dataset(geometries, bscans) train_labels, test_labels, median = filter_initial_wave(train_labels, test_labels) np.save(output_path / "median_mask.npy", median) train_data1 = np.expand_dims(train_data[:, :, :, 0], -1) train_data2 = np.expand_dims(train_data[:, :, :, 1], -1) train_mask = train_labels test_data1 = np.expand_dims(test_data[:, :, :, 0], -1) test_data2 = np.expand_dims(test_data[:, :, :, 1], -1) test_mask = test_labels model = build_network() model.summary() # Training model_path = output_path / "model_checkpoint.keras" output_path.mkdir(exist_ok=True) Adam = keras.optimizers.Adam(learning_rate=1e-4) lr_metric = get_lr_metric(Adam) model.compile(optimizer=Adam, loss='mse', metrics=[lr_metric]) model_checkpoint = keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss', verbose=1, save_best_only=True) lr_checkpoint = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.98, patience=1, min_lr=0) history = model.fit(x=[train_data1,train_data2], y=train_mask, batch_size=batch_size, epochs=epochs, verbose=2, \ validation_data=([test_data1,test_data2], test_mask), callbacks=[model_checkpoint,lr_checkpoint]) # plot history print(history.history.keys()) plt.plot(history.history["loss"], label="train loss") plt.plot(history.history["val_loss"], label = "val loss") plt.legend() plt.savefig(output_path/"loss_history.png") # Testing model.load_weights(model_path) model.evaluate(x=[test_data1,test_data2], y=test_mask) vmin, vmax = test_mask.min(), test_mask.max() vmax = max(np.absolute(vmin), vmax) vmin = -vmax vmin = vmin / 3 vmax = vmax / 3 test_pred = model.predict([test_data1,test_data2]) test_pred = np.asarray(test_pred) def get_ranked_losses(preds, labels): mse = (np.square(preds - labels)).mean(axis=(1, 2)) mse = mse.squeeze() indexes = np.argsort(mse) return mse, indexes mse, mse_indexes = get_ranked_losses(test_pred, test_mask) with open(output_path / "mse_rank.txt", "w") as f: for i in mse_indexes: f.write(f"{i}: {mse[i]}\n") print("Saving test set predictions...") for i, (e, s, p, l) in tqdm(enumerate(zip(test_data1, test_data2, test_pred, test_mask)), total=len(test_mask)): results_subfolder = output_path / "figures_test_set" / str(i).zfill(4) results_subfolder.mkdir(exist_ok=True, parents=True) plt.imsave(results_subfolder / "epsilon_r.png", e.squeeze(), vmin=1, vmax=20, cmap="jet") plt.close() plt.imsave(results_subfolder / "sigma.png", s.squeeze(), vmin=0, vmax=0.05, cmap="jet") plt.close() plt.imsave(results_subfolder / "prediction.png", p.squeeze(), vmin=vmin, vmax=vmax, cmap="jet") plt.close() plt.imsave(results_subfolder / "label.png", l.squeeze(), vmin=vmin, vmax=vmax, cmap="jet") plt.close() plt.imsave(results_subfolder / "diff.png", p.squeeze() - l.squeeze(), vmin=vmin, vmax=vmax, cmap="jet") plt.close()
[docs] def predict_batch(geometries: np.ndarray, model, mask: np.ndarray | None): """ Predicts the B-scans for the specified geometries Parameters ---------- geometries : np.ndarray Input geometries model : tf.keras.Model Model used for inference mask : np.ndarray | None Mask added to the predictions of the model to calculate the B-scans, or None. Returns ------- np.ndarray predictions """ epsilon_maps = np.expand_dims(geometries[:, :, :, 0], -1) sigma_maps = np.expand_dims(geometries[:, :, :, 1], -1) predictions = model.predict([epsilon_maps, sigma_maps]) predictions = np.asarray(predictions).squeeze() if mask is not None: predictions += mask return predictions
[docs] def predict_batch_wide(geometries: np.ndarray, model, mask: np.ndarray | None): """ Splits wide geometries into multiple inputs for the model, then combines the predictions back into a single B-scan. Uses a sliding window approach for predictions, with an offset of half image (192/2 = 96 pixels for the pretrained model). Parameters ---------- geometries : np.ndarray input wide geometries. model : tf.keras.Model Model used for inference. mask : np.ndarray | None label mask, or None. Returns ------- np.ndarray predictions """ model_input_width = model.input_shape[0][2] model_output_height, model_output_width = model.output_shape[1:3] geometries_width = geometries.shape[2] batch_size = geometries.shape[0] offsets_per_image = 2 input_offset = model_input_width // offsets_per_image output_offset = model_output_width // offsets_per_image if not input_offset * offsets_per_image == model_input_width: raise ValueError(f"Error: model output width ({model_output_width}) is not even.") num_images = geometries_width // input_offset if not (num_images * input_offset == geometries_width): raise ValueError(f"Error: sample width ({geometries_width}) is not a multiple of the input offset ({input_offset}).") if geometries_width <= model_output_width + output_offset: raise ValueError(f"Error: sample width ({geometries_width}) must be at least double the model output width ({model_output_width}).") num_images -= 1 # split the wide geometries into smaller inputs split_indexes = np.arange(0, num_images, 1) * input_offset split_geometries = [geometries[:, :, i:i+model_input_width, :] for i in split_indexes] # predict the outputs split_geometries = np.concatenate(split_geometries) split_predictions = predict_batch(split_geometries, model, mask) split_predictions = split_predictions.reshape(num_images, batch_size, model_output_height, model_output_width) # create masks stair_mask = np.linspace(0, 1, output_offset) first_mask = np.ones(model_output_width) first_mask[output_offset:] = np.flip(stair_mask) last_mask = np.ones(model_output_width) last_mask[:output_offset] = stair_mask intermediate_mask = np.ones(model_output_width) intermediate_mask[:output_offset] = stair_mask intermediate_mask[output_offset:] = np.flip(stair_mask) masks = [first_mask] + [intermediate_mask]*(num_images - 2) + [last_mask] masks = np.asarray(masks) # apply the masks # shape: B x H x num_images x W split_predictions = split_predictions.transpose(1, 2, 0, 3) split_predictions = split_predictions * masks predictions = np.zeros((batch_size, model_output_height, output_offset * (num_images + 1)), np.float32) # add the predictions in a overlapped fashion # (concatenating odd and even predictions and then doing a simgle sum might improve performance) for i in range(num_images): offset = i * output_offset predictions[:, :, offset : offset + model_output_width] += split_predictions[:, :, i, :] return predictions
[docs] def save_predictions(preds: np.ndarray, output_dir: str | Path, sample_names: list[str]): """ Saves the predictions, each in its own file. Parameters ---------- preds : np.ndarray Predictions output_dir : str | Path Directory in which to save the files sample_names : list[str] ordered list of names for the samples to save """ output_dir = Path(output_dir) for p, name in zip(preds, sample_names): np.save((output_dir / name).with_suffix(".npy"), p)
[docs] def check_shapes_equal(dataset_output_path: str | Path, model) -> bool: """ Checks if the shapes of model input and dataset samples are equal. Parameters ---------- dataset_output_path : str | Path dataset output folder path. model : keras.Model model used for inference. Returns ------- bool True if the model input and geometry shapes are equal, False otherwise. """ model_input_shape_1, model_input_shape_2 = model.input_shape assert model_input_shape_1 == model_input_shape_2, f"Error: the model has different input shapes for epsilon_r ({model_input_shape_1}) and sigma ({model_input_shape_2})." model_height, model_width = model_input_shape_1[1:3] first_geometry, _, _ = load_dataset(dataset_output_path, (0, 1), verbose=False) geometry_shape = first_geometry.shape geometry_height, geometry_width = geometry_shape[1:3] if not model_height == geometry_height: raise ValueError(f"Error: model input height ({model_height}) is different from dataset sample height ({geometry_height}).") if model_width > geometry_width: raise ValueError(f"Error: model input width ({model_width}) is greater than dataset sample width ({geometry_width}).") if model_width < geometry_width: return False return True
[docs] def predict(dataset_output_path: str | Path, model_checkpoint_path: str | Path, output_dir: str | Path, label_mask_path: str | Path | None, memory_batch_size: int | None = None): """ Predicts B-scans for the full dataset given and stores them as numpy arrays. Parameters ---------- dataset_output_path : str | Path Path to the dataset output folder with samples to predict. model_checkpoint_path : str | Path Path to the keras model checkpoint to use for prediction. output_dir : str | Path Directory in which the predictions will be stored. label_mask_path : str | Path | None Path to the label median mask used for preprocessing labels during training. This will be added to the predictions to obtain the output B-scan. If None, no postprocessing is done. memory_batch_size : int | None Size of the sample batches loaded into memory for each predictions cycle. If None, the full dataset is loaded into memory. Returns ------- np.ndarray | None predictions, only if memory_batch_size is None. """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) model_checkpoint_path = Path(model_checkpoint_path) if model_checkpoint_path.suffix == ".keras": model = keras.models.load_model(model_checkpoint_path) else: model = build_network() model.load_weights(model_checkpoint_path) prediction_fn = predict_batch wide_geometries = not check_shapes_equal(dataset_output_path, model) if wide_geometries: print(f"Detected model input width smaller than dataset sample width. Using sliding windows for prediction.") prediction_fn = predict_batch_wide if label_mask_path is not None: mask = np.load(label_mask_path) else: mask = None print() print("WARNING: label mask not set. This will likely cause wrong predictions.") print() if memory_batch_size is None: geometries, _, sample_names = load_dataset(dataset_output_path) predictions = prediction_fn(geometries, model, mask) save_predictions(predictions, output_dir, sample_names) return predictions else: num_samples = _get_dataset_len(dataset_output_path) for start_index in range(0, num_samples, memory_batch_size): end_index = min(start_index + memory_batch_size, num_samples) geometries, _, sample_names = load_dataset(dataset_output_path, indexes_interval=(start_index, end_index)) predictions = prediction_fn(geometries, model, mask) save_predictions(predictions, output_dir, sample_names)
if __name__ == "__main__": args = _parse_arguments() # set the gpu device used by keras os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu) import tensorflow as tf import keras from tools.plot_Bscan import get_output_data import cv2 if args.action == "train": train(args.dataset_dir, args.output_dir, args.epochs, args.batch_size) elif args.action == "predict": predict(args.dataset_dir, args.model_path, args.output_dir, args.mask_path, args.mem_batch_size)