"""Module pygame.tests.test_utils.array Export the Exporter and Importer classes. Class Exporter has configurable shape and strides. Exporter objects provide a convient target for unit tests on Pygame objects and functions that import a new buffer interface. Class Importer imports a buffer interface with the given PyBUF_* flags. It returns NULL Py_buffer fields as None. The shape, strides, and suboffsets arrays are returned as tuples of ints. All Py_buffer field properties are read-only. This class is useful in comparing exported buffer interfaces with the actual request. The simular Python builtin memoryview currently does not support configurable PyBUF_* flags. This module contains its own unit tests. When Pygame is installed, these tests can be run with the following command line statement: python -m pygame.tests.test_utils.array """ import pygame if not pygame.HAVE_NEWBUF: emsg = "This Pygame build does not support the new buffer protocol" raise ImportError(emsg) import pygame.newbuffer from pygame.newbuffer import (PyBUF_SIMPLE, PyBUF_FORMAT, PyBUF_ND, PyBUF_WRITABLE, PyBUF_STRIDES, PyBUF_C_CONTIGUOUS, PyBUF_F_CONTIGUOUS, PyBUF_ANY_CONTIGUOUS, PyBUF_INDIRECT, PyBUF_STRIDED, PyBUF_STRIDED_RO, PyBUF_RECORDS, PyBUF_RECORDS_RO, PyBUF_FULL, PyBUF_FULL_RO, PyBUF_CONTIG, PyBUF_CONTIG_RO) import unittest import sys import ctypes import operator try: reduce except NameError: from functools import reduce __all__ = ["Exporter", "Importer"] try: ctypes.c_ssize_t except AttributeError: void_p_sz = ctypes.sizeof(ctypes.c_void_p) if ctypes.sizeof(ctypes.c_short) == void_p_sz: ctypes.c_ssize_t = ctypes.c_short elif ctypes.sizeof(ctypes.c_int) == void_p_sz: ctypes.c_ssize_t = ctypes.c_int elif ctypes.sizeof(ctypes.c_long) == void_p_sz: ctypes.c_ssize_t = ctypes.c_long elif ctypes.sizeof(ctypes.c_longlong) == void_p_sz: ctypes.c_ssize_t = ctypes.c_longlong else: raise RuntimeError("Cannot set c_ssize_t: sizeof(void *) is %i" % void_p_sz) def _prop_get(fn): return property(fn) class Exporter(pygame.newbuffer.BufferMixin): """An object that exports a multi-dimension new buffer interface The only array operation this type supports is to export a buffer. """ prefixes = {'@': '', '=': '=', '<': '=', '>': '=', '!': '=', '2': '2', '3': '3', '4': '4', '5': '5', '6': '6', '7': '7', '8': '8', '9': '9'} types = {'c': ctypes.c_char, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, '=c': ctypes.c_int8, '=b': ctypes.c_int8, '=B': ctypes.c_uint8, '?': ctypes.c_bool, '=?': ctypes.c_int8, 'h': ctypes.c_short, 'H': ctypes.c_ushort, '=h': ctypes.c_int16, '=H': ctypes.c_uint16, 'i': ctypes.c_int, 'I': ctypes.c_uint, '=i': ctypes.c_int32, '=I': ctypes.c_uint32, 'l': ctypes.c_long, 'L': ctypes.c_ulong, '=l': ctypes.c_int32, '=L': ctypes.c_uint32, 'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong, '=q': ctypes.c_int64, '=Q': ctypes.c_uint64, 'f': ctypes.c_float, 'd': ctypes.c_double, 'P': ctypes.c_void_p, 'x': ctypes.c_ubyte * 1, '2x': ctypes.c_ubyte * 2, '3x': ctypes.c_ubyte * 3, '4x': ctypes.c_ubyte * 4, '5x': ctypes.c_ubyte * 5, '6x': ctypes.c_ubyte * 6, '7x': ctypes.c_ubyte * 7, '8x': ctypes.c_ubyte * 8, '9x': ctypes.c_ubyte * 9} def __init__(self, shape, format=None, strides=None, readonly=None, itemsize=None): if format is None: format = 'B' if readonly is None: readonly = False prefix = '' typecode = '' i = 0 if i < len(format): try: prefix = self.prefixes[format[i]] i += 1 except LookupError: pass if i < len(format) and format[i] == '1': i += 1 if i == len(format) - 1: typecode = format[i] if itemsize is None: try: itemsize = ctypes.sizeof(self.types[prefix + typecode]) except KeyError: raise ValueError("Unknown item format '" + format + "'") self.readonly = bool(readonly) self.format = format self._format = ctypes.create_string_buffer(format.encode('latin_1')) self.ndim = len(shape) self.itemsize = itemsize self.len = reduce(operator.mul, shape, 1) * self.itemsize self.shape = tuple(shape) self._shape = (ctypes.c_ssize_t * self.ndim)(*self.shape) if strides is None: self._strides = (ctypes.c_ssize_t * self.ndim)() self._strides[self.ndim - 1] = itemsize for i in range(self.ndim - 1, 0, -1): self._strides[i - 1] = self.shape[i] * self._strides[i] self.strides = tuple(self._strides) elif len(strides) == self.ndim: self.strides = tuple(strides) self._strides = (ctypes.c_ssize_t * self.ndim)(*self.strides) else: raise ValueError("Mismatch in length of strides and shape") buflen = max(d * abs(s) for d, s in zip(self.shape, self.strides)) self.buflen = buflen self._buf = (ctypes.c_ubyte * buflen)() offset = sum((d - 1) * abs(s) for d, s in zip(self.shape, self.strides) if s < 0) self.buf = ctypes.addressof(self._buf) + offset def buffer_info(self): return (addressof(self.buffer), self.shape[0]) def tobytes(self): return cast(self.buffer, POINTER(c_char))[0:self._len] def __len__(self): return self.shape[0] def _get_buffer(self, view, flags): from ctypes import addressof if (flags & PyBUF_WRITABLE) == PyBUF_WRITABLE and self.readonly: raise BufferError("buffer is read-only") if ((flags & PyBUF_C_CONTIGUOUS) == PyBUF_C_CONTIGUOUS and not self.is_contiguous('C')): raise BufferError("data is not C contiguous") if ((flags & PyBUF_F_CONTIGUOUS) == PyBUF_F_CONTIGUOUS and not self.is_contiguous('F')): raise BufferError("data is not F contiguous") if ((flags & PyBUF_ANY_CONTIGUOUS) == PyBUF_ANY_CONTIGUOUS and not self.is_contiguous('A')): raise BufferError("data is not contiguous") view.buf = self.buf view.readonly = self.readonly view.len = self.len if flags | PyBUF_WRITABLE == PyBUF_WRITABLE: view.ndim = 0 else: view.ndim = self.ndim view.itemsize = self.itemsize if (flags & PyBUF_FORMAT) == PyBUF_FORMAT: view.format = addressof(self._format) else: view.format = None if (flags & PyBUF_ND) == PyBUF_ND: view.shape = addressof(self._shape) elif self.is_contiguous('C'): view.shape = None else: raise BufferError( "shape required for {} dimensional data".format(self.ndim)) if (flags & PyBUF_STRIDES) == PyBUF_STRIDES: view.strides = ctypes.addressof(self._strides) elif view.shape is None or self.is_contiguous('C'): view.strides = None else: raise BufferError("strides required for none C contiguous data") view.suboffsets = None view.internal = None view.obj = self def is_contiguous(self, fortran): if fortran in "CA": if self.strides[-1] == self.itemsize: for i in range(self.ndim - 1, 0, -1): if self.strides[i - 1] != self.shape[i] * self.strides[i]: break else: return True if fortran in "FA": if self.strides[0] == self.itemsize: for i in range(0, self.ndim - 1): if self.strides[i + 1] != self.shape[i] * self.strides[i]: break else: return True return False class Importer(object): """An object that imports a new buffer interface The fields of the Py_buffer C struct are exposed by identically named Importer read-only properties. """ def __init__(self, obj, flags): self._view = pygame.newbuffer.Py_buffer() self._view.get_buffer(obj, flags) @property def obj(self): """return object or None for NULL field""" return self._view.obj @property def buf(self): """return int or None for NULL field""" return self._view.buf @property def len(self): """return int""" return self._view.len @property def readonly(self): """return bool""" return self._view.readonly @property def format(self): """return bytes or None for NULL field""" format_addr = self._view.format if format_addr is None: return None return ctypes.cast(format_addr, ctypes.c_char_p).value.decode('ascii') @property def itemsize(self): """return int""" return self._view.itemsize @property def ndim(self): """return int""" return self._view.ndim @property def shape(self): """return int tuple or None for NULL field""" return self._to_ssize_tuple(self._view.shape) @property def strides(self): """return int tuple or None for NULL field""" return self._to_ssize_tuple(self._view.strides) @property def suboffsets(self): """return int tuple or None for NULL field""" return self._to_ssize_tuple(self._view.suboffsets) @property def internal(self): """return int or None for NULL field""" return self._view.internal def _to_ssize_tuple(self, addr): from ctypes import cast, POINTER, c_ssize_t if addr is None: return None return tuple(cast(addr, POINTER(c_ssize_t))[0:self._view.ndim]) class ExporterTest(unittest.TestCase): """Class Exporter unit tests""" def test_formats(self): char_sz = ctypes.sizeof(ctypes.c_char) short_sz = ctypes.sizeof(ctypes.c_short) int_sz = ctypes.sizeof(ctypes.c_int) long_sz = ctypes.sizeof(ctypes.c_long) longlong_sz = ctypes.sizeof(ctypes.c_longlong) float_sz = ctypes.sizeof(ctypes.c_float) double_sz = ctypes.sizeof(ctypes.c_double) voidp_sz = ctypes.sizeof(ctypes.c_void_p) bool_sz = ctypes.sizeof(ctypes.c_bool) self.check_args(0, (1,), 'B', (1,), 1, 1, 1) self.check_args(1, (1,), 'b', (1,), 1, 1, 1) self.check_args(1, (1,), 'B', (1,), 1, 1, 1) self.check_args(1, (1,), 'c', (char_sz,), char_sz, char_sz, char_sz) self.check_args(1, (1,), 'h', (short_sz,), short_sz, short_sz, short_sz) self.check_args(1, (1,), 'H', (short_sz,), short_sz, short_sz, short_sz) self.check_args(1, (1,), 'i', (int_sz,), int_sz, int_sz, int_sz) self.check_args(1, (1,), 'I', (int_sz,), int_sz, int_sz, int_sz) self.check_args(1, (1,), 'l', (long_sz,), long_sz, long_sz, long_sz) self.check_args(1, (1,), 'L', (long_sz,), long_sz, long_sz, long_sz) self.check_args(1, (1,), 'q', (longlong_sz,), longlong_sz, longlong_sz, longlong_sz) self.check_args(1, (1,), 'Q', (longlong_sz,), longlong_sz, longlong_sz, longlong_sz) self.check_args(1, (1,), 'f', (float_sz,), float_sz, float_sz, float_sz) self.check_args(1, (1,), 'd', (double_sz,), double_sz, double_sz, double_sz) self.check_args(1, (1,), 'x', (1,), 1, 1, 1) self.check_args(1, (1,), 'P', (voidp_sz,), voidp_sz, voidp_sz, voidp_sz) self.check_args(1, (1,), '?', (bool_sz,), bool_sz, bool_sz, bool_sz) self.check_args(1, (1,), '@b', (1,), 1, 1, 1) self.check_args(1, (1,), '@B', (1,), 1, 1, 1) self.check_args(1, (1,), '@c', (char_sz,), char_sz, char_sz, char_sz) self.check_args(1, (1,), '@h', (short_sz,), short_sz, short_sz, short_sz) self.check_args(1, (1,), '@H', (short_sz,), short_sz, short_sz, short_sz) self.check_args(1, (1,), '@i', (int_sz,), int_sz, int_sz, int_sz) self.check_args(1, (1,), '@I', (int_sz,), int_sz, int_sz, int_sz) self.check_args(1, (1,), '@l', (long_sz,), long_sz, long_sz, long_sz) self.check_args(1, (1,), '@L', (long_sz,), long_sz, long_sz, long_sz) self.check_args(1, (1,), '@q', (longlong_sz,), longlong_sz, longlong_sz, longlong_sz) self.check_args(1, (1,), '@Q', (longlong_sz,), longlong_sz, longlong_sz, longlong_sz) self.check_args(1, (1,), '@f', (float_sz,), float_sz, float_sz, float_sz) self.check_args(1, (1,), '@d', (double_sz,), double_sz, double_sz, double_sz) self.check_args(1, (1,), '@?', (bool_sz,), bool_sz, bool_sz, bool_sz) self.check_args(1, (1,), '=b', (1,), 1, 1, 1) self.check_args(1, (1,), '=B', (1,), 1, 1, 1) self.check_args(1, (1,), '=c', (1,), 1, 1, 1) self.check_args(1, (1,), '=h', (2,), 2, 2, 2) self.check_args(1, (1,), '=H', (2,), 2, 2, 2) self.check_args(1, (1,), '=i', (4,), 4, 4, 4) self.check_args(1, (1,), '=I', (4,), 4, 4, 4) self.check_args(1, (1,), '=l', (4,), 4, 4, 4) self.check_args(1, (1,), '=L', (4,), 4, 4, 4) self.check_args(1, (1,), '=q', (8,), 8, 8, 8) self.check_args(1, (1,), '=Q', (8,), 8, 8, 8) self.check_args(1, (1,), '=?', (1,), 1, 1, 1) self.check_args(1, (1,), '<h', (2,), 2, 2, 2) self.check_args(1, (1,), '>h', (2,), 2, 2, 2) self.check_args(1, (1,), '!h', (2,), 2, 2, 2) self.check_args(1, (1,), '<q', (8,), 8, 8, 8) self.check_args(1, (1,), '>q', (8,), 8, 8, 8) self.check_args(1, (1,), '!q', (8,), 8, 8, 8) self.check_args(1, (1,), '1x', (1,), 1, 1, 1) self.check_args(1, (1,), '2x', (2,), 2, 2, 2) self.check_args(1, (1,), '3x', (3,), 3, 3, 3) self.check_args(1, (1,), '4x', (4,), 4, 4, 4) self.check_args(1, (1,), '5x', (5,), 5, 5, 5) self.check_args(1, (1,), '6x', (6,), 6, 6, 6) self.check_args(1, (1,), '7x', (7,), 7, 7, 7) self.check_args(1, (1,), '8x', (8,), 8, 8, 8) self.check_args(1, (1,), '9x', (9,), 9, 9, 9) self.check_args(1, (1,), '1h', (2,), 2, 2, 2) self.check_args(1, (1,), '=1h', (2,), 2, 2, 2) self.assertRaises(ValueError, Exporter, (2, 1), '') self.assertRaises(ValueError, Exporter, (2, 1), 'W') self.assertRaises(ValueError, Exporter, (2, 1), '^Q') self.assertRaises(ValueError, Exporter, (2, 1), '=W') self.assertRaises(ValueError, Exporter, (2, 1), '=f') self.assertRaises(ValueError, Exporter, (2, 1), '=d') self.assertRaises(ValueError, Exporter, (2, 1), '<f') self.assertRaises(ValueError, Exporter, (2, 1), '<d') self.assertRaises(ValueError, Exporter, (2, 1), '>f') self.assertRaises(ValueError, Exporter, (2, 1), '>d') self.assertRaises(ValueError, Exporter, (2, 1), '!f') self.assertRaises(ValueError, Exporter, (2, 1), '!d') self.assertRaises(ValueError, Exporter, (2, 1), '0x') self.assertRaises(ValueError, Exporter, (2, 1), '11x') self.assertRaises(ValueError, Exporter, (2, 1), 'BB') def test_strides(self): self.check_args(1, (10,), '=h', (2,), 20, 20, 2) self.check_args(1, (5, 3), '=h', (6, 2), 30, 30, 2) self.check_args(1, (7, 3, 5), '=h', (30, 10, 2), 210, 210, 2) self.check_args(1, (13, 5, 11, 3), '=h', (330, 66, 6, 2), 4290, 4290, 2) self.check_args(3, (7, 3, 5), '=h', (2, 14, 42), 210, 210, 2) self.check_args(3, (7, 3, 5), '=h', (2, 16, 48), 210, 240, 2) self.check_args(3, (13, 5, 11, 3), '=h', (440, 88, 8, 2), 4290, 5720, 2) self.check_args(3, (7, 5), '3x', (15, 3), 105, 105, 3) self.check_args(3, (7, 5), '3x', (3, 21), 105, 105, 3) self.check_args(3, (7, 5), '3x', (3, 24), 105, 120, 3) def test_readonly(self): a = Exporter((2,), 'h', readonly=True) self.assertTrue(a.readonly) b = Importer(a, PyBUF_STRIDED_RO) self.assertRaises(BufferError, Importer, a, PyBUF_STRIDED) b = Importer(a, PyBUF_STRIDED_RO) def test_is_contiguous(self): a = Exporter((10,), '=h') self.assertTrue(a.is_contiguous('C')) self.assertTrue(a.is_contiguous('F')) self.assertTrue(a.is_contiguous('A')) a = Exporter((10, 4), '=h') self.assertTrue(a.is_contiguous('C')) self.assertTrue(a.is_contiguous('A')) self.assertFalse(a.is_contiguous('F')) a = Exporter((13, 5, 11, 3), '=h', (330, 66, 6, 2)) self.assertTrue(a.is_contiguous('C')) self.assertTrue(a.is_contiguous('A')) self.assertFalse(a.is_contiguous('F')) a = Exporter((10, 4), '=h', (2, 20)) self.assertTrue(a.is_contiguous('F')) self.assertTrue(a.is_contiguous('A')) self.assertFalse(a.is_contiguous('C')) a = Exporter((13, 5, 11, 3), '=h', (2, 26, 130, 1430)) self.assertTrue(a.is_contiguous('F')) self.assertTrue(a.is_contiguous('A')) self.assertFalse(a.is_contiguous('C')) a = Exporter((2, 11, 6, 4), '=h', (576, 48, 8, 2)) self.assertFalse(a.is_contiguous('A')) a = Exporter((2, 11, 6, 4), '=h', (2, 4, 48, 288)) self.assertFalse(a.is_contiguous('A')) a = Exporter((3, 2, 2), '=h', (16, 8, 4)) self.assertFalse(a.is_contiguous('A')) a = Exporter((3, 2, 2), '=h', (4, 12, 24)) self.assertFalse(a.is_contiguous('A')) def test_PyBUF_flags(self): a = Exporter((10, 2), 'd') b = Importer(a, PyBUF_SIMPLE) self.assertTrue(b.obj is a) self.assertTrue(b.format is None) self.assertEqual(b.len, a.len) self.assertEqual(b.itemsize, a.itemsize) self.assertTrue(b.shape is None) self.assertTrue(b.strides is None) self.assertTrue(b.suboffsets is None) self.assertTrue(b.internal is None) self.assertFalse(b.readonly) b = Importer(a, PyBUF_WRITABLE) self.assertTrue(b.obj is a) self.assertTrue(b.format is None) self.assertEqual(b.len, a.len) self.assertEqual(b.itemsize, a.itemsize) self.assertTrue(b.shape is None) self.assertTrue(b.strides is None) self.assertTrue(b.suboffsets is None) self.assertTrue(b.internal is None) self.assertFalse(b.readonly) b = Importer(a, PyBUF_ND) self.assertTrue(b.obj is a) self.assertTrue(b.format is None) self.assertEqual(b.len, a.len) self.assertEqual(b.itemsize, a.itemsize) self.assertEqual(b.shape, a.shape) self.assertTrue(b.strides is None) self.assertTrue(b.suboffsets is None) self.assertTrue(b.internal is None) self.assertFalse(b.readonly) a = Exporter((5, 10), '=h', (24, 2)) b = Importer(a, PyBUF_STRIDES) self.assertTrue(b.obj is a) self.assertTrue(b.format is None) self.assertEqual(b.len, a.len) self.assertEqual(b.itemsize, a.itemsize) self.assertEqual(b.shape, a.shape) self.assertEqual(b.strides, a.strides) self.assertTrue(b.suboffsets is None) self.assertTrue(b.internal is None) self.assertFalse(b.readonly) b = Importer(a, PyBUF_FULL) self.assertTrue(b.obj is a) self.assertEqual(b.format, '=h') self.assertEqual(b.len, a.len) self.assertEqual(b.itemsize, a.itemsize) self.assertEqual(b.shape, a.shape) self.assertEqual(b.strides, a.strides) self.assertTrue(b.suboffsets is None) self.assertTrue(b.internal is None) self.assertFalse(b.readonly) self.assertRaises(BufferError, Importer, a, PyBUF_SIMPLE) self.assertRaises(BufferError, Importer, a, PyBUF_WRITABLE) self.assertRaises(BufferError, Importer, a, PyBUF_ND) self.assertRaises(BufferError, Importer, a, PyBUF_C_CONTIGUOUS) self.assertRaises(BufferError, Importer, a, PyBUF_F_CONTIGUOUS) self.assertRaises(BufferError, Importer, a, PyBUF_ANY_CONTIGUOUS) self.assertRaises(BufferError, Importer, a, PyBUF_CONTIG) def test_negative_strides(self): self.check_args(3, (3, 5, 4), 'B', (20, 4, -1), 60, 60, 1, 3) self.check_args(3, (3, 5, 3), 'B', (20, 4, -1), 45, 60, 1, 2) self.check_args(3, (3, 5, 4), 'B', (20, -4, 1), 60, 60, 1, 16) self.check_args(3, (3, 5, 4), 'B', (-20, -4, -1), 60, 60, 1, 59) self.check_args(3, (3, 5, 3), 'B', (-20, -4, -1), 45, 60, 1, 58) def test_attributes(self): a = Exporter((13, 5, 11, 3), '=h', (440, 88, 8, 2)) self.assertEqual(a.ndim, 4) self.assertEqual(a.itemsize, 2) self.assertFalse(a.readonly) self.assertEqual(a.shape, (13, 5, 11, 3)) self.assertEqual(a.format, '=h') self.assertEqual(a.strides, (440, 88, 8, 2)) self.assertEqual(a.len, 4290) self.assertEqual(a.buflen, 5720) self.assertEqual(a.buf, ctypes.addressof(a._buf)) a = Exporter((8,)) self.assertEqual(a.ndim, 1) self.assertEqual(a.itemsize, 1) self.assertFalse(a.readonly) self.assertEqual(a.shape, (8,)) self.assertEqual(a.format, 'B') self.assertTrue(isinstance(a.strides, tuple)) self.assertEqual(a.strides, (1,)) self.assertEqual(a.len, 8) self.assertEqual(a.buflen, 8) a = Exporter([13, 5, 11, 3], '=h', [440, 88, 8, 2]) self.assertTrue(isinstance(a.shape, tuple)) self.assertTrue(isinstance(a.strides, tuple)) self.assertEqual(a.shape, (13, 5, 11, 3)) self.assertEqual(a.strides, (440, 88, 8, 2)) def test_itemsize(self): exp = Exporter((4, 5), format='B', itemsize=8) imp = Importer(exp, PyBUF_RECORDS) self.assertEqual(imp.itemsize, 8) self.assertEqual(imp.format, 'B') self.assertEqual(imp.strides, (40, 8)) exp = Exporter((4, 5), format='weird', itemsize=5) imp = Importer(exp, PyBUF_RECORDS) self.assertEqual(imp.itemsize, 5) self.assertEqual(imp.format, 'weird') self.assertEqual(imp.strides, (25, 5)) def check_args(self, call_flags, shape, format, strides, length, bufsize, itemsize, offset=0): format_arg = format if call_flags & 1 else None strides_arg = strides if call_flags & 2 else None a = Exporter(shape, format_arg, strides_arg) self.assertEqual(a.buflen, bufsize) self.assertEqual(a.buf, ctypes.addressof(a._buf) + offset) m = Importer(a, PyBUF_RECORDS_RO) self.assertEqual(m.buf, a.buf) self.assertEqual(m.len, length) self.assertEqual(m.format, format) self.assertEqual(m.itemsize, itemsize) self.assertEqual(m.shape, shape) self.assertEqual(m.strides, strides) if __name__ == '__main__': unittest.main()