"""
GEddySoft: A Python Package for Eddy Covariance Processing
This module contains the main processing function for the GEddySoft eddy covariance
software package. It handles the complete workflow of processing raw high-frequency
data from sonic anemometers and trace gas analyzers (e.g., PTR-TOF-MS) to calculate
fluxes and quality control metrics.
Key Features
------------
* Flexible input handling for sonic and trace gas data
* Comprehensive quality control including:
* Spike detection (Vickers & Mahrt 1997)
* Stationarity tests (Foken & Wichura 1996)
* Integral turbulence characteristics (Thomas & Foken 2002)
* Instrument diagnostics
* Advanced processing options:
* Multiple coordinate rotation methods
* Time lag optimization
* Spectral corrections
* Flux uncertainty estimation
* Parallel processing support for large datasets
* HDF5-based data storage with rich metadata
Author
------
Bernard Heinesch
University of Liège, Gembloux Agro-Bio Tech
License
-------
GEddySoft is available under the terms of the MIT License.
"""
# %% Package and functions import
import numpy as np
import pandas as pd
import datetime
import os
import sys
import scipy.signal
from scipy.interpolate import interp1d
from copy import deepcopy
from print_progress import print_progress
import hdfdict # not initially available in the anaconda distribution
import re
import h5py
# import user functions/modules
from read_GHG import read_diag_val
from read_main_inputs import read_main_inputs
from read_metadata_files import read_metadata_files
from check_raw_timestamps import check_raw_timestamps
from get_list_input_files import get_list_input_files
from prepare_output import prepare_output
from get_closest_value import get_closest_value
from logBinSpectrum import logBinSpectrum
from nanlinfit import nanlinfit
from nandetrend import nandetrend
from xcov import xcov
from find_covariance_peak import find_covariance_peak
from test_steady_state_FW96 import test_steady_state_FW96
from test_steady_state_M98 import test_steady_state_M98
from test_steady_state_D99 import test_steady_state_D99
from test_ITC import test_ITC
from compute_wind_direction import compute_wind_direction
from cospectrum import cospectrum
from inst_prob_test import inst_prob_test
from add_attributes import add_attributes
from flux_uncertainties import flux_uncertainties
from clean_results import clean_results
from correction_factor_lpf import correction_factor_lpf
from spike_detection_vickers97 import spike_detection_vickers97
from map_sonic2tracer import map_sonic2tracer
from wind_rotations import wind_rotations
from flux_unit_conversion import flux_unit_conversion
# %% read inputs and prepare the process of individual days
[docs]
def GEddySoft_main(str_day, ini, log_filename):
"""
Process eddy covariance data for a single day (when used in multithread mode)
or for the time period given in the ini file.
This function handles the complete processing chain for eddy covariance data,
from raw high-frequency measurements to quality-controlled fluxes. It supports
both single-threaded and parallel processing modes.
Parameters
----------
str_day : str
Date string in format 'YYYYMMDD' for the day to process.
Use 'no_multithread' for single-threaded mode.
ini : dict
Configuration dictionary containing all processing parameters:
* files: Input/output paths and file patterns
* param: Processing parameters (QC thresholds, methods, etc.)
* run_param: Runtime parameters
* sonic: Sonic anemometer configuration
* irga: Gas analyzer configuration
log_filename : str
Path to the log file where processing messages will be written
Returns
-------
unique_days : list
List of successfully processed days
Notes
-----
The processing workflow includes:
1. Data loading and synchronization
* Sonic anemometer data
* Trace gas measurements
* Auxiliary meteorological data
2. Quality control
* Spike detection and removal
* Diagnostic value checking
* Missing data handling
3. Coordinate rotations
* Double rotation or
* Planar fit method
4. Time lag optimization
* Covariance maximization
* RH-dependent correction
5. Flux calculations
* Detrending options
* Webb-Pearman-Leuning terms
6. Quality assessment
* Stationarity tests
* Integral turbulence characteristics
* Spectral analysis
7. Uncertainty estimation
* Random error (Finkelstein & Sims)
* Detection limits
The function creates two types of output files:
1. Results file (HDF5)
* Fluxes and means
* Quality flags
* Processing parameters
2. Covariance file (HDF5, optional)
* Raw covariance functions
* Time lag information
Examples
--------
>>> # Process a single day
>>> ini = read_main_inputs('config.txt')
>>> log_file = 'processing.log'
>>> processed = GEddySoft_main('20240101', ini, log_file)
"""
# if multi-processing, mute the console and overwrite the file selection of the ini file
if str_day != 'no_multithread':
# Save the current stdout
original_stdout = sys.stdout
# Redirect stdout to devnull
sys.stdout = open(os.devnull, 'w')
ini['files']['date_files_selection'] = "('{}__00_00_00','{}__23_30_00')".format(str_day, str_day)
# Open logfile in append mode (already opened but see no other solution when multiprocessing)
OF = open(log_filename, 'a')
# load metadata files if provided
if ini['files']['meteo_filepath']:
df_meteofiledata = read_metadata_files(ini['files']['meteo_filepath'], OF, meteo=True)
if ini['param']['TILT_CORRECTION_MODE'] == 2:
R_tilt_PFM = read_metadata_files(ini['files']['tilt_correction_filepath'], OF, tilt=True)
if ini['files']['lag_clock_drift_filepath']:
# lag drift info are present and must be accounted for
df_lag_clock_drift = read_metadata_files(ini['files']['lag_clock_drift_filepath'], OF, clock_drift=True)
if ini['param']['LAG_DETECT_METHOD'] == 'PRESCRIBED':
df_lag_prescribed = read_metadata_files(ini['files']['lag_prescribed_filepath'], OF, presc_lag=True)
if ini['param']['LAG_RH_DEPENDENCY'] == 1:
df_lag_rh_dependency = read_metadata_files(ini['files']['lag_rh_dependency_filepath'], OF, rh_lag=True)
if ini['param']['LPFC'] != 0:
df_lpfc = read_metadata_files(ini['files']['lpfc_filepath'], OF, lpfc=ini['param']['LPFC'])
print(); OF.write('\n')
# check output folder
if not os.path.exists((ini['files']['output_folder'])):
sys.exit('output folder not found: ' + ini['files']['output_folder'])
# create sub-folder for covariance files if not existing
cov_folder = os.path.join(ini['files']['output_folder'], 'cov')
if not os.path.exists(cov_folder):
os.makedirs(cov_folder)
print(f"Created missing folder: {cov_folder}\n"); OF.write(f"Created missing folder: {cov_folder}\n\n")
# get list of sonic and tracer input files (in the given dir for sonic and also in subdir for tracer)
all_sonic_files_list, all_tracer_files_list = get_list_input_files(ini)
# map sonic file list to tracer file list if asked for
if ini['run_param']['MAP_SONIC2TRACER'] and ini['run_param']['CONCENTRATION_TYPE']:
all_sonic_files_list = map_sonic2tracer(all_sonic_files_list, all_tracer_files_list)
print('sonic files mapped to tracer files\n')
print('*************** processing data ***************' + '\n')
OF.write('*************** processing data ***************' + '\n' + '\n')
# get uniques yyyy_mm_dd in the sonic file list
unique_days = list(set(list(map(lambda x: x[len(ini['files']['sonic_files_prefix']):len(ini['files']['sonic_files_prefix']) + 10], all_sonic_files_list['name']))))
unique_days.sort()
idx_tracers_to_process = None
# %% loop on days
for d in range(len(unique_days)):
process_irga_data_day = False
# select only the given day
# for sonic files
condition = lambda e: unique_days[d] in e
indices = [i for i, elem in enumerate(all_sonic_files_list['name']) if condition(elem)]
sonic_files_list = {'name': [], 'path': [], 'prefix': [], 'date': []}
for key in all_sonic_files_list:
sonic_files_list[key] = [all_sonic_files_list[key][i] for i in indices]
if ini['run_param']['CONCENTRATION_TYPE'] == 1:
# and for tracer files
# Extract year, month, day from the input_date
year, month, day = unique_days[d].split('_')
# Define a pattern to identify time-related placeholders
time_placeholders = re.compile(r'HH|MM|SS')
# Remove time-related placeholders from the user_format
cleaned_format = time_placeholders.sub('', ini['files']['tracer_files_date_format'])
# Replace placeholders for date components in the cleaned format
formatted_date = cleaned_format.replace('yyyy', year).replace('mm', month).replace('dd', day)
# Remove any leftover separators
formatted_date = re.sub(r'[^a-zA-Z0-9]+$', '', formatted_date)
condition = lambda e: formatted_date in e
indices = [i for i, elem in enumerate(all_tracer_files_list['name']) if condition(elem)]
tracer_files_list = {'name': [], 'path': [], 'prefix': [], 'date': []}
for key in tracer_files_list:
tracer_files_list[key] = [all_tracer_files_list[key][i] for i in indices]
# prepare some variables
out_len = len(sonic_files_list['name'])
upsampling_factor = int(ini['param']['SAMPLING_RATE_SONIC']/ini['param']['SAMPLING_RATE_TRACER'])
# prepare results dict structure
results = prepare_output(ini, out_len)
# prepare covariance data dict to be stored to file
cov_data = {}
temp = {'cov': [[np.NaN] * (2*ini['param']['LAG_OUTER_WINDOW_SIZE']+1)] * out_len}
cov_data['IRGA'] = dict(zip(tuple(str(x) for x in tuple(range(0, len(ini['irga']['irga_columns'])))),
[deepcopy(temp) for i in range(len(ini['irga']['irga_columns']))])) # struct containing results for each tracer
# prepare frequency axis for cospectrum
f = np.arange(0, np.ceil(ini['param']['WINDOW_LENGTH']*ini['param']['SAMPLING_RATE_FINAL']/2)+1) / ini['param']['WINDOW_LENGTH'] # linear frequency
results['freq'] = logBinSpectrum(f, np.zeros(len(f)), ini['param']['NUM_FREQ_BINS'], f[1], f[-1])[0] # log-spaced frequency bins
# %% process sonic data
# Loop on sonic files
for n in range(len(sonic_files_list['name'])):
# store current time
proc_start_time_hh = datetime.datetime.now()
# load sonic file in sonicdata np
sonicdata, error_code = read_main_inputs(sonic_files_list['path'][n], sonic_files_list['name'][n], 'sonic', ini, OF)
if error_code: continue
# store timestamp
# Filename is in UTC+1, start of the half-hour
# Timestamp in the input file is in UNIX format (number of seconds since Epoch, 1/1/1970 00:00:00 UTC).
# The instruction utcfromtimestamp converts time in the UTC timezone
# +3600 : to convert to UTC +1
first_ts = round(sonicdata[0, 0])
last_ts = round(next((i for i in reversed(sonicdata[:, 0]) if i != 0), None))
results['time'][n] = datetime.datetime.utcfromtimestamp(last_ts + 3600)
# remove sonicdata trailing zeros (the file is initially oversized and filled with zero values)
sonicdata = sonicdata[:len(sonicdata[sonicdata[:, 0] != 0, :]), :]
# check completeness of sonic data
completeness_sonicdata = sum(np.isfinite(sonicdata[:, 1])) / \
(ini['param']['WINDOW_LENGTH']*ini['param']['SAMPLING_RATE_SONIC'])
# if sonic file not complete enough, skip this file
if completeness_sonicdata < ini['param']['COMPLETENESS_THRESHOLD']:
print('sonic file incomplete (' + "{:.2f}".format(completeness_sonicdata) + '%), sonic process skipped')
continue
# reduce sonic file to expected lenght
# if the sonic output frequency is slightly higher that it's nominal one, "trailing" records will be discarded
# acceptable only is these nb of records are very limited (e.g. GHS50 files have between 90000 and 90005 records instead of 90000)
# the case of the sonic output frequency slightly lower that it's nominal one is not considered
sonicdata = sonicdata[:ini['param']['WINDOW_LENGTH']*ini['param']['SAMPLING_RATE_SONIC'], :]
# check validity of timestamp series, if present
time_series = pd.Series(np.vectorize(datetime.datetime.utcfromtimestamp)(sonicdata[:, 0]))
threshold = pd.Timedelta(milliseconds=3/ini['param']['SAMPLING_RATE_SONIC']*1000)
msg = check_raw_timestamps(time_series, threshold)
if msg[0]:
print('sonic file: ' + msg[0]); OF.write('sonic file: ' + msg[0] + '\n')
# check sonic flag and set flagged data to default value
# if SPIKE_MODE = 2 and not more than 3 consecutive raw data with error code, then handled by the spike replacement
if np.any(sonicdata[:, 5] != 0):
print('Warning: detected sonic data with error code activated')
sonicdata[sonicdata[:, 5] != 0, 1:5] = -999
# check IRGA flag and set flagged data to NaN
if (ini['run_param']['CONCENTRATION_TYPE'] == 0 and ini['irga']['irga_columns'][2] > 0):
diag_count = read_diag_val(sonicdata[:, 8])
if (diag_count.drop(columns=['AGC', 'Sync', 'Aux_input']) != 0).any(axis=1).any():
sonicdata[:, 6:8] = np.NaN
print('IRGA data flagged')
# subsample sonic data from SAMPLING_RATE_SONIC to SAMPLING RATE FINAL if needed
# note : InnFlux uses upfirdn routine instead of resample
if ((ini['param']['SAMPLING_RATE_SONIC'] > ini['param']['SAMPLING_RATE_FINAL'])):
if np.sum(np.isnan(sonicdata)):
print(str(np.sum(np.isnan(sonicdata))) + ' Nans have been detected in this sonic file. Resampling will not work.') # debug purposes, see readme for nans and resample
downsampling_factor = ini['param']['SAMPLING_RATE_SONIC']/ini['param']['SAMPLING_RATE_FINAL']
sonicdata = scipy.signal.resample(sonicdata, int(len(sonicdata[:, 0])/downsampling_factor))
# reconstruct the timestamp colum (used later on by interp1d).
# also allows to correct for possible variable transmission delays between the sonic and the final
# storage on the datalogger/computer, assuming that the sonic is sending data at a fixed frequency
sonicdata[:, 0] = (first_ts +
np.asarray([x for x in range(0, len(sonicdata))])/ini['param']['SAMPLING_RATE_FINAL']) # first column contains timestamp
# detect w and T spikes (and remove if requested)
if ini['param']['SPIKE_MODE'] != 0:
sonicdata[:, 3], is_spike = spike_detection_vickers97(
sonicdata[:, 3],
spike_mode = ini['param']['SPIKE_MODE'],
avrg_len = int(ini['param']['WINDOW_LENGTH']/60),
ac_freq = ini['param']['SAMPLING_RATE_SONIC'],
spike_limit = ini['param']['SPIKE_DETECTION_THRESHOLD_W'],
max_consec_spikes = ini['param']['MAX_CONSEC_SPIKES'],
ctrplot = ini['run_param']['PLOT_SPIKE_DETECTION']
)
num_spikes_w = int(sum(is_spike))
else:
num_spikes_w = np.nan
if ini['param']['SPIKE_MODE'] != 0:
sonicdata[:, 4], is_spike = spike_detection_vickers97(sonicdata[:, 4],
spike_mode = ini['param']['SPIKE_MODE'],
avrg_len = int(ini['param']['WINDOW_LENGTH']/60),
ac_freq = ini['param']['SAMPLING_RATE_SONIC'],
spike_limit = ini['param']['SPIKE_DETECTION_THRESHOLD_T'],
max_consec_spikes = ini['param']['MAX_CONSEC_SPIKES'],
ctrplot = ini['run_param']['PLOT_SPIKE_DETECTION']
)
num_spikes_T = int(sum(is_spike))
else:
num_spikes_T = np.nan
# test on instrument malfunctions from Vitale 2020
IPT_w = inst_prob_test(sonicdata[:, 3], False, ini['param']['SAMPLING_RATE_FINAL'], ini['run_param']['PLOT_INST_PROB_TEST'], 'W')
IPT_T = inst_prob_test(sonicdata[:, 4], ini['param']['DETREND_TEMPERATURE_SIGNAL'], ini['param']['SAMPLING_RATE_FINAL'], ini['run_param']['PLOT_INST_PROB_TEST'], 'T_SONIC')
# mean and standard deviation of horizontal wind speed and direction
# note: wind direction is the angle where the wind is coming from, with north = 0 and east = 90 degrees
mean_uvw = np.nanmean(sonicdata[:, 1:3+1], 0)
mean_wind_speed = np.sqrt(mean_uvw[0]**2 + mean_uvw[1]**2 + mean_uvw[2]**2)
std_wind_speed = np.std(np.sqrt(sonicdata[:, 1]**2 + sonicdata[:, 2]**2 + sonicdata[:, 3]**2))
mean_wind_dir = compute_wind_direction(mean_uvw[0], mean_uvw[1], ini['param']['SONIC_TYPE'], ini['param']['SONIC_ORIENTATION'])
# Compute wind directions using vectorized operations
wind_directions = compute_wind_direction(sonicdata[:, 1],
sonicdata[:, 2],
ini['param']['SONIC_TYPE'], ini['param']['SONIC_ORIENTATION'])
# Compute standard deviation
std_wind_dir = np.std(wind_directions)
# Coordinate rotations
if (ini['param']['TILT_CORRECTION_MODE'] != 2):
uvw_rot, rot_phi, R_tilt = wind_rotations(ini, sonicdata, mean_uvw)
else:
uvw_rot, rot_phi, R_tilt = wind_rotations(ini, sonicdata, mean_uvw, R_tilt_PFM, mean_wind_dir)
mean_uvw_rot = np.nanmean(uvw_rot, axis=0)
# create a common time vector for aligning sonic and irga/tracer data
interp_time = sonicdata[0, 0] + np.transpose(np.arange(0, ini['param']['WINDOW_LENGTH'], 1/ini['param']['SAMPLING_RATE_FINAL']))
# interpolate wind and scalar data (keep NaN and extrapolate using NaN)
interp_u = interp1d(sonicdata[:, 0], uvw_rot[:, 0], kind='nearest', bounds_error=False, fill_value=np.NaN)(interp_time)
interp_v = interp1d(sonicdata[:, 0], uvw_rot[:, 1], kind='nearest', bounds_error=False, fill_value=np.NaN)(interp_time)
interp_w = interp1d(sonicdata[:, 0], uvw_rot[:, 2], kind='nearest', bounds_error=False, fill_value=np.NaN)(interp_time)
interp_Tsonic = interp1d(sonicdata[:, 0], sonicdata[:, 4], kind='nearest', bounds_error=False, fill_value=np.NaN)(interp_time) + 273.15
# check data completeness
completeness_T = sum(np.isfinite(interp_Tsonic))/(ini['param']['WINDOW_LENGTH']*ini['param']['SAMPLING_RATE_FINAL'])
# Reynolds decomposition of wind components (e.g. u = u_mean + u_prime)
u_prime = interp_u - np.nanmean(interp_u)
v_prime = interp_v - np.nanmean(interp_v)
w_prime = interp_w - np.nanmean(interp_w)
# standard deviation of wind
std_uvw = [np.nanstd(u_prime, ddof=1), np.nanstd(v_prime, ddof=1), np.nanstd(w_prime, ddof=1)]
# check for Nans and replace NaN by zero
if np.isnan(u_prime).any():
print("Warning: NaN values detected in u_prime. Replacing with 0.")
u_prime = np.nan_to_num(u_prime, nan=0)
if np.isnan(v_prime).any():
print("Warning: NaN values detected in v_prime. Replacing with 0.")
v_prime = np.nan_to_num(v_prime, nan=0)
if np.isnan(w_prime).any():
print("Warning: NaN values detected in w_prime. Replacing with 0.")
w_prime = np.nan_to_num(w_prime, nan=0)
# calculate covariances of wind components
uw = np.cov(u_prime, w_prime)[0][1]
vw = np.cov(v_prime, w_prime)[0][1]
uv = np.cov(u_prime, v_prime)[0][1]
uu = np.cov(u_prime, u_prime)[0][1]
vv = np.cov(v_prime, v_prime)[0][1]
ww = np.cov(w_prime, w_prime)[0][1]
# calculate friction velocity, u*
u_star = (uw**2 + vw**2)**0.25
# calculate temperature T from virtual sonic temperature T_sonic and water vapor concentration
# T_sonic = T*(1 + 0.32*w), were w is the volume mixing ratio of water vapor in air
if ini['run_param']['CONCENTRATION_TYPE'] == 0:
interp_H2O = interp1d(sonicdata[:,0], sonicdata[:, 4], kind='nearest', bounds_error=False, fill_value=np.NaN)(interp_time)
completeness_H2O = sum(np.isfinite(interp_H2O))/(ini['param']['WINDOW_LENGTH']*ini['param']['SAMPLING_RATE_FINAL'])
else:
completeness_H2O = 0
if ini['run_param']['CONCENTRATION_TYPE'] == 0 and completeness_H2O >= ini['param']['COMPLETENESS_THRESHOLD']:
if (completeness_T >= ini['param']['COMPLETENESS_THRESHOLD'] and completeness_H2O >= ini['param']['COMPLETENESS_THRESHOLD']):
interp_T = interp_Tsonic/(1 + 0.32*1e-3*interp_H2O) # note that H2O data is given in permille
T_mean = np.nanmean(interp_T)
dT_dt = nanlinfit(interp_T)[0]
dT_dt = dT_dt*ini['param']['SAMPLING_RATE_FINAL']
if (ini['param']['DETREND_TEMPERATURE_SIGNAL']):
T_prime = nandetrend(interp_T)
else:
T_prime = interp_T - T_mean
std_T = np.nanstd(T_prime, ddof=1)
T_prime = np.nan_to_num(T_prime, 0)
else:
# H2O concentration not available, use T_sonic
interp_T = interp_Tsonic
T_mean = np.nanmean(interp_T)
dT_dt = nanlinfit(interp_T)[0]
dT_dt = dT_dt*ini['param']['SAMPLING_RATE_FINAL']
if (ini['param']['DETREND_TEMPERATURE_SIGNAL']):
T_prime = nandetrend(interp_T)
else:
T_prime = interp_T - T_mean
std_T = np.nanstd(T_prime, ddof=1)
T_prime = np.nan_to_num(T_prime, 0)
# get mean atmospheric pressure
if ini['files']['meteo_filepath'] and ini['meteo']['pressure_column']:
p_mean = get_closest_value(df_meteofiledata.iloc[:, ini['meteo']['pressure_column']-1], results['time'][n])
else:
p_mean = ini['param']['DEFAULT_PRESSURE']
# get mean air temperature (in K, only for improved computation of air molar concentration)
T_meteo = np.NaN
if ini['files']['meteo_filepath'] and ini['meteo']['temperature_column']:
T_meteo = get_closest_value(df_meteofiledata.iloc[:, ini['meteo']['temperature_column']-1], results['time'][n]) + 273.15
else:
T_meteo = np.NaN
# get mean air relative humidity (in %, for lag-RH dependency, if any)
if ini['files']['meteo_filepath'] and ini['meteo']['relative_humidity_column']:
rh_meteo = get_closest_value(df_meteofiledata.iloc[:, ini['meteo']['relative_humidity_column']-1], results['time'][n])
else:
rh_meteo = np.NaN
# calculate air molar concentration (mol m-3)
if not pd.isna(T_meteo):
air_mol_conc = 1/((T_meteo/273.15) * (1013.25/p_mean) * 22.414e-3)
else:
air_mol_conc = 1/((T_mean/273.15) * (1013.25/p_mean) * 22.414e-3)
if (completeness_T >= ini['param']['COMPLETENESS_THRESHOLD']):
# calculate temperature flux <w'T'>
wT = xcov(w_prime, T_prime, [0, 0])
# calculate spectrum for T
[spec_T, spec_T_scaled, _] = cospectrum(ini, T_prime, T_prime, f, mean_wind_speed, plot=ini['run_param']['PLOT_COSPECTRUM'], filename=sonic_files_list['name'][n], spectrum_type='spec')
# calculate cospectrum for wT
[cospec_wT, cospec_wT_scaled, _] = cospectrum(ini, w_prime, T_prime, f, mean_wind_speed, plot=ini['run_param']['PLOT_COSPECTRUM'], filename=sonic_files_list['name'][n], spectrum_type='cospec')
# steady state test for wT according to Foken and Wichura 1996 and to Mahrt 1998
steady_state_wT_FW96 = test_steady_state_FW96(w_prime, T_prime)
steady_state_wT_M98 = test_steady_state_M98(w_prime, T_prime)
steady_state_wT_D99 = test_steady_state_D99(w_prime, T_prime)
else:
wT = np.NaN
spec_T = [np.NaN] * ini['param']['NUM_FREQ_BINS']
spec_T_scaled = [np.NaN] * ini['param']['NUM_FREQ_BINS']
cospec_wT = [np.NaN] * ini['param']['NUM_FREQ_BINS']
cospec_wT_scaled = [np.NaN] * ini['param']['NUM_FREQ_BINS']
steady_state_wT_FW96 = np.NaN
steady_state_wT_M98 = np.NaN
steady_state_wT_D99 = np.NaN
# calculate Obukhov length, L
L = -T_mean*u_star**3/(0.4*9.81*wT)
# calculate stability parameter, z/L
zoL = ini['param']['SENSOR_HEIGHT']/L
# calculate potential temperature, theta
# virtual potential temperature, theta_v
# and associated wT covariance
p_0 = 1000 # reference pressure for potential temperature [hPa]
if ((completeness_T >= ini['param']['COMPLETENESS_THRESHOLD']) and p_mean):
theta_mean = T_mean*(p_0/p_mean)**(0.286)
theta_prime = T_prime*(p_0/p_mean)**(0.286)
wtheta = xcov(w_prime, theta_prime, [0, 0])
if (completeness_H2O >= ini['param']['COMPLETENESS_THRESHOLD']):
theta_v = interp_T*(p_0/p_mean)**0.286*(1 + 0.38*1e-3*interp_H2O) # note that H2O data is given in permille
theta_v_mean = np.nanmean(theta_v)
if (ini['param']['DETREND_TEMPERATURE_SIGNAL']):
theta_v_prime = nandetrend(theta_v)
else:
theta_v_prime = theta_v - theta_v_mean
theta_v_prime = np.nan_to_num(theta_v_prime, 0)
wtheta_v = xcov(w_prime, theta_v_prime, [0, 0])
else:
theta_v_mean = np.NaN
wtheta_v = np.NaN
else:
theta_mean = np.NaN
theta_v_mean = np.NaN
wtheta = np.NaN
wtheta_v = np.NaN
# test on developed turbulent conditions
ITC_w, ITC_u, ITC_T = test_ITC(w_prime, u_prime, T_prime, wT, zoL, u_star, ini['param']['LATITUDE'])
# store results for sonic data
results['MET']['u_unrot'][n] = mean_uvw[0]
results['MET']['v_unrot'][n] = mean_uvw[1]
results['MET']['w_unrot'][n] = mean_uvw[2]
results['MET']['u_rot'][n] = mean_uvw_rot[0]
results['MET']['v_rot'][n] = mean_uvw_rot[1]
results['MET']['w_rot'][n] = mean_uvw_rot[2]
results['MET']['std_u'][n] = std_uvw[0]
results['MET']['std_v'][n] = std_uvw[1]
results['MET']['std_w'][n] = std_uvw[2]
results['MET']['wsh'][n] = mean_wind_speed
results['MET']['std_wsh'][n] = std_wind_speed
results['MET']['wdir'][n] = mean_wind_dir
results['MET']['std_wdir'][n] = std_wind_dir
results['MET']['phi'][n] = rot_phi*180/3.1415
results['MET']['tilt'][str(n)] = R_tilt
results['MET']['uw'][n] = uw
results['MET']['vw'][n] = vw
results['MET']['uv'][n] = uv
results['MET']['uu'][n] = uu
results['MET']['vv'][n] = vv
results['MET']['ww'][n] = ww
results['MET']['ust'][n] = u_star
results['MET']['T'][n] = T_mean
results['MET']['std_T'][n] = std_T
results['MET']['wT'][n] = wT
results['MET']['L'][n] = L
results['MET']['zoL'][n] = zoL
results['MET']['spec_T'][n] = spec_T
results['MET']['spec_T_scaled'][n] = spec_T_scaled
results['MET']['cospec_wT'][n] = cospec_wT
results['MET']['cospec_wT_scaled'][n] = cospec_wT_scaled
results['MET']['p'][n] = p_mean
results['MET']['rho_air_molar'][n] = air_mol_conc
results['MET']['theta'][n] = theta_mean
results['MET']['theta_v'][n] = theta_v_mean
results['MET']['wtheta'][n] = wtheta
results['MET']['wtheta_v'][n] = wtheta_v
results['MET']['P_air'][n] = p_mean
results['MET']['T_air'][n] = T_meteo
results['MET']['RH_air'][n] = rh_meteo
results['MET']['qaqc']['completeness_sonic'][n] = completeness_sonicdata
results['MET']['qaqc']['num_spikes_w'][n] = num_spikes_w
results['MET']['qaqc']['num_spikes_T'][n] = num_spikes_T
results['MET']['qaqc']['SST_wT_FW96'][n] = steady_state_wT_FW96
results['MET']['qaqc']['SST_wT_M98'][n] = steady_state_wT_M98
results['MET']['qaqc']['SST_wT_D99'][n] = steady_state_wT_D99
results['MET']['qaqc']['ITC_w'][n] = ITC_w
results['MET']['qaqc']['ITC_u'][n] = ITC_u
results['MET']['qaqc']['ITC_T'][n] = ITC_T
results['MET']['qaqc']['IPT_w'][n] = IPT_w
results['MET']['qaqc']['IPT_T'][n] = IPT_T
# %% process IRGA data
# IRGA tracers
if ini['run_param']['CONCENTRATION_TYPE'] == 0:
process_irga_data_day = True
lag_clock_drift_samples = 0
# get clock-drift time lag drift if input file provided
if ini['files']['lag_clock_drift_filepath']:
# time lag drift info are present and must be accounted for
lag_clock_drift_samples = int(-df_lag_clock_drift['TDC-computer'][abs(df_lag_clock_drift.index - results['time'][n]).argmin()] * ini['param']['SAMPLING_RATE_FINAL']) # find closest time lag based on timestamp
# get prescribed time lag (instrumental + physical) if input file provided
if ini['param']['LAG_DETECT_METHOD'] == 'PRESCRIBED':
# get prescribed time lag (clock-drift + physical)
closest_index = abs(df_lag_prescribed.index - results['time'][n]).argmin()
if abs(df_lag_prescribed.index - results['time'][n]).min() == datetime.timedelta():
# a lag is present for this half-hour
prescribed_lag_samples = int(df_lag_prescribed['time lag in s'][closest_index] * ini['param']['SAMPLING_RATE_FINAL']) # find closest time lag based on timestamp
else:
# a lag is not present for this half-hour, compute the median lag of the ten closest values
# the median must be computed on the physical lag
# Synchronize on df_lag_prescribed index
df_lag_clock_drift_synchronized = df_lag_clock_drift.reindex(df_lag_prescribed.index, method='nearest')
# Calculate indices for the non-NaN values around the gap
valid_indices = range(len(df_lag_prescribed))
valid_distances = [abs(i - closest_index) for i in valid_indices]
sorted_indices = [x for _, x in sorted(zip(valid_distances, valid_indices))]
# Select the closest 10 values for median calculation
selected_indices = sorted_indices[:10]
# Calculate the median of the selected values
prescribed_lag_samples = (np.median(df_lag_prescribed.iloc[selected_indices,0].values + df_lag_clock_drift_synchronized.iloc[selected_indices,0].values) - df_lag_clock_drift_synchronized.iloc[closest_index,0])* ini['param']['SAMPLING_RATE_FINAL']
# compute flux correction factor for low-pass filtering
if ini['param']['LPFC'] == 0:
cf_lpf = 1
elif ini['param']['LPFC'] == 1:
cf_lpf = correction_factor_lpf(mean_wind_speed, zoL, ini, df_lpfc=df_lpfc, ctrplot=ini['run_param']['PLOT_CORRECTION_FACTOR_LPF'])
elif ini['param']['LPFC'] == 2:
cf_lpf = correction_factor_lpf(mean_wind_speed, zoL, ini, df_lpfc=df_lpfc)
results['IRGA']['cf_lpf'][n] = cf_lpf
# loop on concentrations
for i in range(len(ini['irga']['irga_columns'])-1):
lag_phys_wd_center = ini['param']['LAG_WINDOW_CENTER']
# replace by the rh-dependent physical lag, if the tracer is present in the rh_dependency file
if ini['param']['LAG_RH_DEPENDENCY'] == 1:
searched_mz = ini['irga']['irga_names'][i]
matching_cols = df_lag_rh_dependency.columns[
(df_lag_rh_dependency.columns.astype(float) >= searched_mz - 0.001) &
(df_lag_rh_dependency.columns.astype(float) <= searched_mz + 0.001)
]
if not matching_cols.empty:
closest_index = abs(df_lag_rh_dependency.index - min(rh_meteo, 100)).argmin()
lag_phys_wd_center = df_lag_rh_dependency.loc[closest_index, matching_cols[0]]
# detect spikes (and remove if requested)
if ini['param']['SPIKE_MODE'] != 0:
sonicdata[:, i+6], is_spike=spike_detection_vickers97(sonicdata[:, i+6],
spike_mode=ini['param']['SPIKE_MODE'],
avrg_len=int(ini['param']['WINDOW_LENGTH']/60),
ac_freq=ini['param']['SAMPLING_RATE_FINAL'],
spike_limit=ini['param']['SPIKE_DETECTION_THRESHOLD_IRGA'],
max_consec_spikes=ini['param']['MAX_CONSEC_SPIKES'],
ctrplot=ini['run_param']['PLOT_SPIKE_DETECTION']
)
num_spikes = int(sum(is_spike))
else:
num_spikes = np.nan
# test on instrument malfunctions from Vitale 2020
IPT = inst_prob_test(sonicdata[:,i+6], ini['param']['DETREND_TRACER_SIGNAL'], ini['param']['SAMPLING_RATE_FINAL'], ini['run_param']['PLOT_INST_PROB_TEST'], 'c or h')
# IPT = [np.nan for a in range(12)]
interp_c = interp1d(sonicdata[:,0], sonicdata[:,i+6], kind='nearest', bounds_error=False, fill_value=np.NaN)(interp_time)
completeness_IRGA = sum(np.isfinite(interp_c))/(ini['param']['WINDOW_LENGTH']*ini['param']['SAMPLING_RATE_FINAL'])
if completeness_IRGA >= ini['param']['COMPLETENESS_THRESHOLD']:
# calculate slope of IRGA tracer signal
dc_dt = nanlinfit(interp_c)[0]
dc_dt = dc_dt*ini['param']['SAMPLING_RATE_FINAL']
# Reynolds decomposition / detrending
c_mean = np.nanmean(interp_c)
if (ini['param']['DETREND_TRACER_SIGNAL']):
c_prime = nandetrend(interp_c)
else:
c_prime = interp_c - c_mean
std_c = np.nanstd(c_prime, ddof=1)
c_prime = np.nan_to_num(c_prime, 0)
# calculate cross-covariance over a given lag-window
cov_wc = xcov(w_prime, c_prime, [lag_phys_wd_center + lag_clock_drift_samples - ini['param']['LAG_OUTER_WINDOW_SIZE'], lag_phys_wd_center + lag_clock_drift_samples + ini['param']['LAG_OUTER_WINDOW_SIZE']])
# find time lag
if ini['param']['LAG_DETECT_METHOD'] == 'CONSTANT':
lag_samples = int(lag_phys_wd_center + lag_clock_drift_samples)
elif ini['param']['LAG_DETECT_METHOD'] == 'MAX':
lag_samples = find_covariance_peak(cov_wc, ini['param']['LAG_OUTER_WINDOW_SIZE'], ini['param']['LAG_INNER_WINDOW_SIZE'], lag_phys_wd_center + lag_clock_drift_samples, ini['param']['LAG_COVPEAK_FILTER_LENGTH'], 0, ini['run_param']['PLOT_FIND_COVARIANCE_PEAK'], sonic_files_list['name'][n]) # lag in samples
elif ini['param']['LAG_DETECT_METHOD'] == 'MAX_WITH_DEFAULT':
lag_samples = find_covariance_peak(cov_wc, ini['param']['LAG_OUTER_WINDOW_SIZE'], ini['param']['LAG_INNER_WINDOW_SIZE'], lag_phys_wd_center + lag_clock_drift_samples, ini['param']['LAG_COVPEAK_FILTER_LENGTH'], 0, ini['run_param']['PLOT_FIND_COVARIANCE_PEAK'], sonic_files_list['name'][n]) # lag in samples
# if lag was found at the extremum of the search window, replace by the default
if (lag_samples == lag_phys_wd_center + lag_clock_drift_samples - ini['param']['LAG_INNER_WINDOW_SIZE'] + 1 or
lag_samples == lag_phys_wd_center + lag_clock_drift_samples + ini['param']['LAG_INNER_WINDOW_SIZE']):
lag_samples = lag_phys_wd_center + lag_clock_drift_samples
elif ini['param']['LAG_DETECT_METHOD'] == 'PRESCRIBED':
lag_samples = int(prescribed_lag_samples)
lagtime_individual = lag_samples/ini['param']['SAMPLING_RATE_FINAL'] # seconds
# calculate flux in umol/m^2/s or mmol/m^2/s
flux_individual = (cov_wc[ini['param']['LAG_OUTER_WINDOW_SIZE'] - (ini['param']['LAG_WINDOW_CENTER'] + lag_clock_drift_samples) + lag_samples])
# align c_prime for time-lag and trim the circular part, corrected time-series being needed for further computations
if lag_samples > 0:
# Shift c_prime forward (to the right), remove front (wrapped)
c_prime_trim = np.roll(c_prime, lag_samples)[lag_samples:]
# Remove first lag_samples from w_prime to align it in time
w_prime_trim = w_prime[lag_samples:]
elif lag_samples < 0:
lag_abs = abs(lag_samples)
# Shift c_prime backward (to the left), remove end (wrapped)
c_prime_trim = np.roll(c_prime, lag_samples)[:-lag_abs]
# Remove last lag_abs from w_prime to align
w_prime_trim = w_prime[:-lag_abs]
# calculate spectrum for c and cospectrum for wc
[spec_c, spec_c_scaled, _] = cospectrum(ini, c_prime_trim, c_prime_trim, f, mean_wind_speed, spectrum_type='spec')
[cospec_wc, cospec_wc_scaled, _] = cospectrum(ini, w_prime_trim, c_prime_trim, f, mean_wind_speed, spectrum_type='cospec')
# steady state test according to Foken and Wichura 1996 and to Mahrt 1998
steady_state_wc_FW96 = test_steady_state_FW96(w_prime_trim, c_prime_trim)
steady_state_wc_M98 = test_steady_state_M98(w_prime_trim, c_prime_trim)
steady_state_wc_D99 = test_steady_state_D99(w_prime_trim, c_prime_trim)
# compute flux uncertainties
[flux_noise_mean, flux_noise_std, flux_noise_rmse, random_error_FS, random_flux, random_error_noise] = \
flux_uncertainties(ini, w_prime_trim, c_prime_trim)
# convert IRGA flux from kinematic units to desired units
flux_individual, flux_noise_mean, flux_noise_std, flux_noise_rmse, random_error_FS, random_flux, random_error_noise = \
flux_unit_conversion(flux_individual, flux_noise_mean, flux_noise_std, flux_noise_rmse, random_error_FS, random_flux, random_error_noise,
'to umol m-2 s-1', air_mol_conc)
# store results for individual IRGA tracer
results['IRGA'][str(i)]['conc_mean'][n] = c_mean
results['IRGA'][str(i)]['conc_std'][n] = std_c
results['IRGA'][str(i)]['lagtime'][n] = lagtime_individual
results['IRGA'][str(i)]['lagtime_clock_drift'][n] = lag_clock_drift_samples/ini['param']['SAMPLING_RATE_FINAL']
results['IRGA'][str(i)]['flux'][n] = flux_individual
results['IRGA'][str(i)]['spec'][n] = spec_c
results['IRGA'][str(i)]['spec_scaled'][n] = spec_c_scaled
results['IRGA'][str(i)]['cospec'][n] = cospec_wc
results['IRGA'][str(i)]['cospec_scaled'][n] = cospec_wc_scaled
results['IRGA'][str(i)]['qaqc']['completeness_IRGA'][n] = completeness_IRGA
results['IRGA'][str(i)]['qaqc']['num_spikes'][n] = num_spikes
results['IRGA'][str(i)]['qaqc']['SST_FW96'][n] = steady_state_wc_FW96
results['IRGA'][str(i)]['qaqc']['SST_M98'][n] = steady_state_wc_M98
results['IRGA'][str(i)]['qaqc']['SST_D99'][n] = steady_state_wc_D99
results['IRGA'][str(i)]['qaqc']['IPT'][n] = IPT
results['IRGA'][str(i)]['qaqc']['flux_SNR'][n] = abs(flux_individual - flux_noise_mean)/flux_noise_std
results['IRGA'][str(i)]['qaqc']['flux_noise_mean'][n] = flux_noise_mean
results['IRGA'][str(i)]['qaqc']['flux_noise_std'][n] = flux_noise_std
results['IRGA'][str(i)]['qaqc']['flux_noise_rmse'][n] = flux_noise_rmse
results['IRGA'][str(i)]['qaqc']['random_error_FS'][n] = random_error_FS
results['IRGA'][str(i)]['qaqc']['random_flux'][n] = random_flux
results['IRGA'][str(i)]['qaqc']['random_error_noise'][n] = random_error_noise
cov_data['IRGA'][str(i)]['cov'][n] = cov_wc
# %% process TRACER data
if ini['run_param']['CONCENTRATION_TYPE'] == 1:
# load corresponding tracer file
tracerdata, error_code, idx_tracers_to_process, tracer_file_index, *rest = read_main_inputs(sonic_files_list['path'][n], sonic_files_list['name'][n], 'tracer', ini, OF, idx_tracers_to_process, tracer_files_list, results, cov_data, out_len)
if rest:
results = rest[0]
cov_data = rest[1]
# process tracer data
if not(error_code): # corresponding tracer file was found
# compute completeness of tracer data
completeness_tracerdata = sum(np.isfinite(tracerdata['conc']), 1)/(ini['param']['WINDOW_LENGTH']*ini['param']['SAMPLING_RATE_FINAL'])
# check validity of timestamp series, if present
time_series = pd.Series(np.vectorize(datetime.datetime.utcfromtimestamp)(tracerdata['time']))
threshold = pd.Timedelta(milliseconds=2/ini['param']['SAMPLING_RATE_TRACER']*1000)
msg = check_raw_timestamps(time_series, threshold)
if msg[0]:
print('tracer file: ' + msg[0]); OF.write('tracer file: ' + msg[0] + '\n')
# # debug, check for time differences between sonic and tracer timestamps
# if completeness_tracerdata[0] > ini['param']['COMPLETENESS_THRESHOLD']:
# soniclen=len(sonicdata[:,0])
# tracerlen=len(tracerdata['time'])
# timedif = sonicdata[0:min(soniclen,tracerlen),0] - tracerdata['time'][0:min(soniclen,tracerlen)]
# if abs(max(timedif)) > 0.1:
# import matplotlib.pyplot as plt
# plt.plot(timedif)
# print('timedif detected for file : ' + sonic_files_list['name'][n] ) # + '\n'
# upsample data to final sampling rate if necessary
if (ini['param']['SAMPLING_RATE_TRACER'] < ini['param']['SAMPLING_RATE_FINAL']):
first_ts = tracerdata['time'][0, 0]
upsampling_factor = ini['param']['SAMPLING_RATE_FINAL']/ini['param']['SAMPLING_RATE_TRACER']
tracerdata['conc'] = scipy.signal.resample(tracerdata['conc'], int(tracerdata['conc'].shape[0]*upsampling_factor))
# reconstruct the timestamp colum (used later on by interp1d).
tracerdata['time'][:] = (first_ts +
np.asarray([x for x in range(0, len(tracerdata['time']))])/ini['param']['SAMPLING_RATE_FINAL'])
lag_clock_drift_samples = 0
# get clock-drift time lag drift if input file provided
if ini['files']['lag_clock_drift_filepath']:
# time lag drift info are present and must be accounted for
lag_clock_drift_samples = int(-df_lag_clock_drift['TDC-computer'][abs(df_lag_clock_drift.index - results['time'][n]).argmin()] * ini['param']['SAMPLING_RATE_FINAL']) # find closest time lag based on timestamp
# get prescribed time lag (instrumental + physical) if input file provided
if ini['param']['LAG_DETECT_METHOD'] == 'PRESCRIBED':
closest_index = abs(df_lag_prescribed.index - results['time'][n]).argmin()
if abs(df_lag_prescribed.index - results['time'][n]).min() == datetime.timedelta():
# a lag is present for this half-hour
prescribed_lag_samples = int(df_lag_prescribed['time lag in s'][closest_index] * ini['param']['SAMPLING_RATE_FINAL']) # find closest time lag based on timestamp
else:
# a lag is not present for this half-hour, compute the median lag of the ten closest values
# the median must be computed on the physical lag
# Synchronize on df_lag_prescribed index
df_lag_clock_drift_synchronized = df_lag_clock_drift.reindex(df_lag_prescribed.index, method='nearest')
# Calculate indices for the non-NaN values around the gap
valid_indices = range(len(df_lag_prescribed))
valid_distances = [abs(i - closest_index) for i in valid_indices]
sorted_indices = [x for _, x in sorted(zip(valid_distances, valid_indices))]
# Select the closest 10 values for median calculation
selected_indices = sorted_indices[:10]
# Calculate the median of the selected values
prescribed_lag_samples = (np.median(df_lag_prescribed.iloc[selected_indices,0].values + df_lag_clock_drift_synchronized.iloc[selected_indices,0].values) - df_lag_clock_drift_synchronized.iloc[closest_index,0])* ini['param']['SAMPLING_RATE_FINAL']
# compute flux correction factor for low-pass filtering
if ini['param']['LPFC'] == 0:
cf_lpf = 1
elif ini['param']['LPFC'] == 1:
cf_lpf = correction_factor_lpf(mean_wind_speed, zoL, ini, df_lpfc=df_lpfc, ctrplot=ini['run_param']['PLOT_CORRECTION_FACTOR_LPF'])
elif ini['param']['LPFC'] == 2:
cf_lpf = correction_factor_lpf(mean_wind_speed, zoL, ini, df_lpfc=df_lpfc)
results['TRACER']['cf_lpf'][n] = cf_lpf
# save config parameters
results['TRACER']['default_CC_kinetic'][n] = tracerdata['default_CC_kinetic']
# iterate over tracers
msg_tracer = ''
for i in range(tracerdata['conc'].shape[1]):
# check if whole values are NaNs and skip process if the case
if np.all(np.isnan(tracerdata['conc'][:, i])):
msg_tracer = msg_tracer + f"Warning: full NaN values found in tracerdata[{i}]\n"
OF.write((f"\nWarning: full NaN values found in tracerdata[{i}]"))
continue
# check for NaNs, send a warning if present and skip process if the case
if np.any(np.isnan(tracerdata['conc'][:, i])):
msg_tracer = msg_tracer + f"Warning: NaN values found in tracerdata[{i}]\n"
OF.write((f"Warning: NaN values found in tracerdata[{i}]\n"))
continue
# check completeness of tracer data
if(completeness_tracerdata[i] < ini['param']['COMPLETENESS_THRESHOLD']):
if i == 0:
msg_tracer = msg_tracer + 'tracer file incomplete (' + str(int(completeness_tracerdata[i] * 100)) + '%), tracer process skipped\n'
continue
lag_phys_wd_center = ini['param']['LAG_WINDOW_CENTER']
# replace by the rh-dependent physical lag, if the tracer is present in the rh_dependency file
if ini['param']['LAG_RH_DEPENDENCY'] == 1:
searched_mz = round(float(tracerdata['mz'][i]), 3)
matching_cols = df_lag_rh_dependency.columns[
(df_lag_rh_dependency.columns.astype(float) >= searched_mz - 0.001) &
(df_lag_rh_dependency.columns.astype(float) <= searched_mz + 0.001)
]
if not matching_cols.empty:
closest_index = abs(df_lag_rh_dependency.index - min(rh_meteo, 100)).argmin()
lag_phys_wd_center = df_lag_rh_dependency.loc[closest_index, matching_cols[0]]
# statistics on tracer signal and zero
c_mean = np.nanmean(tracerdata['conc'][:, i])
c_std = np.nanstd(tracerdata['conc'][:, i])
c_Q5 = np.nanpercentile(tracerdata['conc'][:, i], 5)
c_Q95 = np.nanpercentile(tracerdata['conc'][:, i], 95)
c_acc = np.nanmean(abs(tracerdata['conc_acc'][:, i]/tracerdata['conc'][:, i]))
c_prec = np.sqrt(np.nansum(tracerdata['conc_prec'][:, i]**2) / np.count_nonzero(~np.isnan(tracerdata['conc_prec'][:, i])))
c_LOD = 3*np.sqrt(np.nansum(tracerdata['zero_prec'][:, i]**2) / np.count_nonzero(~np.isnan(tracerdata['zero_prec'][:, i])))
# detect spikes (and remove if requested)
if ini['param']['SPIKE_MODE'] != 0:
tracerdata['conc'][:, i], is_spike = spike_detection_vickers97(tracerdata['conc'][:, i],
spike_mode = ini['param']['SPIKE_MODE'],
avrg_len = int(ini['param']['WINDOW_LENGTH']/60),
ac_freq = ini['param']['SAMPLING_RATE_FINAL'],
spike_limit = ini['param']['SPIKE_DETECTION_THRESHOLD_TRACER'],
max_consec_spikes = ini['param']['MAX_CONSEC_SPIKES'],
ctrplot = ini['run_param']['PLOT_SPIKE_DETECTION']
)
num_spikes = int(sum(is_spike))
else:
num_spikes = np.nan
# test on instrument malfunctions from Vitale 2020
if ini['run_param']['PROCESS_IPT_TRACERS']:
IPT = inst_prob_test(tracerdata['conc'][:, i], ini['param']['DETREND_TRACER_SIGNAL'], ini['param']['SAMPLING_RATE_FINAL'], ini['run_param']['PLOT_INST_PROB_TEST'], 'tracer')
else:
IPT = [np.nan for a in range(12)]
# caclulate slope of tracer signal
dc_dt = nanlinfit(tracerdata['conc'][:, i])[0]
dc_dt = dc_dt*ini['param']['SAMPLING_RATE_TRACER']
# interpolate concentration data (keep NaNs and extrapolate using NaNs)
interp_c = interp1d(np.squeeze(tracerdata['time'][:]), np.squeeze(tracerdata['conc'][:, i]), kind='nearest', bounds_error=False, fill_value=np.NaN)(interp_time)
# Reynolds decomposition / detrending
if ini['param']['DETREND_TRACER_SIGNAL']:
c_prime = nandetrend(interp_c)
else:
c_prime = interp_c - c_mean
# trim w_prime and c_prime for NaNs, keeping the alignment
first, last = np.argmax(~np.isnan(w_prime)), len(w_prime) - np.argmax(~np.isnan(w_prime)[::-1]) - 1
w_prime_trim = w_prime[first:last + 1]; c_prime_trim = c_prime[first:last + 1]
first, last = np.argmax(~np.isnan(c_prime_trim)), len(c_prime_trim) - np.argmax(~np.isnan(c_prime_trim)[::-1]) - 1
w_prime_trim = w_prime_trim[first:last + 1]; c_prime_trim = c_prime_trim[first:last + 1]
# calculate cross-covariance over a given lag-window
cov_wc = xcov(w_prime_trim, c_prime_trim, [lag_phys_wd_center + lag_clock_drift_samples - ini['param']['LAG_OUTER_WINDOW_SIZE'], lag_phys_wd_center + lag_clock_drift_samples + ini['param']['LAG_OUTER_WINDOW_SIZE']])
# find time lag
if ini['param']['LAG_DETECT_METHOD'] == 'CONSTANT':
lag_samples = int(lag_phys_wd_center + lag_clock_drift_samples)
elif ini['param']['LAG_DETECT_METHOD'] == 'MAX':
lag_samples = find_covariance_peak(cov_wc, ini['param']['LAG_OUTER_WINDOW_SIZE'], ini['param']['LAG_INNER_WINDOW_SIZE'], lag_phys_wd_center + lag_clock_drift_samples, ini['param']['LAG_COVPEAK_FILTER_LENGTH'], 0, ini['run_param']['PLOT_FIND_COVARIANCE_PEAK'], tracer_files_list['name'][tracer_file_index]) # lag in samples
elif ini['param']['LAG_DETECT_METHOD'] == 'MAX_WITH_DEFAULT':
lag_samples = find_covariance_peak(cov_wc, ini['param']['LAG_OUTER_WINDOW_SIZE'], ini['param']['LAG_INNER_WINDOW_SIZE'], lag_phys_wd_center + lag_clock_drift_samples, ini['param']['LAG_COVPEAK_FILTER_LENGTH'], 0, ini['run_param']['PLOT_FIND_COVARIANCE_PEAK'], tracer_files_list['name'][tracer_file_index]) # lag in samples
# if lag was found at the extremum of the search window, replace by the default
if (lag_samples == lag_phys_wd_center + lag_clock_drift_samples - ini['param']['LAG_INNER_WINDOW_SIZE'] + 1 or
lag_samples == lag_phys_wd_center + lag_clock_drift_samples + ini['param']['LAG_INNER_WINDOW_SIZE']):
lag_samples = lag_phys_wd_center + lag_clock_drift_samples
elif ini['param']['LAG_DETECT_METHOD'] == 'PRESCRIBED':
lag_samples = int(prescribed_lag_samples)
lagtime_individual = lag_samples/ini['param']['SAMPLING_RATE_FINAL'] # seconds
# calculate flux in ug/m^2/s
flux_individual = cov_wc[ini['param']['LAG_OUTER_WINDOW_SIZE'] - (lag_phys_wd_center + lag_clock_drift_samples) + lag_samples]
# correct flux for low-pass filtering
flux_individual = flux_individual * cf_lpf
# align c_prime for time-lag and trim the circular part, corrected time-series being needed for further computations
if lag_samples > 0:
# Shift c_prime forward (to the right), remove front (wrapped)
c_prime_trim = np.roll(c_prime_trim, lag_samples)[lag_samples:]
# Remove first lag_samples from w_prime to align it in time
w_prime_trim = w_prime_trim[lag_samples:]
elif lag_samples < 0:
lag_abs = abs(lag_samples)
# Shift c_prime backward (to the left), remove end (wrapped)
c_prime_trim = np.roll(c_prime_trim, lag_samples)[:-lag_abs]
# Remove last lag_abs from w_prime to align
w_prime_trim = w_prime_trim[:-lag_abs]
# calculate spectrum for c and cospectrum for wc
[spec_c, spec_c_scaled, _] = cospectrum(ini, c_prime_trim, c_prime_trim, f, mean_wind_speed, spectrum_type='spec')
[cospec_wc, cospec_wc_scaled, _] = cospectrum(ini, w_prime_trim, c_prime_trim, f, mean_wind_speed, spectrum_type='cospec')
# steady state test according to Foken and Wichura 1996 and to Mahrt 1998
steady_state_wc_FW96 = test_steady_state_FW96(w_prime_trim, c_prime_trim)
steady_state_wc_M98 = test_steady_state_M98(w_prime_trim, c_prime_trim)
steady_state_wc_D99 = test_steady_state_D99(w_prime_trim, c_prime_trim)
# compute flux uncertainties
[flux_noise_mean, flux_noise_std, flux_noise_rmse, random_error_FS, random_flux, random_error_noise] = \
flux_uncertainties(ini, w_prime_trim, c_prime_trim)
# convert tracer flux and associated variables from kinematic units to desired units
flux_individual, flux_noise_mean, flux_noise_std, flux_noise_rmse, random_error_FS, random_flux, random_error_noise = \
flux_unit_conversion(flux_individual, flux_noise_mean, flux_noise_std, flux_noise_rmse, random_error_FS, random_flux, random_error_noise,
'to ug m-2 s-1', air_mol_conc, tracerdata['mz'][i] - 1)
# store results for individual tracer
results['TRACER'][str(i)]['conc_mean'][n] = c_mean
results['TRACER'][str(i)]['conc_std'][n] = c_std
results['TRACER'][str(i)]['conc_Q5'][n] = c_Q5
results['TRACER'][str(i)]['conc_Q95'][n] = c_Q95
results['TRACER'][str(i)]['conc_acc'][n] = c_acc
results['TRACER'][str(i)]['conc_prec'][n] = c_prec
results['TRACER'][str(i)]['conc_LOD'][n] = c_LOD
results['TRACER'][str(i)]['lagtime'][n] = lagtime_individual
results['TRACER'][str(i)]['lagtime_clock_drift'][n] = lag_clock_drift_samples/ini['param']['SAMPLING_RATE_FINAL']
results['TRACER'][str(i)]['flux'][n] = flux_individual
results['TRACER'][str(i)]['spec'][n] = spec_c
results['TRACER'][str(i)]['spec_scaled'][n] = spec_c_scaled
results['TRACER'][str(i)]['cospec'][n] = cospec_wc
results['TRACER'][str(i)]['cospec_scaled'][n] = cospec_wc_scaled
results['TRACER'][str(i)]['qaqc']['completeness_TRACER'][n] = completeness_tracerdata[i]
results['TRACER'][str(i)]['qaqc']['num_spikes'][n] = num_spikes
results['TRACER'][str(i)]['qaqc']['SST_FW96'][n] = steady_state_wc_FW96
results['TRACER'][str(i)]['qaqc']['SST_M98'][n] = steady_state_wc_M98
results['TRACER'][str(i)]['qaqc']['SST_D99'][n] = steady_state_wc_D99
results['TRACER'][str(i)]['qaqc']['IPT'][n] = IPT
results['TRACER'][str(i)]['qaqc']['flux_SNR'][n] = abs(flux_individual - flux_noise_mean)/flux_noise_std
results['TRACER'][str(i)]['qaqc']['flux_noise_mean'][n] = flux_noise_mean
results['TRACER'][str(i)]['qaqc']['flux_noise_std'][n] = flux_noise_std
results['TRACER'][str(i)]['qaqc']['flux_noise_rmse'][n] = flux_noise_rmse
results['TRACER'][str(i)]['qaqc']['random_error_FS'][n] = random_error_FS
results['TRACER'][str(i)]['qaqc']['random_flux'][n] = random_flux
results['TRACER'][str(i)]['qaqc']['random_error_noise'][n] = random_error_noise
results['TRACER'][str(i)]['calibration'][n] = tracerdata['calibration'][i]
results['TRACER'][str(i)]['transmission'][n] = tracerdata['transmission'][i]
results['TRACER'][str(i)]['Xr0'][n] = tracerdata['Xr0'][i]
results['TRACER'][str(i)]['cluster_min'][n] = tracerdata['cluster_min'][i]
results['TRACER'][str(i)]['cluster_max'][n] = tracerdata['cluster_max'][i]
results['TRACER'][str(i)]['k_reac'][n] = tracerdata['k_reac'][i]
results['TRACER'][str(i)]['FY'][n] = tracerdata['FY'][i]
results['TRACER'][str(i)]['IF'][n] = tracerdata['IF'][i]
cov_data['TRACER'][str(i)]['cov'][n] = cov_wc
print_progress(i+1, tracerdata['conc'].shape[1], 'tracers processed', flush_=True, end_='\r', width=10)
# display time needed
proc_end_time_hh = datetime.datetime.now()
msg = 'process time: ' + str((proc_end_time_hh - proc_start_time_hh))
print('\n' + msg); OF.write("\n" + msg + "\n\n")
# display messages
print(msg_tracer)
# %% clean results for unprocessed sonic files
clean_results(results)
# %% output data
# save covariance functions to file
# if (tracer_files_list or ini['irga']['irga_columns']) and ini['run_param']['WRITE_COV_OUTPUTS']:
if ini['run_param']['WRITE_COV_OUTPUTS']:
ts = results['time'][0]
cov_filename = ini['files']['output_files_prefix'] + 'cov_%d%02d%02d.hdf5' % (ts.year, ts.month, ts.day)
hdfdict.dump(cov_data, ini['files']['output_folder'] + '\\cov\\' + cov_filename, mode='w')
# add attributes
with h5py.File(ini['files']['output_folder'] + '\\cov\\' + cov_filename, 'r+') as hdf5_f:
if ini['run_param']['CONCENTRATION_TYPE'] == 0:
# loop on tracers
for i in range(len(hdf5_f["IRGA"])):
hdf5_f['IRGA'][str(i)]['cov'].attrs['description'] = 'covariance function (cov vs lag)'; hdf5_f['IRGA'][str(i)]['cov'].attrs['units'] = 'ppbv ncps-1 m s-1 and lag in samples'
elif ini['run_param']['CONCENTRATION_TYPE'] == 1:
# loop on tracers
for i in range(len(hdf5_f["TRACER"])):
hdf5_f['TRACER'][str(i)]['cov'].attrs['description'] = 'covariance function (cov vs lag)'; hdf5_f['TRACER'][str(i)]['cov'].attrs['units'] = 'ppbv ncps-1 m s-1 and lag in samples'
# save results to file
date = datetime.datetime.now()
results['file_creation_time'] = '%d%02d%02d%02d%02d' % (date.year, date.month, date.day, date.hour, date.minute)
ts = results['time'][0]
res_filename = ini['files']['output_files_prefix'] + '%d%02d%02d.hdf5' % (ts.year, ts.month, ts.day)
results['time'] = [date_obj.strftime('%Y-%m-%d %H-%M-%S') for date_obj in results['time']]
hdfdict.dump(results, ini['files']['output_folder']+'\\' + res_filename, mode='w')
# add attributes "name" and "units"
if 'TRACER' in results: # TRACER part has been created in the result dict (tracer data for that day)
add_attributes(ini['files']['output_folder'], res_filename, process_irga_data_day, len(ini['irga']['irga_columns']), True, tracerdata)
else:
add_attributes(ini['files']['output_folder'], res_filename, process_irga_data_day, len(ini['irga']['irga_columns']))
return unique_days