scc_access/scc_access.py

Sat, 09 Jan 2021 15:11:15 +0200

author
ioannis@ioannis-VirtualBox
date
Sat, 09 Jan 2021 15:11:15 +0200
changeset 43
0151be380f3c
parent 16
ddaea4327bd5
child 51
a4ca2b6d67f5
permissions
-rw-r--r--

Multiple changes, based on wrong initial state.

import sys

import requests

# Python 2 and 3 support
try:
    import urllib.parse as urlparse  # Python 3
except ImportError:
    from urlparse import urlparse  # Python 2

import argparse
import os
import re
import time
from io import BytesIO
from zipfile import ZipFile
import datetime
import logging
import yaml

import netCDF4 as netcdf

requests.packages.urllib3.disable_warnings()

logger = logging.getLogger(__name__)

# The regex to find the measurement id from the measurement page
# This should be read from the uploaded file, but would require an extra NetCDF module.
regex = "<h3>Measurement (?P<measurement_id>.{12}) <small>"


class SCC:
    """ A simple class that will attempt to upload a file on the SCC server.

    The uploading is done by simulating a normal browser session. In the current
    version no check is performed, and no feedback is given if the upload 
    was successful. If everything is setup correctly, it will work. 
    """

    def __init__(self, auth, output_dir, base_url):
        self.auth = auth
        self.output_dir = output_dir
        self.base_url = base_url
        self.session = requests.Session()

        # Construct the absolute URLs
        self.login_url = urlparse.urljoin(self.base_url, 'accounts/login/')
        self.upload_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/quick/')
        self.download_hirelpp_pattern = urlparse.urljoin(self.base_url,
                                                         'data_processing/measurements/{0}/download-hirelpp/')
        self.download_cloudmask_pattern = urlparse.urljoin(self.base_url,
                                                           'data_processing/measurements/{0}/download-cloudmask/')
        self.download_elpp_pattern = urlparse.urljoin(self.base_url,
                                                      'data_processing/measurements/{0}/download-preprocessed/')
        self.download_elda_pattern = urlparse.urljoin(self.base_url,
                                                      'data_processing/measurements/{0}/download-optical/')
        self.download_plot_pattern = urlparse.urljoin(self.base_url,
                                                      'data_processing/measurements/{0}/download-plots/')
        self.download_elic_pattern = urlparse.urljoin(self.base_url,
                                                      'data_processing/measurements/{0}/download-elic/')
        self.delete_measurement_pattern = urlparse.urljoin(self.base_url, 'admin/database/measurements/{0}/delete/')
        self.api_base_url = urlparse.urljoin(self.base_url, 'api/v1/')

        self.login_credentials = None

    def login(self, credentials):
        """ Login the the website. """
        logger.debug("Attempting to login to SCC, username %s." % credentials[0])
        self.login_credentials = {'username': credentials[0],
                                  'password': credentials[1]}

        logger.debug("Accessing login page at %s." % self.login_url)

        # Get upload form
        login_page = self.session.get(self.login_url, auth=self.auth, verify=False)

        if login_page.status_code != 200:
            logger.error('Could not access login pages. Status code %s' % login_page.status_code)
            sys.exit(1)

        logger.debug("Submiting credentials.")

        # Submit the login data
        login_submit = self.session.post(self.login_url,
                                         data=self.login_credentials,
                                         headers={'X-CSRFToken': login_page.cookies['csrftoken'],
                                                  'referer': self.login_url},
                                         verify=False,
                                         auth=self.auth)
        return login_submit

    def logout(self):
        pass

    def upload_file(self, filename, system_id, force_upload, delete_related):
        """ Upload a filename for processing with a specific system. If the 
        upload is successful, it returns the measurement id. """

        measurement_id = self.measurement_id_from_file(filename)

        logger.debug('Checking if a measurement with the same id already exists on the SCC server.')
        existing_measurement = self.get_measurement(measurement_id)

        if existing_measurement:
            if force_upload:
                logger.info(
                    "Measurement with id {} already exists on the SCC. Trying to delete it...".format(measurement_id))
                self.delete_measurement(measurement_id, delete_related)
            else:
                logger.error(
                    "Measurement with id {} already exists on the SCC. Use --force_upload flag to overwrite it.".format(
                        measurement_id))
                sys.exit(1)

        # Get submit page
        upload_page = self.session.get(self.upload_url,
                                       auth=self.auth,
                                       verify=False)

        # Submit the data
        upload_data = {'system': system_id}
        files = {'data': open(filename, 'rb')}

        logger.info("Uploading of file %s started." % filename)
        logger.debug("URL: {0}, data: {1}, 'X-CSRFToken': {2}".format(self.upload_url,
                                                                      upload_data,
                                                                      upload_page.cookies['csrftoken']))
        upload_submit = self.session.post(self.upload_url,
                                          data=upload_data,
                                          files=files,
                                          headers={'X-CSRFToken': upload_page.cookies['csrftoken'],
                                                   'referer': self.upload_url, },
                                          verify=False,
                                          auth=self.auth)

        if upload_submit.status_code != 200:
            logger.warning("Connection error. Status code: %s" % upload_submit.status_code)
            return False

        # Check if there was a redirect to a new page.
        if upload_submit.url == self.upload_url:
            measurement_id = False
            logger.error("Uploaded file rejected! Try to upload manually to see the error.")
        else:
            measurement_id = re.findall(regex, upload_submit.text)[0]
            logger.info("Successfully uploaded measurement with id %s." % measurement_id)

        return measurement_id

    @staticmethod
    def measurement_id_from_file(filename):
        """ Get the measurement id from the input file. """

        if not os.path.isfile(filename):
            logger.error("File {} does not exist.".format(filename))
            sys.exit(1)

        with netcdf.Dataset(filename) as f:
            try:
                measurement_id = f.Measurement_ID
            except AttributeError:
                logger.error(
                    "Input file {} does not contain a Measurement_ID global attribute. Wrong file format?".format(
                        filename))
                sys.exit(1)

        return measurement_id

    def download_files(self, measurement_id, subdir, download_url):
        """ Downloads some files from the download_url to the specified
        subdir. This method is used to download preprocessed file, optical 
        files etc.
        """
        # Get the file
        request = self.session.get(download_url, auth=self.auth,
                                   verify=False,
                                   stream=True)

        # Create the dir if it does not exist
        local_dir = os.path.join(self.output_dir, measurement_id, subdir)
        if not os.path.exists(local_dir):
            os.makedirs(local_dir)

        # Save the file by chunk, needed if the file is big.
        memory_file = BytesIO()

        for chunk in request.iter_content(chunk_size=1024):
            if chunk:  # filter out keep-alive new chunks
                memory_file.write(chunk)
                memory_file.flush()

        zip_file = ZipFile(memory_file)

        for ziped_name in zip_file.namelist():
            basename = os.path.basename(ziped_name)

            local_file = os.path.join(local_dir, basename)

            with open(local_file, 'wb') as f:
                f.write(zip_file.read(ziped_name))

    def download_hirelpp(self, measurement_id):
        """ Download hirelpp files for the measurement id. """
        # Construct the download url
        download_url = self.download_hirelpp_pattern.format(measurement_id)
        try:
            self.download_files(measurement_id, 'scc_hirelpp', download_url)
        except Exception as e:
            logger.error("Could not download HiRElPP files. Error message: {}".format(e))
            logger.debug('Download exception:', exc_info=True)

    def download_cloudmask(self, measurement_id):
        """ Download cloudmask files for the measurement id. """
        # Construct the download url
        download_url = self.download_cloudmask_pattern.format(measurement_id)
        try:
            self.download_files(measurement_id, 'scc_cloudscreen', download_url)
        except Exception as e:
            logger.error("Could not download cloudscreen files. Error message: {}".format(e))
            logger.debug('Download exception:', exc_info=True)

    def download_elpp(self, measurement_id):
        """ Download preprocessed files for the measurement id. """
        # Construct the download url
        download_url = self.download_elpp_pattern.format(measurement_id)
        try:
            self.download_files(measurement_id, 'scc_preprocessed', download_url)
        except Exception as e:
            logger.error("Could not download ElPP files. Error message: {}".format(e))
            logger.debug('Download exception:', exc_info=True)

    def download_elda(self, measurement_id):
        """ Download optical files for the measurement id. """
        # Construct the download url
        download_url = self.download_elda_pattern.format(measurement_id)
        try:
            self.download_files(measurement_id, 'scc_optical', download_url)
        except Exception as e:
            logger.error("Could not download ELDA files. Error message: {}".format(e))
            logger.debug('Download exception:', exc_info=True)

    def download_plots(self, measurement_id):
        """ Download profile graphs for the measurement id. """
        # Construct the download url
        download_url = self.download_plot_pattern.format(measurement_id)
        try:
            self.download_files(measurement_id, 'scc_plots', download_url)
        except Exception as e:
            logger.error("Could not download ELDA plots. Error message: {}".format(e))
            logger.debug('Download exception:', exc_info=True)

    def download_elic(self, measurement_id):
        """ Download ELIC files for the measurement id. """
        # Construct the download url
        download_url = self.download_elic_pattern.format(measurement_id)
        try:
            self.download_files(measurement_id, 'scc_elic', download_url)
        except Exception as e:
            logger.error("Could not download ELIC files. Error message: {}".format(e))
            logger.debug('Download exception:', exc_info=True)

    def download_eldec(self, measurement_id):
        """ Download ELDEC files for the measurement id. """
        # Construct the download url
        download_url = self.download_elda_pattern.format(measurement_id)  # ELDA patter is used for now
        try:
            self.download_files(measurement_id, 'scc_eldec', download_url)
        except Exception as e:
            logger.error("Could not download EDELC files. Error message: {}".format(e))
            logger.debug('Download exception:', exc_info=True)

    def rerun_elpp(self, measurement_id, monitor=True):
        measurement = self.get_measurement(measurement_id)

        if measurement:
            request = self.session.get(measurement.rerun_elpp_url, auth=self.auth,
                                       verify=False,
                                       stream=True)

            if request.status_code != 200:
                logger.error(
                    "Could not rerun ELPP for %s. Status code: %s" % (measurement_id, request.status_code))
                return

            if monitor:
                self.monitor_processing(measurement_id)

    def rerun_all(self, measurement_id, monitor=True):
        logger.debug("Started rerun_all procedure.")

        logger.debug("Getting measurement %s" % measurement_id)
        measurement = self.get_measurement(measurement_id)

        if measurement:
            logger.debug("Attempting to rerun all processing through %s." % measurement.rerun_all_url)

            request = self.session.get(measurement.rerun_all_url, auth=self.auth,
                                       verify=False,
                                       stream=True)

            if request.status_code != 200:
                logger.error("Could not rerun pre processing for %s. Status code: %s" %
                             (measurement_id, request.status_code))
                return

            if monitor:
                self.monitor_processing(measurement_id)

    def process(self, filename, system_id, force_upload, delete_related):
        """ Upload a file for processing and wait for the processing to finish.
        If the processing is successful, it will download all produced files.
        """
        logger.info("--- Processing started on %s. ---" % datetime.datetime.now())

        # Upload file
        measurement_id = self.upload_file(filename, system_id, force_upload, delete_related)

        measurement = self.monitor_processing(measurement_id)
        return measurement

    def monitor_processing(self, measurement_id):
        """ Monitor the processing progress of a measurement id"""

        measurement = self.get_measurement(measurement_id)
        if measurement is not None:
            while measurement.is_running:
                logger.info("Measurement is being processed (status: {}, {}, {}, {}, {}, {}). Please wait.".format(
                    measurement.upload,
                    measurement.hirelpp,
                    measurement.cloudmask,
                    measurement.elpp,
                    measurement.elda,
                    measurement.elic))
                time.sleep(10)
                measurement = self.get_measurement(measurement_id)
            logger.info("Measurement processing finished (status: {}, {}, {}, {}, {}, {}). Please wait.".format(
                measurement.upload,
                measurement.hirelpp,
                measurement.cloudmask,
                measurement.elpp,
                measurement.elda,
                measurement.elic))
            if measurement.hirelpp == 127:
                logger.info("Downloading HiRElPP files.")
                self.download_hirelpp(measurement_id)
            if measurement.cloudmask == 127:
                logger.info("Downloading cloud screening files.")
                self.download_cloudmask(measurement_id)
            if measurement.elpp == 127:
                logger.info("Downloading ELPP files.")
                self.download_elpp(measurement_id)
            if measurement.elda == 127:
                logger.info("Downloading ELDA files.")
                self.download_elda(measurement_id)
                logger.info("Downloading graphs.")
                self.download_plots(measurement_id)
            if measurement.elic == 127:
                logger.info("Downloading ELIC files.")
                self.download_elic(measurement_id)

            # TODO: Need to check ELDEC code (when it becomes available in the API)
            if measurement.is_calibration:
                logger.info("Downloading ELDEC files.")
                self.download_eldec(measurement_id)

            logger.info("--- Processing finished. ---")
        return measurement

    def get_measurement(self, measurement_id):

        if measurement_id is None:
            return None

        measurement_url = urlparse.urljoin(self.api_base_url, 'measurements/%s/' % measurement_id)

        response = self.session.get(measurement_url,
                                    auth=self.auth,
                                    verify=False)

        response_dict = None
        if response.status_code == 200:
            response_dict = response.json()
        if response.status_code == 404:
            logger.info("No measurement with id %s found on the SCC." % measurement_id)
        elif response.status_code != 200:
            logger.error('Could not access API. Status code %s.' % response.status_code)
            sys.exit(1)

        logger.debug("Response dictionary: {}".format(response_dict))

        if response_dict:
            measurement = Measurement(self.base_url, response_dict)
            return measurement
        else:
            return None

    def delete_measurement(self, measurement_id, delete_related=False):
        """ Deletes a measurement with the provided measurement id. The user
        should have the appropriate permissions. 
        
        The procedures is performed directly through the web interface and
        NOT through the API.
        """
        # Get the measurement object
        measurement = self.get_measurement(measurement_id)

        # Check that it exists
        if measurement is None:
            logger.warning("Nothing to delete.")
            return None

        # Go the the page confirming the deletion
        delete_url = self.delete_measurement_pattern.format(measurement.id)

        logger.debug("Delete url: {}".format(delete_url))

        confirm_page = self.session.get(delete_url,
                                        auth=self.auth,
                                        verify=False)

        # Check that the page opened properly
        if confirm_page.status_code != 200:
            logger.warning("Could not open delete page. Status: {0}".format(confirm_page.status_code))
            return None

        # Get the delete related value
        if delete_related:
            delete_related_option = 'delete_related'
        else:
            delete_related_option = 'not_delete_related'

        # Delete the measurement
        delete_page = self.session.post(delete_url,
                                        auth=self.auth,
                                        verify=False,
                                        data={'post': 'yes',
                                              'select_delete_related_measurements': delete_related_option},
                                        headers={'X-CSRFToken': confirm_page.cookies['csrftoken'],
                                                 'referer': delete_url}
                                        )
        if delete_page.status_code != 200:
            logger.warning("Something went wrong. Delete page status: {0}".format(
                delete_page.status_code))
            return None

        logger.info("Deleted measurement {0}.".format(measurement_id))
        return True

    def available_measurements(self):
        """ Get a list of available measurement on the SCC. """
        measurement_url = urlparse.urljoin(self.api_base_url, 'measurements')
        response = self.session.get(measurement_url,
                                    auth=self.auth,
                                    verify=False)
        response_dict = response.json()

        if response_dict:
            measurement_list = response_dict['objects']
            measurements = [Measurement(self.base_url, measurement_dict) for measurement_dict in measurement_list]
            logger.info("Found %s measurements on the SCC." % len(measurements))
        else:
            measurements = None
            logger.warning("No response received from the SCC when asked for available measurements.")

        return measurements

    def measurement_id_for_date(self, t1, call_sign, base_number=0):
        """ Give the first available measurement id on the SCC for the specific
        date. 
        """
        date_str = t1.strftime('%Y%m%d')
        search_url = urlparse.urljoin(self.api_base_url, 'measurements/?id__startswith=%s' % date_str)

        response = self.session.get(search_url,
                                    auth=self.auth,
                                    verify=False)

        response_dict = response.json()

        measurement_id = None

        if response_dict:
            measurement_list = response_dict['objects']
            existing_ids = [measurement_dict['id'] for measurement_dict in measurement_list]

            measurement_number = base_number
            measurement_id = "%s%s%04i" % (date_str, call_sign, measurement_number)

            while measurement_id in existing_ids:
                measurement_number = measurement_number + 1
                measurement_id = "%s%s%04i" % (date_str, call_sign, measurement_number)
                if measurement_number == 1000:
                    raise ValueError('No available measurement id found.')

        return measurement_id

    def __enter__(self):
        return self

    def __exit__(self, *args):
        logger.debug("Closing SCC connection session.")
        self.session.close()


