Sat, 09 Jan 2021 15:11:15 +0200
Multiple changes, based on wrong initial state.
scc_access/scc_access.py | file | annotate | diff | comparison | revisions |
--- a/scc_access/scc_access.py Sat Jan 09 15:10:30 2021 +0200 +++ b/scc_access/scc_access.py Sat Jan 09 15:11:15 2021 +0200 @@ -1,18 +1,26 @@ +import sys + import requests -requests.packages.urllib3.disable_warnings() -import sys -import urlparse +# Python 2 and 3 support +try: + import urllib.parse as urlparse # Python 3 +except ImportError: + from urlparse import urlparse # Python 2 + import argparse import os import re import time -import StringIO +from io import BytesIO from zipfile import ZipFile import datetime import logging import yaml +import netCDF4 as netcdf + +requests.packages.urllib3.disable_warnings() logger = logging.getLogger(__name__) @@ -34,19 +42,27 @@ self.output_dir = output_dir self.base_url = base_url self.session = requests.Session() - self.construct_urls() - def construct_urls(self): - """ Construct all URLs needed for processing. """ # Construct the absolute URLs self.login_url = urlparse.urljoin(self.base_url, 'accounts/login/') self.upload_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/quick/') - self.download_preprocessed_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-preprocessed/') - self.download_optical_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-optical/') - self.download_graph_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-plots/') + self.download_hirelpp_pattern = urlparse.urljoin(self.base_url, + 'data_processing/measurements/{0}/download-hirelpp/') + self.download_cloudmask_pattern = urlparse.urljoin(self.base_url, + 'data_processing/measurements/{0}/download-cloudmask/') + self.download_elpp_pattern = urlparse.urljoin(self.base_url, + 'data_processing/measurements/{0}/download-preprocessed/') + self.download_elda_pattern = urlparse.urljoin(self.base_url, + 'data_processing/measurements/{0}/download-optical/') + self.download_plot_pattern = urlparse.urljoin(self.base_url, + 'data_processing/measurements/{0}/download-plots/') + self.download_elic_pattern = urlparse.urljoin(self.base_url, + 'data_processing/measurements/{0}/download-elic/') self.delete_measurement_pattern = urlparse.urljoin(self.base_url, 'admin/database/measurements/{0}/delete/') self.api_base_url = urlparse.urljoin(self.base_url, 'api/v1/') + self.login_credentials = None + def login(self, credentials): """ Login the the website. """ logger.debug("Attempting to login to SCC, username %s." % credentials[0]) @@ -63,7 +79,7 @@ sys.exit(1) logger.debug("Submiting credentials.") - + # Submit the login data login_submit = self.session.post(self.login_url, data=self.login_credentials, @@ -76,9 +92,26 @@ def logout(self): pass - def upload_file(self, filename, system_id): + def upload_file(self, filename, system_id, force_upload, delete_related): """ Upload a filename for processing with a specific system. If the upload is successful, it returns the measurement id. """ + + measurement_id = self.measurement_id_from_file(filename) + + logger.debug('Checking if a measurement with the same id already exists on the SCC server.') + existing_measurement = self.get_measurement(measurement_id) + + if existing_measurement: + if force_upload: + logger.info( + "Measurement with id {} already exists on the SCC. Trying to delete it...".format(measurement_id)) + self.delete_measurement(measurement_id, delete_related) + else: + logger.error( + "Measurement with id {} already exists on the SCC. Use --force_upload flag to overwrite it.".format( + measurement_id)) + sys.exit(1) + # Get submit page upload_page = self.session.get(self.upload_url, auth=self.auth, @@ -89,12 +122,14 @@ files = {'data': open(filename, 'rb')} logger.info("Uploading of file %s started." % filename) - + logger.debug("URL: {0}, data: {1}, 'X-CSRFToken': {2}".format(self.upload_url, + upload_data, + upload_page.cookies['csrftoken'])) upload_submit = self.session.post(self.upload_url, data=upload_data, files=files, headers={'X-CSRFToken': upload_page.cookies['csrftoken'], - 'referer': self.upload_url,}, + 'referer': self.upload_url, }, verify=False, auth=self.auth) @@ -108,7 +143,26 @@ logger.error("Uploaded file rejected! Try to upload manually to see the error.") else: measurement_id = re.findall(regex, upload_submit.text)[0] - logger.error("Successfully uploaded measurement with id %s." % measurement_id) + logger.info("Successfully uploaded measurement with id %s." % measurement_id) + + return measurement_id + + @staticmethod + def measurement_id_from_file(filename): + """ Get the measurement id from the input file. """ + + if not os.path.isfile(filename): + logger.error("File {} does not exist.".format(filename)) + sys.exit(1) + + with netcdf.Dataset(filename) as f: + try: + measurement_id = f.Measurement_ID + except AttributeError: + logger.error( + "Input file {} does not contain a Measurement_ID global attribute. Wrong file format?".format( + filename)) + sys.exit(1) return measurement_id @@ -128,7 +182,7 @@ os.makedirs(local_dir) # Save the file by chunk, needed if the file is big. - memory_file = StringIO.StringIO() + memory_file = BytesIO() for chunk in request.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks @@ -145,35 +199,87 @@ with open(local_file, 'wb') as f: f.write(zip_file.read(ziped_name)) - def download_preprocessed(self, measurement_id): + def download_hirelpp(self, measurement_id): + """ Download hirelpp files for the measurement id. """ + # Construct the download url + download_url = self.download_hirelpp_pattern.format(measurement_id) + try: + self.download_files(measurement_id, 'scc_hirelpp', download_url) + except Exception as e: + logger.error("Could not download HiRElPP files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) + + def download_cloudmask(self, measurement_id): + """ Download cloudmask files for the measurement id. """ + # Construct the download url + download_url = self.download_cloudmask_pattern.format(measurement_id) + try: + self.download_files(measurement_id, 'scc_cloudscreen', download_url) + except Exception as e: + logger.error("Could not download cloudscreen files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) + + def download_elpp(self, measurement_id): """ Download preprocessed files for the measurement id. """ # Construct the download url - download_url = self.download_preprocessed_pattern.format(measurement_id) - self.download_files(measurement_id, 'scc_preprocessed', download_url) + download_url = self.download_elpp_pattern.format(measurement_id) + try: + self.download_files(measurement_id, 'scc_preprocessed', download_url) + except Exception as e: + logger.error("Could not download ElPP files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) - def download_optical(self, measurement_id): + def download_elda(self, measurement_id): """ Download optical files for the measurement id. """ # Construct the download url - download_url = self.download_optical_pattern.format(measurement_id) - self.download_files(measurement_id, 'scc_optical', download_url) + download_url = self.download_elda_pattern.format(measurement_id) + try: + self.download_files(measurement_id, 'scc_optical', download_url) + except Exception as e: + logger.error("Could not download ELDA files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) - def download_graphs(self, measurement_id): + def download_plots(self, measurement_id): """ Download profile graphs for the measurement id. """ # Construct the download url - download_url = self.download_graph_pattern.format(measurement_id) - self.download_files(measurement_id, 'scc_plots', download_url) + download_url = self.download_plot_pattern.format(measurement_id) + try: + self.download_files(measurement_id, 'scc_plots', download_url) + except Exception as e: + logger.error("Could not download ELDA plots. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) - def rerun_processing(self, measurement_id, monitor=True): + def download_elic(self, measurement_id): + """ Download ELIC files for the measurement id. """ + # Construct the download url + download_url = self.download_elic_pattern.format(measurement_id) + try: + self.download_files(measurement_id, 'scc_elic', download_url) + except Exception as e: + logger.error("Could not download ELIC files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) + + def download_eldec(self, measurement_id): + """ Download ELDEC files for the measurement id. """ + # Construct the download url + download_url = self.download_elda_pattern.format(measurement_id) # ELDA patter is used for now + try: + self.download_files(measurement_id, 'scc_eldec', download_url) + except Exception as e: + logger.error("Could not download EDELC files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) + + def rerun_elpp(self, measurement_id, monitor=True): measurement = self.get_measurement(measurement_id) if measurement: - request = self.session.get(measurement.rerun_processing_url, auth=self.auth, + request = self.session.get(measurement.rerun_elpp_url, auth=self.auth, verify=False, stream=True) if request.status_code != 200: logger.error( - "Could not rerun processing for %s. Status code: %s" % (measurement_id, request.status_code)) + "Could not rerun ELPP for %s. Status code: %s" % (measurement_id, request.status_code)) return if monitor: @@ -200,13 +306,14 @@ if monitor: self.monitor_processing(measurement_id) - def process(self, filename, system_id): + def process(self, filename, system_id, force_upload, delete_related): """ Upload a file for processing and wait for the processing to finish. If the processing is successful, it will download all produced files. """ logger.info("--- Processing started on %s. ---" % datetime.datetime.now()) + # Upload file - measurement_id = self.upload_file(filename, system_id) + measurement_id = self.upload_file(filename, system_id, force_upload, delete_related) measurement = self.monitor_processing(measurement_id) return measurement @@ -217,64 +324,77 @@ measurement = self.get_measurement(measurement_id) if measurement is not None: while measurement.is_running: - logger.info("Measurement is being processed (status: %s, %s, %s). Please wait." % (measurement.upload, - measurement.pre_processing, - measurement.processing)) + logger.info("Measurement is being processed (status: {}, {}, {}, {}, {}, {}). Please wait.".format( + measurement.upload, + measurement.hirelpp, + measurement.cloudmask, + measurement.elpp, + measurement.elda, + measurement.elic)) time.sleep(10) measurement = self.get_measurement(measurement_id) - logger.info("Measurement processing finished (status: %s, %s, %s)." % (measurement.upload, - measurement.pre_processing, - measurement.processing)) - if measurement.pre_processing == 127: - logger.info("Downloading preprocessed files.") - self.download_preprocessed(measurement_id) - if measurement.processing == 127: - logger.info("Downloading optical files.") - self.download_optical(measurement_id) + logger.info("Measurement processing finished (status: {}, {}, {}, {}, {}, {}). Please wait.".format( + measurement.upload, + measurement.hirelpp, + measurement.cloudmask, + measurement.elpp, + measurement.elda, + measurement.elic)) + if measurement.hirelpp == 127: + logger.info("Downloading HiRElPP files.") + self.download_hirelpp(measurement_id) + if measurement.cloudmask == 127: + logger.info("Downloading cloud screening files.") + self.download_cloudmask(measurement_id) + if measurement.elpp == 127: + logger.info("Downloading ELPP files.") + self.download_elpp(measurement_id) + if measurement.elda == 127: + logger.info("Downloading ELDA files.") + self.download_elda(measurement_id) logger.info("Downloading graphs.") - self.download_graphs(measurement_id) + self.download_plots(measurement_id) + if measurement.elic == 127: + logger.info("Downloading ELIC files.") + self.download_elic(measurement_id) + + # TODO: Need to check ELDEC code (when it becomes available in the API) + if measurement.is_calibration: + logger.info("Downloading ELDEC files.") + self.download_eldec(measurement_id) + logger.info("--- Processing finished. ---") return measurement - def get_status(self, measurement_id): - """ Get the processing status for a measurement id through the API. """ - measurement_url = urlparse.urljoin(self.api_base_url, 'measurements/?id__exact=%s' % measurement_id) - - response = self.session.get(measurement_url, - auth=self.auth, - verify=False) + def get_measurement(self, measurement_id): - response_dict = response.json() - - if response_dict['objects']: - measurement_list = response_dict['objects'] - measurement = Measurement(self.base_url, measurement_list[0]) - return measurement.upload, measurement.pre_processing, measurement.processing - else: - logger.error("No measurement with id %s found on the SCC." % measurement_id) + if measurement_id is None: return None - def get_measurement(self, measurement_id): measurement_url = urlparse.urljoin(self.api_base_url, 'measurements/%s/' % measurement_id) response = self.session.get(measurement_url, auth=self.auth, verify=False) - if response.status_code != 200: + response_dict = None + if response.status_code == 200: + response_dict = response.json() + if response.status_code == 404: + logger.info("No measurement with id %s found on the SCC." % measurement_id) + elif response.status_code != 200: logger.error('Could not access API. Status code %s.' % response.status_code) sys.exit(1) - response_dict = response.json() + logger.debug("Response dictionary: {}".format(response_dict)) if response_dict: - measurement = Measurement(self.base_url,response_dict) + measurement = Measurement(self.base_url, response_dict) return measurement else: - logger.error("No measurement with id %s found on the SCC." % measurement_id) return None - def delete_measurement(self, measurement_id): + def delete_measurement(self, measurement_id, delete_related=False): """ Deletes a measurement with the provided measurement id. The user should have the appropriate permissions. @@ -292,6 +412,8 @@ # Go the the page confirming the deletion delete_url = self.delete_measurement_pattern.format(measurement.id) + logger.debug("Delete url: {}".format(delete_url)) + confirm_page = self.session.get(delete_url, auth=self.auth, verify=False) @@ -301,11 +423,18 @@ logger.warning("Could not open delete page. Status: {0}".format(confirm_page.status_code)) return None + # Get the delete related value + if delete_related: + delete_related_option = 'delete_related' + else: + delete_related_option = 'not_delete_related' + # Delete the measurement delete_page = self.session.post(delete_url, auth=self.auth, verify=False, - data={'post': 'yes'}, + data={'post': 'yes', + 'select_delete_related_measurements': delete_related_option}, headers={'X-CSRFToken': confirm_page.cookies['csrftoken'], 'referer': delete_url} ) @@ -314,7 +443,7 @@ delete_page.status_code)) return None - logger.info("Deleted measurement {0}".format(measurement_id)) + logger.info("Deleted measurement {0}.".format(measurement_id)) return True def available_measurements(self): @@ -325,17 +454,17 @@ verify=False) response_dict = response.json() - measurements = None if response_dict: measurement_list = response_dict['objects'] measurements = [Measurement(self.base_url, measurement_dict) for measurement_dict in measurement_list] logger.info("Found %s measurements on the SCC." % len(measurements)) else: + measurements = None logger.warning("No response received from the SCC when asked for available measurements.") return measurements - def measurement_id_for_date(self, t1, call_sign='bu', base_number=0): + def measurement_id_for_date(self, t1, call_sign, base_number=0): """ Give the first available measurement id on the SCC for the specific date. """ @@ -355,23 +484,48 @@ existing_ids = [measurement_dict['id'] for measurement_dict in measurement_list] measurement_number = base_number - measurement_id = "%s%s%02i" % (date_str, call_sign, measurement_number) + measurement_id = "%s%s%04i" % (date_str, call_sign, measurement_number) while measurement_id in existing_ids: measurement_number = measurement_number + 1 - measurement_id = "%s%s%02i" % (date_str, call_sign, measurement_number) - if measurement_number == 100: + measurement_id = "%s%s%04i" % (date_str, call_sign, measurement_number) + if measurement_number == 1000: raise ValueError('No available measurement id found.') return measurement_id + def __enter__(self): + return self -class ApiObject: - """ A generic class object. """ + def __exit__(self, *args): + logger.debug("Closing SCC connection session.") + self.session.close() + + +class Measurement: + """ This class represents the measurement object as returned in the SCC API. + """ def __init__(self, base_url, dict_response): self.base_url = base_url + # Define expected attributes to assist debuggin + self.cloudmask = None + self.elda = None + self.elic = None + self.elpp = None + self.hirelpp = None + self.id = None + self.is_calibration = None + self.is_running = None + self.pre_processing_exit_code = None + self.processing_exit_code = None + self.resource_uri = None + self.start = None + self.stop = None + self.system = None + self.upload = None + if dict_response: # Add the dictionary key value pairs as object properties for key, value in dict_response.items(): @@ -380,27 +534,14 @@ else: self.exists = False - -class Measurement(ApiObject): - """ This class represents the measurement object as returned in the SCC API. - """ + @property + def rerun_elda_url(self): + url_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-elda/') + return url_pattern.format(self.id) @property - def is_running(self): - """ Returns True if the processing has not finished. - """ - if self.upload == 0: - return False - if self.pre_processing == -127: - return False - if self.pre_processing == 127: - if self.processing in [127, -127]: - return False - return True - - @property - def rerun_processing_url(self): - url_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-optical/') + def rerun_elpp_url(self): + url_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-elpp/') return url_pattern.format(self.id) @property @@ -409,62 +550,61 @@ return ulr_pattern.format(self.id) def __str__(self): - return "%s: %s, %s, %s" % (self.id, - self.upload, - self.pre_processing, - self.processing) + return "Measurement {}".format(self.id) -def upload_file(filename, system_id, settings): +def upload_file(filename, system_id, force_upload, delete_related, settings): """ Shortcut function to upload a file to the SCC. """ - logger.info("Uploading file %s, using sytem %s" % (filename, system_id)) + logger.info("Uploading file %s, using system %s." % (filename, system_id)) - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - measurement_id = scc.upload_file(filename, system_id) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + measurement_id = scc.upload_file(filename, system_id, force_upload, delete_related) + scc.logout() + return measurement_id -def process_file(filename, system_id, settings): +def process_file(filename, system_id, force_upload, delete_related, settings): """ Shortcut function to process a file to the SCC. """ - logger.info("Processing file %s, using sytem %s" % (filename, system_id)) + logger.info("Processing file %s, using system %s." % (filename, system_id)) - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - measurement = scc.process(filename, system_id) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + measurement = scc.process(filename, system_id, force_upload, delete_related) + scc.logout() + return measurement -def delete_measurement(measurement_id, settings): +def delete_measurement(measurement_id, settings, delete_related): """ Shortcut function to delete a measurement from the SCC. """ - logger.info("Deleting %s" % measurement_id) + logger.info("Deleting %s." % measurement_id) - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - scc.delete_measurement(measurement_id) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + scc.delete_measurement(measurement_id, delete_related) + scc.logout() def rerun_all(measurement_id, monitor, settings): """ Shortcut function to delete a measurement from the SCC. """ logger.info("Rerunning all products for %s" % measurement_id) - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - scc.rerun_all(measurement_id, monitor) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + scc.rerun_all(measurement_id, monitor) + scc.logout() -def rerun_processing(measurement_id, monitor, settings): +def rerun_elpp(measurement_id, monitor, settings): """ Shortcut function to delete a measurement from the SCC. """ logger.info("Rerunning (optical) processing for %s" % measurement_id) - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - scc.rerun_processing(measurement_id, monitor) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + scc.rerun_elpp(measurement_id, monitor) + scc.logout() def import_settings(config_file_path): @@ -480,8 +620,9 @@ try: settings = yaml.safe_load(yaml_file) logger.debug("Read settings file(%s)" % config_file_path) - except: + except Exception as e: logger.error("Could not parse YAML file (%s)" % config_file_path) + logger.debug("Error message: {}".format(e)) sys.exit(1) # YAML limitation: does not read tuples @@ -499,8 +640,13 @@ parser.add_argument("-p", "--process", help="Wait for the results of the processing.", action="store_true") parser.add_argument("--delete", help="Measurement ID to delete.") - parser.add_argument("--rerun-all", help="Measurement ID to rerun.") - parser.add_argument("--rerun-processing", help="Measurement ID to rerun processing routines.") + # parser.add_argument("--delete_related", help= + # "Delete all related measurements. Use only if you know what you are doing!", + # action="store_true") + parser.add_argument("--force_upload", help="If measurement ID exists on SCC, delete before uploading.", + action="store_true") + parser.add_argument("--rerun-all", help="Rerun all processing steps for the provided measurement ID.") + parser.add_argument("--rerun-elpp", help="Rerun low-resolution processing steps for the provided measurement ID.") # Verbosity settings from http://stackoverflow.com/a/20663028 parser.add_argument('-d', '--debug', help="Print debugging information.", action="store_const", @@ -512,6 +658,9 @@ args = parser.parse_args() + # For now, don to allow to delete related measurements + delete_related = False + # Get the logger with the appropriate level logging.basicConfig(format='%(levelname)s: %(message)s', level=args.loglevel) @@ -520,16 +669,16 @@ # If the arguments are OK, try to log-in to SCC and upload. if args.delete: # If the delete is provided, do nothing else - delete_measurement(args.delete, settings) + delete_measurement(args.delete, settings, delete_related) elif args.rerun_all: rerun_all(args.rerun_all, args.process, settings) - elif args.rerun_processing: - rerun_processing(args.rerun_processing, args.process, settings) + elif args.rerun_elpp: + rerun_elpp(args.rerun_elpp, args.process, settings) else: if (args.filename == '') or (args.system == 0): parser.error('Provide a valid filename and system parameters.\nRun with -h for help.\n') if args.process: - process_file(args.filename, args.system, settings) + process_file(args.filename, args.system, args.force_upload, delete_related, settings) else: - upload_file(args.filename, args.system, settings) + upload_file(args.filename, args.system, args.force_upload, delete_related, settings)