atmospheric_lidar/licel_depol.py

Tue, 27 Mar 2018 15:13:05 +0300

author
Ioannis <ioannis@inoe.ro>
date
Tue, 27 Mar 2018 15:13:05 +0300
changeset 145
a99ae250e086
parent 116
d9703af687aa
child 160
db28629317fa
permissions
-rw-r--r--

Change for reading new lines for both Windows and Linux

import logging

import numpy as np

from licel import LicelLidarMeasurement

logger = logging.getLogger(__name__)


class LicelCalibrationMeasurement(LicelLidarMeasurement):

    def __init__(self, plus45_files=None, minus45_files=None,  use_id_as_name=False, licel_timezone='UTC'):
        # Setup the empty class
        super(LicelCalibrationMeasurement, self).__init__(use_id_as_name=use_id_as_name, licel_timezone=licel_timezone)

        self.plus45_files = plus45_files
        self.minus45_files = minus45_files

        if plus45_files and minus45_files:
            self.check_equal_length()

            self.read_channel_data()
            self.update()

    def update(self):
        """ Correct timescales after each update.
        """
        super(LicelCalibrationMeasurement, self).update()
        self.correct_timescales()

    def check_equal_length(self):
        """
        Check if input time series have equal lengths.
        """
        len_plus = len(self.plus45_files)
        len_minus = len(self.minus45_files)
        if len_plus != len_minus:
            raise self.UnequalMeasurementLengthError(
                            "Input timeseries have different length: %s vs %s." % (len_plus, len_minus))

    def read_channel_data(self):

        # Read plus and minus 45 measurements
        self.plus45_measurement = LicelLidarMeasurement(self.plus45_files, self.use_id_as_name, self.licel_timezone)
        self.plus45_measurement.rename_channels(suffix='_p45')

        self.minus45_measurement = LicelLidarMeasurement(self.minus45_files, self.use_id_as_name, self.licel_timezone)
        self.minus45_measurement.rename_channels(suffix='_m45')

        # Combine them in this object
        self.channels = {}
        self.channels.update(self.plus45_measurement.channels)
        self.channels.update(self.minus45_measurement.channels)

    def correct_timescales(self):
        self.check_timescales_are_two()
        self.combine_scales()

    def check_timescales_are_two(self):
        no_timescales = len(self.variables['Raw_Data_Start_Time'])
        if no_timescales != 2:
            raise self.WrongNumberOfTimescalesError("Wrong number of timescales: %s instead of 2." % no_timescales)

    def combine_scales(self):
        start_times, end_times = self.get_ordered_timescales()
        new_start_time = start_times[0]
        new_stop_time = end_times[1]
        self.variables['Raw_Data_Start_Time'] = [new_start_time, ]
        self.variables['Raw_Data_Stop_Time'] = [new_stop_time, ]
        self.reset_timescale_id()

    def reset_timescale_id(self):
        """
        Set all timescales to 0
        :return:
        """
        timescale_dict = self.variables['id_timescale']
        self.variables['id_timescale'] = dict.fromkeys(timescale_dict, 0)

    def get_ordered_timescales(self):
        scale_start_1, scale_start_2 = self.variables['Raw_Data_Start_Time']
        scale_end_1, scale_end_2 = self.variables['Raw_Data_Stop_Time']

        if scale_start_1[0] > scale_start_2[0]:
            scale_start_1, scale_start_2 = scale_start_2, scale_start_1

        if scale_end_1[0] > scale_end_2[0]:
            scale_end_1, scale_end_2 = scale_end_2, scale_end_1

        return (scale_start_1, scale_start_2), (scale_end_1, scale_end_2)

    def add_fake_measurements(self, no_profiles, variation=0.1):
        """
        Add a number of fake measurements. This is done to allow testing with single analog profiles.

        Adds a predefined variation in each new profile.
        """
        duration = self.info['duration']
        for channel_name, channel in self.channels.items():
            base_time = channel.data.keys()[0]
            base_data = channel.data[base_time]

            for n in range(no_profiles):
                random_variation = base_data * (np.random.rand(len(base_data)) * 2 - 1) * variation

                new_time = base_time + n * duration
                new_data = channel.data[base_time].copy() + random_variation
                if 'ph' in channel_name:
                    new_data = new_data.astype('int')
                channel.data[new_time] = new_data

        self.update()

    def subset_photoncounting(self):
        """
        Subset photoncounting channels.
        """
        ph_channels = [channel for channel in self.channels.keys() if 'ph' in channel]
        new_measurement = self.subset_by_channels(ph_channels)
        return new_measurement

    def _get_scc_channel_variables(self):
        """
        Get a list of variables to put in the SCC.

        It can be overridden e.g. in the depolarization product class.

        Returns
        -------

        channel_variables: dict
           A dictionary with channel variable specifications.
        """
        channel_variables = \
            {'Background_Low': (('channels',), 'd'),
             'Background_High': (('channels',), 'd'),
             'LR_Input': (('channels',), 'i'),
             'DAQ_Range': (('channels',), 'd'),
             'Pol_Calib_Range_Min': (('channels',), 'd'),
             'Pol_Calib_Range_Max': (('channels',), 'd'),
             }
        return channel_variables

    class UnequalMeasurementLengthError(RuntimeError):
        """ Raised when the plus and minus files have different length.
        """
        pass

    class WrongNumberOfTimescalesError(RuntimeError):
        """ Raised when timescales are not two.
        """
        pass

mercurial