AL-2020/venv/Lib/site-packages/pygame/tests/bufferproxy_test.py

484 lines
16 KiB
Python

import sys
import re
import weakref
import gc
import ctypes
import unittest
import pygame
from pygame.bufferproxy import BufferProxy
from pygame.compat import as_bytes
try:
BufferError
except NameError:
from pygame import BufferError
class BufferProxyTest(unittest.TestCase):
view_keywords = {'shape': (5, 4, 3),
'typestr': '|u1',
'data': (0, True),
'strides': (4, 20, 1)}
def test_module_name(self):
self.assertEqual(pygame.bufferproxy.__name__,
"pygame.bufferproxy")
def test_class_name(self):
self.assertEqual(BufferProxy.__name__, "BufferProxy")
def test___array_struct___property(self):
kwds = self.view_keywords
v = BufferProxy(kwds)
d = pygame.get_array_interface(v)
self.assertEqual(len(d), 5)
self.assertEqual(d['version'], 3)
self.assertEqual(d['shape'], kwds['shape'])
self.assertEqual(d['typestr'], kwds['typestr'])
self.assertEqual(d['data'], kwds['data'])
self.assertEqual(d['strides'], kwds['strides'])
def test___array_interface___property(self):
kwds = self.view_keywords
v = BufferProxy(kwds)
d = v.__array_interface__
self.assertEqual(len(d), 5)
self.assertEqual(d['version'], 3)
self.assertEqual(d['shape'], kwds['shape'])
self.assertEqual(d['typestr'], kwds['typestr'])
self.assertEqual(d['data'], kwds['data'])
self.assertEqual(d['strides'], kwds['strides'])
def test_parent_property(self):
kwds = dict(self.view_keywords)
p = []
kwds['parent'] = p
v = BufferProxy(kwds)
self.assertIs(v.parent, p)
def test_before(self):
def callback(parent):
success.append(parent is p)
class MyException(Exception):
pass
def raise_exception(parent):
raise MyException("Just a test.")
kwds = dict(self.view_keywords)
p = []
kwds['parent'] = p
# For array interface
success = []
kwds['before'] = callback
v = BufferProxy(kwds)
self.assertEqual(len(success), 0)
d = v.__array_interface__
self.assertEqual(len(success), 1)
self.assertTrue(success[0])
d = v.__array_interface__
self.assertEqual(len(success), 1)
d = v = None
gc.collect()
self.assertEqual(len(success), 1)
# For array struct
success = []
kwds['before'] = callback
v = BufferProxy(kwds)
self.assertEqual(len(success), 0)
c = v.__array_struct__
self.assertEqual(len(success), 1)
self.assertTrue(success[0])
c = v.__array_struct__
self.assertEqual(len(success), 1)
c = v = None
gc.collect()
self.assertEqual(len(success), 1)
# Callback raises an exception
kwds['before'] = raise_exception
v = BufferProxy(kwds)
self.assertRaises(MyException, lambda : v.__array_struct__)
def test_after(self):
def callback(parent):
success.append(parent is p)
kwds = dict(self.view_keywords)
p = []
kwds['parent'] = p
# For array interface
success = []
kwds['after'] = callback
v = BufferProxy(kwds)
self.assertEqual(len(success), 0)
d = v.__array_interface__
self.assertEqual(len(success), 0)
d = v.__array_interface__
self.assertEqual(len(success), 0)
d = v = None
gc.collect()
self.assertEqual(len(success), 1)
self.assertTrue(success[0])
# For array struct
success = []
kwds['after'] = callback
v = BufferProxy(kwds)
self.assertEqual(len(success), 0)
c = v.__array_struct__
self.assertEqual(len(success), 0)
c = v.__array_struct__
self.assertEqual(len(success), 0)
c = v = None
gc.collect()
self.assertEqual(len(success), 1)
self.assertTrue(success[0])
def test_attribute(self):
v = BufferProxy(self.view_keywords)
self.assertRaises(AttributeError, getattr, v, 'undefined')
v.undefined = 12;
self.assertEqual(v.undefined, 12)
del v.undefined
self.assertRaises(AttributeError, getattr, v, 'undefined')
def test_weakref(self):
v = BufferProxy(self.view_keywords)
weak_v = weakref.ref(v)
self.assertIs(weak_v(), v)
v = None
gc.collect()
self.assertIsNone(weak_v())
def test_gc(self):
"""refcount agnostic check that contained objects are freed"""
def before_callback(parent):
return r[0]
def after_callback(parent):
return r[1]
class Obj(object):
pass
p = Obj()
a = Obj()
r = [Obj(), Obj()]
weak_p = weakref.ref(p)
weak_a = weakref.ref(a)
weak_r0 = weakref.ref(r[0])
weak_r1 = weakref.ref(r[1])
weak_before = weakref.ref(before_callback)
weak_after = weakref.ref(after_callback)
kwds = dict(self.view_keywords)
kwds['parent'] = p
kwds['before'] = before_callback
kwds['after'] = after_callback
v = BufferProxy(kwds)
v.some_attribute = a
weak_v = weakref.ref(v)
kwds = p = a = before_callback = after_callback = None
gc.collect()
self.assertTrue(weak_p() is not None)
self.assertTrue(weak_a() is not None)
self.assertTrue(weak_before() is not None)
self.assertTrue(weak_after() is not None)
v = None
[gc.collect() for x in range(4)]
self.assertTrue(weak_v() is None)
self.assertTrue(weak_p() is None)
self.assertTrue(weak_a() is None)
self.assertTrue(weak_before() is None)
self.assertTrue(weak_after() is None)
self.assertTrue(weak_r0() is not None)
self.assertTrue(weak_r1() is not None)
r = None
gc.collect()
self.assertTrue(weak_r0() is None)
self.assertTrue(weak_r1() is None)
# Cycle removal
kwds = dict(self.view_keywords)
kwds['parent'] = []
v = BufferProxy(kwds)
v.some_attribute = v
tracked = True
for o in gc.get_objects():
if o is v:
break
else:
tracked = False
self.assertTrue(tracked)
kwds['parent'].append(v)
kwds = None
gc.collect()
n1 = len(gc.garbage)
v = None
gc.collect()
n2 = len(gc.garbage)
self.assertEqual(n2, n1)
def test_c_api(self):
api = pygame.bufferproxy._PYGAME_C_API
api_type = type(pygame.base._PYGAME_C_API)
self.assertIsInstance(api, api_type)
def test_repr(self):
v = BufferProxy(self.view_keywords)
cname = BufferProxy.__name__
oname, ovalue = re.findall(r"<([^)]+)\(([^)]+)\)>", repr(v))[0]
self.assertEqual(oname, cname)
self.assertEqual(v.length, int(ovalue))
def test_subclassing(self):
class MyBufferProxy(BufferProxy):
def __repr__(self):
return "*%s*" % (BufferProxy.__repr__(self),)
kwds = dict(self.view_keywords)
kwds['parent'] = 0
v = MyBufferProxy(kwds)
self.assertEqual(v.parent, 0)
r = repr(v)
self.assertEqual(r[:2], '*<')
self.assertEqual(r[-2:], '>*')
@unittest.skipIf(not pygame.HAVE_NEWBUF, 'newbuf not implemented')
def NEWBUF_test_newbuf(self):
from ctypes import string_at
from pygame.tests.test_utils import buftools
Exporter = buftools.Exporter
Importer = buftools.Importer
exp = Exporter((10,), 'B', readonly=True)
b = BufferProxy(exp)
self.assertEqual(b.length, exp.len)
self.assertEqual(b.raw, string_at(exp.buf, exp.len))
d = b.__array_interface__
try:
self.assertEqual(d['typestr'], '|u1')
self.assertEqual(d['shape'], exp.shape)
self.assertEqual(d['strides'], exp.strides)
self.assertEqual(d['data'], (exp.buf, True))
finally:
d = None
exp = Exporter((3,), '=h')
b = BufferProxy(exp)
self.assertEqual(b.length, exp.len)
self.assertEqual(b.raw, string_at(exp.buf, exp.len))
d = b.__array_interface__
try:
lil_endian = pygame.get_sdl_byteorder() == pygame.LIL_ENDIAN
f = '{}i{}'.format('<' if lil_endian else '>', exp.itemsize)
self.assertEqual(d['typestr'], f)
self.assertEqual(d['shape'], exp.shape)
self.assertEqual(d['strides'], exp.strides)
self.assertEqual(d['data'], (exp.buf, False))
finally:
d = None
exp = Exporter((10, 2), '=i')
b = BufferProxy(exp)
imp = Importer(b, buftools.PyBUF_RECORDS)
self.assertTrue(imp.obj is b)
self.assertEqual(imp.buf, exp.buf)
self.assertEqual(imp.ndim, exp.ndim)
self.assertEqual(imp.format, exp.format)
self.assertEqual(imp.readonly, exp.readonly)
self.assertEqual(imp.itemsize, exp.itemsize)
self.assertEqual(imp.len, exp.len)
self.assertEqual(imp.shape, exp.shape)
self.assertEqual(imp.strides, exp.strides)
self.assertTrue(imp.suboffsets is None)
d = {'typestr': '|u1',
'shape': (10,),
'strides': (1,),
'data': (9, True)} # 9? Will not reading the data anyway.
b = BufferProxy(d)
imp = Importer(b, buftools.PyBUF_SIMPLE)
self.assertTrue(imp.obj is b)
self.assertEqual(imp.buf, 9)
self.assertEqual(imp.len, 10)
self.assertEqual(imp.format, None)
self.assertEqual(imp.itemsize, 1)
self.assertEqual(imp.ndim, 0)
self.assertTrue(imp.readonly)
self.assertTrue(imp.shape is None)
self.assertTrue(imp.strides is None)
self.assertTrue(imp.suboffsets is None)
try:
pygame.bufferproxy.get_segcount
except AttributeError:
pass
else:
def test_oldbuf_arg(self):
self.OLDBUF_test_oldbuf_arg()
def OLDBUF_test_oldbuf_arg(self):
from pygame.bufferproxy import (get_segcount, get_read_buffer,
get_write_buffer)
content = as_bytes('\x01\x00\x00\x02') * 12
memory = ctypes.create_string_buffer(content)
memaddr = ctypes.addressof(memory)
def raise_exception(o):
raise ValueError("An exception")
bf = BufferProxy({'shape': (len(content),),
'typestr': '|u1',
'data': (memaddr, False),
'strides': (1,)})
seglen, segaddr = get_read_buffer(bf, 0)
self.assertEqual(segaddr, 0)
self.assertEqual(seglen, 0)
seglen, segaddr = get_write_buffer(bf, 0)
self.assertEqual(segaddr, 0)
self.assertEqual(seglen, 0)
segcount, buflen = get_segcount(bf)
self.assertEqual(segcount, 1)
self.assertEqual(buflen, len(content))
seglen, segaddr = get_read_buffer(bf, 0)
self.assertEqual(segaddr, memaddr)
self.assertEqual(seglen, len(content))
seglen, segaddr = get_write_buffer(bf, 0)
self.assertEqual(segaddr, memaddr)
self.assertEqual(seglen, len(content))
bf = BufferProxy({'shape': (len(content),),
'typestr': '|u1',
'data': (memaddr, True),
'strides': (1,)})
segcount, buflen = get_segcount(bf)
self.assertEqual(segcount, 1)
self.assertEqual(buflen, len(content))
seglen, segaddr = get_read_buffer(bf, 0)
self.assertEqual(segaddr, memaddr)
self.assertEqual(seglen, len(content))
self.assertRaises(ValueError, get_write_buffer, bf, 0)
bf = BufferProxy({'shape': (len(content),),
'typestr': '|u1',
'data': (memaddr, True),
'strides': (1,),
'before': raise_exception})
segcount, buflen = get_segcount(bf)
self.assertEqual(segcount, 0)
self.assertEqual(buflen, 0)
bf = BufferProxy({'shape': (3, 4),
'typestr': '|u4',
'data': (memaddr, True),
'strides': (12, 4)})
segcount, buflen = get_segcount(bf)
self.assertEqual(segcount, 3 * 4)
self.assertEqual(buflen, 3 * 4 * 4)
for i in range(0, 4):
seglen, segaddr = get_read_buffer(bf, i)
self.assertEqual(segaddr, memaddr + i * 4)
self.assertEqual(seglen, 4)
class BufferProxyLegacyTest(unittest.TestCase):
content = as_bytes('\x01\x00\x00\x02') * 12
buffer = ctypes.create_string_buffer(content)
data = (ctypes.addressof(buffer), True)
def test_length(self):
# __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.length:
# The size of the buffer data in bytes.
bf = BufferProxy({'shape': (3, 4),
'typestr': '|u4',
'data': self.data,
'strides': (12, 4)})
self.assertEqual(bf.length, len(self.content))
bf = BufferProxy({'shape': (3, 3),
'typestr': '|u4',
'data': self.data,
'strides': (12, 4)})
self.assertEqual(bf.length, 3*3*4)
def test_raw(self):
# __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.raw:
# The raw buffer data as string. The string may contain NUL bytes.
bf = BufferProxy({'shape': (len(self.content),),
'typestr': '|u1',
'data': self.data})
self.assertEqual(bf.raw, self.content)
bf = BufferProxy({'shape': (3, 4),
'typestr': '|u4',
'data': self.data,
'strides': (4, 12)})
self.assertEqual(bf.raw, self.content)
bf = BufferProxy({'shape': (3, 4),
'typestr': '|u1',
'data': self.data,
'strides': (16, 4)})
self.assertRaises(ValueError, getattr, bf, 'raw')
def test_write(self):
# __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.write:
# B.write (bufferproxy, buffer, offset) -> None
#
# Writes raw data to the bufferproxy.
#
# Writes the raw data from buffer to the BufferProxy object, starting
# at the specified offset within the BufferProxy.
# If the length of the passed buffer exceeds the length of the
# BufferProxy (reduced by the offset), an IndexError will be raised.
from ctypes import c_byte, sizeof, addressof, string_at, memset
nullbyte = '\x00'.encode('latin_1')
Buf = c_byte * 10
data_buf = Buf(*range(1, 3 * sizeof(Buf) + 1, 3))
data = string_at(data_buf, sizeof(data_buf))
buf = Buf()
bp = BufferProxy({'typestr': '|u1',
'shape': (sizeof(buf),),
'data': (addressof(buf), False)})
try:
self.assertEqual(bp.raw, nullbyte * sizeof(Buf))
bp.write(data)
self.assertEqual(bp.raw, data)
memset(buf, 0, sizeof(buf))
bp.write(data[:3], 2)
raw = bp.raw
self.assertEqual(raw[:2], nullbyte * 2)
self.assertEqual(raw[2:5], data[:3])
self.assertEqual(raw[5:], nullbyte * (sizeof(Buf) - 5))
bp.write(data[:3], bp.length - 3)
raw = bp.raw
self.assertEqual(raw[-3:], data[:3])
self.assertRaises(IndexError, bp.write, data, 1)
self.assertRaises(IndexError, bp.write, data[:5], -1)
self.assertRaises(IndexError, bp.write, data[:5], bp.length)
self.assertRaises(TypeError, bp.write, 12)
bp = BufferProxy({'typestr': '|u1',
'shape': (sizeof(buf),),
'data': (addressof(buf), True)})
self.assertRaises(pygame.BufferError, bp.write, '123'.encode('latin_1'))
finally:
# Make sure bp is garbage collected before buf
bp = None
gc.collect()
if __name__ == '__main__':
unittest.main()