Fri, 10 Oct 2014 00:16:55 +0200
Initial commit.
import os import fnmatch import numpy as np import netCDF4 as netcdf ERROR_ORDER = {'notification': 1, 'warning': 2, 'error': 3, } # Used when printing the report header_template = """\n----- Report for file {0.filename} ----- Checked against specification: {0.specs[name]}. Output summary: Errors: {1[error]}, Warnings: {1[warning]}, Notifications: {1[notification]}""" # We firstly define the possible specification elements # In the end of this file we use them to define complete specifications class SpecGenericError: def __repr__(self): return "{0}: {1}".format(self.level.title(), self.message) class SpecError(SpecGenericError): def __init__(self, message): self.message = message self.level = 'error' class SpecWarning(SpecGenericError): def __init__(self, message): self.message = message self.level = 'warning' class SpecNotification(SpecGenericError): def __init__(self, message): self.message = message self.level = 'notification' class GenericSpecification: @property def continue_check(self): return True class DimensionMandatory(GenericSpecification): def __init__(self, is_mandatory = True): self.block_next = True # if true the next checks for this dimension will not be performed. self.is_mandatory = is_mandatory def check(self, netcdf_file, dimension_name): the_dimension = netcdf_file.dimensions.get(dimension_name, None) error = [] if the_dimension: # If the dimension is found in the file self.dimension_exists = True check_passed = True else: self.dimension_exists = False if self.is_mandatory: # If a mandatory dimension is not found in the file check_passed = False error.append(SpecError('The dimension {0} is obligatory but was not found in the file.'.format(dimension_name))) else: check_passed = True error.append(SpecNotification('The optional dimension {0} was not found in the file.'.format(dimension_name))) self.check_passed = check_passed self.error = error return check_passed, error @property def continue_check(self): if (not self.dimension_exists) and (self.block_next): return False else: return True class DimensionUnlimited(GenericSpecification): def __init__(self, is_unlimited): self.block_next = False self.is_unlimited = is_unlimited def check(self, netcdf_file, dimension_name): the_dimension = netcdf_file.dimensions.get(dimension_name, None) error = [] if the_dimension: if the_dimension.isunlimited() == True and self.is_unlimited == False: check_passed = False error.append(SpecWarning('Dimension {0} should not be unlimited but is.'.format(dimension_name))) elif the_dimension.isunlimited() == False and self.is_unlimited == True: check_passed = False error.append(SpecWarning('Dimension {0} should be unlimited but it is not.'.format(dimension_name))) else: check_passed = True else: check_passed = True error.append(SpecError('Dimension {0} should be unlimited, but was not found in the file.'.format(dimension_name))) self.check_passed = check_passed self.error = error return check_passed, error class VariableMandatory(GenericSpecification): def __init__(self, is_mandatory = True): self.block_next = True # if true the next checks for this variable will not be performed. self.is_mandatory = is_mandatory def check(self, netcdf_file, variable_name): the_variable = netcdf_file.variables.get(variable_name, None) error = [] if the_variable != None: # If the variable is found in the file self.variable_exists = True check_passed = True else: self.variable_exists = False if self.is_mandatory: # If a mandatory variable is not found in the file check_passed = False error.append(SpecError('The variable {0} is obligatory but was not found in the file.'.format(variable_name))) else: check_passed = True error.append(SpecNotification('The optional variable {0} was not found in the file.'.format(variable_name))) self.check_passed = check_passed self.error = error return check_passed, error @property def continue_check(self): if (not self.variable_exists) and (self.block_next): return False else: return True class VariableDimensions(GenericSpecification): def __init__(self, dimensions): self.dimensions = dimensions def check(self, netcdf_file, variable_name): the_variable = netcdf_file.variables.get(variable_name, None) if the_variable != None: variable_dimensions = list(the_variable.dimensions) error = [] check_passed = True for dimension in self.dimensions: if not (dimension in variable_dimensions): check_passed = False error.append(SpecError("Variable {0} does not have dimension {1}.".format(variable_name, dimension))) # If all dimensions are present, check if the variables are in the # correct order. if check_passed: if list(self.dimensions) != variable_dimensions: check_passed = False error.append(SpecError("Variable {0} has wrong dimension order: {1} instead of {2}.".format(variable_name, variable_dimensions, list(self.dimensions)))) for dimension in variable_dimensions: if dimension not in self.dimensions: error.append(SpecWarning('Dimension {0} found in variable {1} but is not defined in the specifications'.format(dimension, variable_name))) else: check_passed = False error = [SpecError('Variable {0} should be checked for dimensions, but was not found in the file.'.format(variable_name)),] self.check_passed = check_passed self.error = error return check_passed, error class VariableType(GenericSpecification): def __init__(self, dtype): self.dtype = dtype def check(self, netcdf_file, variable_name): the_variable = netcdf_file.variables.get(variable_name, None) error = [] if the_variable != None: # Get the internal python type and not the numpy.dtype. # The conversions guarantee (?) that a single element is always returned variable_type_python = type(np.asscalar(np.asarray(np.asarray(the_variable[:]).item(0)))) if not (variable_type_python == self.dtype): check_passed = False error.append(SpecError('Variable {0} is of type {1} while it should be {2}'.format(variable_name, the_variable.dtype, self.dtype))) else: check_passed = True else: check_passed = False error.append(SpecError('Variable {0} should be checked for type, but was not found in the file.'.format(variable_name))) self.check_passed = check_passed self.error = error return check_passed, error class AttributeMandatory(GenericSpecification): def __init__(self, is_mandatory = True): self.block_next = True # if true the next checks for this variable will not be performed. self.is_mandatory = is_mandatory def check(self, netcdf_file, attribute_name): the_attribute = getattr(netcdf_file, attribute_name, None) error = [] if the_attribute: # If the variable is found in the file self.attribute_exists = True check_passed = True else: self.attribute_exists = False if self.is_mandatory: # If a mandatory variable is not found in the file check_passed = False error.append(SpecError('The attribute {0} is obligatory but was not found in the file.'.format(attribute_name))) else: check_passed = True error.append(SpecNotification('The optional attribute {0} was not found in the file.'.format(attribute_name))) self.check_passed = check_passed self.error = error return check_passed, error @property def continue_check(self): if (not self.attribute_exists) and (self.block_next): return False else: return True class AttributeType(GenericSpecification): def __init__(self, dtype, block_next = True): self.block_next = block_next self.dtype = dtype def check(self, netcdf_file, attribute_name): the_attribute = getattr(netcdf_file, attribute_name, None) error = [] if the_attribute: # Get the internal python type and not the numpy.dtype. # The conversions guarantee (?) that a single element is always returned try: attribute_type_python = type(np.asscalar(np.asarray(np.asarray(the_attribute[:]).item(0)))) except: attribute_type_python = type(np.asscalar(the_attribute)) if not (attribute_type_python == self.dtype): check_passed = False error.append(SpecError('Attribute {0} is of type {1} while it should be {2}'.format(attribute_name, type(the_attribute).__name__, self.dtype.__name__))) else: error = None check_passed = True else: check_passed = False error.append(SpecError('Attribute {0} should be checked for type, but was not found in the file.'.format(attribute_name))) self.check_passed = check_passed self.error = error return check_passed, error @property def continue_check(self): if (not self.check_passed) and (self.block_next): return False else: return True class AttributeStrLength(GenericSpecification): def __init__(self, length): self.length = length def check(self, netcdf_file, attribute_name): the_attribute = getattr(netcdf_file, attribute_name, None) error = [] if the_attribute: if len(the_attribute) != self.length: check_passed = False error.append(SpecError('Attribute {0} should be of length {1} while it has length {2}'.format(attribute_name, self.length, len(the_attribute)))) else: check_passed = True else: check_passed = False error.append(SpecError('Attribute {0} should be checked for length, but was not found in the file.'.format(attribute_name))) self.check_passed = check_passed self.error = error return check_passed, error class FilenameShellPattern(GenericSpecification): def __init__(self, shell_pattern): self.pattern = shell_pattern def check(self, netcdf_file, filename): error = [] if fnmatch.fnmatch(filename, self.pattern): check_passed = True else: check_passed = False error.append(SpecError('Filename {0} does not match patter {1}'.format(filename, self.pattern))) self.check_passed = check_passed self.error = error return check_passed, error # This is the main class of the script. class FileChecker: """ It uses the provided specifications to check the a file. It can be used with the 'with' statement. For example: with FileChecker(filename, specs) as file_checker: file_checker.run_checks() file_checker.print_report('error') """ def __init__(self, filepath, specs): self.file = None self.checks_run = False self.filepath = filepath self.filename = os.path.basename(filepath) self.specs = specs self.check_results = {} self.check_results['general'] = [] def __enter__(self): self.open_file() return self def __exit__(self, type, value, traceback): if self.file: self.file.close() def open_file(self): try: self.file = netcdf.Dataset(self.filepath) except: self.check_results['general'].append(SpecError('Could not open file {0}.'.format(self.filename))) def close_file(self): self.file.close() def run_checks(self): if self.file: self.check_file() self.check_attributes() self.check_dimensions() self.check_variables() self.checks_run = True def check_file(self): self.check_results['file'] = [] try: specs_file = self.specs['file'] except: specs_file = [] for file_spec in specs_file: check_passed, error = file_spec.check(self.file, self.filename) if error: self.check_results['file'].extend(list(error)) if not file_spec.continue_check: break def check_attributes(self): """ Check if attributes are according to specs """ self.check_results['attributes'] = [] try: spec_attributes = self.specs['attributes'].keys() except: spec_attributes = [] for attribute_name in spec_attributes: attribute_specs = self.specs['attributes'][attribute_name] for attribute_spec in attribute_specs: check_passed, error = attribute_spec.check(self.file, attribute_name) if error: self.check_results['attributes'].extend(list(error)) if not attribute_spec.continue_check: break # Don't continue checking specifications if a blocking check failed. for attribute_name in self.file.ncattrs(): if attribute_name not in spec_attributes: self.check_results['attributes'].append(SpecWarning('Attribute {0} found in the file but is not defined in the specifications'.format(attribute_name))) def check_dimensions(self): """ Check if dimension are according to specs """ self.check_results['dimensions'] = [] try: spec_dimensions = self.specs['dimensions'].keys() except: spec_dimensions = [] for dimension_name in spec_dimensions: dimension_specs = self.specs['dimensions'][dimension_name] for dimension_spec in dimension_specs: check_passed, error = dimension_spec.check(self.file, dimension_name) if error: self.check_results['dimensions'].extend(list(error)) if not dimension_spec.continue_check: break # Don't continue checking specifications if a blocking check failed. for dimension in self.file.dimensions: if dimension not in spec_dimensions: self.check_results['dimensions'].append(SpecWarning('Dimension {0} found in the file but is not defined in the specifications'.format(dimension))) def check_variables(self): """ Check if variables are according to specs """ self.check_results['variables'] = [] try: spec_variables = self.specs['variables'].keys() except: spec_variables = [] for variable_name in spec_variables: variable_specs = self.specs['variables'][variable_name] for variable_spec in variable_specs: check_passed, error = variable_spec.check(self.file, variable_name) if error: self.check_results['variables'].extend(list(error)) if not variable_spec.continue_check: break # Don't continue checking specifications if a blocking check failed. for variable_name in self.file.variables: if variable_name not in spec_variables: self.check_results['variables'].append(SpecWarning('Variable {0} found in the file but is not defined in the specifications'.format(variable_name))) def file_ok(self, level = 'error'): """ Check if the file checked is ok. What ok means is defined by the level variable """ status = None if self.checks_run: status = True for category, result_list in self.check_results.items(): for result in result_list: if ERROR_ORDER[result.level] >= ERROR_ORDER[level]: status = False return status def results_for_level(self, level): """ Returns all the results of a specific level """ results = None if self.checks_run: results = [] for category, result_list in self.check_results.items(): for result in result_list: if ERROR_ORDER[result.level] == ERROR_ORDER[level]: results.append(result) return results def results_by_level(self): """ Returns a dictionary with the results by level. """ results = {} for level, order in ERROR_ORDER.items(): results[level] = self.results_for_level(level) return results def result_count(self): """ Returns a dictionary with the number of results per category. """ result_number = {} results = self.results_by_level() for category, error_list in results.items(): if error_list is None: result_number[category] = 0 else: result_number[category] = len(error_list) return result_number def print_report(self, level): """ Print a report for the given level. """ print header_template.format(self, self.result_count()) results = self.results_by_level() for result_level in ['error', 'warning', 'notification']: if ERROR_ORDER[result_level] >= ERROR_ORDER[level]: print "\n{0} details".format(result_level.capitalize()) print "----------------" for result in results[result_level]: print result # Sounding file specifications sounding_specs = {'file': [FilenameShellPattern('rs_*.nc'),], 'dimensions': {'points': [DimensionMandatory(True), DimensionUnlimited(False),], }, 'variables': {'Altitude': [VariableMandatory(True), VariableDimensions(['points',]), VariableType(float)], 'Temperature': [VariableMandatory(True), VariableDimensions(['points',]), VariableType(float)], 'Pressure': [VariableMandatory(True), VariableDimensions(['points',]), VariableType(float)], 'RelativeHumidity': [VariableMandatory(False), VariableDimensions(['points',]), VariableType(float)], }, 'attributes': {'Latitude_degrees_north': [AttributeMandatory(True), AttributeType(float),], 'Longitude_degrees_east': [AttributeMandatory(True), AttributeType(float),], 'Altitude_meter_asl': [AttributeMandatory(True), AttributeType(float),], 'Location': [AttributeMandatory(False), AttributeType(unicode),], 'Sounding_Station_Name': [AttributeMandatory(False), AttributeType(unicode),], 'WMO_Station_Number': [AttributeMandatory(False), AttributeType(unicode),], 'WBAN_Station_Number':[AttributeMandatory(False), AttributeType(unicode),], 'Sounding_Start_Date':[AttributeMandatory(True), AttributeType(unicode, block_next = True), AttributeStrLength(8)], 'Sounding_Start_Time_UT':[AttributeMandatory(True), AttributeType(unicode, block_next = True), AttributeStrLength(6)], 'Sounding_Stop_Time_UT':[AttributeMandatory(False), AttributeType(unicode, block_next = True), AttributeStrLength(6)], }, 'name': "SCC Sounding file" } # Lidar ratio file specifications lidar_ratio_specs = {'file': [FilenameShellPattern('*.nc'),], 'dimensions': {'points': [DimensionMandatory(True), DimensionUnlimited(False),], 'products': [DimensionMandatory(True), DimensionUnlimited(False),], }, 'variables': {'Altitude': [VariableMandatory(True), VariableDimensions(['points',]), VariableType(float)], 'Lidar_Ratio': [VariableMandatory(True), VariableDimensions(['points', 'products']), VariableType(float)], 'product_ID': [VariableMandatory(True), VariableDimensions(['products',]), VariableType(int)], }, 'attributes': {'Lidar_Station_Name': [AttributeMandatory(True), AttributeType(unicode),], }, 'name': "SCC Lidar ratio file" } # Overlap file specifications overlap_specs = {'file': [FilenameShellPattern('ov_*.nc'),], 'dimensions': {'points': [DimensionMandatory(True), DimensionUnlimited(False),], 'channels': [DimensionMandatory(True), DimensionUnlimited(False),], }, 'variables': {'Altitude': [VariableMandatory(True), VariableDimensions(['points',]), VariableType(float)], 'Overlap_Function': [VariableMandatory(True), VariableDimensions(['points', 'channels']), VariableType(float)], 'channel_ID': [VariableMandatory(True), VariableDimensions(['channels',]), VariableType(int)], }, 'attributes': {'Lidar_Station_Name': [AttributeMandatory(True), AttributeType(unicode, block_next = True), AttributeStrLength(2)], 'Overlap_Measurement_Date': [AttributeMandatory(True), AttributeType(unicode, block_next = True), AttributeStrLength(8)], }, 'name': "SCC Overlap file" } # Raw data file specifications data_specs = {'file': [FilenameShellPattern('*.nc'),], 'dimensions': {'points': [DimensionMandatory(True), DimensionUnlimited(False),], 'channels': [DimensionMandatory(True), DimensionUnlimited(False),], 'nb_of_time_scales': [DimensionMandatory(True), DimensionUnlimited(False),], 'time': [DimensionMandatory(True), DimensionUnlimited(True),], 'time_bck': [DimensionMandatory(False), DimensionUnlimited(False),], 'scan_angles': [DimensionMandatory(True), DimensionUnlimited(False),], }, 'variables': {'channel_ID': [VariableMandatory(True), VariableDimensions(['channels',]), VariableType(int)], 'Laser_Repetition_Rate': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(int)], 'Laser_Pointing_Angle': [VariableMandatory(True), VariableDimensions(['scan_angles',]), VariableType(float)], 'ID_Range': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(int)], 'Scattering_Mechanism': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(int)], 'Emitted_Wavelength': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(float)], 'Detected_Wavelength': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(float)], 'Raw_Data_Range_Resolution': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(float)], 'Background_Mode': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(int)], 'Background_Low': [VariableMandatory(True), VariableDimensions(['channels',]), VariableType(float)], 'Background_High': [VariableMandatory(True), VariableDimensions(['channels',]), VariableType(float)], 'Molecular_Calc': [VariableMandatory(True), VariableDimensions([]), VariableType(int)], 'id_timescale': [VariableMandatory(True), VariableDimensions(['channels',]), VariableType(int)], 'Dead_Time_Corr_Type': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(int)], 'Dead_Time': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(float)], 'Acquisition_Mode': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(int)], 'Trigger_Delay': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(float)], 'Laser_Pointing_Angle_of_Profiles': [VariableMandatory(True), VariableDimensions(['time','nb_of_time_scales',]), VariableType(int)], 'Raw_Data_Start_Time': [VariableMandatory(True), VariableDimensions(['time','nb_of_time_scales',]), VariableType(int)], 'Raw_Data_Stop_Time': [VariableMandatory(True), VariableDimensions(['time','nb_of_time_scales',]), VariableType(int)], 'Laser_Shots': [VariableMandatory(True), VariableDimensions(['time','channels',]), VariableType(int)], 'Raw_Lidar_Data': [VariableMandatory(False), VariableDimensions(['time', 'channels', 'points']), VariableType(float)], 'Depolarization_Factor': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(float)], 'LR_Input': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(int)], 'DAQ_Range': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(float)], 'Pressure_at_Lidar_Station': [VariableMandatory(False), VariableDimensions([]), VariableType(float)], 'Temperature_at_Lidar_Station': [VariableMandatory(False), VariableDimensions([]), VariableType(float)], 'Background_Profile': [VariableMandatory(False), VariableDimensions(['time_bck', 'channels', 'points']), VariableType(float)], 'Raw_Bck_Start_Time': [VariableMandatory(False), VariableDimensions(['time_bck','nb_of_time_scales',]), VariableType(int)], 'Raw_Bck_Stop_Time': [VariableMandatory(False), VariableDimensions(['time_bck','nb_of_time_scales',]), VariableType(int)], 'Error_On_Raw_Lidar_Data': [VariableMandatory(False), VariableDimensions(['time','channels', 'points']), VariableType(float)], 'First_Signal_Rangebin': [VariableMandatory(False), VariableDimensions(['channels',]), VariableType(int)], }, 'attributes': {'Measurement_ID': [AttributeMandatory(True), AttributeType(unicode, block_next = True), AttributeStrLength(12)], 'RawData_Start_Date': [AttributeMandatory(True), AttributeType(unicode, block_next = True), AttributeStrLength(8)], 'RawData_Start_Time_UT': [AttributeMandatory(True), AttributeType(unicode, block_next = True), AttributeStrLength(6)], 'RawData_Stop_Time_UT': [AttributeMandatory(True), AttributeType(unicode, block_next = True), AttributeStrLength(6)], 'RawBck_Start_Date': [AttributeMandatory(False), AttributeType(unicode, block_next = True), AttributeStrLength(8)], 'RawBck_Start_Time_UT': [AttributeMandatory(False), AttributeType(unicode, block_next = True), AttributeStrLength(6)], 'RawBck_Stop_Time_UT': [AttributeMandatory(False), AttributeType(unicode, block_next = True), AttributeStrLength(6)], 'Sounding_File_Name': [AttributeMandatory(False), AttributeType(unicode),], 'LR_File_Name': [AttributeMandatory(False), AttributeType(unicode),], 'Overlap_File_Name': [AttributeMandatory(False), AttributeType(unicode),], 'Location': [AttributeMandatory(False), AttributeType(unicode),], 'System': [AttributeMandatory(False), AttributeType(unicode),], 'Latitude_degrees_north': [AttributeMandatory(False), AttributeType(float),], 'Longitude_degrees_east': [AttributeMandatory(False), AttributeType(float),], 'Altitude_meter_asl': [AttributeMandatory(False), AttributeType(float),], }, 'name': "SCC Raw input file" } # Used for the command line arguments spec_shorthands = {'sounding': sounding_specs, 'lidar_ratio': lidar_ratio_specs, 'overlap': overlap_specs, 'data': data_specs,} if __name__ == "__main__": # For use from a terminal import argparse parser = argparse.ArgumentParser() parser.add_argument("file", help = "The path of the file to be checked") parser.add_argument("-s", "--specs", default = 'data', help = "The specificiations to use", choices = ['data', 'overlap', 'lidar_ratio', 'sounding']) parser.add_argument("-l", "--level", default = 'warning', help = "The output level", choices = ['error', 'warning', 'notification']) # Check the arguments args = parser.parse_args() specs = spec_shorthands[args.specs] with FileChecker(args.file, specs) as file_checker: file_checker.run_checks() file_checker.print_report(args.level)