Mon, 08 Jan 2018 14:59:21 +0100
Refactor argument parsing + New commands
* Restructured argument parsing to use subcommands.
* Move some common options to requests session (auth + verify)
* Allow for a common location for settings file (~/.scc_access.yaml)
* Update Readme
* Add option to list available files
* Add option to download existing files
import requests requests.packages.urllib3.disable_warnings() import urlparse import argparse import os import re import time import StringIO from zipfile import ZipFile import datetime import logging import yaml logger = logging.getLogger(__name__) # The regex to find the measurement id from the measurement page # This should be read from the uploaded file, but would require an extra NetCDF module. regex = "<h3>Measurement (?P<measurement_id>.{12}) <small>" class SCC: """ A simple class that will attempt to upload a file on the SCC server. The uploading is done by simulating a normal browser session. In the current version no check is performed, and no feedback is given if the upload was successful. If everything is setup correctly, it will work. """ def __init__(self, auth, output_dir, base_url): self.auth = auth self.output_dir = output_dir self.base_url = base_url self.session = requests.Session() self.session.auth = auth self.session.verify = False self.login_url = urlparse.urljoin(self.base_url, 'accounts/login/') self.upload_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/quick/') self.download_preprocessed_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-preprocessed/') self.download_optical_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-optical/') self.download_graph_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/download-plots/') self.delete_measurement_pattern = urlparse.urljoin(self.base_url, 'admin/database/measurements/{0}/delete/') self.api_base_url = urlparse.urljoin(self.base_url, 'api/v1/') self.list_measurements_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/') def login(self, credentials): """ Login the the website. """ logger.debug("Attempting to login to SCC, username %s." % credentials[0]) login_credentials = {'username': credentials[0], 'password': credentials[1]} logger.debug("Accessing login page at %s." % self.login_url) # Get upload form login_page = self.session.get(self.login_url) logger.debug("Submiting credentials.") # Submit the login data login_submit = self.session.post(self.login_url, data=login_credentials, headers={'X-CSRFToken': login_page.cookies['csrftoken'], 'referer': self.login_url}) return login_submit def logout(self): pass def upload_file(self, filename, system_id): """ Upload a filename for processing with a specific system. If the upload is successful, it returns the measurement id. """ # Get submit page upload_page = self.session.get(self.upload_url) # Submit the data upload_data = {'system': system_id} files = {'data': open(filename, 'rb')} logger.info("Uploading of file %s started." % filename) upload_submit = self.session.post(self.upload_url, data=upload_data, files=files, headers={'X-CSRFToken': upload_page.cookies['csrftoken'], 'referer': self.upload_url}) if upload_submit.status_code != 200: logger.warning("Connection error. Status code: %s" % upload_submit.status_code) return False # Check if there was a redirect to a new page. if upload_submit.url == self.upload_url: measurement_id = False logger.error("Uploaded file rejected! Try to upload manually to see the error.") else: measurement_id = re.findall(regex, upload_submit.text)[0] logger.error("Successfully uploaded measurement with id %s." % measurement_id) return measurement_id def download_files(self, measurement_id, subdir, download_url): """ Downloads some files from the download_url to the specified subdir. This method is used to download preprocessed file, optical files etc. """ # Get the file request = self.session.get(download_url, stream=True) if not request.ok: raise Exception("Could not download files for measurement '%s'" % measurement_id) # Create the dir if it does not exist local_dir = os.path.join(self.output_dir, measurement_id, subdir) if not os.path.exists(local_dir): os.makedirs(local_dir) # Save the file by chunk, needed if the file is big. memory_file = StringIO.StringIO() for chunk in request.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks memory_file.write(chunk) memory_file.flush() zip_file = ZipFile(memory_file) for ziped_name in zip_file.namelist(): basename = os.path.basename(ziped_name) local_file = os.path.join(local_dir, basename) with open(local_file, 'wb') as f: f.write(zip_file.read(ziped_name)) def download_preprocessed(self, measurement_id): """ Download preprocessed files for the measurement id. """ # Construct the download url download_url = self.download_preprocessed_pattern.format(measurement_id) self.download_files(measurement_id, 'scc_preprocessed', download_url) def download_optical(self, measurement_id): """ Download optical files for the measurement id. """ # Construct the download url download_url = self.download_optical_pattern.format(measurement_id) self.download_files(measurement_id, 'scc_optical', download_url) def download_graphs(self, measurement_id): """ Download profile graphs for the measurement id. """ # Construct the download url download_url = self.download_graph_pattern.format(measurement_id) self.download_files(measurement_id, 'scc_plots', download_url) def rerun_processing(self, measurement_id, monitor=True): measurement = self.get_measurement(measurement_id) if measurement: request = self.session.get(measurement.rerun_processing_url, stream=True) if request.status_code != 200: logger.error( "Could not rerun processing for %s. Status code: %s" % (measurement_id, request.status_code)) return if monitor: self.monitor_processing(measurement_id) def rerun_all(self, measurement_id, monitor=True): logger.debug("Started rerun_all procedure.") logger.debug("Getting measurement %s" % measurement_id) measurement = self.get_measurement(measurement_id) if measurement: logger.debug("Attempting to rerun all processing through %s." % measurement.rerun_all_url) request = self.session.get(measurement.rerun_all_url, stream=True) if request.status_code != 200: logger.error("Could not rerun pre processing for %s. Status code: %s" % (measurement_id, request.status_code)) return if monitor: self.monitor_processing(measurement_id) def process(self, filename, system_id, monitor): """ Upload a file for processing and wait for the processing to finish. If the processing is successful, it will download all produced files. """ logger.info("--- Processing started on %s. ---" % datetime.datetime.now()) # Upload file measurement_id = self.upload_file(filename, system_id) if monitor: return self.monitor_processing(measurement_id) return None def monitor_processing(self, measurement_id): """ Monitor the processing progress of a measurement id""" measurement = self.get_measurement(measurement_id) if measurement is not None: while measurement.is_running: logger.info("Measurement is being processed (status: %s, %s, %s). Please wait." % (measurement.upload, measurement.pre_processing, measurement.processing)) time.sleep(10) measurement = self.get_measurement(measurement_id) logger.info("Measurement processing finished (status: %s, %s, %s)." % (measurement.upload, measurement.pre_processing, measurement.processing)) if measurement.pre_processing == 127: logger.info("Downloading preprocessed files.") self.download_preprocessed(measurement_id) if measurement.processing == 127: logger.info("Downloading optical files.") self.download_optical(measurement_id) logger.info("Downloading graphs.") self.download_graphs(measurement_id) logger.info("--- Processing finished. ---") return measurement def get_status(self, measurement_id): """ Get the processing status for a measurement id through the API. """ measurement_url = urlparse.urljoin(self.api_base_url, 'measurements/?id__exact=%s' % measurement_id) response = self.session.get(measurement_url) response_dict = response.json() if response_dict['objects']: measurement_list = response_dict['objects'] measurement = Measurement(self.base_url, measurement_list[0]) return measurement.upload, measurement.pre_processing, measurement.processing else: logger.error("No measurement with id %s found on the SCC." % measurement_id) return None def get_measurement(self, measurement_id): measurement_url = urlparse.urljoin(self.api_base_url, 'measurements/%s/' % measurement_id) response = self.session.get(measurement_url) if not response.ok: logger.error('Could not access API. Status code %s.' % response.status_code) return None response_dict = response.json() if response_dict: measurement = Measurement(self.base_url, response_dict) return measurement else: logger.error("No measurement with id %s found on the SCC." % measurement_id) return None def delete_measurement(self, measurement_id): """ Deletes a measurement with the provided measurement id. The user should have the appropriate permissions. The procedures is performed directly through the web interface and NOT through the API. """ # Get the measurement object measurement = self.get_measurement(measurement_id) # Check that it exists if measurement is None: logger.warning("Nothing to delete.") return None # Go the the page confirming the deletion delete_url = self.delete_measurement_pattern.format(measurement_id) confirm_page = self.session.get(delete_url) # Check that the page opened properly if confirm_page.status_code != 200: logger.warning("Could not open delete page. Status: {0}".format(confirm_page.status_code)) return None # Delete the measurement delete_page = self.session.post(delete_url, data={'post': 'yes'}, headers={'X-CSRFToken': confirm_page.cookies['csrftoken'], 'referer': delete_url} ) if delete_page.status_code != 200: logger.warning("Something went wrong. Delete page status: {0}".format( delete_page.status_code)) return None logger.info("Deleted measurement {0}".format(measurement_id)) return True def available_measurements(self): """ Get a list of available measurement on the SCC. """ measurement_url = urlparse.urljoin(self.api_base_url, 'measurements') response = self.session.get(measurement_url) response_dict = response.json() measurements = None if response_dict: measurement_list = response_dict['objects'] measurements = [Measurement(self.base_url, measurement_dict) for measurement_dict in measurement_list] logger.info("Found %s measurements on the SCC." % len(measurements)) else: logger.warning("No response received from the SCC when asked for available measurements.") return measurements def list_measurements(self, station=None, system=None, start=None, stop=None, upload_status=None, processing_status=None, optical_processing=None): # Need to set to empty string if not specified, we won't get any results params = { "station": station if station is not None else "", "system": system if system is not None else "", "stop": stop if stop is not None else "", "start": start if start is not None else "", "upload_status": upload_status if upload_status is not None else "", "preprocessing_status": processing_status if processing_status is not None else "", "optical_processing_status": optical_processing if optical_processing is not None else "" } resp = self.session.get(self.list_measurements_pattern, params=params).text tbl_rgx = re.compile(r'<table id="measurements">(.*?)</table>', re.DOTALL) entry_rgx = re.compile(r'<tr>(.*?)</tr>', re.DOTALL) measurement_rgx = re.compile( r'.*?<td><a[^>]*>(\w+)</a>.*?<td>.*?<td>([\w-]+ [\w:]+)</td>.*<td data-order="([-]?\d+),([-]?\d+),([-]?\d+)".*', re.DOTALL) matches = tbl_rgx.findall(resp) if len(matches) != 1: return [] ret = [] for entry in entry_rgx.finditer(matches[0]): m = measurement_rgx.match(entry.string[entry.start(0):entry.end(0)]) if m: name, date, upload, preproc, optical = m.groups() ret.append( Measurement(self.base_url, {"id": name, "upload": int(upload), "pre_processing": int(preproc), "processing": int(optical)})) return ret def measurement_id_for_date(self, t1, call_sign='bu', base_number=0): """ Give the first available measurement id on the SCC for the specific date. """ date_str = t1.strftime('%Y%m%d') search_url = urlparse.urljoin(self.api_base_url, 'measurements/?id__startswith=%s' % date_str) response = self.session.get(search_url) response_dict = response.json() measurement_id = None if response_dict: measurement_list = response_dict['objects'] existing_ids = [measurement_dict['id'] for measurement_dict in measurement_list] measurement_number = base_number measurement_id = "%s%s%02i" % (date_str, call_sign, measurement_number) while measurement_id in existing_ids: measurement_number = measurement_number + 1 measurement_id = "%s%s%02i" % (date_str, call_sign, measurement_number) if measurement_number == 100: raise ValueError('No available measurement id found.') return measurement_id class ApiObject: """ A generic class object. """ def __init__(self, base_url, dict_response): self.base_url = base_url if dict_response: # Add the dictionary key value pairs as object properties for key, value in dict_response.items(): setattr(self, key, value) self.exists = True else: self.exists = False class Measurement(ApiObject): """ This class represents the measurement object as returned in the SCC API. """ @property def is_running(self): """ Returns True if the processing has not finished. """ if self.upload == 0: return False if self.pre_processing == -127: return False if self.pre_processing == 127: if self.processing in [127, -127]: return False return True @property def rerun_processing_url(self): url_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-optical/') return url_pattern.format(self.id) @property def rerun_all_url(self): ulr_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-all/') return ulr_pattern.format(self.id) def __str__(self): return "%s: %s, %s, %s" % (self.id, self.upload, self.pre_processing, self.processing) def upload_file(filename, system_id, settings): """ Shortcut function to upload a file to the SCC. """ logger.info("Uploading file %s, using sytem %s" % (filename, system_id)) scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) scc.login(settings['website_credentials']) measurement_id = scc.upload_file(filename, system_id) scc.logout() return measurement_id def process_file(filename, system_id, monitor, settings): """ Shortcut function to process a file to the SCC. """ logger.info("Processing file %s, using sytem %s" % (filename, system_id)) scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) scc.login(settings['website_credentials']) measurement = scc.process(filename, system_id, monitor) scc.logout() return measurement def delete_measurement(measurement_ids, settings): """ Shortcut function to delete measurements from the SCC. """ scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) scc.login(settings['website_credentials']) for m_id in measurement_ids: logger.info("Deleting %s" % m_id) scc.delete_measurement(m_id) scc.logout() def rerun_all(measurement_ids, monitor, settings): """ Shortcut function to rerun measurements from the SCC. """ scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) scc.login(settings['website_credentials']) for m_id in measurement_ids: logger.info("Rerunning all products for %s" % m_id) scc.rerun_all(m_id, monitor) scc.logout() def rerun_processing(measurement_ids, monitor, settings): """ Shortcut function to delete a measurement from the SCC. """ scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) scc.login(settings['website_credentials']) for m_id in measurement_ids: logger.info("Rerunning (optical) processing for %s" % m_id) scc.rerun_processing(m_id, monitor) scc.logout() def list_measurements(settings, station=None, system=None, start=None, stop=None, upload_status=None, preprocessing_status=None, optical_processing=None): """List all available measurements""" scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) scc.login(settings['website_credentials']) ret = scc.list_measurements(station=station, system=system, start=start, stop=stop, upload_status=upload_status, processing_status=preprocessing_status, optical_processing=optical_processing) for entry in ret: print("%s" % entry.id) scc.logout() def download_measurements(measurement_ids, download_preproc, download_optical, download_graph, settings): """Download all measurements for the specified IDs""" scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) scc.login(settings['website_credentials']) for m_id in measurement_ids: if download_preproc: logger.info("Downloading preprocessed files for '%s'" % m_id) scc.download_preprocessed(m_id) logger.info("Complete") if download_optical: logger.info("Downloading optical files for '%s'" % m_id) scc.download_optical(m_id) logger.info("Complete") if download_graph: logger.info("Downloading profile graph files for '%s'" % m_id) scc.download_graphs(m_id) logger.info("Complete") def settings_from_path(config_file_path): """ Read the configuration file. The file should be in YAML syntax.""" if not os.path.isfile(config_file_path): raise argparse.ArgumentTypeError("Wrong path for configuration file (%s)" % config_file_path) with open(config_file_path) as yaml_file: try: settings = yaml.safe_load(yaml_file) logger.debug("Read settings file(%s)" % config_file_path) except Exception: raise argparse.ArgumentTypeError("Could not parse YAML file (%s)" % config_file_path) # YAML limitation: does not read tuples settings['basic_credentials'] = tuple(settings['basic_credentials']) settings['website_credentials'] = tuple(settings['website_credentials']) return settings # Setup for command specific parsers def setup_delete(parser): def delete_from_args(parsed): delete_measurement(parsed.IDs, parsed.config) parser.add_argument("IDs", nargs="+", help="measurement IDs to delete.") parser.set_defaults(execute=delete_from_args) def setup_rerun_all(parser): def rerun_all_from_args(parsed): rerun_all(parsed.IDs, parsed.process, parsed.config) parser.add_argument("IDs", nargs="+", help="Measurement IDs to rerun.") parser.add_argument("-p", "--process", help="Wait for the results of the processing.", action="store_true") parser.set_defaults(execute=rerun_all_from_args) def setup_rerun_processing(parser): def rerun_processing_from_args(parsed): rerun_processing(parsed.IDs, parsed.process, parsed.config) parser.add_argument("IDs", nargs="+", help="Measurement IDs to rerun the processing on.") parser.add_argument("-p", "--process", help="Wait for the results of the processing.", action="store_true") parser.set_defaults(execute=rerun_processing_from_args) def setup_process_file(parser): def process_file_from_args(parsed): process_file(parsed.file, parsed.system, parsed.process, parsed.config) parser.add_argument("filename", help="Measurement file name or path.") parser.add_argument("system", help="Processing system id.") parser.add_argument("-p", "--process", help="Wait for the results of the processing.", action="store_true") parser.set_defaults(execute=process_file_from_args) def setup_upload_file(parser): def upload_file_from_args(parsed): upload_file(parsed.file, parsed.system, parsed.config) parser.add_argument("filename", help="Measurement file name or path.") parser.add_argument("system", help="Processing system id.") parser.set_defaults(execute=upload_file_from_args) def setup_list_measurements(parser): def list_measurements_from_args(parsed): list_measurements(parsed.config, station=parsed.station, system=parsed.system, start=parsed.start, stop=parsed.stop, upload_status=parsed.upload_status, preprocessing_status=parsed.preprocessing_status, optical_processing=parsed.optical_processing_status) def status(arg): if -127 <= int(arg) <= 127: return arg else: raise argparse.ArgumentTypeError("Status must be between -127 and 127") def date(arg): if re.match(r'\d{4}-\d{2}-\d{2}', arg): return arg else: raise argparse.ArgumentTypeError("Date must be in format 'YYYY-MM-DD'") parser.add_argument("--station", help="Filter for only the selected station") parser.add_argument("--system", help="Filter for only the selected station") parser.add_argument("--start", help="Filter for only the selected station", type=date) parser.add_argument("--stop", help="Filter for only the selected station", type=date) parser.add_argument("--upload-status", help="Filter for only the selected station", type=status) parser.add_argument("--preprocessing-status", help="Filter for only the selected station", type=status) parser.add_argument("--optical-processing-status", help="Filter for only the selected station", type=status) parser.set_defaults(execute=list_measurements_from_args) def setup_download_measurements(parser): def download_measurements_from_args(parsed): preproc = parsed.download_preprocessed optical = parsed.download_optical graphs = parsed.download_profile_graphs if not preproc and not graphs: optical = True download_measurements(parsed.IDs, preproc, optical, graphs, parsed.config) parser.add_argument("IDs", help="Measurement IDs that should be downloaded.", nargs="+") parser.add_argument("--download-preprocessed", action="store_true", help="Download preprocessed files.") parser.add_argument("--download-optical", action="store_true", help="Download optical files (default if no other download is used).") parser.add_argument("--download-profile-graphs", action="store_true", help="Download profile graph files.") parser.set_defaults(execute=download_measurements_from_args) def main(): # Define the command line arguments. parser = argparse.ArgumentParser() subparsers = parser.add_subparsers() delete_parser = subparsers.add_parser("delete", help="Deletes a measurement.") rerun_all_parser = subparsers.add_parser("rerun-all", help="Rerun a measurement.") rerun_processing_parser = subparsers.add_parser("rerun-processing", help="Rerun processing routings for a measurement.") process_file_parser = subparsers.add_parser("process-file", help="Process a file.") upload_file_parser = subparsers.add_parser("upload-file", help="Upload a file.") list_parser = subparsers.add_parser("list", help="List all measurements.") download_parser = subparsers.add_parser("download", help="Download selected measurements.") setup_delete(delete_parser) setup_rerun_all(rerun_all_parser) setup_rerun_processing(rerun_processing_parser) setup_process_file(process_file_parser) setup_upload_file(upload_file_parser) setup_list_measurements(list_parser) setup_download_measurements(download_parser) # Verbosity settings from http://stackoverflow.com/a/20663028 parser.add_argument('-d', '--debug', help="Print debugging information.", action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO, ) parser.add_argument('-s', '--silent', help="Show only warning and error messages.", action="store_const", dest="loglevel", const=logging.WARNING ) home = os.path.expanduser("~") default_config_location = os.path.abspath(os.path.join(home, ".scc_access.yaml")) parser.add_argument("-c", "--config", help="Path to the config file.", type=settings_from_path, default=default_config_location) args = parser.parse_args() # Get the logger with the appropriate level logging.basicConfig(format='%(levelname)s: %(message)s', level=args.loglevel) # Dispatch to appropriate function args.execute(args) # When running through terminal if __name__ == '__main__': main()