1 #!/usr/bin/env python |
|
2 """ |
|
3 The MIT License (MIT) |
|
4 |
|
5 Copyright (c) 2015, Ioannis Binietoglou |
|
6 |
|
7 Permission is hereby granted, free of charge, to any person obtaining a copy |
|
8 of this software and associated documentation files (the "Software"), to deal |
|
9 in the Software without restriction, including without limitation the rights |
|
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|
11 copies of the Software, and to permit persons to whom the Software is |
|
12 furnished to do so, subject to the following conditions: |
|
13 |
|
14 The above copyright notice and this permission notice shall be included in |
|
15 all copies or substantial portions of the Software. |
|
16 |
|
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
|
23 THE SOFTWARE. |
|
24 """ |
|
25 |
|
26 __version__ = "0.6.0" |
|
27 |
|
28 # Try to read the settings from the settings.py file |
|
29 try: |
|
30 from settings import * |
|
31 except: |
|
32 raise ImportError( |
|
33 """A settings file (setting.py) is required to run the script. |
|
34 You can use settings.sample.py as a template.""") |
|
35 |
|
36 import requests |
|
37 requests.packages.urllib3.disable_warnings() |
|
38 |
|
39 import urlparse |
|
40 import argparse |
|
41 import os |
|
42 import re |
|
43 import time |
|
44 import StringIO |
|
45 from zipfile import ZipFile |
|
46 import datetime |
|
47 import logging |
|
48 |
|
49 logger = logging.getLogger(__name__) |
|
50 |
|
51 |
|
52 # Construct the absolute URLs |
|
53 LOGIN_URL = urlparse.urljoin(BASE_URL, 'accounts/login/') |
|
54 UPLOAD_URL = urlparse.urljoin(BASE_URL, 'data_processing/measurements/quick/') |
|
55 DOWNLOAD_PREPROCESSED = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/download-preprocessed/') |
|
56 DOWNLOAD_OPTICAL = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/download-optical/') |
|
57 DOWNLOAD_GRAPH = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/download-plots/') |
|
58 RERUN_ALL = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/rerun-all/') |
|
59 RERUN_PROCESSING = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/rerun-optical/') |
|
60 |
|
61 DELETE_MEASUREMENT = urlparse.urljoin(BASE_URL, 'admin/database/measurements/{0}/delete/') |
|
62 API_BASE_URL = urlparse.urljoin(BASE_URL, 'api/v1/') |
|
63 |
|
64 # The regex to find the measurement id from the measurement page |
|
65 # This should be read from the uploaded file, but would require an extra NetCDF module. |
|
66 regex = "<h3>Measurement (?P<measurement_id>.{12}) <small>" |
|
67 |
|
68 |
|
69 class SCC: |
|
70 """ A simple class that will attempt to upload a file on the SCC server. |
|
71 The uploading is done by simulating a normal browser session. In the current |
|
72 version no check is performed, and no feedback is given if the upload |
|
73 was successful. If everything is setup correctly, it will work. |
|
74 """ |
|
75 |
|
76 def __init__(self, auth=BASIC_LOGIN, output_dir=OUTPUT_DIR): |
|
77 self.auth = auth |
|
78 self.output_dir = output_dir |
|
79 self.session = requests.Session() |
|
80 |
|
81 def login(self, credentials=DJANGO_LOGIN): |
|
82 """ Login the the website. """ |
|
83 logger.debug("Attempting to login to SCC, username %s." % credentials[0]) |
|
84 self.login_credentials = {'username': credentials[0], |
|
85 'password': credentials[1]} |
|
86 |
|
87 logger.debug("Accessing login page at %s." % LOGIN_URL) |
|
88 |
|
89 # Get upload form |
|
90 login_page = self.session.get(LOGIN_URL, |
|
91 auth=self.auth, verify=False) |
|
92 |
|
93 logger.debug("Submiting credentials.") |
|
94 # Submit the login data |
|
95 login_submit = self.session.post(LOGIN_URL, |
|
96 data=self.login_credentials, |
|
97 headers={'X-CSRFToken': login_page.cookies['csrftoken'], |
|
98 'referer': LOGIN_URL}, |
|
99 verify=False, |
|
100 auth=self.auth) |
|
101 return login_submit |
|
102 |
|
103 def logout(self): |
|
104 pass |
|
105 |
|
106 def upload_file(self, filename, system_id): |
|
107 """ Upload a filename for processing with a specific system. If the |
|
108 upload is successful, it returns the measurement id. """ |
|
109 # Get submit page |
|
110 upload_page = self.session.get(UPLOAD_URL, |
|
111 auth=self.auth, |
|
112 verify=False) |
|
113 |
|
114 # Submit the data |
|
115 upload_data = {'system': system_id} |
|
116 files = {'data': open(filename, 'rb')} |
|
117 |
|
118 logging.info("Uploading of file %s started." % filename) |
|
119 |
|
120 upload_submit = self.session.post(UPLOAD_URL, |
|
121 data=upload_data, |
|
122 files=files, |
|
123 headers={'X-CSRFToken': upload_page.cookies['csrftoken'], |
|
124 'referer': UPLOAD_URL}, |
|
125 verify=False, |
|
126 auth=self.auth) |
|
127 |
|
128 if upload_submit.status_code != 200: |
|
129 logging.warning("Connection error. Status code: %s" % upload_submit.status_code) |
|
130 return False |
|
131 |
|
132 # Check if there was a redirect to a new page. |
|
133 if upload_submit.url == UPLOAD_URL: |
|
134 measurement_id = False |
|
135 logging.error("Uploaded file rejected! Try to upload manually to see the error.") |
|
136 else: |
|
137 measurement_id = re.findall(regex, upload_submit.text)[0] |
|
138 logging.error("Successfully uploaded measurement with id %s." % measurement_id) |
|
139 |
|
140 return measurement_id |
|
141 |
|
142 def download_files(self, measurement_id, subdir, download_url): |
|
143 """ Downloads some files from the download_url to the specified |
|
144 subdir. This method is used to download preprocessed file, optical |
|
145 files etc. |
|
146 """ |
|
147 # Get the file |
|
148 request = self.session.get(download_url, auth=self.auth, |
|
149 verify=False, |
|
150 stream=True) |
|
151 |
|
152 # Create the dir if it does not exist |
|
153 local_dir = os.path.join(self.output_dir, measurement_id, subdir) |
|
154 if not os.path.exists(local_dir): |
|
155 os.makedirs(local_dir) |
|
156 |
|
157 # Save the file by chunk, needed if the file is big. |
|
158 memory_file = StringIO.StringIO() |
|
159 |
|
160 for chunk in request.iter_content(chunk_size=1024): |
|
161 if chunk: # filter out keep-alive new chunks |
|
162 memory_file.write(chunk) |
|
163 memory_file.flush() |
|
164 |
|
165 zip_file = ZipFile(memory_file) |
|
166 |
|
167 for ziped_name in zip_file.namelist(): |
|
168 basename = os.path.basename(ziped_name) |
|
169 |
|
170 local_file = os.path.join(local_dir, basename) |
|
171 |
|
172 with open(local_file, 'wb') as f: |
|
173 f.write(zip_file.read(ziped_name)) |
|
174 |
|
175 def download_preprocessed(self, measurement_id): |
|
176 """ Download preprocessed files for the measurement id. """ |
|
177 # Construct the download url |
|
178 download_url = DOWNLOAD_PREPROCESSED.format(measurement_id) |
|
179 self.download_files(measurement_id, 'scc_preprocessed', download_url) |
|
180 |
|
181 def download_optical(self, measurement_id): |
|
182 """ Download optical files for the measurement id. """ |
|
183 # Construct the download url |
|
184 download_url = DOWNLOAD_OPTICAL.format(measurement_id) |
|
185 self.download_files(measurement_id, 'scc_optical', download_url) |
|
186 |
|
187 def download_graphs(self, measurement_id): |
|
188 """ Download profile graphs for the measurement id. """ |
|
189 # Construct the download url |
|
190 download_url = DOWNLOAD_GRAPH.format(measurement_id) |
|
191 self.download_files(measurement_id, 'scc_plots', download_url) |
|
192 |
|
193 def rerun_processing(self, measurement_id, monitor=True): |
|
194 measurement = self.get_measurement(measurement_id) |
|
195 |
|
196 if measurement: |
|
197 request = self.session.get(measurement.rerun_processing_url, auth=self.auth, |
|
198 verify=False, |
|
199 stream=True) |
|
200 |
|
201 if request.status_code != 200: |
|
202 logging.error("Could not rerun processing for %s. Status code: %s" % (measurement_id, request.status_code)) |
|
203 return |
|
204 |
|
205 if monitor: |
|
206 self.monitor_processing(measurement_id) |
|
207 |
|
208 def rerun_all(self, measurement_id, monitor=True): |
|
209 logger.debug("Started rerun_all procedure.") |
|
210 |
|
211 logger.debug("Getting measurement %s" % measurement_id) |
|
212 measurement = self.get_measurement(measurement_id) |
|
213 |
|
214 if measurement: |
|
215 logger.debug("Attempting to rerun all processing through %s." % measurement.rerun_all_url) |
|
216 |
|
217 request = self.session.get(measurement.rerun_all_url, auth=self.auth, |
|
218 verify=False, |
|
219 stream=True) |
|
220 |
|
221 if request.status_code != 200: |
|
222 logger.error("Could not rerun pre processing for %s. Status code: %s" % |
|
223 (measurement_id, request.status_code)) |
|
224 return |
|
225 |
|
226 if monitor: |
|
227 self.monitor_processing(measurement_id) |
|
228 |
|
229 def process(self, filename, system_id): |
|
230 """ Upload a file for processing and wait for the processing to finish. |
|
231 If the processing is successful, it will download all produced files. |
|
232 """ |
|
233 logger.info("--- Processing started on %s. ---" % datetime.datetime.now()) |
|
234 # Upload file |
|
235 measurement_id = self.upload_file(filename, system_id) |
|
236 |
|
237 measurement = self.monitor_processing(measurement_id) |
|
238 return measurement |
|
239 |
|
240 def monitor_processing(self, measurement_id): |
|
241 """ Monitor the processing progress of a measurement id""" |
|
242 |
|
243 measurement = self.get_measurement(measurement_id) |
|
244 if measurement is not None: |
|
245 while measurement.is_running: |
|
246 logger.info("Measurement is being processed (status: %s, %s, %s). Please wait." % (measurement.upload, |
|
247 measurement.pre_processing, |
|
248 measurement.processing)) |
|
249 time.sleep(10) |
|
250 measurement = self.get_measurement(measurement_id) |
|
251 logger.info("Measurement processing finished (status: %s, %s, %s)." % (measurement.upload, |
|
252 measurement.pre_processing, |
|
253 measurement.processing)) |
|
254 if measurement.pre_processing == 127: |
|
255 logger.info("Downloading preprocessed files.") |
|
256 self.download_preprocessed(measurement_id) |
|
257 if measurement.processing == 127: |
|
258 logger.info("Downloading optical files.") |
|
259 self.download_optical(measurement_id) |
|
260 logger.info("Downloading graphs.") |
|
261 self.download_graphs(measurement_id) |
|
262 logger.info("--- Processing finished. ---") |
|
263 return measurement |
|
264 |
|
265 def get_status(self, measurement_id): |
|
266 """ Get the processing status for a measurement id through the API. """ |
|
267 measurement_url = urlparse.urljoin(API_BASE_URL, 'measurements/?id__exact=%s' % measurement_id) |
|
268 |
|
269 response = self.session.get(measurement_url, |
|
270 auth=self.auth, |
|
271 verify=False) |
|
272 |
|
273 response_dict = response.json() |
|
274 |
|
275 if response_dict['objects']: |
|
276 measurement_list = response_dict['objects'] |
|
277 measurement = Measurement(measurement_list[0]) |
|
278 return (measurement.upload, measurement.pre_processing, measurement.processing) |
|
279 else: |
|
280 logger.error("No measurement with id %s found on the SCC." % measurement_id) |
|
281 return None |
|
282 |
|
283 def get_measurement(self, measurement_id): |
|
284 measurement_url = urlparse.urljoin(API_BASE_URL, 'measurements/%s/' % measurement_id) |
|
285 |
|
286 response = self.session.get(measurement_url, |
|
287 auth=self.auth, |
|
288 verify=False) |
|
289 |
|
290 response_dict = response.json() |
|
291 |
|
292 if response_dict: |
|
293 measurement = Measurement(response_dict) |
|
294 return measurement |
|
295 else: |
|
296 logger.error("No measurement with id %s found on the SCC." % measurement_id) |
|
297 return None |
|
298 |
|
299 def delete_measurement(self, measurement_id): |
|
300 """ Deletes a measurement with the provided measurement id. The user |
|
301 should have the appropriate permissions. |
|
302 |
|
303 The procedures is performed directly through the web interface and |
|
304 NOT through the API. |
|
305 """ |
|
306 # Get the measurement object |
|
307 measurement = self.get_measurement(measurement_id) |
|
308 |
|
309 # Check that it exists |
|
310 if measurement is None: |
|
311 logger.warning("Nothing to delete.") |
|
312 return None |
|
313 |
|
314 # Go the the page confirming the deletion |
|
315 delete_url = DELETE_MEASUREMENT.format(measurement.id) |
|
316 |
|
317 confirm_page = self.session.get(delete_url, |
|
318 auth=self.auth, |
|
319 verify=False) |
|
320 |
|
321 # Check that the page opened properly |
|
322 if confirm_page.status_code != 200: |
|
323 logger.warning("Could not open delete page. Status: {0}".format(confirm_page.status_code)) |
|
324 return None |
|
325 |
|
326 # Delete the measurement |
|
327 delete_page = self.session.post(delete_url, |
|
328 auth=self.auth, |
|
329 verify=False, |
|
330 data={'post': 'yes'}, |
|
331 headers={'X-CSRFToken': confirm_page.cookies['csrftoken'], |
|
332 'referer': delete_url} |
|
333 ) |
|
334 if delete_page.status_code != 200: |
|
335 logger.warning("Something went wrong. Delete page status: {0}".format( |
|
336 delete_page.status_code)) |
|
337 return None |
|
338 |
|
339 logger.info("Deleted measurement {0}".format(measurement_id)) |
|
340 return True |
|
341 |
|
342 def available_measurements(self): |
|
343 """ Get a list of available measurement on the SCC. """ |
|
344 measurement_url = urlparse.urljoin(API_BASE_URL, 'measurements') |
|
345 response = self.session.get(measurement_url, |
|
346 auth=self.auth, |
|
347 verify=False) |
|
348 response_dict = response.json() |
|
349 |
|
350 measurements = None |
|
351 if response_dict: |
|
352 measurement_list = response_dict['objects'] |
|
353 measurements = [Measurement(measurement_dict) for measurement_dict in measurement_list] |
|
354 logger.info("Found %s measurements on the SCC." % len(measurements)) |
|
355 else: |
|
356 logger.warning("No response received from the SCC when asked for available measurements.") |
|
357 |
|
358 return measurements |
|
359 |
|
360 def measurement_id_for_date(self, t1, call_sign='bu', base_number=0): |
|
361 """ Give the first available measurement id on the SCC for the specific |
|
362 date. |
|
363 """ |
|
364 date_str = t1.strftime('%Y%m%d') |
|
365 search_url = urlparse.urljoin(API_BASE_URL, 'measurements/?id__startswith=%s' % date_str) |
|
366 |
|
367 response = self.session.get(search_url, |
|
368 auth=self.auth, |
|
369 verify=False) |
|
370 |
|
371 response_dict = response.json() |
|
372 |
|
373 measurement_id = None |
|
374 |
|
375 if response_dict: |
|
376 measurement_list = response_dict['objects'] |
|
377 existing_ids = [measurement_dict['id'] for measurement_dict in measurement_list] |
|
378 |
|
379 measurement_number = base_number |
|
380 measurement_id = "%s%s%02i" % (date_str, call_sign, measurement_number) |
|
381 |
|
382 while measurement_id in existing_ids: |
|
383 measurement_number = measurement_number + 1 |
|
384 measurement_id = "%s%s%02i" % (date_str, call_sign, measurement_number) |
|
385 if measurement_number == 100: |
|
386 raise ValueError('No available measurement id found.') |
|
387 |
|
388 return measurement_id |
|
389 |
|
390 |
|
391 class ApiObject: |
|
392 """ A generic class object. """ |
|
393 |
|
394 def __init__(self, dict_response): |
|
395 |
|
396 if dict_response: |
|
397 # Add the dictionary key value pairs as object properties |
|
398 for key, value in dict_response.items(): |
|
399 setattr(self, key, value) |
|
400 self.exists = True |
|
401 else: |
|
402 self.exists = False |
|
403 |
|
404 |
|
405 class Measurement(ApiObject): |
|
406 """ This class represents the measurement object as returned in the SCC API. |
|
407 """ |
|
408 |
|
409 @property |
|
410 def is_running(self): |
|
411 """ Returns True if the processing has not finished. |
|
412 """ |
|
413 if self.upload == 0: |
|
414 return False |
|
415 if self.pre_processing == -127: |
|
416 return False |
|
417 if self.pre_processing == 127: |
|
418 if self.processing in [127, -127]: |
|
419 return False |
|
420 return True |
|
421 |
|
422 @property |
|
423 def rerun_processing_url(self): |
|
424 return RERUN_PROCESSING.format(self.id) |
|
425 |
|
426 @property |
|
427 def rerun_all_url(self): |
|
428 return RERUN_ALL.format(self.id) |
|
429 |
|
430 def __str__(self): |
|
431 return "%s: %s, %s, %s" % (self.id, |
|
432 self.upload, |
|
433 self.pre_processing, |
|
434 self.processing) |
|
435 |
|
436 |
|
437 def upload_file(filename, system_id, auth=BASIC_LOGIN, credential=DJANGO_LOGIN): |
|
438 """ Shortcut function to upload a file to the SCC. """ |
|
439 logger.info("Uploading file %s, using sytem %s" % (filename, system_id)) |
|
440 |
|
441 scc = SCC(auth) |
|
442 scc.login(credential) |
|
443 measurement_id = scc.upload_file(filename, system_id) |
|
444 scc.logout() |
|
445 return measurement_id |
|
446 |
|
447 |
|
448 def process_file(filename, system_id, auth=BASIC_LOGIN, credential=DJANGO_LOGIN): |
|
449 """ Shortcut function to process a file to the SCC. """ |
|
450 logger.info("Processing file %s, using sytem %s" % (filename, system_id)) |
|
451 |
|
452 scc = SCC(auth) |
|
453 scc.login(credential) |
|
454 measurement = scc.process(filename, system_id) |
|
455 scc.logout() |
|
456 return measurement |
|
457 |
|
458 |
|
459 def delete_measurement(measurement_id, auth=BASIC_LOGIN, credential=DJANGO_LOGIN): |
|
460 """ Shortcut function to delete a measurement from the SCC. """ |
|
461 logger.info("Deleting %s" % measurement_id) |
|
462 scc = SCC(auth) |
|
463 scc.login(credential) |
|
464 scc.delete_measurement(measurement_id) |
|
465 scc.logout() |
|
466 |
|
467 |
|
468 def rerun_all(measurement_id, monitor, auth=BASIC_LOGIN, credential=DJANGO_LOGIN): |
|
469 """ Shortcut function to delete a measurement from the SCC. """ |
|
470 logger.info("Rerunning all products for %s" % measurement_id) |
|
471 scc = SCC(auth) |
|
472 scc.login(credential) |
|
473 scc.rerun_all(measurement_id, monitor) |
|
474 scc.logout() |
|
475 |
|
476 |
|
477 def rerun_processing(measurement_id, monitor, auth=BASIC_LOGIN, credential=DJANGO_LOGIN): |
|
478 """ Shortcut function to delete a measurement from the SCC. """ |
|
479 logger.info("Rerunning (optical) processing for %s" % measurement_id) |
|
480 scc = SCC(auth) |
|
481 scc.login(credential) |
|
482 scc.rerun_processing(measurement_id, monitor) |
|
483 scc.logout() |
|
484 |
|
485 |
|
486 # When running through terminal |
|
487 if __name__ == '__main__': |
|
488 |
|
489 # Define the command line arguments. |
|
490 parser = argparse.ArgumentParser() |
|
491 parser.add_argument("filename", nargs='?', help="Measurement file name or path.", default='') |
|
492 parser.add_argument("system", nargs='?', help="Processing system id.", default=0) |
|
493 parser.add_argument("-p", "--process", help="Wait for the results of the processing.", |
|
494 action="store_true") |
|
495 parser.add_argument("--delete", help="Measurement ID to delete.") |
|
496 parser.add_argument("--rerun-all", help="Measurement ID to rerun.") |
|
497 parser.add_argument("--rerun-processing", help="Measurement ID to rerun processing routings.") |
|
498 |
|
499 # Verbosity settings from http://stackoverflow.com/a/20663028 |
|
500 parser.add_argument('-d', '--debug', help="Print debugging information.", action="store_const", |
|
501 dest="loglevel", const=logging.DEBUG, default=logging.INFO, |
|
502 ) |
|
503 parser.add_argument('-s', '--silent', help="Show only warning and error messages.", action="store_const", |
|
504 dest="loglevel", const=logging.WARNING |
|
505 ) |
|
506 |
|
507 args = parser.parse_args() |
|
508 |
|
509 # Get the logger with the appropriate level |
|
510 logging.basicConfig(format='%(levelname)s: %(message)s', level=args.loglevel) |
|
511 |
|
512 # If the arguments are OK, try to login on the site and upload. |
|
513 if args.delete: |
|
514 # If the delete is provided, do nothing else |
|
515 delete_measurement(args.delete) |
|
516 elif args.rerun_all: |
|
517 rerun_all(args.rerun_all, args.process) |
|
518 elif args.rerun_processing: |
|
519 rerun_processing(args.rerun_processing, args.process) |
|
520 else: |
|
521 if (args.filename == '') or (args.system == 0): |
|
522 parser.error('Provide a valid filename and system parameters.\nRun with -h for help.\n') |
|
523 |
|
524 if args.process: |
|
525 process_file(args.filename, args.system) |
|
526 else: |
|
527 upload_file(args.filename, args.system) |
|