class Measurement:
    """ This class represents the measurement object as returned in the SCC API.
    """

    def __init__(self, base_url, dict_response):
        self.base_url = base_url

        # Define expected attributes to assist debuggin
        self.cloudmask = None
        self.elda = None
        self.elic = None
        self.elpp = None
        self.hirelpp = None
        self.id = None
        self.is_calibration = None
        self.is_running = None
        self.pre_processing_exit_code = None
        self.processing_exit_code = None
        self.resource_uri = None
        self.start = None
        self.stop = None
        self.system = None
        self.upload = None

        if dict_response:
            # Add the dictionary key value pairs as object properties
            for key, value in dict_response.items():
                setattr(self, key, value)
            self.exists = True
        else:
            self.exists = False

    @property
    def rerun_elda_url(self):
        url_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-elda/')
        return url_pattern.format(self.id)

    @property
    def rerun_elpp_url(self):
        url_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-elpp/')
        return url_pattern.format(self.id)

    @property
    def rerun_all_url(self):
        ulr_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-all/')
        return ulr_pattern.format(self.id)

    def __str__(self):
        return "Measurement {}".format(self.id)


def upload_file(filename, system_id, force_upload, delete_related, settings):
    """ Shortcut function to upload a file to the SCC. """
    logger.info("Uploading file %s, using system %s." % (filename, system_id))

    with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc:
        scc.login(settings['website_credentials'])
        measurement_id = scc.upload_file(filename, system_id, force_upload, delete_related)
        scc.logout()

    return measurement_id


