Source code for snom_analysis.lib.realign

##############################################################################
# Copyright (C) 2020-2025 Hans-Joachim Schill

# This file is part of snom_analysis.

# snom_analysis is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# snom_analysis is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with snom_analysis.  If not, see <http://www.gnu.org/licenses/>.
##############################################################################

import numpy as np
from matplotlib import pyplot as plt



[docs] def calculate_squared_mean_deviation(array_1, array_2) -> float: # check if both arrays are of equal length if len(array_1) != len(array_2): print('The lengths of the arrays must be equal!') return 0 mean_deviation = 0 N = len(array_1) for i in range(N): mean_deviation += (array_1[i]**2-array_2[i]**2)/N return mean_deviation
[docs] def calculate_squared_deviation(array_1, array_2) -> list: # check if both arrays are of equal length if len(array_1) != len(array_2): print('The lengths of the arrays must be equal!') return 0 mean_deviation = [] N = len(array_1) for i in range(N): # ignore '0' values if array_1[i] == 0 or array_2[i] == 0: mean_deviation.append(0) else: mean_deviation.append(abs(array_1[i]-array_2[i])) return mean_deviation
def _shift_array_1d_by_index(array_1, array_2, index) -> tuple: """This function shifts the second array with respect to the first array. The first array is modified to keep the same length. Args: array_1 (np.array): first and reference array array_2 (np.array): second array index (int): how much should be shifted, to the left if negative """ # print(f'in shift 1d, index: {index}') if index > 0: array_1_new = array_1 array_2_new = array_2 for i in range(abs(index)): # print(f'apply shift, i: {i}') array_1_new = np.append(array_1_new, 0) array_2_new = np.insert(array_2_new, 0, 0) elif index < 0: array_1_new = array_1 array_2_new = array_2 for i in range(abs(index)): array_1_new = np.insert(array_1_new, 0, 0) array_2_new = np.append(array_2_new, 0) else: array_1_new = array_1 array_2_new = array_2 return array_1_new, array_2_new
[docs] def shift_array_2d_by_index(array_1, array_2, index) -> tuple: """This function shifts the second 2d array relative to the first. Zeroes will be created on the outside. Args: array_1 (np.ndarray): array1 array_2 (np.ndarray): array2 index (int): how much to shift, negative means to the left Returns: tuple: shifted arrays """ x_res = len(array_1[0]) y_res = len(array_1) # print(f'xres={x_res} yres={y_res}, index={index}') x_res_new = x_res + abs(index) #create new lists with modified resolution array_1_new = np.zeros((y_res, x_res_new)) array_2_new = np.zeros((y_res, x_res_new)) for y in range(y_res): test_1, test_2 = _shift_array_1d_by_index(array_1[y], array_2[y], index) # print(f'len test1: {len(test_1)}, len test 2: {len(test_2)}') array_1_new[y] = test_1 array_2_new[y] = test_2 # array_1_new[y], array_2_new[y] = _shift_array_1d_by_index(array_1[y], array_2[y], index) return array_1_new, array_2_new
[docs] def create_mean_array(array_1, array_2): #check if dimensions are identical if len(array_1) != len(array_2) and len(array_1[0]) != len(array_2[0]): print('The length of the given arrays is not identical.') exit() else: x_res = len(array_1[0]) y_res = len(array_1) new_array = np.zeros((y_res, x_res)) for y in range(y_res): for x in range(x_res): new_array[y][x] = (array_1[y][x] + array_2[y][x])/2 return new_array
[docs] def create_mean_array_v2(array_1, array_2, index): """This variant is meant to keep the size of the original array! Args: array_1 (_type_): _description_ array_2 (_type_): _description_ Returns: _type_: _description_ """ #check if dimensions are identical if len(array_1) != len(array_2) and len(array_1[0]) != len(array_2[0]): print('The length of the given arrays is not identical.') exit() else: x_res = len(array_1[0]) y_res = len(array_1) new_array = np.zeros((y_res, x_res)) for y in range(y_res): if index > 0: for x in range(x_res - index): new_array[y][x] = (array_1[y][x+index] + array_2[y][x])/2 if index < 0: for x in range(x_res - abs(index)): new_array[y][x] = (array_1[y][x] + array_2[y][x+abs(index)])/2 return new_array
[docs] def minimize_deviation_1d(array_1, array_2, n_tries=5, display=True) -> int: """This function tries to find the optimal shift between two arrays to find the lowest deviation. Best to use leveled data. Args: array_1 (np.array): first array, typically height data array_2 (np.array): second array, typically height data n_tries (int, optional): the maximum shift expected for optimal overlap, will be applied symmetrically to right and left shift. Defaults to 5. Returns: int: the optimal shift index, left shift if negative """ # try shifting to the right mean_deviations_right = [] for i in range(n_tries): array_1_new, array_2_new = _shift_array_1d_by_index(array_1, array_2, i+1) mean_deviation_array = calculate_squared_deviation(array_1_new, array_2_new) mean_deviation = np.mean(mean_deviation_array) mean_deviations_right.append(mean_deviation) x = range(len(array_1_new)) # plt.plot(x, array_1_new, label='array_2') # plt.plot(x, array_2_new, label='array_1') # plt.plot(x, mean_deviation_array, label="Mean deviation_array") # plt.hlines(mean_deviation, label="Mean deviation", xmin=0, xmax=len(array_1)) # plt.legend() # plt.show() min_dev_right = min(mean_deviations_right) # try shifting to the left mean_deviations_left = [] for i in range(n_tries): array_1_new, array_2_new = _shift_array_1d_by_index(array_1, array_2, -(i+1)) mean_deviation_array = calculate_squared_deviation(array_1_new, array_2_new) mean_deviation = np.mean(mean_deviation_array) mean_deviations_left.append(mean_deviation) x = range(len(array_1_new)) # plt.plot(x, array_1_new, label='array_2') # plt.plot(x, array_2_new, label='array_1') # plt.plot(x, mean_deviation_array, label="Mean deviation_array") # plt.hlines(mean_deviation, label="Mean deviation", xmin=0, xmax=len(array_1)) # plt.legend() # plt.show() min_dev_left = min(mean_deviations_left) # try not shifting at all mean_deviation_array = calculate_squared_deviation(array_1, array_2) min_dev_center = np.mean(mean_deviation_array) min_dev_index = 0 if min_dev_left <= min_dev_right and min_dev_left < min_dev_center: # print(f'The minimal deviation of {min_dev_left} is reached for a shift of {mean_deviations_left.index(min_dev_left)+1} to the left') array_1_new, array_2_new = _shift_array_1d_by_index(array_1, array_2, -(mean_deviations_left.index(min_dev_left)+1)) min_dev_index = -(mean_deviations_left.index(min_dev_left)+1) elif min_dev_left > min_dev_right and min_dev_right < min_dev_center: # print(f'The minimal deviation of {min_dev_right} is reached for a shift of {mean_deviations_right.index(min_dev_right)+1} to the right') array_1_new, array_2_new = _shift_array_1d_by_index(array_1, array_2, +(mean_deviations_right.index(min_dev_right)+1)) min_dev_index = (mean_deviations_right.index(min_dev_right)+1) elif min_dev_center <= min_dev_left and min_dev_center <= min_dev_right: array_1_new = array_1 array_2_new = array_2 min_dev_index = 0 else: print(f'min_dev_left: {min_dev_left}, min_dev_center: {min_dev_center}, min_dev_right: {min_dev_right}') print('Oops, something went wrong, could not do the optimization') if display == True: # redo the best deviation: x = range(len(array_1_new)) plt.plot(x, array_1_new, label='array_1') plt.plot(x, array_2_new, label='array_2') mean_deviation_array = calculate_squared_deviation(array_1_new, array_2_new) mean_deviation = np.mean(mean_deviation_array) plt.plot(x, mean_deviation_array, label="Mean deviation_array") plt.hlines(mean_deviation, label="Mean deviation", xmin=0, xmax=len(array_1_new)) plt.legend() plt.show() return min_dev_index
[docs] def minimize_deviation_2d(array_1, array_2, n_tries=5, display=True) -> int: """This function tries to find the optimal shift between two arrays to find the lowest deviation. Best to use leveled data. Args: array_1 (np.array): first array, typically height data array_2 (np.array): second array, typically height data n_tries (int, optional): the maximum shift expected for optimal overlap, will be applied symmetrically to right and left shift. Defaults to 5. Returns: int: the optimal shift index, left shift if negative """ y_shifts = [] # contains the optimal shifts per y data line rows = len(array_1) for i in range(rows): y_shifts.append(minimize_deviation_1d(array_1[i], array_2[i], n_tries, display)) # plt.plot(range(rows), y_shifts, label="optimal dev index per row") # plt.legend(loc="upper left") # plt.show() return int(round(np.mean(y_shifts)))
def _shift_data(data, y_shifts): min_shift = min(y_shifts) max_shift = max(y_shifts) # print('shifts: ', y_shifts) if min_shift < 0: min_shift = abs(min_shift) else: min_shift = 0 if max_shift > 0: max_shift = max_shift else: max_shift = 0 # min and max shift now correspond to the empty spaces which have to be added in the beginning(min) and end(max) of the first line total_shift = min_shift + max_shift if total_shift != 0: y_res = len(data) x_res = len(data[0]) x_res_new = x_res + min_shift + max_shift data_new = np.zeros((y_res, x_res_new)) for i in range(len(y_shifts)): row_data = data[i] index = y_shifts[i] num_of_inserts = min_shift + index num_of_appends = max_shift - index if num_of_inserts > 0: for j in range(num_of_inserts): row_data = np.insert(row_data, 0, 0) if num_of_appends > 0: for j in range(num_of_appends): row_data = np.append(row_data, 0) data_new[i] = row_data else: return data ''' max_shift = abs(max(y_shifts, key=abs)) print('shifts: ', y_shifts) print('max shift: ', max_shift) y_res = len(data) x_res = len(data[0]) x_res_new = x_res + 2*max_shift data_new = np.zeros((y_res, x_res_new)) for i in range(len(y_shifts)): row_data = data[i] index = y_shifts[i] num_of_appends = max_shift - index num_of_inserts = max_shift + index if num_of_inserts > 0: for j in range(num_of_inserts): row_data = np.insert(row_data, 0, 0) if num_of_appends > 0: for j in range(num_of_appends): row_data = np.append(row_data, 0) data_new[i] = row_data ''' return data_new
[docs] def minimize_drift(data:np.array, n_tries=5, display=False) -> np.array: """This function tries to shift subsequent lines of a single measurement to minimize the deviation. The function will return the drift corrected data. Returns: np.array: the drift corrected data """ rows = len(data) # cols = len(data[0]) y_shifts = [0] # the first row will not be shifted, each subsequent shift is related to the first line for y in range(rows-1): y_shifts.append(minimize_deviation_1d(data[0], data[y+1], n_tries, display)) return _shift_data(data, y_shifts)