atmospheric_lidar/scripts/licel2qa.py

Thu, 13 Sep 2018 13:59:32 +0300

author
Iannis B <ioannis@inoe.ro>
date
Thu, 13 Sep 2018 13:59:32 +0300
changeset 153
24ce9e10906c
permissions
-rw-r--r--

Start of telecover conversion function (just for merging).

""" Command line tool to convert Licel binary files to EARLINET telecover files.
"""
import argparse
import glob
import importlib
import logging
import os
import sys

from matplotlib import pyplot as plt
import yaml

from ..licel import LicelLidarMeasurement
from ..__init__ import __version__

logger = logging.getLogger(__name__)


class TelecoverMeasurement(LicelLidarMeasurement):

    def __init__(self, file_list, use_id_as_name, licel_timezone, telecover_settings):
        self.telecover_settings = telecover_settings
        super(TelecoverMeasurement, self).__init__(file_list, use_id_as_name, licel_timezone=licel_timezone)


def create_custom_class(custom_netcdf_parameter_path, use_id_as_name=False, temperature=25., pressure=1020.,
                        licel_timezone='UTC'):
    """ This funtion creates a custom LicelLidarMeasurement subclass,
    based on the input provided by the users.

    Parameters
    ----------
    custom_netcdf_parameter_path : str
       The path to the custom channels parameters.
    use_id_as_name : bool
       Defines if channels names are descriptive or transient digitizer IDs.
    temperature : float
       The ground temperature in degrees C (default 25.0).
    pressure : float
       The ground pressure in hPa (default: 1020.0).
    licel_timezone : str
       String describing the timezone according to the tz database.

    Returns
    -------
    CustomLidarMeasurement:
       A custom sub-class of LicelLidarMeasurement
    """
    logger.debug('Reading parameter files: %s' % custom_netcdf_parameter_path)
    custom_netcdf_parameters = read_settings_file(custom_netcdf_parameter_path)

    class CustomLidarMeasurement(LicelLidarMeasurement):
        extra_netcdf_parameters = custom_netcdf_parameters

        def __init__(self, file_list=None):
            super(CustomLidarMeasurement, self).__init__(file_list, use_id_as_name, licel_timezone=licel_timezone)

        def set_PT(self):
            ''' Sets the pressure and temperature at station level. This is used if molecular_calc parameter is
            set to 0 (i.e. use US Standard atmosphere).

            The results are stored in the info dictionary.
            '''

            self.info['Temperature'] = temperature
            self.info['Pressure'] = pressure

    return CustomLidarMeasurement


def read_settings_file(settings_path):
    """ Read the settings file.

    The file should contain python code."""
    if not os.path.isfile(settings_path):
        logging.error("The provided settings path does not correspond to a file.")
        sys.exit(1)

    dirname, basename = os.path.split(settings_path)
    sys.path.append(dirname)

    module_name, _ = os.path.splitext(basename)
    settings = importlib.import_module(module_name)
    return settings


def read_cloudmask_settings_file(settings_file_path):
    """ Read the configuration file.

    The file should be in YAML syntax."""

    if not os.path.isfile(settings_file_path):
        logging.error("Wrong path for cloudmask settings file (%s)" % settings_file_path)
        sys.exit(1)

    with open(settings_file_path) as yaml_file:
        try:
            settings = yaml.load(yaml_file)
            logging.debug("Read cloudmask settings file(%s)" % settings_file_path)
        except:
            logging.error("Could not parse YAML file (%s)" % settings_file_path)
            sys.exit(1)

    return settings


def get_cloud_free_files(LidarMeasurementClass, files, settings):
    """ Find cloud free periods in the given files.

    Depending on the provided settings, it could create plots of cloud mask and
    selected cloud-free periods.

    Parameters
    ----------
    LidarMeasurementClass : class
       Class used to read the files.
    files : list
       A list of raw licel file paths.
    settings : dict
       A dictionary of cloud masking settings.

    Returns
    -------
    file_list : list of lists
       A list of lists containing paths to cloud-free files.
    """
    logger.warning("Starting cloud mask procedure. This is an experimental feature.")

    try:
        from cloudmask import cloudmask  # Import here until we setup a proper installation procedure
    except ImportError:
        logger.error("Cloud mask module could not be loaded. Please install manually.")
        sys.exit(1)

    measurement = LidarMeasurementClass(files)
    channel = measurement.channels[settings['channel']]
    cloud_mask = cloudmask.CloudMaskRaw(channel)

    idxs = cloud_mask.cloud_free_periods(settings['cloudfree_period_min'],
                                         settings['file_duration_max'],
                                         settings['max_cloud_height'])

    logger.debug('Cloud free indices: {0}'.format(idxs))

    if len(idxs) == 0:  # If no cloud-free period found
        logger.info('No cloud free period found. Nothing converted.')
        sys.exit(1)

    logger.info("{0} cloud free period(s) found.".format(len(idxs)))

    if settings['plot']:
        # Plot cloud free periods
        cloudfree_filename = "cloudfree_{0}_{1}_{2}.png".format(channel.wavelength,
                                                             channel.start_time.strftime('%Y%m%d_%H%M%S'),
                                                             channel.stop_time.strftime('%Y%m%d_%H%M%S'))
        cloudfree_path = os.path.join(settings['plot_directory'], cloudfree_filename)
        fig, _ = cloud_mask.plot_cloudfree(idxs)

        plt.savefig(cloudfree_path)
        plt.close()

        # Plot cloud mask
        cloudmask_filename = "cloudmask_{0}_{1}_{2}.png".format(channel.wavelength,
                                                                channel.start_time.strftime('%Y%m%d_%H%M%S'),
                                                                channel.stop_time.strftime('%Y%m%d_%H%M%S'))
        cloudmask_path = os.path.join(settings['plot_directory'], cloudmask_filename)

        fig, _ = cloud_mask.plot_mask()

        plt.savefig(cloudmask_path)
        plt.close()

    file_list = []
    for idx_min, idx_max in idxs:
        current_files = measurement.files[idx_min:idx_max]
        file_list.append(current_files)

    return file_list