def process_file(filename, system_id, force_upload, delete_related, settings):
    """ Shortcut function to process a file to the SCC. """
    logger.info("Processing file %s, using system %s." % (filename, system_id))

    with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc:
        scc.login(settings['website_credentials'])
        measurement = scc.process(filename, system_id, force_upload, delete_related)
        scc.logout()

    return measurement


def delete_measurement(measurement_id, settings, delete_related):
    """ Shortcut function to delete a measurement from the SCC. """
    logger.info("Deleting %s." % measurement_id)

    with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc:
        scc.login(settings['website_credentials'])
        scc.delete_measurement(measurement_id, delete_related)
        scc.logout()


def rerun_all(measurement_id, monitor, settings):
    """ Shortcut function to delete a measurement from the SCC. """
    logger.info("Rerunning all products for %s" % measurement_id)

    with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc:
        scc.login(settings['website_credentials'])
        scc.rerun_all(measurement_id, monitor)
        scc.logout()


def rerun_elpp(measurement_id, monitor, settings):
    """ Shortcut function to delete a measurement from the SCC. """
    logger.info("Rerunning (optical) processing for %s" % measurement_id)

    with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc:
        scc.login(settings['website_credentials'])
        scc.rerun_elpp(measurement_id, monitor)
        scc.logout()


