50 self.login_url = urlparse.urljoin(self.base_url, 'accounts/login/') |
50 self.login_url = urlparse.urljoin(self.base_url, 'accounts/login/') |
51 self.logout_url = urlparse.urljoin(self.base_url, 'accounts/logout/') |
51 self.logout_url = urlparse.urljoin(self.base_url, 'accounts/logout/') |
52 self.list_measurements_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/') |
52 self.list_measurements_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/') |
53 |
53 |
54 self.upload_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/quick/') |
54 self.upload_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/quick/') |
|
55 self.measurement_page_pattern = urlparse.urljoin(self.base_url, 'data_processing/measurements/{0}/') |
55 self.download_hirelpp_pattern = urlparse.urljoin(self.base_url, |
56 self.download_hirelpp_pattern = urlparse.urljoin(self.base_url, |
56 'data_processing/measurements/{0}/download-hirelpp/') |
57 'data_processing/measurements/{0}/download-hirelpp/') |
57 self.download_cloudmask_pattern = urlparse.urljoin(self.base_url, |
58 self.download_cloudmask_pattern = urlparse.urljoin(self.base_url, |
58 'data_processing/measurements/{0}/download-cloudmask/') |
59 'data_processing/measurements/{0}/download-cloudmask/') |
59 |
60 |
98 |
99 |
99 def logout(self): |
100 def logout(self): |
100 """ Logout from SCC """ |
101 """ Logout from SCC """ |
101 return self.session.get(self.logout_url, stream=True) |
102 return self.session.get(self.logout_url, stream=True) |
102 |
103 |
103 def upload_file(self, filename, system_id, force_upload, delete_related, rs_filename=None, ov_filename=None, lr_filename=None): |
104 def upload_file(self, filename, system_id, force_upload, delete_related, delay=0, rs_filename=None, ov_filename=None, lr_filename=None): |
104 """ Upload a filename for processing with a specific system. If the |
105 """ Upload a filename for processing with a specific system. If the |
105 upload is successful, it returns the measurement id. """ |
106 upload is successful, it returns the measurement id. """ |
106 measurement_id = self.measurement_id_from_file(filename) |
107 measurement_id = self.measurement_id_from_file(filename) |
107 |
108 |
108 logger.debug('Checking if a measurement with the same id already exists on the SCC server.') |
109 logger.debug('Checking if a measurement with the same id already exists on the SCC server.') |
121 |
122 |
122 # Get submit page |
123 # Get submit page |
123 upload_page = self.session.get(self.upload_url) |
124 upload_page = self.session.get(self.upload_url) |
124 |
125 |
125 # Submit the data |
126 # Submit the data |
126 upload_data = {'system': system_id} |
127 upload_data = {'system': system_id, |
|
128 'delay': delay} |
|
129 |
|
130 logger.debug("Submitted processing parameters - System: {}, Delay: {}".format(system_id, delay)) |
|
131 |
127 files = {'data': open(filename, 'rb')} |
132 files = {'data': open(filename, 'rb')} |
128 |
133 |
129 if rs_filename is not None: |
134 if rs_filename is not None: |
130 ancillary_file, _ = self.get_ancillary(rs_filename, 'sounding') |
135 ancillary_file, _ = self.get_ancillary(rs_filename, 'sounding') |
131 |
136 |
171 measurement_id = False |
176 measurement_id = False |
172 logger.error("Uploaded file(s) rejected! Try to upload manually to see the error.") |
177 logger.error("Uploaded file(s) rejected! Try to upload manually to see the error.") |
173 else: |
178 else: |
174 measurement_id = re.findall(regex, upload_submit.text)[0] |
179 measurement_id = re.findall(regex, upload_submit.text)[0] |
175 logger.info("Successfully uploaded measurement with id %s." % measurement_id) |
180 logger.info("Successfully uploaded measurement with id %s." % measurement_id) |
176 |
181 logger.info("You can monitor the processing progress online: {}".format(self.measurement_page_pattern.format(measurement_id))) |
177 return measurement_id |
182 return measurement_id |
178 |
183 |
179 @staticmethod |
184 @staticmethod |
180 def measurement_id_from_file(filename): |
185 def measurement_id_from_file(filename): |
181 """ Get the measurement id from the input file. """ |
186 """ Get the measurement id from the input file. """ |
337 logger.info("Rerun-all command submitted successfully for id {}.".format(measurement_id)) |
342 logger.info("Rerun-all command submitted successfully for id {}.".format(measurement_id)) |
338 |
343 |
339 if monitor: |
344 if monitor: |
340 self.monitor_processing(measurement_id) |
345 self.monitor_processing(measurement_id) |
341 |
346 |
342 def process(self, filename, system_id, monitor, force_upload, delete_related, rs_filename=None, lr_filename=None, ov_filename=None): |
347 def process(self, filename, system_id, monitor, force_upload, delete_related, delay=0, rs_filename=None, lr_filename=None, ov_filename=None): |
343 """ Upload a file for processing and wait for the processing to finish. |
348 """ Upload a file for processing and wait for the processing to finish. |
344 If the processing is successful, it will download all produced files. |
349 If the processing is successful, it will download all produced files. |
345 """ |
350 """ |
346 logger.info("--- Processing started on %s. ---" % datetime.datetime.now()) |
351 logger.info("--- Processing started on %s. ---" % datetime.datetime.now()) |
347 # Upload file |
352 # Upload file |
348 logger.info("Uploading file.") |
353 logger.info("Uploading file.") |
349 measurement_id = self.upload_file(filename, system_id, force_upload, delete_related, |
354 measurement_id = self.upload_file(filename, system_id, force_upload, delete_related, |
|
355 delay=delay, |
350 rs_filename=rs_filename, |
356 rs_filename=rs_filename, |
351 lr_filename=lr_filename, |
357 lr_filename=lr_filename, |
352 ov_filename=ov_filename) |
358 ov_filename=ov_filename) |
353 |
359 |
|
360 if monitor and (delay > 0): |
|
361 logger.warning("Will not start monitoring, since a delay was specified: {} hours.".format(delay)) |
|
362 return None |
|
363 |
354 if measurement_id and monitor: |
364 if measurement_id and monitor: |
355 logger.info("Monitoring processing") |
365 logger.info("Monitoring processing.") |
356 return self.monitor_processing(measurement_id) |
366 return self.monitor_processing(measurement_id) |
357 |
367 |
358 return None |
368 return None |
359 |
369 |
360 def monitor_processing(self, measurement_id): |
370 def monitor_processing(self, measurement_id): |
706 self._log_module_status('ELIC', self.elic, self.elic_exit_code) |
716 self._log_module_status('ELIC', self.elic, self.elic_exit_code) |
707 self._log_module_status('ELQuick', self.elquick, self.elquick_exit_code) |
717 self._log_module_status('ELQuick', self.elquick, self.elquick_exit_code) |
708 |
718 |
709 def _log_module_status(self, name, status, exit_code): |
719 def _log_module_status(self, name, status, exit_code): |
710 if exit_code: |
720 if exit_code: |
711 logger.warning("{0} exit code: {2[exit_code]} - {2[description]}".format(name, status, exit_code)) |
721 if exit_code['exit_code'] > 0: |
|
722 logger.warning("{0} exit code: {2[exit_code]} - {2[description]}".format(name, status, exit_code)) |
|
723 else: |
|
724 logger.info("{0} exit code: {2[exit_code]} - {2[description]}".format(name, status, exit_code)) |
712 else: |
725 else: |
713 logger.info("{0} exit code: {2}".format(name, status, exit_code)) |
726 logger.info("{0} exit code: {2}".format(name, status, exit_code)) |
714 |
727 |
715 @property |
728 @property |
716 def rerun_elda_url(self): |
729 def rerun_elda_url(self): |
746 self.filename, |
759 self.filename, |
747 self.status) |
760 self.status) |
748 |
761 |
749 |
762 |
750 def process_file(filename, system_id, settings, force_upload, delete_related, |
763 def process_file(filename, system_id, settings, force_upload, delete_related, |
751 monitor=True, rs_filename=None, lr_filename=None, ov_filename=None): |
764 delay=0, monitor=True, rs_filename=None, lr_filename=None, ov_filename=None): |
752 """ Shortcut function to process a file to the SCC. """ |
765 """ Shortcut function to process a file to the SCC. """ |
753 logger.info("Processing file %s, using system %s" % (filename, system_id)) |
766 logger.info("Processing file %s, using system %s" % (filename, system_id)) |
754 |
767 |
755 with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: |
768 with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: |
756 scc.login(settings['website_credentials']) |
769 scc.login(settings['website_credentials']) |
757 measurement = scc.process(filename, system_id, |
770 measurement = scc.process(filename, system_id, |
758 force_upload=force_upload, |
771 force_upload=force_upload, |
759 delete_related=delete_related, |
772 delete_related=delete_related, |
|
773 delay=delay, |
760 monitor=monitor, |
774 monitor=monitor, |
761 rs_filename=rs_filename, |
775 rs_filename=rs_filename, |
762 lr_filename=lr_filename, |
776 lr_filename=lr_filename, |
763 ov_filename=ov_filename) |
777 ov_filename=ov_filename) |
764 scc.logout() |
778 scc.logout() |
768 def delete_measurements(measurement_ids, delete_related, settings): |
782 def delete_measurements(measurement_ids, delete_related, settings): |
769 """ Shortcut function to delete measurements from the SCC. """ |
783 """ Shortcut function to delete measurements from the SCC. """ |
770 with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: |
784 with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: |
771 scc.login(settings['website_credentials']) |
785 scc.login(settings['website_credentials']) |
772 for m_id in measurement_ids: |
786 for m_id in measurement_ids: |
773 logger.info("Deleting %s" % m_id) |
787 logger.info("Deleting %s." % m_id) |
774 scc.delete_measurement(m_id, delete_related) |
788 scc.delete_measurement(m_id, delete_related) |
775 scc.logout() |
789 scc.logout() |
776 |
790 |
777 |
791 |
778 def rerun_all(measurement_ids, monitor, settings): |
792 def rerun_all(measurement_ids, monitor, settings): |
779 """ Shortcut function to rerun measurements from the SCC. """ |
793 """ Shortcut function to rerun measurements from the SCC. """ |
780 |
794 |
781 with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: |
795 with SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) as scc: |
782 scc.login(settings['website_credentials']) |
796 scc.login(settings['website_credentials']) |
783 for m_id in measurement_ids: |
797 for m_id in measurement_ids: |
784 logger.info("Rerunning all products for %s" % m_id) |
798 logger.info("Rerunning all products for %s." % m_id) |
785 scc.rerun_all(m_id, monitor) |
799 scc.rerun_all(m_id, monitor) |
786 scc.logout() |
800 scc.logout() |
787 |
801 |
788 |
802 |
789 def rerun_processing(measurement_ids, monitor, settings): |
803 def rerun_processing(measurement_ids, monitor, settings): |
884 |
898 |
885 def setup_upload_file(parser): |
899 def setup_upload_file(parser): |
886 """ Upload but do not monitor processing progress. """ |
900 """ Upload but do not monitor processing progress. """ |
887 def upload_file_from_args(parsed): |
901 def upload_file_from_args(parsed): |
888 process_file(parsed.filename, parsed.system, parsed.config, |
902 process_file(parsed.filename, parsed.system, parsed.config, |
|
903 delay=parsed.delay, |
889 monitor=parsed.process, |
904 monitor=parsed.process, |
890 force_upload=parsed.force_upload, |
905 force_upload=parsed.force_upload, |
891 delete_related=False, # For now, use this as default |
906 delete_related=False, # For now, use this as default |
892 rs_filename=parsed.radiosounding, |
907 rs_filename=parsed.radiosounding, |
893 ov_filename=parsed.overlap, |
908 ov_filename=parsed.overlap, |
894 lr_filename=parsed.lidarratio) |
909 lr_filename=parsed.lidarratio) |
895 |
910 |
|
911 def delay(arg): |
|
912 try: |
|
913 int_arg = int(arg) |
|
914 except ValueError: |
|
915 raise argparse.ArgumentTypeError("Could not convert delay argument {} to integer.".format(arg)) |
|
916 |
|
917 if 0 <= int_arg <= 96: |
|
918 return int_arg |
|
919 else: |
|
920 raise argparse.ArgumentTypeError("Delay should be an integer between 0 and 96.") |
|
921 |
896 parser.add_argument("filename", help="Measurement file name or path.") |
922 parser.add_argument("filename", help="Measurement file name or path.") |
897 parser.add_argument("system", help="Processing system id.") |
923 parser.add_argument("system", help="Processing system id.") |
|
924 parser.add_argument("--delay", help="Delay processing by the specified number of hours (0 to 96).", |
|
925 default=0, type=delay) |
898 parser.add_argument("-p", "--process", help="Wait for the processing results.", |
926 parser.add_argument("-p", "--process", help="Wait for the processing results.", |
899 action="store_true") |
927 action="store_true") |
900 parser.add_argument("--force_upload", help="If measurement ID exists on SCC, delete before uploading.", |
928 parser.add_argument("--force_upload", help="If measurement ID exists on SCC, delete before uploading.", |
901 action="store_true") |
929 action="store_true") |
902 parser.add_argument("--radiosounding", default=None, help="Radiosounding file name or path") |
930 parser.add_argument("--radiosounding", default=None, help="Radiosounding file name or path") |