Source code for h5features.reader

# Copyright 2014-2016 Thomas Schatz, Mathieu Bernard, Roland Thiolliere
#
# This file is part of h5features.
#
# h5features is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# h5features is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with h5features.  If not, see <http://www.gnu.org/licenses/>.

"""Provides the Reader class to the h5features package."""

import h5py
import os
import numpy as np

from .data import Data
from .items import read_items
from .index import read_index
from .version import read_version


[docs]class Reader(object): """This class provides an interface for reading from h5features files. A `Reader` object wrap a h5features file. When created it loads items and index from file. The read() method then allows fast access to features and times data. :param str filename: Path to the HDF5 file to read from. :param str groupname: Name of the group to read from in the file. If None, guess there is one and only one group in `filename`. :raise IOError: if `filename` is not an existing HDF5 file or if `groupname` is not a valid group in `filename`. """ def __init__(self, filename, groupname=None): # open the file for reading if not os.path.exists(filename) or not h5py.is_hdf5(filename): raise IOError('{} is not a HDF5 file'.format(filename)) self.h5file = h5py.File(filename, 'r') # open the requested group in the file if groupname is None: # expect only one group in the file groups = list(self.h5file.keys()) if not len(groups) == 1: raise IOError( 'groupname not specified and several groups in {}.' .format(filename)) groupname = groups[0] elif groupname not in self.h5file: raise IOError('{} is not a valid group in {}' .format(groupname, filename)) self.group = self.h5file[groupname] # load h5features attributes and datasets self.version = read_version(self.group) self.items = read_items(self.group, self.version) self._index = read_index(self.group, self.version) # access to the labels group according to version self._labels_group = (self.group['labels'] if self.version >= '1.1' else self.group['times']) self.dformat = self.group.attrs['format'] if self.dformat == 'sparse': self.dim = self.group.attrs['dim'] self.frames = (self.group['lines'] if self.version == '0.1' else self.group['frames'])[...] def __enter__(self): return self def __exit__(self, type, value, traceback): self.close()
[docs] def close(self): self.h5file.close()
[docs] def index_read(self, index): """Read data from its indexed coordinate""" # TODO raise NotImplementedError( 'h5features.Reader.index_read(index) not implemented ')
[docs] def read(self, from_item=None, to_item=None, from_time=None, to_time=None): """Retrieve requested data coordinates from the h5features index. :param str from_item: Optional. Read the data starting from this item. (defaults to the first stored item) :param str to_item: Optional. Read the data until reaching the item. (defaults to from_item if it was specified and to the last stored item otherwise). :param float from_time: Optional. (defaults to the beginning time in from_item) The specified times are included in the output. :param float to_time: Optional. (defaults to the ending time in to_item) the specified times are included in the output. :return: An instance of h5features.Data read from the file. """ # handling default arguments if to_item is None: to_item = self.items.data[-1] if from_item is None else from_item if from_item is None: from_item = self.items.data[0] # index coordinates of from/to_item. TODO optimize because we # have 4 accesses to list.index() where 2 are enougth. if not self.items.is_valid_interval(from_item, to_item): raise IOError('cannot read items: not a valid interval') from_idx = self.items.data.index(from_item) to_idx = self.items.data.index(to_item) from_pos = self._get_item_position(from_idx) to_pos = self._get_item_position(to_idx) lower = self._get_from_time(from_time, from_pos) # upper included with +1 upper = self._get_to_time(to_time, to_pos) + 1 # Step 2: access actual data if self.dformat == 'sparse': raise NotImplementedError( 'Reading sparse features not implemented') else: features = (self.group['features'][:, lower:upper].T if self.version == '0.1' else self.group['features'][lower:upper, ...]) labels = self._labels_group[lower:upper] # If we read a single item if to_idx == from_idx: features = [features] labels = [labels] # Several items case: split them from the index else: item_ends = self._index[from_idx:to_idx] - from_pos[0] + 1 features = np.split(features, item_ends, axis=0) labels = np.split(labels, item_ends, axis=0) items = self.items.data[from_idx:to_idx + 1] return Data(items, labels, features, check=False)
def _get_item_position(self, idx): """Return a tuple of (start, end) indices of an item from its index.""" start = 0 if idx == 0 else self._index[idx - 1] + 1 end = self._index[idx] return start, end def _get_from_time(self, time, pos): if time is None: return pos[0] else: times = self._labels_group[pos[0]:pos[1] + 1] try: return pos[0] + times.searchsorted(time) except IndexError: raise IOError('time {} is too large'.format(time)) def _get_to_time(self, time, pos): if time is None: return pos[1] else: times = self._labels_group[pos[0]:pos[1] + 1] try: return pos[0] + times.searchsorted(time, side='right') - 1 except IndexError: raise IOError('time {} is too small'.format(time))