def get_corrected_measurement_id(args, n):
    """ Correct the provided measurement id, in case of multiple cloud-free periods. """
    if args.measurement_id is not None:
        order = int(args.measurement_id[-2:])
        new_no = order + n
        measurement_id = args.measurement_id[:-2] + str(new_no)
        measurement_no = args.measurement_number  # The same
    else:
        measurement_no = str(int(args.measurement_number) + n).zfill(2)
        measurement_id = None

    return measurement_id, measurement_no


def convert_to_scc(CustomLidarMeasurement, files, dark_pattern, measurement_id, measurement_number):
    """ Convert files to SCC. """
    measurement = CustomLidarMeasurement(files)
    # Get a list of files containing dark measurements
    if dark_pattern != "":
        dark_files = glob.glob(dark_pattern)

        if dark_files:
            logger.debug("Using %s as dark measurements files!" % ', '.join(dark_files))
            measurement.dark_measurement = CustomLidarMeasurement(dark_files)
        else:
            logger.warning(
                'No dark measurement files found when searching for %s. Will not use any dark measurements.' % dark_pattern)
    try:
        measurement = measurement.subset_by_scc_channels()
    except ValueError as err:
        logger.error(err)
        sys.exit(1)

    # Save the netcdf
    logger.info("Saving netcdf")
    measurement.set_measurement_id(measurement_id, measurement_number)
    measurement.save_as_SCC_netcdf()
    logger.info("Created file %s" % measurement.scc_filename)


def main():
    # Define the command line argument
    parser = argparse.ArgumentParser(
        description="A program to convert Licel binary files to EARLIENT telecover ASCII format")
    parser.add_argument("settings_file", help="The path to a parameter YAML.")
    parser.add_argument("files",
                        help="Location of licel files. Use relative path and filename wildcards. (default './*.*')",
                        default="./*.*")
    parser.add_argument("-i", '--id_as_name',
                        help="Use transient digitizer ids as channel names, instead of descriptive names",
                        action="store_true")
    parser.add_argument("-t", "--temperature", type=float,
                        help="The temperature (in C) at lidar level, required if using US Standard atmosphere",
                        default="25")
    parser.add_argument("-p", "--pressure", type=float,
                        help="The pressure (in hPa) at lidar level, required if using US Standard atmosphere",
                        default="1020")
    parser.add_argument('-D', '--dark_measurements',
                        help="Location of files containing dark measurements. Use relative path and filename wildcars, see 'files' parameter for example.",
                        default="", dest="dark_files"
                        )
    parser.add_argument('--licel_timezone', help="String describing the timezone according to the tz database.",
                        default="UTC", dest="licel_timezone",
                        )

    # Verbosity settings from http://stackoverflow.com/a/20663028
    parser.add_argument('-d', '--debug', help="Print dubuging information.", action="store_const",
                        dest="loglevel", const=logging.DEBUG, default=logging.INFO,
                        )
    parser.add_argument('-s', '--silent', help="Show only warning and error messages.", action="store_const",
                        dest="loglevel", const=logging.WARNING
                        )
    parser.add_argument('--version', help="Show current version.", action='store_true')

    args = parser.parse_args()

    # Get the logger with the appropriate level
    logging.basicConfig(format='%(levelname)s: %(message)s', level=args.loglevel)
    logger = logging.getLogger(__name__)

    # Check for version
    if args.version:
        print("Version: %s" % __version__)
        sys.exit(0)

    # Get a list of files to process
    files = glob.glob(args.files)

    # If not files found, exit
    if len(files) == 0:
        logger.error("No files found when searching for %s." % args.files)
        sys.exit(1)

    # If everything OK, proceed
    logger.info("Found {0} files matching {1}".format(len(files), os.path.abspath(args.files)))

    CustomLidarMeasurement = create_custom_class(args.parameter_file, args.id_as_name, args.temperature,
                                                 args.pressure, args.licel_timezone)

    convert_to_telecover(CustomLidarMeasurement, files, args.dark_files, args.measurement_id, args.measurement_number)

mercurial