Source code for read_metadata_files

import numpy as np
import pandas as pd
import re
import sys


[docs] def read_metadata_files(path, OF, meteo=False, tilt=False, clock_drift=False, presc_lag=False, rh_lag=False, lpfc=False): """ Read the requested metadata input files (one per call)) parameters ---------- path (string): path+name of the file meteo, tilt, clock_drift, presc_lag, rh_lag, lpfc (boolean or integer): used to identify the requested file returns ------- df_xxx (pd dataframe): formatted content of the requested file comments -------- Written by B. Heinesch. University of Liege, Gembloux Agro-Bio Tech. """ # meteo parameters if meteo: print('using meteo file: ' + path); OF.write('using meteo file: ' + path + '\n') df_meteofiledata = pd.read_csv(path, header=0, names=['pressure', 'temperature', 'relative humidity'], sep=',', skiprows=1) # read input meteo file df_meteofiledata.index = pd.to_datetime(df_meteofiledata.index, format='%d/%m/%Y %H:%M') # format index as datetime df_meteofiledata = df_meteofiledata[~df_meteofiledata.index.duplicated()] return df_meteofiledata # tilt correction parameters if tilt: print('using tilt correction file: ' + path); OF.write('using tilt correction file: ' + path + '\n') # this is the file coming from the PFM preparation of eddypro with open(path, 'r') as file: lines = file.readlines() # Find the index of the line containing "Rotation matrices" rotation_matrices_index = lines.index('Rotation matrices\n') + 1 R_tilt_PFM = {} sect = 1 for i in range(rotation_matrices_index, len(lines), 4): # Split the text using one or more spaces as the delimiter elements = re.split(r'\s+', lines[i]) # Filter out any empty strings resulting from consecutive spaces elements = [element for element in elements if element] sector = int(elements[4]) matrix = [] for j in range(i + 1, i + 4): row = list(map(float, lines[j].strip().split())) matrix.append(row) R_tilt_PFM[sector] = np.array(matrix) sect = sect + 1 return R_tilt_PFM # clock-drift lag parameters if clock_drift: print('using clock_drift file: ' + path); OF.write('using clock_drift file: ' + path + '\n') # lag drift info are present and must be accounted for df_lag_clock_drift = pd.read_csv(path, header=0, names=['TDC-computer', 'lag drift'], sep=',') # read input lag drift file df_lag_clock_drift.index = pd.to_datetime(df_lag_clock_drift.index, format='%d/%m/%Y %H:%M') # format index as datetime df_lag_clock_drift = df_lag_clock_drift[~df_lag_clock_drift.index.duplicated()] return df_lag_clock_drift # prescribed time lag (clock-drift + physical) if presc_lag: if not path: sys.exit('LAG_DETECT_METHOD = PRESCRIBED but no lag_prescribed_filepath given') print('using presc_lag file: ' + path); OF.write('using presc_lag file: ' + path + '\n') # time lag present and must be accounted for df_lag_prescribed = pd.read_csv(path, header=0, names=['time lag in s'], sep=',') # read input lag drift file df_lag_prescribed.index = pd.to_datetime(df_lag_prescribed.index, format='%d/%m/%Y %H:%M') # format index as datetime df_lag_prescribed = df_lag_prescribed[~df_lag_prescribed.index.duplicated()] df_lag_prescribed = df_lag_prescribed.dropna() return df_lag_prescribed # time lag rh dependency if rh_lag: if not path: sys.exit('LAG_RH_DEPENDENCY = 1 but no lag_rh_dependency_filepath given') print('using rh_lag file: ' + path); OF.write('using rh_lag file: ' + path + '\n') # time lag rh dependency present and must be accounted for df_lag_rh_dependency = pd.read_csv(path, header=0, sep=',', index_col='RH (%)', skiprows=1) # read input lag rh dependency file df_lag_rh_dependency = df_lag_rh_dependency[~df_lag_rh_dependency.index.duplicated()] df_lag_rh_dependency = df_lag_rh_dependency.dropna() # overwrite each column title by its mz value, rounded at the third decimal df_lag_rh_dependency.columns = [ str(round(float(match.group()), 3)) if (match := re.search(r"[-+]?\d*\.\d+|\d+", col)) else col for col in df_lag_rh_dependency.columns ] return df_lag_rh_dependency # low-pass filtering correction parameters if lpfc == 1: if not path: sys.exit('LPFC = 1 but no COF/Massman-type lpfc_filepath given') print('using lpcf file: ' + path); OF.write('using lpcf file: ' + path + '\n') records = [] stability_class = None with open(path, encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue # Detect section label: all,, unstable,, stable,, if line.startswith('all') or line.startswith('unstable') or line.startswith('stable'): parts = [p.strip() for p in line.split(',')] if parts[0] in ['all', 'unstable', 'stable']: stability_class = parts[0] continue # Skip header line if line.startswith('name,value'): continue parts = [p.strip() for p in line.split(',')] if not stability_class or not parts[0]: continue try: value = float(parts[1]) except (ValueError, IndexError): continue records.append({ 'stability_class': stability_class, 'name': parts[0], 'value': value }) return pd.DataFrame(records) elif lpfc == 2: if not path: sys.exit('LPFC = 2 but no lpcf_filepath given') print('using lpcf file: ' + path); OF.write('using lpcf file: ' + path + '\n') dfs = [] with open(path, encoding='utf-8') as f: lines = f.readlines() i = 0 while i < len(lines): # Skip empty lines if not lines[i].strip(): i += 1 continue # Section label (unstable/stable) if lines[i].startswith('unstable') or lines[i].startswith('stable'): status = lines[i].split(',')[0].strip() header = lines[i].strip().split(',') i += 1 data = [] # Read until empty line or line of commas while i < len(lines) and lines[i].strip() and not all(x == '' for x in lines[i].strip().split(',')): row = lines[i].strip().split(',') if len(row) == len(header): data.append(row) i += 1 # Create DataFrame for this section df_section = pd.DataFrame(data, columns=header) df_section['stability_class'] = status dfs.append(df_section) i += 1 # Concatenate, keep only columns of interest df_lpfc = pd.concat(dfs, ignore_index=True) df_lpfc = df_lpfc[['stability_class', 'ws_max', 'CF_L']] # Convert to numeric where possible df_lpfc['ws_max'] = pd.to_numeric(df_lpfc['ws_max'], errors='coerce') df_lpfc['CF_L'] = pd.to_numeric(df_lpfc['CF_L'], errors='coerce') return df_lpfc