scc_access/scc_access.py

changeset 7
415d034b0864
child 14
c2020b2fdd05
equal deleted inserted replaced
6:c02712d2ab9e 7:415d034b0864
1 #!/usr/bin/env python
2 """
3 The MIT License (MIT)
4
5 Copyright (c) 2015, Ioannis Binietoglou
6
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 THE SOFTWARE.
24 """
25
26 __version__ = "0.6.0"
27
28 # Try to read the settings from the settings.py file
29 try:
30 from settings import *
31 except:
32 raise ImportError(
33 """A settings file (setting.py) is required to run the script.
34 You can use settings.sample.py as a template.""")
35
36 import requests
37 requests.packages.urllib3.disable_warnings()
38
39 import urlparse
40 import argparse
41 import os
42 import re
43 import time
44 import StringIO
45 from zipfile import ZipFile
46 import datetime
47 import logging
48
49 logger = logging.getLogger(__name__)
50
51
52 # Construct the absolute URLs
53 LOGIN_URL = urlparse.urljoin(BASE_URL, 'accounts/login/')
54 UPLOAD_URL = urlparse.urljoin(BASE_URL, 'data_processing/measurements/quick/')
55 DOWNLOAD_PREPROCESSED = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/download-preprocessed/')
56 DOWNLOAD_OPTICAL = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/download-optical/')
57 DOWNLOAD_GRAPH = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/download-plots/')
58 RERUN_ALL = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/rerun-all/')
59 RERUN_PROCESSING = urlparse.urljoin(BASE_URL, 'data_processing/measurements/{0}/rerun-optical/')
60
61 DELETE_MEASUREMENT = urlparse.urljoin(BASE_URL, 'admin/database/measurements/{0}/delete/')
62 API_BASE_URL = urlparse.urljoin(BASE_URL, 'api/v1/')
63
64 # The regex to find the measurement id from the measurement page
65 # This should be read from the uploaded file, but would require an extra NetCDF module.
66 regex = "<h3>Measurement (?P<measurement_id>.{12}) <small>"
67
68
69 class SCC:
70 """ A simple class that will attempt to upload a file on the SCC server.
71 The uploading is done by simulating a normal browser session. In the current
72 version no check is performed, and no feedback is given if the upload
73 was successful. If everything is setup correctly, it will work.
74 """
75
76 def __init__(self, auth=BASIC_LOGIN, output_dir=OUTPUT_DIR):
77 self.auth = auth
78 self.output_dir = output_dir
79 self.session = requests.Session()
80
81 def login(self, credentials=DJANGO_LOGIN):
82 """ Login the the website. """
83 logger.debug("Attempting to login to SCC, username %s." % credentials[0])
84 self.login_credentials = {'username': credentials[0],
85 'password': credentials[1]}
86
87 logger.debug("Accessing login page at %s." % LOGIN_URL)
88
89 # Get upload form
90 login_page = self.session.get(LOGIN_URL,
91 auth=self.auth, verify=False)
92
93 logger.debug("Submiting credentials.")
94 # Submit the login data
95 login_submit = self.session.post(LOGIN_URL,
96 data=self.login_credentials,
97 headers={'X-CSRFToken': login_page.cookies['csrftoken'],
98 'referer': LOGIN_URL},
99 verify=False,
100 auth=self.auth)
101 return login_submit
102
103 def logout(self):
104 pass
105
106 def upload_file(self, filename, system_id):
107 """ Upload a filename for processing with a specific system. If the
108 upload is successful, it returns the measurement id. """
109 # Get submit page
110 upload_page = self.session.get(UPLOAD_URL,
111 auth=self.auth,
112 verify=False)
113
114 # Submit the data
115 upload_data = {'system': system_id}
116 files = {'data': open(filename, 'rb')}
117
118 logging.info("Uploading of file %s started." % filename)
119
120 upload_submit = self.session.post(UPLOAD_URL,
121 data=upload_data,
122 files=files,
123 headers={'X-CSRFToken': upload_page.cookies['csrftoken'],
124 'referer': UPLOAD_URL},
125 verify=False,
126 auth=self.auth)
127
128 if upload_submit.status_code != 200:
129 logging.warning("Connection error. Status code: %s" % upload_submit.status_code)
130 return False
131
132 # Check if there was a redirect to a new page.
133 if upload_submit.url == UPLOAD_URL:
134 measurement_id = False
135 logging.error("Uploaded file rejected! Try to upload manually to see the error.")
136 else:
137 measurement_id = re.findall(regex, upload_submit.text)[0]
138 logging.error("Successfully uploaded measurement with id %s." % measurement_id)
139
140 return measurement_id
141
142 def download_files(self, measurement_id, subdir, download_url):
143 """ Downloads some files from the download_url to the specified
144 subdir. This method is used to download preprocessed file, optical
145 files etc.
146 """
147 # Get the file
148 request = self.session.get(download_url, auth=self.auth,
149 verify=False,
150 stream=True)
151
152 # Create the dir if it does not exist
153 local_dir = os.path.join(self.output_dir, measurement_id, subdir)
154 if not os.path.exists(local_dir):
155 os.makedirs(local_dir)
156
157 # Save the file by chunk, needed if the file is big.
158 memory_file = StringIO.StringIO()
159
160 for chunk in request.iter_content(chunk_size=1024):
161 if chunk: # filter out keep-alive new chunks
162 memory_file.write(chunk)
163 memory_file.flush()
164
165 zip_file = ZipFile(memory_file)
166
167 for ziped_name in zip_file.namelist():
168 basename = os.path.basename(ziped_name)
169
170 local_file = os.path.join(local_dir, basename)
171
172 with open(local_file, 'wb') as f:
173 f.write(zip_file.read(ziped_name))
174
175 def download_preprocessed(self, measurement_id):
176 """ Download preprocessed files for the measurement id. """
177 # Construct the download url
178 download_url = DOWNLOAD_PREPROCESSED.format(measurement_id)
179 self.download_files(measurement_id, 'scc_preprocessed', download_url)
180
181 def download_optical(self, measurement_id):
182 """ Download optical files for the measurement id. """
183 # Construct the download url
184 download_url = DOWNLOAD_OPTICAL.format(measurement_id)
185 self.download_files(measurement_id, 'scc_optical', download_url)
186
187 def download_graphs(self, measurement_id):
188 """ Download profile graphs for the measurement id. """
189 # Construct the download url
190 download_url = DOWNLOAD_GRAPH.format(measurement_id)
191 self.download_files(measurement_id, 'scc_plots', download_url)
192
193 def rerun_processing(self, measurement_id, monitor=True):
194 measurement = self.get_measurement(measurement_id)
195
196 if measurement:
197 request = self.session.get(measurement.rerun_processing_url, auth=self.auth,
198 verify=False,
199 stream=True)
200
201 if request.status_code != 200:
202 logging.error("Could not rerun processing for %s. Status code: %s" % (measurement_id, request.status_code))
203 return
204
205 if monitor:
206 self.monitor_processing(measurement_id)
207
208 def rerun_all(self, measurement_id, monitor=True):
209 logger.debug("Started rerun_all procedure.")
210
211 logger.debug("Getting measurement %s" % measurement_id)
212 measurement = self.get_measurement(measurement_id)
213
214 if measurement:
215 logger.debug("Attempting to rerun all processing through %s." % measurement.rerun_all_url)
216
217 request = self.session.get(measurement.rerun_all_url, auth=self.auth,
218 verify=False,
219 stream=True)
220
221 if request.status_code != 200:
222 logger.error("Could not rerun pre processing for %s. Status code: %s" %
223 (measurement_id, request.status_code))
224 return
225
226 if monitor:
227 self.monitor_processing(measurement_id)
228
229 def process(self, filename, system_id):
230 """ Upload a file for processing and wait for the processing to finish.
231 If the processing is successful, it will download all produced files.
232 """
233 logger.info("--- Processing started on %s. ---" % datetime.datetime.now())
234 # Upload file
235 measurement_id = self.upload_file(filename, system_id)
236
237 measurement = self.monitor_processing(measurement_id)
238 return measurement
239
240 def monitor_processing(self, measurement_id):
241 """ Monitor the processing progress of a measurement id"""
242
243 measurement = self.get_measurement(measurement_id)
244 if measurement is not None:
245 while measurement.is_running:
246 logger.info("Measurement is being processed (status: %s, %s, %s). Please wait." % (measurement.upload,
247 measurement.pre_processing,
248 measurement.processing))
249 time.sleep(10)
250 measurement = self.get_measurement(measurement_id)
251 logger.info("Measurement processing finished (status: %s, %s, %s)." % (measurement.upload,
252 measurement.pre_processing,
253 measurement.processing))
254 if measurement.pre_processing == 127:
255 logger.info("Downloading preprocessed files.")
256 self.download_preprocessed(measurement_id)
257 if measurement.processing == 127:
258 logger.info("Downloading optical files.")
259 self.download_optical(measurement_id)
260 logger.info("Downloading graphs.")
261 self.download_graphs(measurement_id)
262 logger.info("--- Processing finished. ---")
263 return measurement
264
265 def get_status(self, measurement_id):
266 """ Get the processing status for a measurement id through the API. """
267 measurement_url = urlparse.urljoin(API_BASE_URL, 'measurements/?id__exact=%s' % measurement_id)
268
269 response = self.session.get(measurement_url,
270 auth=self.auth,
271 verify=False)
272
273 response_dict = response.json()
274
275 if response_dict['objects']:
276 measurement_list = response_dict['objects']
277 measurement = Measurement(measurement_list[0])
278 return (measurement.upload, measurement.pre_processing, measurement.processing)
279 else:
280 logger.error("No measurement with id %s found on the SCC." % measurement_id)
281 return None
282
283 def get_measurement(self, measurement_id):
284 measurement_url = urlparse.urljoin(API_BASE_URL, 'measurements/%s/' % measurement_id)
285
286 response = self.session.get(measurement_url,
287 auth=self.auth,
288 verify=False)
289
290 response_dict = response.json()
291
292 if response_dict:
293 measurement = Measurement(response_dict)
294 return measurement
295 else:
296 logger.error("No measurement with id %s found on the SCC." % measurement_id)
297 return None
298
299 def delete_measurement(self, measurement_id):
300 """ Deletes a measurement with the provided measurement id. The user
301 should have the appropriate permissions.
302
303 The procedures is performed directly through the web interface and
304 NOT through the API.
305 """
306 # Get the measurement object
307 measurement = self.get_measurement(measurement_id)
308
309 # Check that it exists
310 if measurement is None:
311 logger.warning("Nothing to delete.")
312 return None
313
314 # Go the the page confirming the deletion
315 delete_url = DELETE_MEASUREMENT.format(measurement.id)
316
317 confirm_page = self.session.get(delete_url,
318 auth=self.auth,
319 verify=False)
320
321 # Check that the page opened properly
322 if confirm_page.status_code != 200:
323 logger.warning("Could not open delete page. Status: {0}".format(confirm_page.status_code))
324 return None
325
326 # Delete the measurement
327 delete_page = self.session.post(delete_url,
328 auth=self.auth,
329 verify=False,
330 data={'post': 'yes'},
331 headers={'X-CSRFToken': confirm_page.cookies['csrftoken'],
332 'referer': delete_url}
333 )
334 if delete_page.status_code != 200:
335 logger.warning("Something went wrong. Delete page status: {0}".format(
336 delete_page.status_code))
337 return None
338
339 logger.info("Deleted measurement {0}".format(measurement_id))
340 return True
341
342 def available_measurements(self):
343 """ Get a list of available measurement on the SCC. """
344 measurement_url = urlparse.urljoin(API_BASE_URL, 'measurements')
345 response = self.session.get(measurement_url,
346 auth=self.auth,
347 verify=False)
348 response_dict = response.json()
349
350 measurements = None
351 if response_dict:
352 measurement_list = response_dict['objects']
353 measurements = [Measurement(measurement_dict) for measurement_dict in measurement_list]
354 logger.info("Found %s measurements on the SCC." % len(measurements))
355 else:
356 logger.warning("No response received from the SCC when asked for available measurements.")
357
358 return measurements
359
360 def measurement_id_for_date(self, t1, call_sign='bu', base_number=0):
361 """ Give the first available measurement id on the SCC for the specific
362 date.
363 """
364 date_str = t1.strftime('%Y%m%d')
365 search_url = urlparse.urljoin(API_BASE_URL, 'measurements/?id__startswith=%s' % date_str)
366
367 response = self.session.get(search_url,
368 auth=self.auth,
369 verify=False)
370
371 response_dict = response.json()
372
373 measurement_id = None
374
375 if response_dict:
376 measurement_list = response_dict['objects']
377 existing_ids = [measurement_dict['id'] for measurement_dict in measurement_list]
378
379 measurement_number = base_number
380 measurement_id = "%s%s%02i" % (date_str, call_sign, measurement_number)
381
382 while measurement_id in existing_ids:
383 measurement_number = measurement_number + 1
384 measurement_id = "%s%s%02i" % (date_str, call_sign, measurement_number)
385 if measurement_number == 100:
386 raise ValueError('No available measurement id found.')
387
388 return measurement_id
389
390
391 class ApiObject:
392 """ A generic class object. """
393
394 def __init__(self, dict_response):
395
396 if dict_response:
397 # Add the dictionary key value pairs as object properties
398 for key, value in dict_response.items():
399 setattr(self, key, value)
400 self.exists = True
401 else:
402 self.exists = False
403
404
405 class Measurement(ApiObject):
406 """ This class represents the measurement object as returned in the SCC API.
407 """
408
409 @property
410 def is_running(self):
411 """ Returns True if the processing has not finished.
412 """
413 if self.upload == 0:
414 return False
415 if self.pre_processing == -127:
416 return False
417 if self.pre_processing == 127:
418 if self.processing in [127, -127]:
419 return False
420 return True
421
422 @property
423 def rerun_processing_url(self):
424 return RERUN_PROCESSING.format(self.id)
425
426 @property
427 def rerun_all_url(self):
428 return RERUN_ALL.format(self.id)
429
430 def __str__(self):
431 return "%s: %s, %s, %s" % (self.id,
432 self.upload,
433 self.pre_processing,
434 self.processing)
435
436
437 def upload_file(filename, system_id, auth=BASIC_LOGIN, credential=DJANGO_LOGIN):
438 """ Shortcut function to upload a file to the SCC. """
439 logger.info("Uploading file %s, using sytem %s" % (filename, system_id))
440
441 scc = SCC(auth)
442 scc.login(credential)
443 measurement_id = scc.upload_file(filename, system_id)
444 scc.logout()
445 return measurement_id
446
447
448 def process_file(filename, system_id, auth=BASIC_LOGIN, credential=DJANGO_LOGIN):
449 """ Shortcut function to process a file to the SCC. """
450 logger.info("Processing file %s, using sytem %s" % (filename, system_id))
451
452 scc = SCC(auth)
453 scc.login(credential)
454 measurement = scc.process(filename, system_id)
455 scc.logout()
456 return measurement
457
458
459 def delete_measurement(measurement_id, auth=BASIC_LOGIN, credential=DJANGO_LOGIN):
460 """ Shortcut function to delete a measurement from the SCC. """
461 logger.info("Deleting %s" % measurement_id)
462 scc = SCC(auth)
463 scc.login(credential)
464 scc.delete_measurement(measurement_id)
465 scc.logout()
466
467
468 def rerun_all(measurement_id, monitor, auth=BASIC_LOGIN, credential=DJANGO_LOGIN):
469 """ Shortcut function to delete a measurement from the SCC. """
470 logger.info("Rerunning all products for %s" % measurement_id)
471 scc = SCC(auth)
472 scc.login(credential)
473 scc.rerun_all(measurement_id, monitor)
474 scc.logout()
475
476
477 def rerun_processing(measurement_id, monitor, auth=BASIC_LOGIN, credential=DJANGO_LOGIN):
478 """ Shortcut function to delete a measurement from the SCC. """
479 logger.info("Rerunning (optical) processing for %s" % measurement_id)
480 scc = SCC(auth)
481 scc.login(credential)
482 scc.rerun_processing(measurement_id, monitor)
483 scc.logout()
484
485 def main():
486 # Define the command line arguments.
487 parser = argparse.ArgumentParser()
488 parser.add_argument("filename", nargs='?', help="Measurement file name or path.", default='')
489 parser.add_argument("system", nargs='?', help="Processing system id.", default=0)
490 parser.add_argument("-p", "--process", help="Wait for the results of the processing.",
491 action="store_true")
492 parser.add_argument("--delete", help="Measurement ID to delete.")
493 parser.add_argument("--rerun-all", help="Measurement ID to rerun.")
494 parser.add_argument("--rerun-processing", help="Measurement ID to rerun processing routings.")
495
496 # Verbosity settings from http://stackoverflow.com/a/20663028
497 parser.add_argument('-d', '--debug', help="Print debugging information.", action="store_const",
498 dest="loglevel", const=logging.DEBUG, default=logging.INFO,
499 )
500 parser.add_argument('-s', '--silent', help="Show only warning and error messages.", action="store_const",
501 dest="loglevel", const=logging.WARNING
502 )
503
504 args = parser.parse_args()
505
506 # Get the logger with the appropriate level
507 logging.basicConfig(format='%(levelname)s: %(message)s', level=args.loglevel)
508
509 # If the arguments are OK, try to login on the site and upload.
510 if args.delete:
511 # If the delete is provided, do nothing else
512 delete_measurement(args.delete)
513 elif args.rerun_all:
514 rerun_all(args.rerun_all, args.process)
515 elif args.rerun_processing:
516 rerun_processing(args.rerun_processing, args.process)
517 else:
518 if (args.filename == '') or (args.system == 0):
519 parser.error('Provide a valid filename and system parameters.\nRun with -h for help.\n')
520
521 if args.process:
522 process_file(args.filename, args.system)
523 else:
524 upload_file(args.filename, args.system)
525
526
527 # When running through terminal
528 if __name__ == '__main__':
529 main()

mercurial