26 def import_file(self, filename): |
26 def import_file(self, filename): |
27 """Imports a licel file. |
27 """Imports a licel file. |
28 Input: filename |
28 Input: filename |
29 Output: object """ |
29 Output: object """ |
30 |
30 |
31 raw_info = {} |
|
32 channels = {} |
31 channels = {} |
33 channel_info = [] |
32 |
34 |
33 with open(filename, 'rb') as f: |
35 f = open(filename, 'rb') |
34 |
36 |
35 self.read_header(f) |
|
36 |
|
37 # Check the complete header is read |
|
38 a = f.readline() |
|
39 |
|
40 # Import the data |
|
41 for current_channel_info in self.channel_info: |
|
42 raw_data = np.fromfile(f, 'i4', int(current_channel_info['DataPoints'])) |
|
43 a = np.fromfile(f, 'b', 1) |
|
44 b = np.fromfile(f, 'b', 1) |
|
45 |
|
46 if (a[0] != 13) | (b[0] != 10): |
|
47 print "Warning: No end of line found after record. File could be corrupt" |
|
48 channel = LicelFileChannel(current_channel_info, raw_data, self.duration()) |
|
49 |
|
50 channel_name = channel.channel_name |
|
51 |
|
52 if channel_name in channels.keys(): |
|
53 # If the analog/photon naming scheme is not enough, find a new one! |
|
54 raise IOError('Trying to import two channels with the same name') |
|
55 |
|
56 channels[channel_name] = channel |
|
57 |
|
58 self.channels = channels |
|
59 |
|
60 |
|
61 def read_header(self, f): |
|
62 """ Read the header of a open file f. |
|
63 |
|
64 Returns raw_info and channel_info. Updates some object properties. """ |
|
65 |
37 #Read the first 3 lines of the header |
66 #Read the first 3 lines of the header |
38 raw_info = {} |
67 raw_info = {} |
39 for c1 in range(3): |
68 channel_info = [] |
40 raw_info.update(match_lines(f.readline(), licel_file_header_format[c1])) |
69 |
41 |
70 # Read first line |
|
71 raw_info['Filename'] = f.readline().strip() |
|
72 |
|
73 # Read second line |
|
74 second_line = f.readline() |
|
75 |
|
76 # Many licel files don't follow the licel standard. Specifically, the |
|
77 # measurement site is not always 8 characters, and can include white |
|
78 # spaces. For this, the site name is detect everything before the first |
|
79 # date. For efficiency, the first date is found by the first '/'. |
|
80 # e.g. assuming a string like 'Site name 01/01/2010 ...' |
|
81 |
|
82 site_name = second_line.split('/')[0][:-2] |
|
83 clean_site_name = site_name.strip() |
|
84 raw_info['Site'] = clean_site_name |
|
85 raw_info.update(match_lines(second_line[len(clean_site_name) + 1:], licel_file_header_format[1])) |
|
86 |
|
87 # Read third line |
|
88 third_line = f.readline() |
|
89 raw_info.update(match_lines(third_line, licel_file_header_format[2])) |
|
90 |
|
91 # Update the object properties based on the raw info |
42 start_string = '%s %s' % (raw_info['StartDate'], raw_info['StartTime']) |
92 start_string = '%s %s' % (raw_info['StartDate'], raw_info['StartTime']) |
43 stop_string = '%s %s' % (raw_info['EndDate'], raw_info['EndTime']) |
93 stop_string = '%s %s' % (raw_info['EndDate'], raw_info['EndTime']) |
44 date_format = '%d/%m/%Y %H:%M:%S' |
94 date_format = '%d/%m/%Y %H:%M:%S' |
|
95 |
45 self.start_time = datetime.datetime.strptime(start_string, date_format) |
96 self.start_time = datetime.datetime.strptime(start_string, date_format) |
46 self.stop_time = datetime.datetime.strptime(stop_string, date_format) |
97 self.stop_time = datetime.datetime.strptime(stop_string, date_format) |
47 self.latitude = float(raw_info['Latitude']) |
98 self.latitude = float(raw_info['Latitude']) |
48 self.longitude = float(raw_info['Longtitude']) |
99 self.longitude = float(raw_info['Longtitude']) |
49 |
100 |
50 # Read the rest of the header. |
101 # Read the rest of the header. |
51 for c1 in range(int(raw_info['DataSets'])): |
102 for c1 in range(int(raw_info['DataSets'])): |
52 channel_info.append(match_lines(f.readline(), licel_file_channel_format)) |
103 channel_info.append(match_lines(f.readline(), licel_file_channel_format)) |
53 |
104 |
54 # Check the complete header is read |
|
55 a = f.readline() |
|
56 |
|
57 # Import the data |
|
58 for current_channel_info in channel_info: |
|
59 raw_data = np.fromfile(f, 'i4', int(current_channel_info['DataPoints'])) |
|
60 a = np.fromfile(f, 'b', 1) |
|
61 b = np.fromfile(f, 'b', 1) |
|
62 |
|
63 if (a[0] != 13) | (b[0] != 10): |
|
64 print "Warning: No end of line found after record. File could be corrupt" |
|
65 channel = LicelFileChannel(current_channel_info, raw_data, self.duration()) |
|
66 |
|
67 channel_name = channel.channel_name |
|
68 if channel_name in channels.keys(): |
|
69 # If the analog/photon naming scheme is not enough, find a new one! |
|
70 raise IOError('Trying to import two channels with the same name') |
|
71 |
|
72 channels[channel_name] = channel |
|
73 f.close() |
|
74 |
|
75 self.raw_info = raw_info |
105 self.raw_info = raw_info |
76 self.channels = channels |
106 self.channel_info = channel_info |
77 |
107 |
78 def duration(self): |
108 def duration(self): |
79 """ Return the duration of the file. """ |
109 """ Return the duration of the file. """ |
80 dt = self.stop_time - self.start_time |
110 dt = self.stop_time - self.start_time |
81 return dt.seconds |
111 return dt.seconds |
82 |
112 |