Merging development branch.

Tue, 09 Feb 2021 09:41:52 +0200

author
ioannis@ioannis-VirtualBox
date
Tue, 09 Feb 2021 09:41:52 +0200
changeset 56
c9df25012168
parent 52
24dc95cbbf50 (current diff)
parent 55
c226bda2bb71 (diff)
child 57
8d487e4e20ae

Merging development branch.

--- a/CHANGELOG.rst	Sat Jan 09 19:42:15 2021 +0200
+++ b/CHANGELOG.rst	Tue Feb 09 09:41:52 2021 +0200
@@ -1,5 +1,12 @@
 Changelog
 =========
+
+0.10.0 - 2021-02-04
+------------------
+* Support for new version of web interface (to be released on 2021-02-09.
+* Output of module exit codes and their description.
+* Support of the "delay" parameter when uploading files.
+
 0.9.0 - 2021-01-09
 ------------------
 * Added force_upload option
--- a/README.rst	Sat Jan 09 19:42:15 2021 +0200
+++ b/README.rst	Tue Feb 09 09:41:52 2021 +0200
@@ -8,15 +8,17 @@
 * Download the processed files and graphs
 * Delete an existing measurement from the SCC (with appropriate privileges)
 
-The main functions are implemented in a class (SCC) that you can also import 
-and use in your custom scripts.
 
-The script does not provide any feedback if a file upload fails. Before using
+The script provides limited feedback when a file upload fails. Before using
 the script, you will need to upload some files manually and be confident that 
 your SCC file format and processing settings are correct.
 
 Please note that this is not part of the "official" SCC tools.
 
+In principle, the main functions are implemented in a class (SCC) that you can also import
+and use in your custom scripts. However, error handling has to be improved in this case. If you are interested
+in such use, please contact me at ioannis@inoe.ro.
+
 Any suggestions for improvements and new features are more than welcome.
 
 Installation
@@ -72,11 +74,20 @@
 
    scc_access -c ./path/to/settings.yaml upload-file 20110101po01.nc 125
 
-By default, the SCC will reject an uploaded file, if the specified measurement id already exists on the server. You
+By default, the SCC will reject an uploaded file if the specified measurement id already exists on the server. You
 can instruct the script to delete any existing measurement before uploading using the `--force_upload` flag::
 
    scc_access upload-file 20110101po01.nc 125 -p --force_upload
 
+When uploading a measurement you can require that the processing is delayed by a specified number of hours. This
+can be used to assure that your files are processed using model input data, that typically have a delay of more than
+24 hours. You can specify the delay (in hours) using the --delay option::
+
+   scc_access upload-file 20110101po01.nc 125 --delay 48
+
+.. note::
+   When specifying a delay, the script will not wait to download the output files.
+
 If you want to delete an existing measurement id from the database use the `delete`
 command and the measurement id::
     
@@ -134,6 +145,7 @@
 
     optional arguments:
       -h, --help            show this help message and exit
+      --delay DELAY         Delay processing by the specified number of hours (0 to 96).
       -p, --process         Wait for the processing results.
       --force_upload        If measurement ID exists on SCC, delete before
                             uploading.
--- a/scc_access/__init__.py	Sat Jan 09 19:42:15 2021 +0200
+++ b/scc_access/__init__.py	Tue Feb 09 09:41:52 2021 +0200
@@ -1,1 +1,1 @@
-__version__ = "0.9.0"
\ No newline at end of file
+__version__ = "0.10.0"
\ No newline at end of file
--- a/scc_access/scc_access.py	Sat Jan 09 19:42:15 2021 +0200
+++ b/scc_access/scc_access.py	Tue Feb 09 09:41:52 2021 +0200
@@ -52,6 +52,7 @@
         self.list_measurements_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/')
 
         self.upload_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/quick/')
+        self.measurement_page_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/')
         self.download_hirelpp_pattern = urlparse.urljoin(self.base_url,
                                                              'data_processing/measurements/{0}/download-hirelpp/')
         self.download_cloudmask_pattern = urlparse.urljoin(self.base_url,
@@ -100,7 +101,7 @@
         """ Logout from SCC """
         return self.session.get(self.logout_url, stream=True)
 
-    def upload_file(self, filename, system_id, force_upload, delete_related, rs_filename=None, ov_filename=None, lr_filename=None):
+    def upload_file(self, filename, system_id, force_upload, delete_related, delay=0, rs_filename=None, ov_filename=None, lr_filename=None):
         """ Upload a filename for processing with a specific system. If the
         upload is successful, it returns the measurement id. """
         measurement_id = self.measurement_id_from_file(filename)
@@ -117,13 +118,18 @@
                 logger.error(
                     "Measurement with id {} already exists on the SCC. Use --force_upload flag to overwrite it.".format(
                         measurement_id))
+                # TODO: Implement handling at the proper place. This does not allow the SCC class to be used by external programs.
                 sys.exit(1)
 
         # Get submit page
         upload_page = self.session.get(self.upload_url)
 
         # Submit the data
-        upload_data = {'system': system_id}
+        upload_data = {'system': system_id,
+                       'delay': delay}
+
+        logger.debug("Submitted processing parameters - System: {}, Delay: {}".format(system_id, delay))
+
         files = {'data': open(filename, 'rb')}
 
         if rs_filename is not None:
@@ -173,7 +179,7 @@
         else:
             measurement_id = re.findall(regex, upload_submit.text)[0]
             logger.info("Successfully uploaded measurement with id %s." % measurement_id)
-
+            logger.info("You can monitor the processing progress online: {}".format(self.measurement_page_pattern.format(measurement_id)))
         return measurement_id
 
     @staticmethod
@@ -339,7 +345,7 @@
             if monitor:
                 self.monitor_processing(measurement_id)
 
-    def process(self, filename, system_id, monitor,  force_upload, delete_related, rs_filename=None, lr_filename=None, ov_filename=None):
+    def process(self, filename, system_id, monitor,  force_upload, delete_related, delay=0, rs_filename=None, lr_filename=None, ov_filename=None):
         """ Upload a file for processing and wait for the processing to finish.
         If the processing is successful, it will download all produced files.
         """
@@ -347,12 +353,17 @@
         # Upload file
         logger.info("Uploading file.")
         measurement_id = self.upload_file(filename, system_id, force_upload, delete_related,
+                                          delay=delay,
                                           rs_filename=rs_filename,
                                           lr_filename=lr_filename,
                                           ov_filename=ov_filename)
 
+        if monitor and (delay > 0):
+            logger.warning("Will not start monitoring, since a delay was specified: {} hours.".format(delay))
+            return None
+
         if measurement_id and monitor:
-            logger.info("Monitoring processing")
+            logger.info("Monitoring processing.")
             return self.monitor_processing(measurement_id)
 
         return None
@@ -362,8 +373,8 @@
 
         # try to deal with error 404
         error_count = 0
-        error_max = 6
-        time_sleep = 10
+        error_max = 3
+        time_sleep = 3
 
         # try to wait for measurement to appear in API
         measurement = None
@@ -385,17 +396,13 @@
 
         if measurement is not None:
             while measurement.is_running:
-                logger.info("Measurement is being processed. status: {}, {}, {}, {}, {}, {}). Please wait.".format(
-                    measurement.upload,
-                    measurement.hirelpp,
-                    measurement.cloudmask,
-                    measurement.elpp,
-                    measurement.elda,
-                    measurement.elic))
+                measurement.log_processing_status()
                 time.sleep(10)
                 measurement, status = self.get_measurement(measurement_id)
 
             logger.info("Measurement processing finished.")
+            measurement.log_detailed_status()
+
             if measurement.hirelpp == 127:
                 logger.info("Downloading HiRElPP files.")
                 self.download_hirelpp(measurement_id)
@@ -413,9 +420,7 @@
             if measurement.elic == 127:
                 logger.info("Downloading ELIC files.")
                 self.download_elic(measurement_id)
-
-            # TODO: Need to check ELDEC code (when it becomes available in the API)
-            if measurement.is_calibration:
+            if measurement.is_calibration and measurement.eldec==0:
                 logger.info("Downloading ELDEC files.")
                 self.download_eldec(measurement_id)
             logger.info("--- Processing finished. ---")
@@ -441,6 +446,8 @@
         else:
             logger.error('Could not access API. Status code %s.' % response.status_code)
 
+        # TODO: Implement better handling for status 401.
+
         if response_dict:
             measurement = Measurement(self.base_url, response_dict)
         else:
@@ -660,16 +667,26 @@
     def __init__(self, base_url, dict_response):
 
         # Define expected attributes to assist debugging
+
+        self.hirelpp = None
+        self.hirelpp_exit_code = None
         self.cloudmask = None
+        self.cloudmask_exit_code = None
+        self.elpp = None
+        self.elpp_exit_code = None
         self.elda = None
+        self.elda_exit_code = None
         self.elic = None
-        self.elpp = None
-        self.hirelpp = None
+        self.elic_exit_code = None
+        self.eldec = None
+        self.eldec_exit_code = None
+        self.elquick = None
+        self.elquick_exit_code = None
+
         self.id = None
         self.is_calibration = None
         self.is_running = None
-        self.pre_processing_exit_code = None
-        self.processing_exit_code = None
+
         self.resource_uri = None
         self.start = None
         self.stop = None
@@ -678,6 +695,39 @@
 
         super().__init__(base_url, dict_response)
 
+    def log_processing_status(self):
+        """ Log module status. """
+        logger.info("Measurement is being processed. Status: {}, {}, {}, {}, {}, {}). Please wait.".format(
+            self.upload,
+            self.hirelpp,
+            self.cloudmask,
+            self.elpp,
+            self.elda,
+            self.elic))
+
+    def log_detailed_status(self):
+        """ Log module exit and status codes."""
+        logger.info("Measurement exit status:".format(self.id))
+        if self.is_calibration:
+            self._log_module_status('ElPP', self.elpp, self.elpp_exit_code)
+            self._log_module_status('ElDEC', self.eldec, self.eldec_exit_code)
+        else:
+            self._log_module_status('HiRElPP', self.hirelpp, self.hirelpp_exit_code)
+            self._log_module_status('CloudScreen', self.cloudmask, self.cloudmask_exit_code)
+            self._log_module_status('ElPP', self.elpp, self.elpp_exit_code)
+            self._log_module_status('ELDA', self.elda, self.elda_exit_code)
+            self._log_module_status('ELIC', self.elic, self.elic_exit_code)
+            self._log_module_status('ELQuick', self.elquick, self.elquick_exit_code)
+
+    def _log_module_status(self, name, status, exit_code):
+        if exit_code:
+            if exit_code['exit_code'] > 0:
+                logger.warning("{0} exit code: {2[exit_code]} - {2[description]}".format(name, status, exit_code))
+            else:
+                logger.info("{0} exit code: {2[exit_code]} - {2[description]}".format(name, status, exit_code))
+        else:
+            logger.info("{0} exit code: {2}".format(name, status, exit_code))
+
     @property
     def rerun_elda_url(self):
         url_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/rerun-elda/')
@@ -714,7 +764,7 @@
 
 
 def process_file(filename, system_id, settings, force_upload, delete_related,
-                 monitor=True, rs_filename=None, lr_filename=None, ov_filename=None):
+                 delay=0, monitor=True, rs_filename=None, lr_filename=None, ov_filename=None):
     """ Shortcut function to process a file to the SCC. """
     logger.info("Processing file %s, using system %s" % (filename, system_id))
 
@@ -723,6 +773,7 @@
         measurement = scc.process(filename, system_id,
                                   force_upload=force_upload,
                                   delete_related=delete_related,
+                                  delay=delay,
                                   monitor=monitor,
                                   rs_filename=rs_filename,
                                   lr_filename=lr_filename,
@@ -736,7 +787,7 @@
     with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc:
         scc.login(settings['website_credentials'])
         for m_id in measurement_ids:
-            logger.info("Deleting %s" % m_id)
+            logger.info("Deleting %s." % m_id)
             scc.delete_measurement(m_id, delete_related)
         scc.logout()
 
@@ -747,7 +798,7 @@
     with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc:
         scc.login(settings['website_credentials'])
         for m_id in measurement_ids:
-            logger.info("Rerunning all products for %s" % m_id)
+            logger.info("Rerunning all products for %s." % m_id)
             scc.rerun_all(m_id, monitor)
         scc.logout()
 
@@ -852,6 +903,7 @@
     """ Upload but do not monitor processing progress. """
     def upload_file_from_args(parsed):
         process_file(parsed.filename, parsed.system, parsed.config,
+                     delay=parsed.delay,
                      monitor=parsed.process,
                      force_upload=parsed.force_upload,
                      delete_related=False,  # For now, use this as default
@@ -859,8 +911,21 @@
                      ov_filename=parsed.overlap,
                      lr_filename=parsed.lidarratio)
 
+    def delay(arg):
+        try:
+            int_arg = int(arg)
+        except ValueError:
+            raise argparse.ArgumentTypeError("Could not convert delay argument {} to integer.".format(arg))
+
+        if 0 <= int_arg <= 96:
+            return int_arg
+        else:
+            raise argparse.ArgumentTypeError("Delay should be an integer between 0 and 96.")
+
     parser.add_argument("filename", help="Measurement file name or path.")
     parser.add_argument("system", help="Processing system id.")
+    parser.add_argument("--delay", help="Delay processing by the specified number of hours (0 to 96).",
+                        default=0, type=delay)
     parser.add_argument("-p", "--process", help="Wait for the processing results.",
                         action="store_true")
     parser.add_argument("--force_upload", help="If measurement ID exists on SCC, delete before uploading.",

mercurial