Tue, 27 Mar 2018 15:13:05 +0300
Change for reading new lines for both Windows and Linux
import logging import numpy as np from licel import LicelLidarMeasurement logger = logging.getLogger(__name__) class LicelCalibrationMeasurement(LicelLidarMeasurement): def __init__(self, plus45_files=None, minus45_files=None, use_id_as_name=False, licel_timezone='UTC'): # Setup the empty class super(LicelCalibrationMeasurement, self).__init__(use_id_as_name=use_id_as_name, licel_timezone=licel_timezone) self.plus45_files = plus45_files self.minus45_files = minus45_files if plus45_files and minus45_files: self.check_equal_length() self.read_channel_data() self.update() def update(self): """ Correct timescales after each update. """ super(LicelCalibrationMeasurement, self).update() self.correct_timescales() def check_equal_length(self): """ Check if input time series have equal lengths. """ len_plus = len(self.plus45_files) len_minus = len(self.minus45_files) if len_plus != len_minus: raise self.UnequalMeasurementLengthError( "Input timeseries have different length: %s vs %s." % (len_plus, len_minus)) def read_channel_data(self): # Read plus and minus 45 measurements self.plus45_measurement = LicelLidarMeasurement(self.plus45_files, self.use_id_as_name, self.licel_timezone) self.plus45_measurement.rename_channels(suffix='_p45') self.minus45_measurement = LicelLidarMeasurement(self.minus45_files, self.use_id_as_name, self.licel_timezone) self.minus45_measurement.rename_channels(suffix='_m45') # Combine them in this object self.channels = {} self.channels.update(self.plus45_measurement.channels) self.channels.update(self.minus45_measurement.channels) def correct_timescales(self): self.check_timescales_are_two() self.combine_scales() def check_timescales_are_two(self): no_timescales = len(self.variables['Raw_Data_Start_Time']) if no_timescales != 2: raise self.WrongNumberOfTimescalesError("Wrong number of timescales: %s instead of 2." % no_timescales) def combine_scales(self): start_times, end_times = self.get_ordered_timescales() new_start_time = start_times[0] new_stop_time = end_times[1] self.variables['Raw_Data_Start_Time'] = [new_start_time, ] self.variables['Raw_Data_Stop_Time'] = [new_stop_time, ] self.reset_timescale_id() def reset_timescale_id(self): """ Set all timescales to 0 :return: """ timescale_dict = self.variables['id_timescale'] self.variables['id_timescale'] = dict.fromkeys(timescale_dict, 0) def get_ordered_timescales(self): scale_start_1, scale_start_2 = self.variables['Raw_Data_Start_Time'] scale_end_1, scale_end_2 = self.variables['Raw_Data_Stop_Time'] if scale_start_1[0] > scale_start_2[0]: scale_start_1, scale_start_2 = scale_start_2, scale_start_1 if scale_end_1[0] > scale_end_2[0]: scale_end_1, scale_end_2 = scale_end_2, scale_end_1 return (scale_start_1, scale_start_2), (scale_end_1, scale_end_2) def add_fake_measurements(self, no_profiles, variation=0.1): """ Add a number of fake measurements. This is done to allow testing with single analog profiles. Adds a predefined variation in each new profile. """ duration = self.info['duration'] for channel_name, channel in self.channels.items(): base_time = channel.data.keys()[0] base_data = channel.data[base_time] for n in range(no_profiles): random_variation = base_data * (np.random.rand(len(base_data)) * 2 - 1) * variation new_time = base_time + n * duration new_data = channel.data[base_time].copy() + random_variation if 'ph' in channel_name: new_data = new_data.astype('int') channel.data[new_time] = new_data self.update() def subset_photoncounting(self): """ Subset photoncounting channels. """ ph_channels = [channel for channel in self.channels.keys() if 'ph' in channel] new_measurement = self.subset_by_channels(ph_channels) return new_measurement def _get_scc_channel_variables(self): """ Get a list of variables to put in the SCC. It can be overridden e.g. in the depolarization product class. Returns ------- channel_variables: dict A dictionary with channel variable specifications. """ channel_variables = \ {'Background_Low': (('channels',), 'd'), 'Background_High': (('channels',), 'd'), 'LR_Input': (('channels',), 'i'), 'DAQ_Range': (('channels',), 'd'), 'Pol_Calib_Range_Min': (('channels',), 'd'), 'Pol_Calib_Range_Max': (('channels',), 'd'), } return channel_variables class UnequalMeasurementLengthError(RuntimeError): """ Raised when the plus and minus files have different length. """ pass class WrongNumberOfTimescalesError(RuntimeError): """ Raised when timescales are not two. """ pass