84 |
84 |
85 def logout(self): |
85 def logout(self): |
86 """ Logout from SCC """ |
86 """ Logout from SCC """ |
87 return self.session.get(self.logout_url, stream=True) |
87 return self.session.get(self.logout_url, stream=True) |
88 |
88 |
89 def upload_file(self, filename, system_id, rs_filename=None, ol_filename=None, lr_filename=None): |
89 def upload_file(self, filename, system_id, rs_filename=None, ov_filename=None, lr_filename=None): |
90 """ Upload a filename for processing with a specific system. If the |
90 """ Upload a filename for processing with a specific system. If the |
91 upload is successful, it returns the measurement id. """ |
91 upload is successful, it returns the measurement id. """ |
92 # Get submit page |
92 # Get submit page |
93 upload_page = self.session.get(self.upload_url) |
93 upload_page = self.session.get(self.upload_url) |
94 |
94 |
98 |
98 |
99 if rs_filename is not None: |
99 if rs_filename is not None: |
100 logger.debug('Adding sounding file %s' % rs_filename) |
100 logger.debug('Adding sounding file %s' % rs_filename) |
101 files['sounding_file'] = open(rs_filename, 'rb') |
101 files['sounding_file'] = open(rs_filename, 'rb') |
102 |
102 |
103 if ol_filename is not None: |
103 if ov_filename is not None: |
104 logger.debug('Adding overlap file %s' % ol_filename) |
104 logger.debug('Adding overlap file %s' % ov_filename) |
105 files['overlap_file'] = open(ol_filename, 'rb') |
105 files['overlap_file'] = open(ov_filename, 'rb') |
106 |
106 |
107 if lr_filename is not None: |
107 if lr_filename is not None: |
108 logger.debug('Adding lidar ratio file %s' % lr_filename) |
108 logger.debug('Adding lidar ratio file %s' % lr_filename) |
109 files['lidar_ratio_file'] = open(lr_filename, 'rb') |
109 files['lidar_ratio_file'] = open(lr_filename, 'rb') |
110 |
110 |
111 logger.info("Uploading of file %s started." % filename) |
111 logger.info("Uploading of file(s) %s started." % filename) |
112 |
112 |
113 upload_submit = self.session.post(self.upload_url, |
113 upload_submit = self.session.post(self.upload_url, |
114 data=upload_data, |
114 data=upload_data, |
115 files=files, |
115 files=files, |
116 headers={'X-CSRFToken': upload_page.cookies['csrftoken'], |
116 headers={'X-CSRFToken': upload_page.cookies['csrftoken'], |
121 return False |
121 return False |
122 |
122 |
123 # Check if there was a redirect to a new page. |
123 # Check if there was a redirect to a new page. |
124 if upload_submit.url == self.upload_url: |
124 if upload_submit.url == self.upload_url: |
125 measurement_id = False |
125 measurement_id = False |
126 logger.error("Uploaded file rejected! Try to upload manually to see the error.") |
126 logger.error("Uploaded file(s) rejected! Try to upload manually to see the error.") |
127 else: |
127 else: |
128 measurement_id = re.findall(regex, upload_submit.text)[0] |
128 measurement_id = re.findall(regex, upload_submit.text)[0] |
129 logger.info("Successfully uploaded measurement with id %s." % measurement_id) |
129 logger.info("Successfully uploaded measurement with id %s." % measurement_id) |
130 |
130 |
131 return measurement_id |
131 return measurement_id |
133 def download_files(self, measurement_id, subdir, download_url): |
133 def download_files(self, measurement_id, subdir, download_url): |
134 """ Downloads some files from the download_url to the specified |
134 """ Downloads some files from the download_url to the specified |
135 subdir. This method is used to download preprocessed file, optical |
135 subdir. This method is used to download preprocessed file, optical |
136 files etc. |
136 files etc. |
137 """ |
137 """ |
|
138 # TODO: Make downloading more robust (e.g. in case that files do not exist on server). |
138 # Get the file |
139 # Get the file |
139 request = self.session.get(download_url, stream=True) |
140 request = self.session.get(download_url, stream=True) |
140 |
141 |
141 if not request.ok: |
142 if not request.ok: |
142 raise Exception("Could not download files for measurement '%s'" % measurement_id) |
143 raise Exception("Could not download files for measurement '%s'" % measurement_id) |
145 local_dir = os.path.join(self.output_dir, measurement_id, subdir) |
146 local_dir = os.path.join(self.output_dir, measurement_id, subdir) |
146 if not os.path.exists(local_dir): |
147 if not os.path.exists(local_dir): |
147 os.makedirs(local_dir) |
148 os.makedirs(local_dir) |
148 |
149 |
149 # Save the file by chunk, needed if the file is big. |
150 # Save the file by chunk, needed if the file is big. |
150 memory_file = StringIO() |
151 memory_file = BytesIO() |
151 |
152 |
152 for chunk in request.iter_content(chunk_size=1024): |
153 for chunk in request.iter_content(chunk_size=1024): |
153 if chunk: # filter out keep-alive new chunks |
154 if chunk: # filter out keep-alive new chunks |
154 memory_file.write(chunk) |
155 memory_file.write(chunk) |
155 memory_file.flush() |
156 memory_file.flush() |
180 """ Download profile graphs for the measurement id. """ |
181 """ Download profile graphs for the measurement id. """ |
181 # Construct the download url |
182 # Construct the download url |
182 download_url = self.download_graph_pattern.format(measurement_id) |
183 download_url = self.download_graph_pattern.format(measurement_id) |
183 self.download_files(measurement_id, 'scc_plots', download_url) |
184 self.download_files(measurement_id, 'scc_plots', download_url) |
184 |
185 |
|
186 # TODO: Add download method for other types of files. |
|
187 |
185 def rerun_processing(self, measurement_id, monitor=True): |
188 def rerun_processing(self, measurement_id, monitor=True): |
186 measurement, status = self.get_measurement(measurement_id) |
189 measurement, status = self.get_measurement(measurement_id) |
187 |
190 |
188 if measurement: |
191 if measurement: |
189 request = self.session.get(measurement.rerun_processing_url, stream=True) |
192 request = self.session.get(measurement.rerun_processing_url, stream=True) |
213 return |
216 return |
214 |
217 |
215 if monitor: |
218 if monitor: |
216 self.monitor_processing(measurement_id) |
219 self.monitor_processing(measurement_id) |
217 |
220 |
218 def process(self, filename, system_id, monitor, rs_filename=None): |
221 def process(self, filename, system_id, monitor, rs_filename=None, lr_filename=None, ov_filename=None): |
219 """ Upload a file for processing and wait for the processing to finish. |
222 """ Upload a file for processing and wait for the processing to finish. |
220 If the processing is successful, it will download all produced files. |
223 If the processing is successful, it will download all produced files. |
221 """ |
224 """ |
222 logger.info("--- Processing started on %s. ---" % datetime.datetime.now()) |
225 logger.info("--- Processing started on %s. ---" % datetime.datetime.now()) |
223 # Upload file |
226 # Upload file |
224 logger.info("--- Uploading file") |
227 logger.info("--- Uploading file") |
225 measurement_id = self.upload_file(filename, system_id, rs_filename=rs_filename) |
228 measurement_id = self.upload_file(filename, system_id, |
226 |
229 rs_filename=rs_filename, |
227 logger.info("--- Monitoring processing") |
230 lr_filename=lr_filename, |
228 if monitor: |
231 ov_filename=ov_filename) |
|
232 |
|
233 if measurement_id and monitor: |
|
234 logger.info("--- Monitoring processing") |
229 return self.monitor_processing(measurement_id) |
235 return self.monitor_processing(measurement_id) |
|
236 |
230 return None |
237 return None |
231 |
238 |
232 def monitor_processing(self, measurement_id): |
239 def monitor_processing(self, measurement_id): |
233 """ Monitor the processing progress of a measurement id""" |
240 """ Monitor the processing progress of a measurement id""" |
234 |
241 |
237 error_max = 6 |
244 error_max = 6 |
238 time_sleep = 10 |
245 time_sleep = 10 |
239 |
246 |
240 # try to wait for measurement to appear in API |
247 # try to wait for measurement to appear in API |
241 measurement = None |
248 measurement = None |
242 logger.info("looking for measurement %s in SCC", measurement_id) |
249 logger.info("Looking for measurement %s in SCC", measurement_id) |
243 while error_count < error_max: |
250 while error_count < error_max: |
244 time.sleep(time_sleep) |
251 time.sleep(time_sleep) |
245 measurement, status = self.get_measurement(measurement_id) |
252 measurement, status = self.get_measurement(measurement_id) |
246 if status != 200 and error_count < error_max: |
253 if status != 200 and error_count < error_max: |
247 logger.error("measurement not found. waiting %ds", time_sleep) |
254 logger.error("Measurement not found. waiting %ds", time_sleep) |
248 error_count += 1 |
255 error_count += 1 |
249 else: |
256 else: |
250 break |
257 break |
251 |
258 |
252 if error_count == error_max: |
259 if error_count == error_max: |
253 logger.critical("measurement %s doesn't seem to exist", measurement_id) |
260 logger.critical("Measurement %s doesn't seem to exist", measurement_id) |
254 sys.exit(1) |
261 sys.exit(1) |
255 |
262 |
256 logger.info('measurement %s found', measurement_id) |
263 logger.info('Measurement %s found', measurement_id) |
257 |
264 |
258 if measurement is not None: |
265 if measurement is not None: |
259 while measurement.is_running: |
266 while measurement.is_running: |
260 logger.info("Measurement is being processed (status: %s, %s, %s). Please wait.", measurement.upload, measurement.pre_processing, measurement.processing) |
267 logger.info("Measurement is being processed (status: %s, %s, %s). Please wait.", measurement.upload, measurement.pre_processing, measurement.processing) |
261 time.sleep(10) |
268 time.sleep(10) |
456 self.upload, |
463 self.upload, |
457 self.pre_processing, |
464 self.pre_processing, |
458 self.processing) |
465 self.processing) |
459 |
466 |
460 |
467 |
461 def upload_file(filename, system_id, settings, rs_filename=None): |
468 def process_file(filename, system_id, settings, monitor=True, rs_filename=None, lr_filename=None, ov_filename=None): |
462 """ Shortcut function to upload a file to the SCC. """ |
469 """ Shortcut function to process a file to the SCC. """ |
463 logger.info("Uploading file %s, using sytem %s" % (filename, system_id)) |
470 logger.info("Processing file %s, using system %s" % (filename, system_id)) |
464 |
471 |
465 scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) |
472 scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) |
466 scc.login(settings['website_credentials']) |
473 scc.login(settings['website_credentials']) |
467 measurement_id = scc.upload_file(filename, system_id, rs_filename=rs_filename) |
474 measurement = scc.process(filename, system_id, |
468 scc.logout() |
475 monitor=monitor, |
469 return measurement_id |
476 rs_filename=rs_filename, |
470 |
477 lr_filename=lr_filename, |
471 |
478 ov_filename=ov_filename) |
472 def process_file(filename, system_id, settings, rs_filename=None): |
|
473 """ Shortcut function to process a file to the SCC. """ |
|
474 logger.info("Processing file %s, using sytem %s" % (filename, system_id)) |
|
475 |
|
476 scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) |
|
477 scc.login(settings['website_credentials']) |
|
478 measurement = scc.process(filename, system_id, rs_filename=rs_filename) |
|
479 scc.logout() |
479 scc.logout() |
480 return measurement |
480 return measurement |
481 |
481 |
482 |
482 |
483 def delete_measurement(measurement_ids, settings): |
483 def delete_measurements(measurement_ids, settings): |
484 """ Shortcut function to delete measurements from the SCC. """ |
484 """ Shortcut function to delete measurements from the SCC. """ |
485 scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) |
485 scc = SCC(settings['basic_credentials'], settings['output_dir'], settings['base_url']) |
486 scc.login(settings['website_credentials']) |
486 scc.login(settings['website_credentials']) |
487 for m_id in measurement_ids: |
487 for m_id in measurement_ids: |
488 logger.info("Deleting %s" % m_id) |
488 logger.info("Deleting %s" % m_id) |
566 |
566 |
567 |
567 |
568 # Setup for command specific parsers |
568 # Setup for command specific parsers |
569 def setup_delete(parser): |
569 def setup_delete(parser): |
570 def delete_from_args(parsed): |
570 def delete_from_args(parsed): |
571 delete_measurement(parsed.IDs, parsed.config) |
571 delete_measurements(parsed.IDs, parsed.config) |
572 |
572 |
573 parser.add_argument("IDs", nargs="+", help="measurement IDs to delete.") |
573 parser.add_argument("IDs", nargs="+", help="measurement IDs to delete.") |
574 parser.set_defaults(execute=delete_from_args) |
574 parser.set_defaults(execute=delete_from_args) |
575 |
575 |
576 |
576 |
593 action="store_true") |
593 action="store_true") |
594 parser.set_defaults(execute=rerun_processing_from_args) |
594 parser.set_defaults(execute=rerun_processing_from_args) |
595 |
595 |
596 |
596 |
597 def setup_process_file(parser): |
597 def setup_process_file(parser): |
|
598 """ Upload and monitor processing progress.""" |
598 def process_file_from_args(parsed): |
599 def process_file_from_args(parsed): |
599 process_file(parsed.file, parsed.system, parsed.process, parsed.config, parsed.radiosounding) |
600 process_file(parsed.filename, parsed.system, parsed.config, monitor=True, |
|
601 rs_filename=parsed.radiosounding, |
|
602 ov_filename=parsed.overlap, |
|
603 lr_filename=parsed.lidarratio) |
600 |
604 |
601 parser.add_argument("filename", help="Measurement file name or path.") |
605 parser.add_argument("filename", help="Measurement file name or path.") |
602 parser.add_argument("system", help="Processing system id.") |
606 parser.add_argument("system", help="Processing system id.") |
603 parser.add_argument("--radiosounding", default=None, help="Radiosounding file name or path") |
607 parser.add_argument("--radiosounding", default=None, help="Radiosounding file name or path") |
604 parser.add_argument("-p", "--process", help="Wait for the results of the processing.", |
608 parser.add_argument("--overlap", default=None, help="Overlap file name or path") |
605 action="store_true") |
609 parser.add_argument("--lidarratio", default=None, help="Lidar ratio file name or path") |
|
610 |
606 parser.set_defaults(execute=process_file_from_args) |
611 parser.set_defaults(execute=process_file_from_args) |
607 |
612 |
608 |
613 |
609 def setup_upload_file(parser): |
614 def setup_upload_file(parser): |
|
615 """ Upload but do not monitor processing progress. """ |
610 def upload_file_from_args(parsed): |
616 def upload_file_from_args(parsed): |
611 upload_file(parsed.file, parsed.system, parsed.config, parsed.radiosounding) |
617 process_file(parsed.filename, parsed.system, parsed.config, monitor=False, |
|
618 rs_filename=parsed.radiosounding, |
|
619 ov_filename=parsed.overlap, |
|
620 lr_filename=parsed.lidarratio) |
612 |
621 |
613 parser.add_argument("filename", help="Measurement file name or path.") |
622 parser.add_argument("filename", help="Measurement file name or path.") |
614 parser.add_argument("system", help="Processing system id.") |
623 parser.add_argument("system", help="Processing system id.") |
615 parser.add_argument("--radiosounding", default=None, help="Radiosounding file name or path") |
624 parser.add_argument("--radiosounding", default=None, help="Radiosounding file name or path") |
|
625 parser.add_argument("--overlap", default=None, help="Overlap file name or path") |
|
626 parser.add_argument("--lidarratio", default=None, help="Lidar ratio file name or path") |
616 |
627 |
617 parser.set_defaults(execute=upload_file_from_args) |
628 parser.set_defaults(execute=upload_file_from_args) |
618 |
629 |
619 |
630 |
620 def setup_list_measurements(parser): |
631 def setup_list_measurements(parser): |
667 # Define the command line arguments. |
678 # Define the command line arguments. |
668 parser = argparse.ArgumentParser() |
679 parser = argparse.ArgumentParser() |
669 subparsers = parser.add_subparsers() |
680 subparsers = parser.add_subparsers() |
670 |
681 |
671 delete_parser = subparsers.add_parser("delete", help="Deletes a measurement.") |
682 delete_parser = subparsers.add_parser("delete", help="Deletes a measurement.") |
672 rerun_all_parser = subparsers.add_parser("rerun-all", help="Rerun a measurement.") |
683 rerun_all_parser = subparsers.add_parser("rerun-all", help="Reprocess a measurement on the SCC.") |
673 rerun_processing_parser = subparsers.add_parser("rerun-processing", |
684 rerun_processing_parser = subparsers.add_parser("rerun-processing", |
674 help="Rerun processing routings for a measurement.") |
685 help="Rerun processing routings for a measurement.") |
675 process_file_parser = subparsers.add_parser("process-file", help="Process a file.") |
686 process_file_parser = subparsers.add_parser("process-file", help="Upload a file and download procesing results.") |
676 upload_file_parser = subparsers.add_parser("upload-file", help="Upload a file.") |
687 upload_file_parser = subparsers.add_parser("upload-file", help="Upload a file.") |
677 list_parser = subparsers.add_parser("list", help="List all measurements.") |
688 list_parser = subparsers.add_parser("list", help="List measurements registered on the SCC.") |
678 download_parser = subparsers.add_parser("download", help="Download selected measurements.") |
689 download_parser = subparsers.add_parser("download", help="Download selected measurements.") |
679 |
690 |
680 setup_delete(delete_parser) |
691 setup_delete(delete_parser) |
681 setup_rerun_all(rerun_all_parser) |
692 setup_rerun_all(rerun_all_parser) |
682 setup_rerun_processing(rerun_processing_parser) |
693 setup_rerun_processing(rerun_processing_parser) |
691 ) |
702 ) |
692 parser.add_argument('-s', '--silent', help="Show only warning and error messages.", action="store_const", |
703 parser.add_argument('-s', '--silent', help="Show only warning and error messages.", action="store_const", |
693 dest="loglevel", const=logging.WARNING |
704 dest="loglevel", const=logging.WARNING |
694 ) |
705 ) |
695 |
706 |
|
707 # Setup default config location |
696 home = os.path.expanduser("~") |
708 home = os.path.expanduser("~") |
697 default_config_location = os.path.abspath(os.path.join(home, ".scc_access.yaml")) |
709 default_config_location = os.path.abspath(os.path.join(home, ".scc_access.yaml")) |
698 parser.add_argument("-c", "--config", help="Path to the config file.", type=settings_from_path, |
710 parser.add_argument("-c", "--config", help="Path to the config file.", type=settings_from_path, |
699 default=default_config_location) |
711 default=default_config_location) |
700 |
712 |