510 lines
16 KiB
Python
510 lines
16 KiB
Python
import sys
|
|
import re
|
|
import weakref
|
|
import gc
|
|
import ctypes
|
|
import unittest
|
|
|
|
|
|
import pygame
|
|
from pygame.bufferproxy import BufferProxy
|
|
from pygame.compat import as_bytes
|
|
|
|
try:
|
|
BufferError
|
|
except NameError:
|
|
from pygame import BufferError
|
|
|
|
|
|
class BufferProxyTest(unittest.TestCase):
|
|
view_keywords = {
|
|
"shape": (5, 4, 3),
|
|
"typestr": "|u1",
|
|
"data": (0, True),
|
|
"strides": (4, 20, 1),
|
|
}
|
|
|
|
def test_module_name(self):
|
|
self.assertEqual(pygame.bufferproxy.__name__, "pygame.bufferproxy")
|
|
|
|
def test_class_name(self):
|
|
self.assertEqual(BufferProxy.__name__, "BufferProxy")
|
|
|
|
def test___array_struct___property(self):
|
|
kwds = self.view_keywords
|
|
v = BufferProxy(kwds)
|
|
d = pygame.get_array_interface(v)
|
|
self.assertEqual(len(d), 5)
|
|
self.assertEqual(d["version"], 3)
|
|
self.assertEqual(d["shape"], kwds["shape"])
|
|
self.assertEqual(d["typestr"], kwds["typestr"])
|
|
self.assertEqual(d["data"], kwds["data"])
|
|
self.assertEqual(d["strides"], kwds["strides"])
|
|
|
|
def test___array_interface___property(self):
|
|
kwds = self.view_keywords
|
|
v = BufferProxy(kwds)
|
|
d = v.__array_interface__
|
|
self.assertEqual(len(d), 5)
|
|
self.assertEqual(d["version"], 3)
|
|
self.assertEqual(d["shape"], kwds["shape"])
|
|
self.assertEqual(d["typestr"], kwds["typestr"])
|
|
self.assertEqual(d["data"], kwds["data"])
|
|
self.assertEqual(d["strides"], kwds["strides"])
|
|
|
|
def test_parent_property(self):
|
|
kwds = dict(self.view_keywords)
|
|
p = []
|
|
kwds["parent"] = p
|
|
v = BufferProxy(kwds)
|
|
|
|
self.assertIs(v.parent, p)
|
|
|
|
def test_before(self):
|
|
def callback(parent):
|
|
success.append(parent is p)
|
|
|
|
class MyException(Exception):
|
|
pass
|
|
|
|
def raise_exception(parent):
|
|
raise MyException("Just a test.")
|
|
|
|
kwds = dict(self.view_keywords)
|
|
p = []
|
|
kwds["parent"] = p
|
|
|
|
# For array interface
|
|
success = []
|
|
kwds["before"] = callback
|
|
v = BufferProxy(kwds)
|
|
self.assertEqual(len(success), 0)
|
|
d = v.__array_interface__
|
|
self.assertEqual(len(success), 1)
|
|
self.assertTrue(success[0])
|
|
d = v.__array_interface__
|
|
self.assertEqual(len(success), 1)
|
|
d = v = None
|
|
gc.collect()
|
|
self.assertEqual(len(success), 1)
|
|
|
|
# For array struct
|
|
success = []
|
|
kwds["before"] = callback
|
|
v = BufferProxy(kwds)
|
|
self.assertEqual(len(success), 0)
|
|
c = v.__array_struct__
|
|
self.assertEqual(len(success), 1)
|
|
self.assertTrue(success[0])
|
|
c = v.__array_struct__
|
|
self.assertEqual(len(success), 1)
|
|
c = v = None
|
|
gc.collect()
|
|
self.assertEqual(len(success), 1)
|
|
|
|
# Callback raises an exception
|
|
kwds["before"] = raise_exception
|
|
v = BufferProxy(kwds)
|
|
self.assertRaises(MyException, lambda: v.__array_struct__)
|
|
|
|
def test_after(self):
|
|
def callback(parent):
|
|
success.append(parent is p)
|
|
|
|
kwds = dict(self.view_keywords)
|
|
p = []
|
|
kwds["parent"] = p
|
|
|
|
# For array interface
|
|
success = []
|
|
kwds["after"] = callback
|
|
v = BufferProxy(kwds)
|
|
self.assertEqual(len(success), 0)
|
|
d = v.__array_interface__
|
|
self.assertEqual(len(success), 0)
|
|
d = v.__array_interface__
|
|
self.assertEqual(len(success), 0)
|
|
d = v = None
|
|
gc.collect()
|
|
self.assertEqual(len(success), 1)
|
|
self.assertTrue(success[0])
|
|
|
|
# For array struct
|
|
success = []
|
|
kwds["after"] = callback
|
|
v = BufferProxy(kwds)
|
|
self.assertEqual(len(success), 0)
|
|
c = v.__array_struct__
|
|
self.assertEqual(len(success), 0)
|
|
c = v.__array_struct__
|
|
self.assertEqual(len(success), 0)
|
|
c = v = None
|
|
gc.collect()
|
|
self.assertEqual(len(success), 1)
|
|
self.assertTrue(success[0])
|
|
|
|
def test_attribute(self):
|
|
v = BufferProxy(self.view_keywords)
|
|
self.assertRaises(AttributeError, getattr, v, "undefined")
|
|
v.undefined = 12
|
|
self.assertEqual(v.undefined, 12)
|
|
del v.undefined
|
|
self.assertRaises(AttributeError, getattr, v, "undefined")
|
|
|
|
def test_weakref(self):
|
|
v = BufferProxy(self.view_keywords)
|
|
weak_v = weakref.ref(v)
|
|
|
|
self.assertIs(weak_v(), v)
|
|
|
|
v = None
|
|
gc.collect()
|
|
|
|
self.assertIsNone(weak_v())
|
|
|
|
def test_gc(self):
|
|
"""refcount agnostic check that contained objects are freed"""
|
|
|
|
def before_callback(parent):
|
|
return r[0]
|
|
|
|
def after_callback(parent):
|
|
return r[1]
|
|
|
|
class Obj(object):
|
|
pass
|
|
|
|
p = Obj()
|
|
a = Obj()
|
|
r = [Obj(), Obj()]
|
|
weak_p = weakref.ref(p)
|
|
weak_a = weakref.ref(a)
|
|
weak_r0 = weakref.ref(r[0])
|
|
weak_r1 = weakref.ref(r[1])
|
|
weak_before = weakref.ref(before_callback)
|
|
weak_after = weakref.ref(after_callback)
|
|
kwds = dict(self.view_keywords)
|
|
kwds["parent"] = p
|
|
kwds["before"] = before_callback
|
|
kwds["after"] = after_callback
|
|
v = BufferProxy(kwds)
|
|
v.some_attribute = a
|
|
weak_v = weakref.ref(v)
|
|
kwds = p = a = before_callback = after_callback = None
|
|
gc.collect()
|
|
self.assertTrue(weak_p() is not None)
|
|
self.assertTrue(weak_a() is not None)
|
|
self.assertTrue(weak_before() is not None)
|
|
self.assertTrue(weak_after() is not None)
|
|
v = None
|
|
[gc.collect() for x in range(4)]
|
|
self.assertTrue(weak_v() is None)
|
|
self.assertTrue(weak_p() is None)
|
|
self.assertTrue(weak_a() is None)
|
|
self.assertTrue(weak_before() is None)
|
|
self.assertTrue(weak_after() is None)
|
|
self.assertTrue(weak_r0() is not None)
|
|
self.assertTrue(weak_r1() is not None)
|
|
r = None
|
|
gc.collect()
|
|
self.assertTrue(weak_r0() is None)
|
|
self.assertTrue(weak_r1() is None)
|
|
|
|
# Cycle removal
|
|
kwds = dict(self.view_keywords)
|
|
kwds["parent"] = []
|
|
v = BufferProxy(kwds)
|
|
v.some_attribute = v
|
|
tracked = True
|
|
for o in gc.get_objects():
|
|
if o is v:
|
|
break
|
|
else:
|
|
tracked = False
|
|
self.assertTrue(tracked)
|
|
kwds["parent"].append(v)
|
|
kwds = None
|
|
gc.collect()
|
|
n1 = len(gc.garbage)
|
|
v = None
|
|
gc.collect()
|
|
n2 = len(gc.garbage)
|
|
self.assertEqual(n2, n1)
|
|
|
|
def test_c_api(self):
|
|
api = pygame.bufferproxy._PYGAME_C_API
|
|
api_type = type(pygame.base._PYGAME_C_API)
|
|
|
|
self.assertIsInstance(api, api_type)
|
|
|
|
def test_repr(self):
|
|
v = BufferProxy(self.view_keywords)
|
|
cname = BufferProxy.__name__
|
|
oname, ovalue = re.findall(r"<([^)]+)\(([^)]+)\)>", repr(v))[0]
|
|
self.assertEqual(oname, cname)
|
|
self.assertEqual(v.length, int(ovalue))
|
|
|
|
def test_subclassing(self):
|
|
class MyBufferProxy(BufferProxy):
|
|
def __repr__(self):
|
|
return "*%s*" % (BufferProxy.__repr__(self),)
|
|
|
|
kwds = dict(self.view_keywords)
|
|
kwds["parent"] = 0
|
|
v = MyBufferProxy(kwds)
|
|
self.assertEqual(v.parent, 0)
|
|
r = repr(v)
|
|
self.assertEqual(r[:2], "*<")
|
|
self.assertEqual(r[-2:], ">*")
|
|
|
|
@unittest.skipIf(not pygame.HAVE_NEWBUF, "newbuf not implemented")
|
|
def NEWBUF_test_newbuf(self):
|
|
from ctypes import string_at
|
|
|
|
from pygame.tests.test_utils import buftools
|
|
|
|
Exporter = buftools.Exporter
|
|
Importer = buftools.Importer
|
|
exp = Exporter((10,), "B", readonly=True)
|
|
b = BufferProxy(exp)
|
|
self.assertEqual(b.length, exp.len)
|
|
self.assertEqual(b.raw, string_at(exp.buf, exp.len))
|
|
d = b.__array_interface__
|
|
try:
|
|
self.assertEqual(d["typestr"], "|u1")
|
|
self.assertEqual(d["shape"], exp.shape)
|
|
self.assertEqual(d["strides"], exp.strides)
|
|
self.assertEqual(d["data"], (exp.buf, True))
|
|
finally:
|
|
d = None
|
|
exp = Exporter((3,), "=h")
|
|
b = BufferProxy(exp)
|
|
self.assertEqual(b.length, exp.len)
|
|
self.assertEqual(b.raw, string_at(exp.buf, exp.len))
|
|
d = b.__array_interface__
|
|
try:
|
|
lil_endian = pygame.get_sdl_byteorder() == pygame.LIL_ENDIAN
|
|
f = "{}i{}".format("<" if lil_endian else ">", exp.itemsize)
|
|
self.assertEqual(d["typestr"], f)
|
|
self.assertEqual(d["shape"], exp.shape)
|
|
self.assertEqual(d["strides"], exp.strides)
|
|
self.assertEqual(d["data"], (exp.buf, False))
|
|
finally:
|
|
d = None
|
|
|
|
exp = Exporter((10, 2), "=i")
|
|
b = BufferProxy(exp)
|
|
imp = Importer(b, buftools.PyBUF_RECORDS)
|
|
self.assertTrue(imp.obj is b)
|
|
self.assertEqual(imp.buf, exp.buf)
|
|
self.assertEqual(imp.ndim, exp.ndim)
|
|
self.assertEqual(imp.format, exp.format)
|
|
self.assertEqual(imp.readonly, exp.readonly)
|
|
self.assertEqual(imp.itemsize, exp.itemsize)
|
|
self.assertEqual(imp.len, exp.len)
|
|
self.assertEqual(imp.shape, exp.shape)
|
|
self.assertEqual(imp.strides, exp.strides)
|
|
self.assertTrue(imp.suboffsets is None)
|
|
|
|
d = {
|
|
"typestr": "|u1",
|
|
"shape": (10,),
|
|
"strides": (1,),
|
|
"data": (9, True),
|
|
} # 9? Will not reading the data anyway.
|
|
b = BufferProxy(d)
|
|
imp = Importer(b, buftools.PyBUF_SIMPLE)
|
|
self.assertTrue(imp.obj is b)
|
|
self.assertEqual(imp.buf, 9)
|
|
self.assertEqual(imp.len, 10)
|
|
self.assertEqual(imp.format, None)
|
|
self.assertEqual(imp.itemsize, 1)
|
|
self.assertEqual(imp.ndim, 0)
|
|
self.assertTrue(imp.readonly)
|
|
self.assertTrue(imp.shape is None)
|
|
self.assertTrue(imp.strides is None)
|
|
self.assertTrue(imp.suboffsets is None)
|
|
|
|
try:
|
|
pygame.bufferproxy.get_segcount
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
|
|
def test_oldbuf_arg(self):
|
|
self.OLDBUF_test_oldbuf_arg()
|
|
|
|
def OLDBUF_test_oldbuf_arg(self):
|
|
from pygame.bufferproxy import get_segcount, get_read_buffer, get_write_buffer
|
|
|
|
content = as_bytes("\x01\x00\x00\x02") * 12
|
|
memory = ctypes.create_string_buffer(content)
|
|
memaddr = ctypes.addressof(memory)
|
|
|
|
def raise_exception(o):
|
|
raise ValueError("An exception")
|
|
|
|
bf = BufferProxy(
|
|
{
|
|
"shape": (len(content),),
|
|
"typestr": "|u1",
|
|
"data": (memaddr, False),
|
|
"strides": (1,),
|
|
}
|
|
)
|
|
seglen, segaddr = get_read_buffer(bf, 0)
|
|
self.assertEqual(segaddr, 0)
|
|
self.assertEqual(seglen, 0)
|
|
seglen, segaddr = get_write_buffer(bf, 0)
|
|
self.assertEqual(segaddr, 0)
|
|
self.assertEqual(seglen, 0)
|
|
segcount, buflen = get_segcount(bf)
|
|
self.assertEqual(segcount, 1)
|
|
self.assertEqual(buflen, len(content))
|
|
seglen, segaddr = get_read_buffer(bf, 0)
|
|
self.assertEqual(segaddr, memaddr)
|
|
self.assertEqual(seglen, len(content))
|
|
seglen, segaddr = get_write_buffer(bf, 0)
|
|
self.assertEqual(segaddr, memaddr)
|
|
self.assertEqual(seglen, len(content))
|
|
|
|
bf = BufferProxy(
|
|
{
|
|
"shape": (len(content),),
|
|
"typestr": "|u1",
|
|
"data": (memaddr, True),
|
|
"strides": (1,),
|
|
}
|
|
)
|
|
segcount, buflen = get_segcount(bf)
|
|
self.assertEqual(segcount, 1)
|
|
self.assertEqual(buflen, len(content))
|
|
seglen, segaddr = get_read_buffer(bf, 0)
|
|
self.assertEqual(segaddr, memaddr)
|
|
self.assertEqual(seglen, len(content))
|
|
self.assertRaises(ValueError, get_write_buffer, bf, 0)
|
|
|
|
bf = BufferProxy(
|
|
{
|
|
"shape": (len(content),),
|
|
"typestr": "|u1",
|
|
"data": (memaddr, True),
|
|
"strides": (1,),
|
|
"before": raise_exception,
|
|
}
|
|
)
|
|
segcount, buflen = get_segcount(bf)
|
|
self.assertEqual(segcount, 0)
|
|
self.assertEqual(buflen, 0)
|
|
|
|
bf = BufferProxy(
|
|
{
|
|
"shape": (3, 4),
|
|
"typestr": "|u4",
|
|
"data": (memaddr, True),
|
|
"strides": (12, 4),
|
|
}
|
|
)
|
|
segcount, buflen = get_segcount(bf)
|
|
self.assertEqual(segcount, 3 * 4)
|
|
self.assertEqual(buflen, 3 * 4 * 4)
|
|
for i in range(0, 4):
|
|
seglen, segaddr = get_read_buffer(bf, i)
|
|
self.assertEqual(segaddr, memaddr + i * 4)
|
|
self.assertEqual(seglen, 4)
|
|
|
|
|
|
class BufferProxyLegacyTest(unittest.TestCase):
|
|
content = as_bytes("\x01\x00\x00\x02") * 12
|
|
buffer = ctypes.create_string_buffer(content)
|
|
data = (ctypes.addressof(buffer), True)
|
|
|
|
def test_length(self):
|
|
|
|
# __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.length:
|
|
|
|
# The size of the buffer data in bytes.
|
|
bf = BufferProxy(
|
|
{"shape": (3, 4), "typestr": "|u4", "data": self.data, "strides": (12, 4)}
|
|
)
|
|
self.assertEqual(bf.length, len(self.content))
|
|
bf = BufferProxy(
|
|
{"shape": (3, 3), "typestr": "|u4", "data": self.data, "strides": (12, 4)}
|
|
)
|
|
self.assertEqual(bf.length, 3 * 3 * 4)
|
|
|
|
def test_raw(self):
|
|
|
|
# __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.raw:
|
|
|
|
# The raw buffer data as string. The string may contain NUL bytes.
|
|
|
|
bf = BufferProxy(
|
|
{"shape": (len(self.content),), "typestr": "|u1", "data": self.data}
|
|
)
|
|
self.assertEqual(bf.raw, self.content)
|
|
bf = BufferProxy(
|
|
{"shape": (3, 4), "typestr": "|u4", "data": self.data, "strides": (4, 12)}
|
|
)
|
|
self.assertEqual(bf.raw, self.content)
|
|
bf = BufferProxy(
|
|
{"shape": (3, 4), "typestr": "|u1", "data": self.data, "strides": (16, 4)}
|
|
)
|
|
self.assertRaises(ValueError, getattr, bf, "raw")
|
|
|
|
def test_write(self):
|
|
|
|
# __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.write:
|
|
|
|
# B.write (bufferproxy, buffer, offset) -> None
|
|
#
|
|
# Writes raw data to the bufferproxy.
|
|
#
|
|
# Writes the raw data from buffer to the BufferProxy object, starting
|
|
# at the specified offset within the BufferProxy.
|
|
# If the length of the passed buffer exceeds the length of the
|
|
# BufferProxy (reduced by the offset), an IndexError will be raised.
|
|
from ctypes import c_byte, sizeof, addressof, string_at, memset
|
|
|
|
nullbyte = "\x00".encode("latin_1")
|
|
Buf = c_byte * 10
|
|
data_buf = Buf(*range(1, 3 * sizeof(Buf) + 1, 3))
|
|
data = string_at(data_buf, sizeof(data_buf))
|
|
buf = Buf()
|
|
bp = BufferProxy(
|
|
{"typestr": "|u1", "shape": (sizeof(buf),), "data": (addressof(buf), False)}
|
|
)
|
|
try:
|
|
self.assertEqual(bp.raw, nullbyte * sizeof(Buf))
|
|
bp.write(data)
|
|
self.assertEqual(bp.raw, data)
|
|
memset(buf, 0, sizeof(buf))
|
|
bp.write(data[:3], 2)
|
|
raw = bp.raw
|
|
self.assertEqual(raw[:2], nullbyte * 2)
|
|
self.assertEqual(raw[2:5], data[:3])
|
|
self.assertEqual(raw[5:], nullbyte * (sizeof(Buf) - 5))
|
|
bp.write(data[:3], bp.length - 3)
|
|
raw = bp.raw
|
|
self.assertEqual(raw[-3:], data[:3])
|
|
self.assertRaises(IndexError, bp.write, data, 1)
|
|
self.assertRaises(IndexError, bp.write, data[:5], -1)
|
|
self.assertRaises(IndexError, bp.write, data[:5], bp.length)
|
|
self.assertRaises(TypeError, bp.write, 12)
|
|
bp = BufferProxy(
|
|
{
|
|
"typestr": "|u1",
|
|
"shape": (sizeof(buf),),
|
|
"data": (addressof(buf), True),
|
|
}
|
|
)
|
|
self.assertRaises(pygame.BufferError, bp.write, "123".encode("latin_1"))
|
|
finally:
|
|
# Make sure bp is garbage collected before buf
|
|
bp = None
|
|
gc.collect()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|