atmospheric_lidar/licelv2.py

Fri, 14 Feb 2020 12:38:45 +0200

author
Ioannis <ioannis@inoe.ro>
date
Fri, 14 Feb 2020 12:38:45 +0200
changeset 186
97cabcc90f93
parent 181
12c895b99a3c
child 192
a73d587fd84c
permissions
-rw-r--r--

Support of naming channels by order. In principle should not be required.

import datetime
import logging
import copy
import os

import numpy as np
import pytz

from .licel import LicelFile, LicelChannelData, LicelChannel, LicelLidarMeasurement, PhotodiodeChannel
from .diva import DivaConverterMixin

logger = logging.getLogger(__name__)

c = 299792458.0  # Speed of light


class LicelChannelDataV2(LicelChannelData):
    """ A class representing a single channel found in a single Licel file."""

    def __init__(self, raw_info, raw_data, duration):
        """
        This is run when creating a new object.

        Parameters
        ----------
        raw_info : dict
           A dictionary containing raw channel information.
        raw_data : dict
           An array with raw channel data.
        duration : float
           Duration of the file, in seconds
        """
        super(LicelChannelDataV2, self).__init__(raw_info=raw_info,
                         raw_data=raw_data,
                         duration=duration,
                         use_id_as_name=False)  # Obsolete

    def _assign_properties(self):
        """ Assign properties """
        laser_polarizations = {0: "none",
                               1: "vertical",
                               2: "horizontal",
                               3: "right circular",
                               4: "left circular"}

        self.laser_polarization_int = int(self.raw_info['laser_polarization'])
        self.laser_polarization_str = laser_polarizations[self.laser_polarization_int]

        self.bin_shift = self.raw_info['bin_shift']
        self.bin_shift_decimal_places = self.raw_info['bin_shift_decimal_places']
        super(LicelChannelDataV2, self)._assign_properties()

    @property
    def is_photodiode(self):
        return self.id[0:2] == 'PD'

    @property
    def is_powermeter(self):
        return self.id[0:2] == 'PM'

    @property
    def is_standard_deviation(self):
        return self.id[0:2] == 'S2'

    @property
    def channel_name(self):
        """
        Construct the channel name adding analog photon info to avoid duplicates

        If use_id_as_name is True, the channel name will be the transient digitizer ID (e.g. BT01).
        This could be useful if the lidar system has multiple telescopes, so the descriptive name is
        not unique.

        Returns
        -------
        channel_name : str
           The channel name
        """
        return "{0.id}_L{0.laser_used}_P{0.laser_polarization_int}".format(self)

    def calculate_physical(self):
        """ Calculate physically-meaningful data from raw channel data:

        * In case of analog signals, the data are converted to mV.
        * In case of photon counting signals, data are stored as number of photons.

        In addition, some ancillary variables are also calculated (z, dz, number_of_bins).
        """
        data = self.raw_data

        norm = data / float(self.number_of_shots)
        dz = self.bin_width

        if self.is_analog:
            # If the channel is in analog mode
            ADCrange = self.discriminator  # Discriminator value already in mV

            if self.is_photodiode and (self.adcbits == 0):
                logger.info("Assuming adcbits equal 1. This is a bug in current licel format when storing photodiode data.")
                channel_data = norm * ADCrange / (2 ** self.adcbits)
            else:
                channel_data = norm * ADCrange / ((2 ** self.adcbits) - 1)  # Licel LabView code has a bug (does not account -1).

        else:
             channel_data = norm * self.number_of_shots

        # Calculate Z
        self.z = np.array([dz * bin_number + dz / 2.0 for bin_number in range(self.data_points)])
        self.dz = dz
        self.data = channel_data

    @property
    def is_analog(self):
        return self.analog_photon == '0'

    @property
    def is_photon(self):
        return self.analog_photon == '1'


