Sat, 09 Jan 2021 17:26:30 +0200
Merged scc_access.py
scc_access/scc_access.py | file | annotate | diff | comparison | revisions |
--- a/scc_access/scc_access.py Sat Jan 09 15:24:56 2021 +0200 +++ b/scc_access/scc_access.py Sat Jan 09 17:26:30 2021 +0200 @@ -1,3 +1,5 @@ +import sys + import requests try: @@ -11,13 +13,15 @@ import os import re from io import BytesIO -import sys + import time from zipfile import ZipFile import yaml +import netCDF4 as netcdf + requests.packages.urllib3.disable_warnings() logger = logging.getLogger(__name__) @@ -53,11 +57,11 @@ self.download_cloudmask_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-cloudmask/') - self.download_preprocessed_pattern = urlparse.urljoin(self.base_url, + self.download_elpp_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-preprocessed/') - self.download_optical_pattern = urlparse.urljoin(self.base_url, + self.download_elda_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-optical/') - self.download_graph_pattern = urlparse.urljoin(self.base_url, + self.download_plots_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-plots/') self.download_elic_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-elic/') @@ -96,9 +100,25 @@ """ Logout from SCC """ return self.session.get(self.logout_url, stream=True) - def upload_file(self, filename, system_id, rs_filename=None, ov_filename=None, lr_filename=None): + def upload_file(self, filename, system_id, force_upload, delete_related, rs_filename=None, ov_filename=None, lr_filename=None): """ Upload a filename for processing with a specific system. If the upload is successful, it returns the measurement id. """ + measurement_id = self.measurement_id_from_file(filename) + + logger.debug('Checking if a measurement with the same id already exists on the SCC server.') + existing_measurement = self.get_measurement(measurement_id) + + if existing_measurement: + if force_upload: + logger.info( + "Measurement with id {} already exists on the SCC. Trying to delete it...".format(measurement_id)) + self.delete_measurement(measurement_id, delete_related) + else: + logger.error( + "Measurement with id {} already exists on the SCC. Use --force_upload flag to overwrite it.".format( + measurement_id)) + sys.exit(1) + # Get submit page upload_page = self.session.get(self.upload_url) @@ -156,6 +176,25 @@ return measurement_id + @staticmethod + def measurement_id_from_file(filename): + """ Get the measurement id from the input file. """ + + if not os.path.isfile(filename): + logger.error("File {} does not exist.".format(filename)) + sys.exit(1) + + with netcdf.Dataset(filename) as f: + try: + measurement_id = f.Measurement_ID + except AttributeError: + logger.error( + "Input file {} does not contain a Measurement_ID global attribute. Wrong file format?".format( + filename)) + sys.exit(1) + + return measurement_id + def download_files(self, measurement_id, subdir, download_url): """ Downloads some files from the download_url to the specified subdir. This method is used to download preprocessed file, optical @@ -192,51 +231,90 @@ f.write(zip_file.read(ziped_name)) def download_hirelpp(self, measurement_id): - """ Download HiRElPP files for the measurement id. """ + """ Download hirelpp files for the measurement id. """ # Construct the download url download_url = self.download_hirelpp_pattern.format(measurement_id) - self.download_files(measurement_id, 'hirelpp', download_url) + try: + self.download_files(measurement_id, 'scc_hirelpp', download_url) + except Exception as e: + logger.error("Could not download HiRElPP files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) def download_cloudmask(self, measurement_id): - """ Download preprocessed files for the measurement id. """ + """ Download cloudmask files for the measurement id. """ # Construct the download url download_url = self.download_cloudmask_pattern.format(measurement_id) - self.download_files(measurement_id, 'cloudmask', download_url) + try: + self.download_files(measurement_id, 'scc_cloudscreen', download_url) + except Exception as e: + logger.error("Could not download cloudscreen files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) - def download_preprocessed(self, measurement_id): + def download_elpp(self, measurement_id): """ Download preprocessed files for the measurement id. """ # Construct the download url - download_url = self.download_preprocessed_pattern.format(measurement_id) - self.download_files(measurement_id, 'scc_preprocessed', download_url) + download_url = self.download_elpp_pattern.format(measurement_id) + try: + self.download_files(measurement_id, 'scc_preprocessed', download_url) + except Exception as e: + logger.error("Could not download ElPP files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) - def download_optical(self, measurement_id): + def download_elda(self, measurement_id): """ Download optical files for the measurement id. """ # Construct the download url - download_url = self.download_optical_pattern.format(measurement_id) - self.download_files(measurement_id, 'scc_optical', download_url) + download_url = self.download_elda_pattern.format(measurement_id) + try: + self.download_files(measurement_id, 'scc_optical', download_url) + except Exception as e: + logger.error("Could not download ELDA files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) - def download_graphs(self, measurement_id): + def download_plots(self, measurement_id): """ Download profile graphs for the measurement id. """ # Construct the download url - download_url = self.download_graph_pattern.format(measurement_id) - self.download_files(measurement_id, 'scc_plots', download_url) + download_url = self.download_plots_pattern.format(measurement_id) + try: + self.download_files(measurement_id, 'scc_plots', download_url) + except Exception as e: + logger.error("Could not download ELDA plots. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) def download_elic(self, measurement_id): - """ Download profile graphs for the measurement id. """ + """ Download ELIC files for the measurement id. """ # Construct the download url download_url = self.download_elic_pattern.format(measurement_id) - self.download_files(measurement_id, 'elic', download_url) + try: + self.download_files(measurement_id, 'scc_elic', download_url) + except Exception as e: + logger.error("Could not download ELIC files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) - def rerun_processing(self, measurement_id, monitor=True): + def download_eldec(self, measurement_id): + """ Download ELDEC files for the measurement id. """ + # Construct the download url + download_url = self.download_elda_pattern.format(measurement_id) # ELDA patter is used for now + try: + self.download_files(measurement_id, 'scc_eldec', download_url) + except Exception as e: + logger.error("Could not download EDELC files. Error message: {}".format(e)) + logger.debug('Download exception:', exc_info=True) + + def rerun_elpp(self, measurement_id, monitor=True): + logger.debug("Started rerun_elpp procedure.") + + logger.debug("Getting measurement %s" % measurement_id) measurement, status = self.get_measurement(measurement_id) if measurement: - request = self.session.get(measurement.rerun_processing_url, stream=True) + logger.debug("Attempting to rerun ElPP through %s." % measurement.rerun_all_url) + request = self.session.get(measurement.rerun_elpp_url, stream=True) if request.status_code != 200: logger.error( "Could not rerun processing for %s. Status code: %s" % (measurement_id, request.status_code)) - return + else: + logger.info("Rerun-elpp command submitted successfully for id {}.".format(measurement_id)) if monitor: self.monitor_processing(measurement_id) @@ -255,19 +333,20 @@ if request.status_code != 200: logger.error("Could not rerun pre processing for %s. Status code: %s" % (measurement_id, request.status_code)) - return + else: + logger.info("Rerun-all command submitted successfully for id {}.".format(measurement_id)) if monitor: self.monitor_processing(measurement_id) - def process(self, filename, system_id, monitor, rs_filename=None, lr_filename=None, ov_filename=None): + def process(self, filename, system_id, monitor, force_upload, delete_related, rs_filename=None, lr_filename=None, ov_filename=None): """ Upload a file for processing and wait for the processing to finish. If the processing is successful, it will download all produced files. """ logger.info("--- Processing started on %s. ---" % datetime.datetime.now()) # Upload file logger.info("--- Uploading file") - measurement_id = self.upload_file(filename, system_id, + measurement_id = self.upload_file(filename, system_id, force_upload, delete_related, rs_filename=rs_filename, lr_filename=lr_filename, ov_filename=ov_filename) @@ -288,7 +367,7 @@ # try to wait for measurement to appear in API measurement = None - logger.info("Looking for measurement %s in SCC", measurement_id) + logger.info("Looking for measurement %s on SCC", measurement_id) while error_count < error_max: time.sleep(time_sleep) measurement, status = self.get_measurement(measurement_id) @@ -302,55 +381,74 @@ logger.critical("Measurement %s doesn't seem to exist", measurement_id) sys.exit(1) - logger.info('Measurement %s found', measurement_id) + logger.info('Measurement %s found.', measurement_id) if measurement is not None: while measurement.is_running: - logger.info("Measurement is being processed. Please wait.") + logger.info("Measurement is being processed. status: {}, {}, {}, {}, {}, {}). Please wait.".format( + measurement.upload, + measurement.hirelpp, + measurement.cloudmask, + measurement.elpp, + measurement.elda, + measurement.elic)) time.sleep(10) measurement, status = self.get_measurement(measurement_id) logger.info("Measurement processing finished.") if measurement.hirelpp == 127: - logger.info("Downloading hirelpp files.") + logger.info("Downloading HiRElPP files.") self.download_hirelpp(measurement_id) if measurement.cloudmask == 127: - logger.info("Downloading cloudmask files.") + logger.info("Downloading cloud screening files.") self.download_cloudmask(measurement_id) if measurement.elpp == 127: - logger.info("Downloading preprocessed files.") - self.download_preprocessed(measurement_id) + logger.info("Downloading ElPP files.") + self.download_elpp(measurement_id) if measurement.elda == 127: - logger.info("Downloading optical files.") - self.download_optical(measurement_id) + logger.info("Downloading ELDA files.") + self.download_elda(measurement_id) logger.info("Downloading graphs.") - self.download_graphs(measurement_id) + self.download_plots(measurement_id) if measurement.elic == 127: - logger.info("Downloading preprocessed files.") + logger.info("Downloading ELIC files.") self.download_elic(measurement_id) + + # TODO: Need to check ELDEC code (when it becomes available in the API) + if measurement.is_calibration: + logger.info("Downloading ELDEC files.") + self.download_eldec(measurement_id) logger.info("--- Processing finished. ---") + return measurement def get_measurement(self, measurement_id): + + if measurement_id is None: # Is this still required? + return None + measurement_url = self.api_measurement_pattern.format(measurement_id) logger.debug("Measurement API URL: %s" % measurement_url) response = self.session.get(measurement_url) - if not response.ok: + response_dict = None + + if response.status_code == 200: + response_dict = response.json() + elif response.status_code == 404: + logger.info("No measurement with id %s found on the SCC." % measurement_id) + else: logger.error('Could not access API. Status code %s.' % response.status_code) - return None, response.status_code - - response_dict = response.json() if response_dict: measurement = Measurement(self.base_url, response_dict) - return measurement, response.status_code else: - logger.error("No measurement with id %s found on the SCC." % measurement_id) - return None, response.status_code + measurement = None - def delete_measurement(self, measurement_id): + return measurement, response.status_code + + def delete_measurement(self, measurement_id, delete_related): """ Deletes a measurement with the provided measurement id. The user should have the appropriate permissions. @@ -375,9 +473,16 @@ logger.warning("Could not open delete page. Status: {0}".format(confirm_page.status_code)) return None + # Get the delete related value + if delete_related: + delete_related_option = 'delete_related' + else: + delete_related_option = 'not_delete_related' + # Delete the measurement delete_page = self.session.post(delete_url, - data={'post': 'yes'}, + data={'post': 'yes', + 'select_delete_related_measurements': delete_related_option}, headers={'X-CSRFToken': confirm_page.cookies['csrftoken'], 'referer': delete_url} ) @@ -518,6 +623,13 @@ return ancillary_file, response.status_code + def __enter__(self): + return self + + def __exit__(self, *args): + logger.debug("Closing SCC connection session.") + self.session.close() + class PageNotAccessibleError(RuntimeError): pass @@ -545,21 +657,44 @@ """ This class represents the measurement object as returned in the SCC API. """ + def __init__(self, base_url, dict_response): + + # Define expected attributes to assist debugging + self.cloudmask = None + self.elda = None + self.elic = None + self.elpp = None + self.hirelpp = None + self.id = None + self.is_calibration = None + self.is_running = None + self.pre_processing_exit_code = None + self.processing_exit_code = None + self.resource_uri = None + self.start = None + self.stop = None + self.system = None + self.upload = None + + super().__init__(base_url, dict_response) + @property - def rerun_processing_url(self): + def rerun_elda_url(self): url_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-elda/') return url_pattern.format(self.id) @property + def rerun_elpp_url(self): + url_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-elpp/') + return url_pattern.format(self.id) + + @property def rerun_all_url(self): ulr_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-all/') return ulr_pattern.format(self.id) def __str__(self): - return "%s: %s, %s, %s" % (self.id, - self.upload, - self.pre_processing, - self.processing) + return "Measurement {}".format(self.id) class AncillaryFile(ApiObject): @@ -578,83 +713,87 @@ self.status) -def process_file(filename, system_id, settings, monitor=True, rs_filename=None, lr_filename=None, ov_filename=None): +def process_file(filename, system_id, settings, force_upload, delete_related, + monitor=True, rs_filename=None, lr_filename=None, ov_filename=None): """ Shortcut function to process a file to the SCC. """ logger.info("Processing file %s, using system %s" % (filename, system_id)) - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - measurement = scc.process(filename, system_id, - monitor=monitor, - rs_filename=rs_filename, - lr_filename=lr_filename, - ov_filename=ov_filename) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + measurement = scc.process(filename, system_id, + force_upload=force_upload, + delete_related=delete_related, + monitor=monitor, + rs_filename=rs_filename, + lr_filename=lr_filename, + ov_filename=ov_filename) + scc.logout() return measurement -def delete_measurements(measurement_ids, settings): +def delete_measurements(measurement_ids, delete_related, settings): """ Shortcut function to delete measurements from the SCC. """ - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - for m_id in measurement_ids: - logger.info("Deleting %s" % m_id) - scc.delete_measurement(m_id) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + for m_id in measurement_ids: + logger.info("Deleting %s" % m_id) + scc.delete_measurement(m_id, delete_related) + scc.logout() def rerun_all(measurement_ids, monitor, settings): """ Shortcut function to rerun measurements from the SCC. """ - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - for m_id in measurement_ids: - logger.info("Rerunning all products for %s" % m_id) - scc.rerun_all(m_id, monitor) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + for m_id in measurement_ids: + logger.info("Rerunning all products for %s" % m_id) + scc.rerun_all(m_id, monitor) + scc.logout() def rerun_processing(measurement_ids, monitor, settings): """ Shortcut function to delete a measurement from the SCC. """ - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - for m_id in measurement_ids: - logger.info("Rerunning (optical) processing for %s" % m_id) - scc.rerun_processing(m_id, monitor) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + for m_id in measurement_ids: + logger.info("Rerunning (optical) processing for %s" % m_id) + scc.rerun_elpp(m_id, monitor) + scc.logout() def list_measurements(settings, station=None, system=None, start=None, stop=None, upload_status=None, preprocessing_status=None, optical_processing=None): """List all available measurements""" - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - ret = scc.list_measurements(station=station, system=system, start=start, stop=stop, upload_status=upload_status, - processing_status=preprocessing_status, optical_processing=optical_processing) - for entry in ret: - print("%s" % entry.id) - scc.logout() + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + ret = scc.list_measurements(station=station, system=system, start=start, stop=stop, upload_status=upload_status, + processing_status=preprocessing_status, optical_processing=optical_processing) + for entry in ret: + print("%s" % entry.id) + scc.logout() def download_measurements(measurement_ids, download_preproc, download_optical, download_graph, settings): """Download all measurements for the specified IDs""" - scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) - scc.login(settings['website_credentials']) - for m_id in measurement_ids: - if download_preproc: - logger.info("Downloading preprocessed files for '%s'" % m_id) - scc.download_preprocessed(m_id) - logger.info("Complete") - if download_optical: - logger.info("Downloading optical files for '%s'" % m_id) - scc.download_optical(m_id) - logger.info("Complete") - if download_graph: - logger.info("Downloading profile graph files for '%s'" % m_id) - scc.download_graphs(m_id) - logger.info("Complete") + with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: + scc.login(settings['website_credentials']) + for m_id in measurement_ids: + if download_preproc: + logger.info("Downloading preprocessed files for '%s'" % m_id) + scc.download_elpp(m_id) + logger.info("Complete") + if download_optical: + logger.info("Downloading optical files for '%s'" % m_id) + scc.download_elda(m_id) + logger.info("Complete") + if download_graph: + logger.info("Downloading profile graph files for '%s'" % m_id) + scc.download_plots(m_id) + logger.info("Complete") + scc.logout() def settings_from_path(config_file_path): @@ -681,7 +820,9 @@ # Setup for command specific parsers def setup_delete(parser): def delete_from_args(parsed): - delete_measurements(parsed.IDs, parsed.config) + delete_measurements(parsed.IDs, + delete_related=False, + settings=parsed.config) parser.add_argument("IDs", nargs="+", help="measurement IDs to delete.") parser.set_defaults(execute=delete_from_args) @@ -697,7 +838,7 @@ parser.set_defaults(execute=rerun_all_from_args) -def setup_rerun_processing(parser): +def setup_rerun_elpp(parser): def rerun_processing_from_args(parsed): rerun_processing(parsed.IDs, parsed.process, parsed.config) @@ -707,33 +848,23 @@ parser.set_defaults(execute=rerun_processing_from_args) -def setup_process_file(parser): - """ Upload and monitor processing progress.""" - def process_file_from_args(parsed): - process_file(parsed.filename, parsed.system, parsed.config, monitor=True, +def setup_upload_file(parser): + """ Upload but do not monitor processing progress. """ + def upload_file_from_args(parsed): + process_file(parsed.filename, parsed.system, parsed.config, + monitor=parsed.process, + force_upload=parsed.force_upload, + delete_related=False, # For now, use this as default rs_filename=parsed.radiosounding, ov_filename=parsed.overlap, lr_filename=parsed.lidarratio) parser.add_argument("filename", help="Measurement file name or path.") parser.add_argument("system", help="Processing system id.") - parser.add_argument("--radiosounding", default=None, help="Radiosounding file name or path") - parser.add_argument("--overlap", default=None, help="Overlap file name or path") - parser.add_argument("--lidarratio", default=None, help="Lidar ratio file name or path") - - parser.set_defaults(execute=process_file_from_args) - - -def setup_upload_file(parser): - """ Upload but do not monitor processing progress. """ - def upload_file_from_args(parsed): - process_file(parsed.filename, parsed.system, parsed.config, monitor=False, - rs_filename=parsed.radiosounding, - ov_filename=parsed.overlap, - lr_filename=parsed.lidarratio) - - parser.add_argument("filename", help="Measurement file name or path.") - parser.add_argument("system", help="Processing system id.") + parser.add_argument("-p", "--process", help="Wait for the processing results.", + action="store_true") + parser.add_argument("--force_upload", help="If measurement ID exists on SCC, delete before uploading.", + action="store_true") parser.add_argument("--radiosounding", default=None, help="Radiosounding file name or path") parser.add_argument("--overlap", default=None, help="Overlap file name or path") parser.add_argument("--lidarratio", default=None, help="Lidar ratio file name or path") @@ -743,6 +874,9 @@ def setup_list_measurements(parser): def list_measurements_from_args(parsed): + # TODO: Fix this + logger.warning("This method needs to be updated. Cross-chceck any results.") + list_measurements(parsed.config, station=parsed.station, system=parsed.system, start=parsed.start, stop=parsed.stop, upload_status=parsed.upload_status, preprocessing_status=parsed.preprocessing_status, @@ -772,8 +906,11 @@ def setup_download_measurements(parser): def download_measurements_from_args(parsed): - preproc = parsed.download_preprocessed - optical = parsed.download_optical + # TODO: Fix this + logger.warning("This method needs to be updated. Cross-chceck any results.") + + preproc = parsed.download_elpp + optical = parsed.download_elda graphs = parsed.download_profile_graphs if not preproc and not graphs: optical = True @@ -793,18 +930,17 @@ subparsers = parser.add_subparsers() delete_parser = subparsers.add_parser("delete", help="Deletes a measurement.") - rerun_all_parser = subparsers.add_parser("rerun-all", help="Reprocess a measurement on the SCC.") - rerun_processing_parser = subparsers.add_parser("rerun-processing", - help="Rerun processing routines for a measurement.") - process_file_parser = subparsers.add_parser("process-file", help="Upload a file and download processing results.") - upload_file_parser = subparsers.add_parser("upload-file", help="Upload a file.") + rerun_all_parser = subparsers.add_parser("rerun-all", help="Rerun all processing steps for the provided measurement IDs.") + rerun_processing_parser = subparsers.add_parser("rerun-elpp", + help="Rerun low-resolution processing steps for the provided measurement ID.") + upload_file_parser = subparsers.add_parser("upload-file", help="Submit a file and, optionally, download the output products.") list_parser = subparsers.add_parser("list", help="List measurements registered on the SCC.") download_parser = subparsers.add_parser("download", help="Download selected measurements.") setup_delete(delete_parser) setup_rerun_all(rerun_all_parser) - setup_rerun_processing(rerun_processing_parser) - setup_process_file(process_file_parser) + setup_rerun_elpp(rerun_processing_parser) + setup_upload_file(upload_file_parser) setup_list_measurements(list_parser) setup_download_measurements(download_parser)