import requests
try:
import urllib.parse as urlparse # Python 3
except ImportError:
import urlparse # Python 2
import argparse
import datetime
import logging
import os
import re
from io import BytesIO
import sys
import time
import urlparse
from zipfile import ZipFile
import yaml
requests.packages.urllib3.disable_warnings()
logger = logging.getLogger(__name__)
# The regex to find the measurement id from the measurement page
# This should be read from the uploaded file, but would require an extra NetCDF module.
regex = "
Measurement (?P.{12,15}) " # {12, 15} to handle both old- and new-style measurement ids.
class SCC:
"""A simple class that will attempt to upload a file on the SCC server.
The uploading is done by simulating a normal browser session. In the current
version no check is performed, and no feedback is given if the upload
was successful. If everything is setup correctly, it will work.
"""
def __init__(self, auth, output_dir, base_url):
self.auth = auth
self.output_dir = output_dir
self.base_url = base_url
self.session = requests.Session()
self.session.auth = auth
self.session.verify = False
self.login_url = urlparse.urljoin(self.base_url, 'accounts/login/')
self.logout_url = urlparse.urljoin(self.base_url, 'accounts/logout/')
self.list_measurements_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/')
self.upload_url = urlparse.urljoin(self.base_url, 'data_processing/measurements/quick/')
self.download_hirelpp_pattern = urlparse.urljoin(self.base_url,
'data_processing/measurements/{0}/download-hirelpp/')
self.download_cloudmask_pattern = urlparse.urljoin(self.base_url,
'data_processing/measurements/{0}/download-cloudmask/')
self.download_preprocessed_pattern = urlparse.urljoin(self.base_url,
'data_processing/measurements/{0}/download-preprocessed/')
self.download_optical_pattern = urlparse.urljoin(self.base_url,
'data_processing/measurements/{0}/download-optical/')
self.download_graph_pattern = urlparse.urljoin(self.base_url,
'data_processing/measurements/{0}/download-plots/')
self.download_elic_pattern = urlparse.urljoin(self.base_url,
'data_processing/measurements/{0}/download-elic/')
self.delete_measurement_pattern = urlparse.urljoin(self.base_url, 'admin/database/measurements/{0}/delete/')
self.api_base_url = urlparse.urljoin(self.base_url, 'api/v1/')
self.api_measurement_pattern = urlparse.urljoin(self.api_base_url, 'measurements/{0}/')
self.api_measurements_url = urlparse.urljoin(self.api_base_url, 'measurements')
self.api_sounding_search_pattern = urlparse.urljoin(self.api_base_url, 'sounding_files/?filename={0}')
self.api_lidarratio_search_pattern = urlparse.urljoin(self.api_base_url, 'lidarratio_files/?filename={0}')
self.api_overlap_search_pattern = urlparse.urljoin(self.api_base_url, 'overlap_files/?filename={0}')
def login(self, credentials):
""" Login to SCC. """
logger.debug("Attempting to login to SCC, username %s." % credentials[0])
login_credentials = {'username': credentials[0],
'password': credentials[1]}
logger.debug("Accessing login page at %s." % self.login_url)
# Get upload form
login_page = self.session.get(self.login_url)
if not login_page.ok:
raise self.PageNotAccessibleError('Could not access login pages. Status code %s' % login_page.status_code)
logger.debug("Submitting credentials.")
# Submit the login data
login_submit = self.session.post(self.login_url,
data=login_credentials,
headers={'X-CSRFToken': login_page.cookies['csrftoken'],
'referer': self.login_url})
return login_submit
def logout(self):
""" Logout from SCC """
return self.session.get(self.logout_url, stream=True)
def upload_file(self, filename, system_id, rs_filename=None, ov_filename=None, lr_filename=None):
""" Upload a filename for processing with a specific system. If the
upload is successful, it returns the measurement id. """
# Get submit page
upload_page = self.session.get(self.upload_url)
# Submit the data
upload_data = {'system': system_id}
files = {'data': open(filename, 'rb')}
if rs_filename is not None:
ancillary_file, _ = self.get_ancillary(rs_filename, 'sounding')
if ancillary_file.already_on_scc:
logger.warning("Sounding file {0.filename} already on the SCC with id {0.id}. Ignoring it.".format(ancillary_file))
else:
logger.debug('Adding sounding file %s' % rs_filename)
files['sounding_file'] = open(rs_filename, 'rb')
if ov_filename is not None:
ancillary_file, _ = self.get_ancillary(ov_filename, 'overlap')
if ancillary_file.already_on_scc:
logger.warning("Overlap file {0.filename} already on the SCC with id {0.id}. Ignoring it.".format(ancillary_file))
else:
logger.debug('Adding overlap file %s' % ov_filename)
files['overlap_file'] = open(ov_filename, 'rb')
if lr_filename is not None:
ancillary_file, _ = self.get_ancillary(lr_filename, 'lidarratio')
if ancillary_file.already_on_scc:
logger.warning(
"Lidar ratio file {0.filename} already on the SCC with id {0.id}. Ignoring it.".format(ancillary_file))
else:
logger.debug('Adding lidar ratio file %s' % lr_filename)
files['lidar_ratio_file'] = open(lr_filename, 'rb')
logger.info("Uploading of file(s) %s started." % filename)
upload_submit = self.session.post(self.upload_url,
data=upload_data,
files=files,
headers={'X-CSRFToken': upload_page.cookies['csrftoken'],
'referer': self.upload_url})
if upload_submit.status_code != 200:
logger.warning("Connection error. Status code: %s" % upload_submit.status_code)
return False
# Check if there was a redirect to a new page.
if upload_submit.url == self.upload_url:
measurement_id = False
logger.error("Uploaded file(s) rejected! Try to upload manually to see the error.")
else:
measurement_id = re.findall(regex, upload_submit.text)[0]
logger.info("Successfully uploaded measurement with id %s." % measurement_id)
return measurement_id
def download_files(self, measurement_id, subdir, download_url):
""" Downloads some files from the download_url to the specified
subdir. This method is used to download preprocessed file, optical
files etc.
"""
# TODO: Make downloading more robust (e.g. in case that files do not exist on server).
# Get the file
request = self.session.get(download_url, stream=True)
if not request.ok:
raise Exception("Could not download files for measurement '%s'" % measurement_id)
# Create the dir if it does not exist
local_dir = os.path.join(self.output_dir, measurement_id, subdir)
if not os.path.exists(local_dir):
os.makedirs(local_dir)
# Save the file by chunk, needed if the file is big.
memory_file = BytesIO()
for chunk in request.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
memory_file.write(chunk)
memory_file.flush()
zip_file = ZipFile(memory_file)
for ziped_name in zip_file.namelist():
basename = os.path.basename(ziped_name)
local_file = os.path.join(local_dir, basename)
with open(local_file, 'wb') as f:
f.write(zip_file.read(ziped_name))
def download_hirelpp(self, measurement_id):
""" Download HiRElPP files for the measurement id. """
# Construct the download url
download_url = self.download_hirelpp_pattern.format(measurement_id)
self.download_files(measurement_id, 'hirelpp', download_url)
def download_cloudmask(self, measurement_id):
""" Download preprocessed files for the measurement id. """
# Construct the download url
download_url = self.download_cloudmask_pattern.format(measurement_id)
self.download_files(measurement_id, 'cloudmask', download_url)
def download_preprocessed(self, measurement_id):
""" Download preprocessed files for the measurement id. """
# Construct the download url
download_url = self.download_preprocessed_pattern.format(measurement_id)
self.download_files(measurement_id, 'scc_preprocessed', download_url)
def download_optical(self, measurement_id):
""" Download optical files for the measurement id. """
# Construct the download url
download_url = self.download_optical_pattern.format(measurement_id)
self.download_files(measurement_id, 'scc_optical', download_url)
def download_graphs(self, measurement_id):
""" Download profile graphs for the measurement id. """
# Construct the download url
download_url = self.download_graph_pattern.format(measurement_id)
self.download_files(measurement_id, 'scc_plots', download_url)
def download_elic(self, measurement_id):
""" Download profile graphs for the measurement id. """
# Construct the download url
download_url = self.download_elic_pattern.format(measurement_id)
self.download_files(measurement_id, 'elic', download_url)
def rerun_processing(self, measurement_id, monitor=True):
measurement, status = self.get_measurement(measurement_id)
if measurement:
request = self.session.get(measurement.rerun_processing_url, stream=True)
if request.status_code != 200:
logger.error(
"Could not rerun processing for %s. Status code: %s" % (measurement_id, request.status_code))
return
if monitor:
self.monitor_processing(measurement_id)
def rerun_all(self, measurement_id, monitor=True):
logger.debug("Started rerun_all procedure.")
logger.debug("Getting measurement %s" % measurement_id)
measurement, status = self.get_measurement(measurement_id)
if measurement:
logger.debug("Attempting to rerun all processing through %s." % measurement.rerun_all_url)
request = self.session.get(measurement.rerun_all_url, stream=True)
if request.status_code != 200:
logger.error("Could not rerun pre processing for %s. Status code: %s" %
(measurement_id, request.status_code))
return
if monitor:
self.monitor_processing(measurement_id)
def process(self, filename, system_id, monitor, rs_filename=None, lr_filename=None, ov_filename=None):
""" Upload a file for processing and wait for the processing to finish.
If the processing is successful, it will download all produced files.
"""
logger.info("--- Processing started on %s. ---" % datetime.datetime.now())
# Upload file
logger.info("--- Uploading file")
measurement_id = self.upload_file(filename, system_id,
rs_filename=rs_filename,
lr_filename=lr_filename,
ov_filename=ov_filename)
if measurement_id and monitor:
logger.info("--- Monitoring processing")
return self.monitor_processing(measurement_id)
return None
def monitor_processing(self, measurement_id):
""" Monitor the processing progress of a measurement id"""
# try to deal with error 404
error_count = 0
error_max = 6
time_sleep = 10
# try to wait for measurement to appear in API
measurement = None
logger.info("Looking for measurement %s in SCC", measurement_id)
while error_count < error_max:
time.sleep(time_sleep)
measurement, status = self.get_measurement(measurement_id)
if status != 200 and error_count < error_max:
logger.error("Measurement not found. waiting %ds", time_sleep)
error_count += 1
else:
break
if error_count == error_max:
logger.critical("Measurement %s doesn't seem to exist", measurement_id)
sys.exit(1)
logger.info('Measurement %s found', measurement_id)
if measurement is not None:
while measurement.is_running:
logger.info("Measurement is being processed. Please wait.")
time.sleep(10)
measurement, status = self.get_measurement(measurement_id)
logger.info("Measurement processing finished.")
if measurement.hirelpp == 127:
logger.info("Downloading hirelpp files.")
self.download_hirelpp(measurement_id)
if measurement.cloudmask == 127:
logger.info("Downloading cloudmask files.")
self.download_cloudmask(measurement_id)
if measurement.elpp == 127:
logger.info("Downloading preprocessed files.")
self.download_preprocessed(measurement_id)
if measurement.elda == 127:
logger.info("Downloading optical files.")
self.download_optical(measurement_id)
logger.info("Downloading graphs.")
self.download_graphs(measurement_id)
if measurement.elic == 127:
logger.info("Downloading preprocessed files.")
self.download_elic(measurement_id)
logger.info("--- Processing finished. ---")
return measurement
def get_measurement(self, measurement_id):
measurement_url = self.api_measurement_pattern.format(measurement_id)
logger.debug("Measurement API URL: %s" % measurement_url)
response = self.session.get(measurement_url)
if not response.ok:
logger.error('Could not access API. Status code %s.' % response.status_code)
return None, response.status_code
response_dict = response.json()
if response_dict:
measurement = Measurement(self.base_url, response_dict)
return measurement, response.status_code
else:
logger.error("No measurement with id %s found on the SCC." % measurement_id)
return None, response.status_code
def delete_measurement(self, measurement_id):
""" Deletes a measurement with the provided measurement id. The user
should have the appropriate permissions.
The procedures is performed directly through the web interface and
NOT through the API.
"""
# Get the measurement object
measurement, _ = self.get_measurement(measurement_id)
# Check that it exists
if measurement is None:
logger.warning("Nothing to delete.")
return None
# Go the the page confirming the deletion
delete_url = self.delete_measurement_pattern.format(measurement_id)
confirm_page = self.session.get(delete_url)
# Check that the page opened properly
if confirm_page.status_code != 200:
logger.warning("Could not open delete page. Status: {0}".format(confirm_page.status_code))
return None
# Delete the measurement
delete_page = self.session.post(delete_url,
data={'post': 'yes'},
headers={'X-CSRFToken': confirm_page.cookies['csrftoken'],
'referer': delete_url}
)
if not delete_page.ok:
logger.warning("Something went wrong. Delete page status: {0}".format(
delete_page.status_code))
return None
logger.info("Deleted measurement {0}".format(measurement_id))
return True
def available_measurements(self):
""" Get a list of available measurement on the SCC. """
response = self.session.get(self.api_measurements_url)
response_dict = response.json()
if response_dict:
measurement_list = response_dict['objects']
measurements = [Measurement(self.base_url, measurement_dict) for measurement_dict in measurement_list]
logger.info("Found %s measurements on the SCC." % len(measurements))
else:
logger.warning("No response received from the SCC when asked for available measurements.")
measurements = None
return measurements
def list_measurements(self, station=None, system=None, start=None, stop=None, upload_status=None,
processing_status=None, optical_processing=None):
# TODO: Change this to work through the API
# Need to set to empty string if not specified, we won't get any results
params = {
"station": station if station is not None else "",
"system": system if system is not None else "",
"stop": stop if stop is not None else "",
"start": start if start is not None else "",
"upload_status": upload_status if upload_status is not None else "",
"preprocessing_status": processing_status if processing_status is not None else "",
"optical_processing_status": optical_processing if optical_processing is not None else ""
}
response_txt = self.session.get(self.list_measurements_url, params=params).text
tbl_rgx = re.compile(r'', re.DOTALL)
entry_rgx = re.compile(r'(.*?)
', re.DOTALL)
measurement_rgx = re.compile(
r'.*?]*>(\w+).*? | .*? | ([\w-]+ [\w:]+) | .* |