def import_settings(config_file_path):
    """ Read the configuration file.

    The file should be in YAML syntax."""

    if not os.path.isfile(config_file_path):
        logger.error("Wrong path for configuration file (%s)" % config_file_path)
        sys.exit(1)

    with open(config_file_path) as yaml_file:
        try:
            settings = yaml.safe_load(yaml_file)
            logger.debug("Read settings file(%s)" % config_file_path)
        except Exception as e:
            logger.error("Could not parse YAML file (%s)" % config_file_path)
            logger.debug("Error message: {}".format(e))
            sys.exit(1)

    # YAML limitation: does not read tuples
    settings['basic_credentials'] = tuple(settings['basic_credentials'])
    settings['website_credentials'] = tuple(settings['website_credentials'])
    return settings


def main():
    # Define the command line arguments.
    parser = argparse.ArgumentParser()
    parser.add_argument("config", help="Path to configuration file")
    parser.add_argument("filename", nargs='?', help="Measurement file name or path.", default='')
    parser.add_argument("system", nargs='?', help="Processing system id.", default=0)
    parser.add_argument("-p", "--process", help="Wait for the results of the processing.",
                        action="store_true")
    parser.add_argument("--delete", help="Measurement ID to delete.")
    # parser.add_argument("--delete_related", help=
    #                     "Delete all related measurements. Use only if you know what you are doing!",
    #                     action="store_true")
    parser.add_argument("--force_upload", help="If measurement ID exists on SCC, delete before uploading.",
                        action="store_true")
    parser.add_argument("--rerun-all", help="Rerun all processing steps for the provided measurement ID.")
    parser.add_argument("--rerun-elpp", help="Rerun low-resolution processing steps for the provided measurement ID.")

    # Verbosity settings from http://stackoverflow.com/a/20663028
    parser.add_argument('-d', '--debug', help="Print debugging information.", action="store_const",
                        dest="loglevel", const=logging.DEBUG, default=logging.INFO,
                        )
    parser.add_argument('-s', '--silent', help="Show only warning and error messages.", action="store_const",
                        dest="loglevel", const=logging.WARNING
                        )

    args = parser.parse_args()

    # For now, don to allow to delete related measurements
    delete_related = False

    # Get the logger with the appropriate level
    logging.basicConfig(format='%(levelname)s: %(message)s', level=args.loglevel)

    settings = import_settings(args.config)

    # If the arguments are OK, try to log-in to SCC and upload.
    if args.delete:
        # If the delete is provided, do nothing else
        delete_measurement(args.delete, settings, delete_related)
    elif args.rerun_all:
        rerun_all(args.rerun_all, args.process, settings)
    elif args.rerun_elpp:
        rerun_elpp(args.rerun_elpp, args.process, settings)
    else:
        if (args.filename == '') or (args.system == 0):
            parser.error('Provide a valid filename and system parameters.\nRun with -h for help.\n')

        if args.process:
            process_file(args.filename, args.system, args.force_upload, delete_related, settings)
        else:
            upload_file(args.filename, args.system, args.force_upload, delete_related, settings)

mercurial