class LicelFileV2(LicelFile):
    """ A class representing a single binary Licel file. """

    licel_file_header_format = ['filename',
                                'start_date start_time end_date end_time altitude longitude latitude zenith_angle azimuth_angle custom_field',
                                # Appart from Site that is read manually
                                'LS1 rate_1 LS2 rate_2 number_of_datasets LS3 rate_3 0000000 0000 controller_timestamp' ]
    licel_file_channel_format = 'active analog_photon laser_used number_of_datapoints laser_polarization HV bin_width wavelength d2 d3 bin_shift bin_shift_decimal_places ADCbits number_of_shots discriminator ID'

    channel_data_class = LicelChannelDataV2

    # If True, it corrects the old Raymetrics convention of zenith angle definition (zenith = -90 degrees)
    fix_zenith_angle = False

    def __init__(self, file_path, licel_timezone="UTC", get_name_by_order=False, import_now=True):
        """
        This is run when creating a new object.

        Parameters
        ----------
        file_path : str
           The path to the Licel file.
        licel_timezone : str
           The timezone of dates found in the Licel files. Should match the available
           timezones in the TZ database.
        import_now : bool
           If True, the header and data are read immediately. If not, the user has to call the
           corresponding methods directly. This is used to speed up reading files when only
           header information are required.
        """
        super(LicelFileV2, self).__init__(file_path=file_path,
                         use_id_as_name=False,  # Obsolete
                         get_name_by_order=get_name_by_order,
                         licel_timezone=licel_timezone,
                         import_now=import_now)

    def _assign_properties(self):
        """ Assign properties from the raw_info dictionary. """
        self.number_of_datasets = int(self.raw_info['number_of_datasets'])
        self.altitude = float(self.raw_info['altitude'])
        self.longitude = float(self.raw_info['longitude'])
        self.latitude = float(self.raw_info['latitude'])

        self.zenith_angle_raw = float(self.raw_info['zenith_angle'])
        logger.debug('Fix zenith angle? %s' % self.fix_zenith_angle)

        if self.fix_zenith_angle:
            logger.debug('Fixing zenith angle.')
            self.zenith_angle = self._correct_zenith_angle(self.zenith_angle_raw)
        else:
            self.zenith_angle = self.zenith_angle_raw

        self.azimuth_angle = float(self.raw_info['azimuth_angle'])

        if "custom_field" in self.raw_info.keys():
            self.custom_field = self.raw_info['custom_field'].strip('"')

        self.laser1_shots = self.raw_info['LS1']
        self.laser1_reprate = self.raw_info['rate_1']
        self.laser2_shots = self.raw_info['LS2']
        self.laser2_reprate = self.raw_info['rate_2']
        self.laser3_shots = self.raw_info['LS3']
        self.laser3_reprate = self.raw_info['rate_3']

        if 'controller_timestamp' in self.raw_info.keys():
            self.controller_timestamp = self.raw_info['controller_timestamp']

    def import_file(self):
        """ Read the header info and data of the Licel file.
        """
        channels = {}
        photodiodes = {}
        powermeters = {}
        standard_deviations = {}

        with open(self.file_path, 'rb') as f:

            self.read_header(f)

            # Check the complete header is read
            f.readline()

            # Import the data
            for current_channel_info in self.channel_info:
                raw_data = np.fromfile(f, 'i4', int(current_channel_info['number_of_datapoints']))
                a = np.fromfile(f, 'b', 1)
                b = np.fromfile(f, 'b', 1)

                if (a[0] != 13) | (b[0] != 10):
                    logger.warning("No end of line found after record. File could be corrupt: %s" % self.file_path)
                    logger.warning('a: {0}, b: {1}.'.format(a, b))

                channel = self.channel_data_class(current_channel_info, raw_data, self.duration())

                # Assign the channel either as normal channel or photodiode
                if channel.is_photodiode:
                    if channel.channel_name in photodiodes.keys():
                        # Check if current naming convention produces unique files
                        raise IOError('Trying to import two photodiodes with the same name')
                    photodiodes[channel.channel_name] = channel
                elif channel.is_powermeter:
                    if channel.channel_name in powermeters.keys():
                        # Check if current naming convention produces unique files
                        raise IOError('Trying to import two powermeters with the same name')
                    powermeters[channel.channel_name] = channel
                elif channel.is_standard_deviation:
                    if channel.channel_name in channels.keys():
                        # Check if current naming convention does not produce unique files
                        raise IOError('Trying to import two standard deviations with the same name')
                    standard_deviations[channel.channel_name] = channel
                else:
                    if channel.channel_name in channels.keys():
                        # Check if current naming convention does not produce unique files
                        raise IOError('Trying to import two channels with the same name')
                    channels[channel.channel_name] = channel

        self.channels = channels
        self.photodiodes = photodiodes
        self.powermeters = powermeters
        self.standard_deviations = standard_deviations

        self._calculate_physical()

    def _calculate_physical(self):
        """ Calculate physical quantities from raw data for all channels in the file. """
        for channel in self.channels.values():
            channel.calculate_physical()

        for photodiode in self.photodiodes.values():
            photodiode.calculate_physical()

        for powermeter in self.powermeters.values():
            powermeter.calculate_physical()

        for standard_deviations in self.standard_deviations.values():
            standard_deviations.calculate_physical()

    @property
    def has_powermeter(self):
        return len(self.powermeters) != 0

    @property
    def has_standard_deviations(self):
        return len(self.standard_deviations) != 0


