# This file is part of h5py, a Python interface to the HDF5 library. # # http://www.h5py.org # # Copyright 2008-2013 Andrew Collette and contributors # # License: Standard 3-clause BSD; see "license.txt" for full license terms # and contributor agreement. """ Dataset testing operations. Tests all dataset operations, including creation, with the exception of: 1. Slicing operations for read and write, handled by module test_slicing 2. Type conversion for read and write (currently untested) """ import pathlib import os import sys import numpy as np import platform import pytest import warnings from .common import ut, TestCase from .data_files import get_data_file_path from h5py import File, Group, Dataset from h5py._hl.base import is_empty_dataspace, product from h5py import h5f, h5t from h5py.h5py_warnings import H5pyDeprecationWarning from h5py import version import h5py import h5py._hl.selections as sel class BaseDataset(TestCase): def setUp(self): self.f = File(self.mktemp(), 'w') def tearDown(self): if self.f: self.f.close() class TestRepr(BaseDataset): """ Feature: repr(Dataset) behaves sensibly """ def test_repr_open(self): """ repr() works on live and dead datasets """ ds = self.f.create_dataset('foo', (4,)) self.assertIsInstance(repr(ds), str) self.f.close() self.assertIsInstance(repr(ds), str) class TestCreateShape(BaseDataset): """ Feature: Datasets can be created from a shape only """ def test_create_scalar(self): """ Create a scalar dataset """ dset = self.f.create_dataset('foo', ()) self.assertEqual(dset.shape, ()) def test_create_simple(self): """ Create a size-1 dataset """ dset = self.f.create_dataset('foo', (1,)) self.assertEqual(dset.shape, (1,)) def test_create_integer(self): """ Create a size-1 dataset with integer shape""" dset = self.f.create_dataset('foo', 1) self.assertEqual(dset.shape, (1,)) def test_create_extended(self): """ Create an extended dataset """ dset = self.f.create_dataset('foo', (63,)) self.assertEqual(dset.shape, (63,)) self.assertEqual(dset.size, 63) dset = self.f.create_dataset('bar', (6, 10)) self.assertEqual(dset.shape, (6, 10)) self.assertEqual(dset.size, (60)) def test_create_integer_extended(self): """ Create an extended dataset """ dset = self.f.create_dataset('foo', 63) self.assertEqual(dset.shape, (63,)) self.assertEqual(dset.size, 63) dset = self.f.create_dataset('bar', (6, 10)) self.assertEqual(dset.shape, (6, 10)) self.assertEqual(dset.size, (60)) def test_default_dtype(self): """ Confirm that the default dtype is float """ dset = self.f.create_dataset('foo', (63,)) self.assertEqual(dset.dtype, np.dtype('=f4')) def test_missing_shape(self): """ Missing shape raises TypeError """ with self.assertRaises(TypeError): self.f.create_dataset('foo') def test_long_double(self): """ Confirm that the default dtype is float """ dset = self.f.create_dataset('foo', (63,), dtype=np.longdouble) if platform.machine() in ['ppc64le']: pytest.xfail("Storage of long double deactivated on %s" % platform.machine()) self.assertEqual(dset.dtype, np.longdouble) @ut.skipIf(not hasattr(np, "complex256"), "No support for complex256") def test_complex256(self): """ Confirm that the default dtype is float """ dset = self.f.create_dataset('foo', (63,), dtype=np.dtype('complex256')) self.assertEqual(dset.dtype, np.dtype('complex256')) def test_name_bytes(self): dset = self.f.create_dataset(b'foo', (1,)) self.assertEqual(dset.shape, (1,)) dset2 = self.f.create_dataset(b'bar/baz', (2,)) self.assertEqual(dset2.shape, (2,)) class TestCreateData(BaseDataset): """ Feature: Datasets can be created from existing data """ def test_create_scalar(self): """ Create a scalar dataset from existing array """ data = np.ones((), 'f') dset = self.f.create_dataset('foo', data=data) self.assertEqual(dset.shape, data.shape) def test_create_extended(self): """ Create an extended dataset from existing data """ data = np.ones((63,), 'f') dset = self.f.create_dataset('foo', data=data) self.assertEqual(dset.shape, data.shape) def test_dataset_intermediate_group(self): """ Create dataset with missing intermediate groups """ ds = self.f.create_dataset("/foo/bar/baz", shape=(10, 10), dtype='= (1,10,0): efile_prefix = pathlib.Path(dset.id.get_access_plist().get_efile_prefix().decode()).as_posix() parent = pathlib.Path(ext_file).parent.as_posix() assert efile_prefix == parent dset2 = self.f.require_dataset('foo', shape, testdata.dtype, efile_prefix=os.path.dirname(ext_file)) assert dset2.external is not None dset2[()] == testdata def test_name_str(self): """ External argument may be a file name str only """ self.f.create_dataset('foo', (6, 100), external=self.mktemp()) def test_name_path(self): """ External argument may be a file name path only """ self.f.create_dataset('foo', (6, 100), external=pathlib.Path(self.mktemp())) def test_iter_multi(self): """ External argument may be an iterable of multiple tuples """ ext_file = self.mktemp() N = 100 external = iter((ext_file, x * 1000, 1000) for x in range(N)) dset = self.f.create_dataset('poo', (6, 100), external=external) assert len(dset.external) == N def test_invalid(self): """ Test with invalid external lists """ shape = (6, 100) ext_file = self.mktemp() for exc_type, external in [ (TypeError, [ext_file]), (TypeError, [ext_file, 0]), (TypeError, [ext_file, 0, h5f.UNLIMITED]), (ValueError, [(ext_file,)]), (ValueError, [(ext_file, 0)]), (ValueError, [(ext_file, 0, h5f.UNLIMITED, 0)]), (TypeError, [(ext_file, 0, "h5f.UNLIMITED")]), ]: with self.assertRaises(exc_type): self.f.create_dataset('foo', shape, external=external) def test_create_expandable(self): """ Create expandable external dataset """ ext_file = self.mktemp() shape = (128, 64) maxshape = (None, 64) exp_dset = self.f.create_dataset('foo', shape=shape, maxshape=maxshape, external=ext_file) assert exp_dset.chunks is None assert exp_dset.shape == shape assert exp_dset.maxshape == maxshape class TestAutoCreate(BaseDataset): """ Feature: Datasets auto-created from data produce the correct types """ def assert_string_type(self, ds, cset, variable=True): tid = ds.id.get_type() self.assertEqual(type(tid), h5py.h5t.TypeStringID) self.assertEqual(tid.get_cset(), cset) if variable: assert tid.is_variable_str() def test_vlen_bytes(self): """Assigning byte strings produces a vlen string ASCII dataset """ self.f['x'] = b"Hello there" self.assert_string_type(self.f['x'], h5py.h5t.CSET_ASCII) self.f['y'] = [b"a", b"bc"] self.assert_string_type(self.f['y'], h5py.h5t.CSET_ASCII) self.f['z'] = np.array([b"a", b"bc"], dtype=np.object_) self.assert_string_type(self.f['z'], h5py.h5t.CSET_ASCII) def test_vlen_unicode(self): """Assigning unicode strings produces a vlen string UTF-8 dataset """ self.f['x'] = "Hello there" + chr(0x2034) self.assert_string_type(self.f['x'], h5py.h5t.CSET_UTF8) self.f['y'] = ["a", "bc"] self.assert_string_type(self.f['y'], h5py.h5t.CSET_UTF8) # 2D array; this only works with an array, not nested lists self.f['z'] = np.array([["a", "bc"]], dtype=np.object_) self.assert_string_type(self.f['z'], h5py.h5t.CSET_UTF8) def test_string_fixed(self): """ Assignment of fixed-length byte string produces a fixed-length ascii dataset """ self.f['x'] = np.bytes_("Hello there") ds = self.f['x'] self.assert_string_type(ds, h5py.h5t.CSET_ASCII, variable=False) self.assertEqual(ds.id.get_type().get_size(), 11) class TestCreateLike(BaseDataset): def test_no_chunks(self): self.f['lol'] = np.arange(25).reshape(5, 5) self.f.create_dataset_like('like_lol', self.f['lol']) dslike = self.f['like_lol'] self.assertEqual(dslike.shape, (5, 5)) self.assertIs(dslike.chunks, None) def test_track_times(self): orig = self.f.create_dataset('honda', data=np.arange(12), track_times=True) self.assertNotEqual(0, h5py.h5g.get_objinfo(orig._id).mtime) similar = self.f.create_dataset_like('hyundai', orig) self.assertNotEqual(0, h5py.h5g.get_objinfo(similar._id).mtime) orig = self.f.create_dataset('ibm', data=np.arange(12), track_times=False) self.assertEqual(0, h5py.h5g.get_objinfo(orig._id).mtime) similar = self.f.create_dataset_like('lenovo', orig) self.assertEqual(0, h5py.h5g.get_objinfo(similar._id).mtime) def test_maxshape(self): """ Test when other.maxshape != other.shape """ other = self.f.create_dataset('other', (10,), maxshape=20) similar = self.f.create_dataset_like('sim', other) self.assertEqual(similar.shape, (10,)) self.assertEqual(similar.maxshape, (20,)) class TestChunkIterator(BaseDataset): def test_no_chunks(self): dset = self.f.create_dataset("foo", ()) with self.assertRaises(TypeError): dset.iter_chunks() def test_1d(self): dset = self.f.create_dataset("foo", (100,), chunks=(32,)) expected = ((slice(0,32,1),), (slice(32,64,1),), (slice(64,96,1),), (slice(96,100,1),)) self.assertEqual(list(dset.iter_chunks()), list(expected)) expected = ((slice(50,64,1),), (slice(64,96,1),), (slice(96,97,1),)) self.assertEqual(list(dset.iter_chunks(np.s_[50:97])), list(expected)) def test_2d(self): dset = self.f.create_dataset("foo", (100,100), chunks=(32,64)) expected = ((slice(0, 32, 1), slice(0, 64, 1)), (slice(0, 32, 1), slice(64, 100, 1)), (slice(32, 64, 1), slice(0, 64, 1)), (slice(32, 64, 1), slice(64, 100, 1)), (slice(64, 96, 1), slice(0, 64, 1)), (slice(64, 96, 1), slice(64, 100, 1)), (slice(96, 100, 1), slice(0, 64, 1)), (slice(96, 100, 1), slice(64, 100, 1))) self.assertEqual(list(dset.iter_chunks()), list(expected)) expected = ((slice(48, 52, 1), slice(40, 50, 1)),) self.assertEqual(list(dset.iter_chunks(np.s_[48:52,40:50])), list(expected)) def test_2d_partial_slice(self): dset = self.f.create_dataset("foo", (5,5), chunks=(2,2)) expected = ((slice(3, 4, 1), slice(3, 4, 1)), (slice(3, 4, 1), slice(4, 5, 1)), (slice(4, 5, 1), slice(3, 4, 1)), (slice(4, 5, 1), slice(4, 5, 1))) sel = slice(3,5) self.assertEqual(list(dset.iter_chunks((sel, sel))), list(expected)) class TestResize(BaseDataset): """ Feature: Datasets created with "maxshape" may be resized """ def test_create(self): """ Create dataset with "maxshape" """ dset = self.f.create_dataset('foo', (20, 30), maxshape=(20, 60)) self.assertIsNot(dset.chunks, None) self.assertEqual(dset.maxshape, (20, 60)) def test_create_1D(self): """ Create dataset with "maxshape" using integer maxshape""" dset = self.f.create_dataset('foo', (20,), maxshape=20) self.assertIsNot(dset.chunks, None) self.assertEqual(dset.maxshape, (20,)) dset = self.f.create_dataset('bar', 20, maxshape=20) self.assertEqual(dset.maxshape, (20,)) def test_resize(self): """ Datasets may be resized up to maxshape """ dset = self.f.create_dataset('foo', (20, 30), maxshape=(20, 60)) self.assertEqual(dset.shape, (20, 30)) dset.resize((20, 50)) self.assertEqual(dset.shape, (20, 50)) dset.resize((20, 60)) self.assertEqual(dset.shape, (20, 60)) def test_resize_1D(self): """ Datasets may be resized up to maxshape using integer maxshape""" dset = self.f.create_dataset('foo', 20, maxshape=40) self.assertEqual(dset.shape, (20,)) dset.resize((30,)) self.assertEqual(dset.shape, (30,)) def test_resize_over(self): """ Resizing past maxshape triggers an exception """ dset = self.f.create_dataset('foo', (20, 30), maxshape=(20, 60)) with self.assertRaises(Exception): dset.resize((20, 70)) def test_resize_nonchunked(self): """ Resizing non-chunked dataset raises TypeError """ dset = self.f.create_dataset("foo", (20, 30)) with self.assertRaises(TypeError): dset.resize((20, 60)) def test_resize_axis(self): """ Resize specified axis """ dset = self.f.create_dataset('foo', (20, 30), maxshape=(20, 60)) dset.resize(50, axis=1) self.assertEqual(dset.shape, (20, 50)) def test_axis_exc(self): """ Illegal axis raises ValueError """ dset = self.f.create_dataset('foo', (20, 30), maxshape=(20, 60)) with self.assertRaises(ValueError): dset.resize(50, axis=2) def test_zero_dim(self): """ Allow zero-length initial dims for unlimited axes (issue 111) """ dset = self.f.create_dataset('foo', (15, 0), maxshape=(15, None)) self.assertEqual(dset.shape, (15, 0)) self.assertEqual(dset.maxshape, (15, None)) class TestDtype(BaseDataset): """ Feature: Dataset dtype is available as .dtype property """ def test_dtype(self): """ Retrieve dtype from dataset """ dset = self.f.create_dataset('foo', (5,), '|S10') self.assertEqual(dset.dtype, np.dtype('|S10')) def test_dtype_complex32(self): """ Retrieve dtype from complex float16 dataset (gh-2156) """ # No native support in numpy as of v1.23.3, so expect compound type. complex32 = np.dtype([('r', np.float16), ('i', np.float16)]) dset = self.f.create_dataset('foo', (5,), complex32) self.assertEqual(dset.dtype, complex32) class TestLen(BaseDataset): """ Feature: Size of first axis is available via Python's len """ def test_len(self): """ Python len() (under 32 bits) """ dset = self.f.create_dataset('foo', (312, 15)) self.assertEqual(len(dset), 312) def test_len_big(self): """ Python len() vs Dataset.len() """ dset = self.f.create_dataset('foo', (2 ** 33, 15)) self.assertEqual(dset.shape, (2 ** 33, 15)) if sys.maxsize == 2 ** 31 - 1: with self.assertRaises(OverflowError): len(dset) else: self.assertEqual(len(dset), 2 ** 33) self.assertEqual(dset.len(), 2 ** 33) class TestIter(BaseDataset): """ Feature: Iterating over a dataset yields rows """ def test_iter(self): """ Iterating over a dataset yields rows """ data = np.arange(30, dtype='f').reshape((10, 3)) dset = self.f.create_dataset('foo', data=data) for x, y in zip(dset, data): self.assertEqual(len(x), 3) self.assertArrayEqual(x, y) def test_iter_scalar(self): """ Iterating over scalar dataset raises TypeError """ dset = self.f.create_dataset('foo', shape=()) with self.assertRaises(TypeError): [x for x in dset] class TestStrings(BaseDataset): """ Feature: Datasets created with vlen and fixed datatypes correctly translate to and from HDF5 """ def test_vlen_bytes(self): """ Vlen bytes dataset maps to vlen ascii in the file """ dt = h5py.string_dtype(encoding='ascii') ds = self.f.create_dataset('x', (100,), dtype=dt) tid = ds.id.get_type() self.assertEqual(type(tid), h5py.h5t.TypeStringID) self.assertEqual(tid.get_cset(), h5py.h5t.CSET_ASCII) string_info = h5py.check_string_dtype(ds.dtype) self.assertEqual(string_info.encoding, 'ascii') def test_vlen_bytes_fillvalue(self): """ Vlen bytes dataset handles fillvalue """ dt = h5py.string_dtype(encoding='ascii') fill_value = b'bar' ds = self.f.create_dataset('x', (100,), dtype=dt, fillvalue=fill_value) self.assertEqual(self.f['x'][0], fill_value) self.assertEqual(self.f['x'].asstr()[0], fill_value.decode()) self.assertEqual(self.f['x'].fillvalue, fill_value) def test_vlen_unicode(self): """ Vlen unicode dataset maps to vlen utf-8 in the file """ dt = h5py.string_dtype() ds = self.f.create_dataset('x', (100,), dtype=dt) tid = ds.id.get_type() self.assertEqual(type(tid), h5py.h5t.TypeStringID) self.assertEqual(tid.get_cset(), h5py.h5t.CSET_UTF8) string_info = h5py.check_string_dtype(ds.dtype) self.assertEqual(string_info.encoding, 'utf-8') def test_vlen_unicode_fillvalue(self): """ Vlen unicode dataset handles fillvalue """ dt = h5py.string_dtype() fill_value = 'bár' ds = self.f.create_dataset('x', (100,), dtype=dt, fillvalue=fill_value) self.assertEqual(self.f['x'][0], fill_value.encode("utf-8")) self.assertEqual(self.f['x'].asstr()[0], fill_value) self.assertEqual(self.f['x'].fillvalue, fill_value.encode("utf-8")) def test_fixed_ascii(self): """ Fixed-length bytes dataset maps to fixed-length ascii in the file """ dt = np.dtype("|S10") ds = self.f.create_dataset('x', (100,), dtype=dt) tid = ds.id.get_type() self.assertEqual(type(tid), h5py.h5t.TypeStringID) self.assertFalse(tid.is_variable_str()) self.assertEqual(tid.get_size(), 10) self.assertEqual(tid.get_cset(), h5py.h5t.CSET_ASCII) string_info = h5py.check_string_dtype(ds.dtype) self.assertEqual(string_info.encoding, 'ascii') self.assertEqual(string_info.length, 10) def test_fixed_bytes_fillvalue(self): """ Vlen bytes dataset handles fillvalue """ dt = h5py.string_dtype(encoding='ascii', length=10) fill_value = b'bar' ds = self.f.create_dataset('x', (100,), dtype=dt, fillvalue=fill_value) self.assertEqual(self.f['x'][0], fill_value) self.assertEqual(self.f['x'].asstr()[0], fill_value.decode()) self.assertEqual(self.f['x'].fillvalue, fill_value) def test_fixed_utf8(self): dt = h5py.string_dtype(encoding='utf-8', length=5) ds = self.f.create_dataset('x', (100,), dtype=dt) tid = ds.id.get_type() self.assertEqual(tid.get_cset(), h5py.h5t.CSET_UTF8) s = 'cù' ds[0] = s.encode('utf-8') ds[1] = s ds[2:4] = [s, s] ds[4:6] = np.array([s, s], dtype=object) ds[6:8] = np.array([s.encode('utf-8')] * 2, dtype=dt) with self.assertRaises(TypeError): ds[8:10] = np.array([s, s], dtype='U') np.testing.assert_array_equal(ds[:8], np.array([s.encode('utf-8')] * 8, dtype='S')) def test_fixed_utf_8_fillvalue(self): """ Vlen unicode dataset handles fillvalue """ dt = h5py.string_dtype(encoding='utf-8', length=10) fill_value = 'bár'.encode("utf-8") ds = self.f.create_dataset('x', (100,), dtype=dt, fillvalue=fill_value) self.assertEqual(self.f['x'][0], fill_value) self.assertEqual(self.f['x'].asstr()[0], fill_value.decode("utf-8")) self.assertEqual(self.f['x'].fillvalue, fill_value) def test_fixed_unicode(self): """ Fixed-length unicode datasets are unsupported (raise TypeError) """ dt = np.dtype("|U10") with self.assertRaises(TypeError): ds = self.f.create_dataset('x', (100,), dtype=dt) def test_roundtrip_vlen_bytes(self): """ writing and reading to vlen bytes dataset preserves type and content """ dt = h5py.string_dtype(encoding='ascii') ds = self.f.create_dataset('x', (100,), dtype=dt) data = b"Hello\xef" ds[0] = data out = ds[0] self.assertEqual(type(out), bytes) self.assertEqual(out, data) def test_roundtrip_fixed_bytes(self): """ Writing to and reading from fixed-length bytes dataset preserves type and content """ dt = np.dtype("|S10") ds = self.f.create_dataset('x', (100,), dtype=dt) data = b"Hello\xef" ds[0] = data out = ds[0] self.assertEqual(type(out), np.bytes_) self.assertEqual(out, data) def test_retrieve_vlen_unicode(self): dt = h5py.string_dtype() ds = self.f.create_dataset('x', (10,), dtype=dt) data = "fàilte" ds[0] = data self.assertIsInstance(ds[0], bytes) out = ds.asstr()[0] self.assertIsInstance(out, str) self.assertEqual(out, data) def test_asstr(self): ds = self.f.create_dataset('x', (10,), dtype=h5py.string_dtype()) data = "fàilte" ds[0] = data strwrap1 = ds.asstr('ascii') with self.assertRaises(UnicodeDecodeError): out = strwrap1[0] # Different errors parameter self.assertEqual(ds.asstr('ascii', 'ignore')[0], 'filte') # latin-1 will decode it but give the wrong text self.assertNotEqual(ds.asstr('latin-1')[0], data) # len of ds self.assertEqual(10, len(ds.asstr())) # Array output np.testing.assert_array_equal( ds.asstr()[:1], np.array([data], dtype=object) ) np.testing.assert_array_equal( np.asarray(ds.asstr())[:1], np.array([data], dtype=object) ) def test_asstr_fixed(self): dt = h5py.string_dtype(length=5) ds = self.f.create_dataset('x', (10,), dtype=dt) data = 'cù' ds[0] = np.array(data.encode('utf-8'), dtype=dt) self.assertIsInstance(ds[0], np.bytes_) out = ds.asstr()[0] self.assertIsInstance(out, str) self.assertEqual(out, data) # Different errors parameter self.assertEqual(ds.asstr('ascii', 'ignore')[0], 'c') # latin-1 will decode it but give the wrong text self.assertNotEqual(ds.asstr('latin-1')[0], data) # Array output np.testing.assert_array_equal( ds.asstr()[:1], np.array([data], dtype=object) ) def test_unicode_write_error(self): """Encoding error when writing a non-ASCII string to an ASCII vlen dataset""" dt = h5py.string_dtype('ascii') ds = self.f.create_dataset('x', (100,), dtype=dt) data = "fàilte" with self.assertRaises(UnicodeEncodeError): ds[0] = data def test_unicode_write_bytes(self): """ Writing valid utf-8 byte strings to a unicode vlen dataset is OK """ dt = h5py.string_dtype() ds = self.f.create_dataset('x', (100,), dtype=dt) data = (u"Hello there" + chr(0x2034)).encode('utf8') ds[0] = data out = ds[0] self.assertEqual(type(out), bytes) self.assertEqual(out, data) def test_vlen_bytes_write_ascii_str(self): """ Writing an ascii str to ascii vlen dataset is OK """ dt = h5py.string_dtype('ascii') ds = self.f.create_dataset('x', (100,), dtype=dt) data = "ASCII string" ds[0] = data out = ds[0] self.assertEqual(type(out), bytes) self.assertEqual(out, data.encode('ascii')) class TestCompound(BaseDataset): """ Feature: Compound types correctly round-trip """ def test_rt(self): """ Compound types are read back in correct order (issue 236)""" dt = np.dtype([ ('weight', np.float64), ('cputime', np.float64), ('walltime', np.float64), ('parents_offset', np.uint32), ('n_parents', np.uint32), ('status', np.uint8), ('endpoint_type', np.uint8), ]) testdata = np.ndarray((16,), dtype=dt) for key in dt.fields: testdata[key] = np.random.random((16,)) * 100 self.f['test'] = testdata outdata = self.f['test'][...] self.assertTrue(np.all(outdata == testdata)) self.assertEqual(outdata.dtype, testdata.dtype) def test_assign(self): dt = np.dtype([ ('weight', (np.float64, 3)), ('endpoint_type', np.uint8), ]) testdata = np.ndarray((16,), dtype=dt) for key in dt.fields: testdata[key] = np.random.random(size=testdata[key].shape) * 100 ds = self.f.create_dataset('test', (16,), dtype=dt) for key in dt.fields: ds[key] = testdata[key] outdata = self.f['test'][...] self.assertTrue(np.all(outdata == testdata)) self.assertEqual(outdata.dtype, testdata.dtype) def test_fields(self): dt = np.dtype([ ('x', np.float64), ('y', np.float64), ('z', np.float64), ]) testdata = np.ndarray((16,), dtype=dt) for key in dt.fields: testdata[key] = np.random.random((16,)) * 100 self.f['test'] = testdata # Extract multiple fields np.testing.assert_array_equal( self.f['test'].fields(['x', 'y'])[:], testdata[['x', 'y']] ) # Extract single field np.testing.assert_array_equal( self.f['test'].fields('x')[:], testdata['x'] ) # Check __array__() method of fields wrapper np.testing.assert_array_equal( np.asarray(self.f['test'].fields(['x', 'y'])), testdata[['x', 'y']] ) # Check type conversion of __array__() method dt_int = np.dtype([('x', np.int32)]) np.testing.assert_array_equal( np.asarray(self.f['test'].fields(['x']), dtype=dt_int), testdata[['x']].astype(dt_int) ) # Check len() on fields wrapper assert len(self.f['test'].fields('x')) == 16 def test_nested_compound_vlen(self): dt_inner = np.dtype([('a', h5py.vlen_dtype(np.int32)), ('b', h5py.vlen_dtype(np.int32))]) dt = np.dtype([('f1', h5py.vlen_dtype(dt_inner)), ('f2', np.int64)]) inner1 = (np.array(range(1, 3), dtype=np.int32), np.array(range(6, 9), dtype=np.int32)) inner2 = (np.array(range(10, 14), dtype=np.int32), np.array(range(16, 21), dtype=np.int32)) data = np.array([(np.array([inner1, inner2], dtype=dt_inner), 2), (np.array([inner1], dtype=dt_inner), 3)], dtype=dt) self.f["ds"] = data out = self.f["ds"] # Specifying check_alignment=False because vlen fields have 8 bytes of padding # because the vlen datatype in hdf5 occupies 16 bytes self.assertArrayEqual(out, data, check_alignment=False) class TestSubarray(BaseDataset): def test_write_list(self): ds = self.f.create_dataset("a", (1,), dtype="3int8") ds[0] = [1, 2, 3] np.testing.assert_array_equal(ds[:], [[1, 2, 3]]) ds[:] = [[4, 5, 6]] np.testing.assert_array_equal(ds[:], [[4, 5, 6]]) def test_write_array(self): ds = self.f.create_dataset("a", (1,), dtype="3int8") ds[0] = np.array([1, 2, 3]) np.testing.assert_array_equal(ds[:], [[1, 2, 3]]) ds[:] = np.array([[4, 5, 6]]) np.testing.assert_array_equal(ds[:], [[4, 5, 6]]) class TestEnum(BaseDataset): """ Feature: Enum datatype info is preserved, read/write as integer """ EDICT = {'RED': 0, 'GREEN': 1, 'BLUE': 42} def test_create(self): """ Enum datasets can be created and type correctly round-trips """ dt = h5py.enum_dtype(self.EDICT, basetype='i') ds = self.f.create_dataset('x', (100, 100), dtype=dt) dt2 = ds.dtype dict2 = h5py.check_enum_dtype(dt2) self.assertEqual(dict2, self.EDICT) def test_readwrite(self): """ Enum datasets can be read/written as integers """ dt = h5py.enum_dtype(self.EDICT, basetype='i4') ds = self.f.create_dataset('x', (100, 100), dtype=dt) ds[35, 37] = 42 ds[1, :] = 1 self.assertEqual(ds[35, 37], 42) self.assertArrayEqual(ds[1, :], np.array((1,) * 100, dtype='i4')) class TestFloats(BaseDataset): """ Test support for mini and extended-precision floats """ def _exectest(self, dt): dset = self.f.create_dataset('x', (100,), dtype=dt) self.assertEqual(dset.dtype, dt) data = np.ones((100,), dtype=dt) dset[...] = data self.assertArrayEqual(dset[...], data) @ut.skipUnless(hasattr(np, 'float16'), "NumPy float16 support required") def test_mini(self): """ Mini-floats round trip """ self._exectest(np.dtype('float16')) # TODO: move these tests to test_h5t def test_mini_mapping(self): """ Test mapping for float16 """ if hasattr(np, 'float16'): self.assertEqual(h5t.IEEE_F16LE.dtype, np.dtype('= (1, 10, 5), "chunk info requires HDF5 >= 1.10.5") def test_get_chunk_details(): from io import BytesIO buf = BytesIO() with h5py.File(buf, 'w') as fout: fout.create_dataset('test', shape=(100, 100), chunks=(10, 10), dtype='i4') fout['test'][:] = 1 buf.seek(0) with h5py.File(buf, 'r') as fin: ds = fin['test'].id assert ds.get_num_chunks() == 100 for j in range(100): offset = tuple(np.array(np.unravel_index(j, (10, 10))) * 10) si = ds.get_chunk_info(j) assert si.chunk_offset == offset assert si.filter_mask == 0 assert si.byte_offset is not None assert si.size > 0 si = ds.get_chunk_info_by_coord((0, 0)) assert si.chunk_offset == (0, 0) assert si.filter_mask == 0 assert si.byte_offset is not None assert si.size > 0 @ut.skipUnless(h5py.version.hdf5_version_tuple >= (1, 12, 3) or (h5py.version.hdf5_version_tuple >= (1, 10, 10) and h5py.version.hdf5_version_tuple < (1, 10, 99)), "chunk iteration requires HDF5 1.10.10 and later 1.10, or 1.12.3 and later") def test_chunk_iter(): """H5Dchunk_iter() for chunk information""" from io import BytesIO buf = BytesIO() with h5py.File(buf, 'w') as f: f.create_dataset('test', shape=(100, 100), chunks=(10, 10), dtype='i4') f['test'][:] = 1 buf.seek(0) with h5py.File(buf, 'r') as f: dsid = f['test'].id num_chunks = dsid.get_num_chunks() assert num_chunks == 100 ci = {} for j in range(num_chunks): si = dsid.get_chunk_info(j) ci[si.chunk_offset] = si def callback(chunk_info): known = ci[chunk_info.chunk_offset] assert chunk_info.chunk_offset == known.chunk_offset assert chunk_info.filter_mask == known.filter_mask assert chunk_info.byte_offset == known.byte_offset assert chunk_info.size == known.size dsid.chunk_iter(callback) def test_empty_shape(writable_file): ds = writable_file.create_dataset('empty', dtype='int32') assert ds.shape is None assert ds.maxshape is None def test_zero_storage_size(): # https://github.com/h5py/h5py/issues/1475 from io import BytesIO buf = BytesIO() with h5py.File(buf, 'w') as fout: fout.create_dataset('empty', dtype='uint8') buf.seek(0) with h5py.File(buf, 'r') as fin: assert fin['empty'].chunks is None assert fin['empty'].id.get_offset() is None assert fin['empty'].id.get_storage_size() == 0 def test_python_int_uint64(writable_file): # https://github.com/h5py/h5py/issues/1547 data = [np.iinfo(np.int64).max, np.iinfo(np.int64).max + 1] # Check creating a new dataset ds = writable_file.create_dataset('x', data=data, dtype=np.uint64) assert ds.dtype == np.dtype(np.uint64) np.testing.assert_array_equal(ds[:], np.array(data, dtype=np.uint64)) # Check writing to an existing dataset ds[:] = data np.testing.assert_array_equal(ds[:], np.array(data, dtype=np.uint64)) def test_setitem_fancy_indexing(writable_file): # https://github.com/h5py/h5py/issues/1593 arr = writable_file.create_dataset('data', (5, 1000, 2), dtype=np.uint8) block = np.random.randint(255, size=(5, 3, 2)) arr[:, [0, 2, 4], ...] = block def test_vlen_spacepad(): with File(get_data_file_path("vlen_string_dset.h5")) as f: assert f["DS1"][0] == b"Parting" def test_vlen_nullterm(): with File(get_data_file_path("vlen_string_dset_utc.h5")) as f: assert f["ds1"][0] == b"2009-12-20T10:16:18.662409Z" def test_allow_unknown_filter(writable_file): # apparently 256-511 are reserved for testing purposes fake_filter_id = 256 ds = writable_file.create_dataset( 'data', shape=(10, 10), dtype=np.uint8, compression=fake_filter_id, allow_unknown_filter=True ) assert str(fake_filter_id) in ds._filters def test_dset_chunk_cache(): """Chunk cache configuration for individual datasets.""" from io import BytesIO buf = BytesIO() with h5py.File(buf, 'w') as fout: ds = fout.create_dataset( 'x', shape=(10, 20), chunks=(5, 4), dtype='i4', rdcc_nbytes=2 * 1024 * 1024, rdcc_w0=0.2, rdcc_nslots=997) ds_chunk_cache = ds.id.get_access_plist().get_chunk_cache() assert fout.id.get_access_plist().get_cache()[1:] != ds_chunk_cache assert ds_chunk_cache == (997, 2 * 1024 * 1024, 0.2) buf.seek(0) with h5py.File(buf, 'r') as fin: ds = fin.require_dataset( 'x', shape=(10, 20), dtype='i4', rdcc_nbytes=3 * 1024 * 1024, rdcc_w0=0.67, rdcc_nslots=709) ds_chunk_cache = ds.id.get_access_plist().get_chunk_cache() assert fin.id.get_access_plist().get_cache()[1:] != ds_chunk_cache assert ds_chunk_cache == (709, 3 * 1024 * 1024, 0.67) class TestCommutative(BaseDataset): """ Test the symmetry of operators, at least with the numpy types. Issue: https://github.com/h5py/h5py/issues/1947 """ def test_numpy_commutative(self,): """ Create a h5py dataset, extract one element convert to numpy Check that it returns symmetric response to == and != """ shape = (100,1) dset = self.f.create_dataset("test", shape, dtype=float, data=np.random.rand(*shape)) # grab a value from the elements, ie dset[0, 0] # check that mask arrays are commutative wrt ==, != val = np.float64(dset[0, 0]) assert np.all((val == dset) == (dset == val)) assert np.all((val != dset) == (dset != val)) # generate sample not in the dset, ie max(dset)+delta # check that mask arrays are commutative wrt ==, != delta = 0.001 nval = np.nanmax(dset)+delta assert np.all((nval == dset) == (dset == nval)) assert np.all((nval != dset) == (dset != nval)) def test_basetype_commutative(self,): """ Create a h5py dataset and check basetype compatibility. Check that operation is symmetric, even if it is potentially not meaningful. """ shape = (100,1) dset = self.f.create_dataset("test", shape, dtype=float, data=np.random.rand(*shape)) # generate float type, sample float(0.) # check that operation is symmetric (but potentially meaningless) val = float(0.) assert (val == dset) == (dset == val) assert (val != dset) == (dset != val) class TestVirtualPrefix(BaseDataset): """ Test setting virtual prefix """ def test_virtual_prefix_create(self): shape = (100,1) virtual_prefix = "/path/to/virtual" dset = self.f.create_dataset("test", shape, dtype=float, data=np.random.rand(*shape), virtual_prefix = virtual_prefix) virtual_prefix_readback = pathlib.Path(dset.id.get_access_plist().get_virtual_prefix().decode()).as_posix() assert virtual_prefix_readback == virtual_prefix def test_virtual_prefix_require(self): virtual_prefix = "/path/to/virtual" dset = self.f.require_dataset('foo', (10, 3), 'f', virtual_prefix = virtual_prefix) virtual_prefix_readback = pathlib.Path(dset.id.get_access_plist().get_virtual_prefix().decode()).as_posix() self.assertEqual(virtual_prefix, virtual_prefix_readback) self.assertIsInstance(dset, Dataset) self.assertEqual(dset.shape, (10, 3))