class LicelV2Channel(LicelChannel):

    def __init__(self):

        self.laser_polarization_int = None
        self.laser_polarization_str = None
        self.bin_shift = None
        self.bin_shift_decimal_places = None

        self.zenith_angles = []
        self.azimuth_angles = []

        super(LicelV2Channel, self).__init__()

    def append_file(self, current_file, file_channel):
        """ Append file to the current object """

        super(LicelV2Channel, self).append_file(current_file, file_channel)
        self.zenith_angles.append(current_file.zenith_angle)
        self.azimuth_angles.append(current_file.azimuth_angle)

    def _assign_properties(self, current_file, file_channel):

        self._assign_unique_property('laser_polarization_int', file_channel.laser_polarization_int)
        self._assign_unique_property('laser_polarization_str', file_channel.laser_polarization_str)
        self._assign_unique_property('bin_shift', file_channel.bin_shift)
        self._assign_unique_property('bin_shift_decimal_places', file_channel.bin_shift_decimal_places)

        super(LicelV2Channel, self)._assign_properties(current_file, file_channel)

    @property
    def is_analog(self):
        return self.analog_photon_string == 'an'

    @property
    def is_photon_counting(self):
        return self.analog_photon_string == 'ph'

    def __unicode__(self):
        return "<Licel channel: %s>" % self.name

    def __str__(self):
        return str(self).encode('utf-8')


class LicelLidarMeasurementV2(LicelLidarMeasurement):

    file_class = LicelFileV2
    channel_class = LicelV2Channel
    photodiode_class = PhotodiodeChannel
    powermeter_class = PhotodiodeChannel
    standard_deviation_class = LicelV2Channel

    def __init__(self, file_list=None, get_name_by_order=False, licel_timezone='UTC'):

        self.standard_deviations = {}
        self.powermeters = {}

        super(LicelLidarMeasurementV2, self).__init__(file_list=file_list,
                                                      get_name_by_order=get_name_by_order,
                                                      use_id_as_name=False,
                                                      licel_timezone=licel_timezone)

    def _create_or_append_channel(self, current_file):

        for channel_name, channel in current_file.channels.items():
            if channel_name not in self.channels:
                self.channels[channel_name] = self.channel_class()
            self.channels[channel_name].append_file(current_file, channel)

        for photodiode_name, photodiode in current_file.photodiodes.items():
            if photodiode_name not in self.photodiodes:
                self.photodiodes[photodiode_name] = self.photodiode_class()
            self.photodiodes[photodiode_name].append_file(current_file, photodiode)

        for powermeter_name, powermeter in current_file.powermeters.items():
            if powermeter_name not in self.powermeters:
                self.powermeters[powermeter_name] = self.powermeter_class()
            self.powermeters[powermeter_name].append_file(current_file, powermeter)

        for std_deviation_name, std_deviation in current_file.standard_deviations.items():
            if std_deviation_name not in self.photodiodes:
                self.standard_deviations[std_deviation_name] = self.standard_deviation_class()
            self.standard_deviations[std_deviation_name].append_file(current_file, std_deviation)

    def _import_file(self, filename):

        if filename in self.files:
            logger.warning("File has been imported already: %s" % filename)
        else:
            logger.debug('Importing file {0}'.format(filename))
            current_file = self.file_class(filename, licel_timezone=self.licel_timezone)
            self.raw_info[current_file.file_path] = current_file.raw_info
            self.durations[current_file.file_path] = current_file.duration()

            file_laser_shots = []

            self._create_or_append_channel(current_file)

            self.laser_shots.append(file_laser_shots)
            self.files.append(current_file.file_path)

mercurial