3RNN/Lib/site-packages/scipy/stats/tests/test_axis_nan_policy.py
2024-05-26 19:49:15 +02:00

1189 lines
50 KiB
Python

# Many scipy.stats functions support `axis` and `nan_policy` parameters.
# When the two are combined, it can be tricky to get all the behavior just
# right. This file contains a suite of common tests for scipy.stats functions
# that support `axis` and `nan_policy` and additional tests for some associated
# functions in stats._util.
from itertools import product, combinations_with_replacement, permutations
import re
import pickle
import pytest
import numpy as np
from numpy.testing import assert_allclose, assert_equal, suppress_warnings
from scipy import stats
from scipy.stats import norm # type: ignore[attr-defined]
from scipy.stats._axis_nan_policy import _masked_arrays_2_sentinel_arrays
from scipy._lib._util import AxisError
def unpack_ttest_result(res):
low, high = res.confidence_interval()
return (res.statistic, res.pvalue, res.df, res._standard_error,
res._estimate, low, high)
def _get_ttest_ci(ttest):
# get a function that returns the CI bounds of provided `ttest`
def ttest_ci(*args, **kwargs):
res = ttest(*args, **kwargs)
return res.confidence_interval()
return ttest_ci
axis_nan_policy_cases = [
# function, args, kwds, number of samples, number of outputs,
# ... paired, unpacker function
# args, kwds typically aren't needed; just showing that they work
(stats.kruskal, tuple(), dict(), 3, 2, False, None), # 4 samples is slow
(stats.ranksums, ('less',), dict(), 2, 2, False, None),
(stats.mannwhitneyu, tuple(), {'method': 'asymptotic'}, 2, 2, False, None),
(stats.wilcoxon, ('pratt',), {'mode': 'auto'}, 2, 2, True,
lambda res: (res.statistic, res.pvalue)),
(stats.wilcoxon, tuple(), dict(), 1, 2, True,
lambda res: (res.statistic, res.pvalue)),
(stats.wilcoxon, tuple(), {'mode': 'approx'}, 1, 3, True,
lambda res: (res.statistic, res.pvalue, res.zstatistic)),
(stats.gmean, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.hmean, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.pmean, (1.42,), dict(), 1, 1, False, lambda x: (x,)),
(stats.sem, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.iqr, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.kurtosis, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.skew, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.kstat, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.kstatvar, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.moment, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.moment, tuple(), dict(order=[1, 2]), 1, 2, False, None),
(stats.jarque_bera, tuple(), dict(), 1, 2, False, None),
(stats.ttest_1samp, (np.array([0]),), dict(), 1, 7, False,
unpack_ttest_result),
(stats.ttest_rel, tuple(), dict(), 2, 7, True, unpack_ttest_result),
(stats.ttest_ind, tuple(), dict(), 2, 7, False, unpack_ttest_result),
(_get_ttest_ci(stats.ttest_1samp), (0,), dict(), 1, 2, False, None),
(_get_ttest_ci(stats.ttest_rel), tuple(), dict(), 2, 2, True, None),
(_get_ttest_ci(stats.ttest_ind), tuple(), dict(), 2, 2, False, None),
(stats.mode, tuple(), dict(), 1, 2, True, lambda x: (x.mode, x.count)),
(stats.differential_entropy, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.variation, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.friedmanchisquare, tuple(), dict(), 3, 2, True, None),
(stats.brunnermunzel, tuple(), dict(), 2, 2, False, None),
(stats.mood, tuple(), {}, 2, 2, False, None),
(stats.shapiro, tuple(), {}, 1, 2, False, None),
(stats.ks_1samp, (norm().cdf,), dict(), 1, 4, False,
lambda res: (*res, res.statistic_location, res.statistic_sign)),
(stats.ks_2samp, tuple(), dict(), 2, 4, False,
lambda res: (*res, res.statistic_location, res.statistic_sign)),
(stats.kstest, (norm().cdf,), dict(), 1, 4, False,
lambda res: (*res, res.statistic_location, res.statistic_sign)),
(stats.kstest, tuple(), dict(), 2, 4, False,
lambda res: (*res, res.statistic_location, res.statistic_sign)),
(stats.levene, tuple(), {}, 2, 2, False, None),
(stats.fligner, tuple(), {'center': 'trimmed', 'proportiontocut': 0.01},
2, 2, False, None),
(stats.ansari, tuple(), {}, 2, 2, False, None),
(stats.entropy, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.entropy, tuple(), dict(), 2, 1, True, lambda x: (x,)),
(stats.skewtest, tuple(), dict(), 1, 2, False, None),
(stats.kurtosistest, tuple(), dict(), 1, 2, False, None),
(stats.normaltest, tuple(), dict(), 1, 2, False, None),
(stats.cramervonmises, ("norm",), dict(), 1, 2, False,
lambda res: (res.statistic, res.pvalue)),
(stats.cramervonmises_2samp, tuple(), dict(), 2, 2, False,
lambda res: (res.statistic, res.pvalue)),
(stats.epps_singleton_2samp, tuple(), dict(), 2, 2, False, None),
(stats.bartlett, tuple(), {}, 2, 2, False, None),
(stats.tmean, tuple(), {}, 1, 1, False, lambda x: (x,)),
(stats.tvar, tuple(), {}, 1, 1, False, lambda x: (x,)),
(stats.tmin, tuple(), {}, 1, 1, False, lambda x: (x,)),
(stats.tmax, tuple(), {}, 1, 1, False, lambda x: (x,)),
(stats.tstd, tuple(), {}, 1, 1, False, lambda x: (x,)),
(stats.tsem, tuple(), {}, 1, 1, False, lambda x: (x,)),
(stats.circmean, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.circvar, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.circstd, tuple(), dict(), 1, 1, False, lambda x: (x,)),
(stats.f_oneway, tuple(), {}, 2, 2, False, None),
(stats.alexandergovern, tuple(), {}, 2, 2, False,
lambda res: (res.statistic, res.pvalue)),
(stats.combine_pvalues, tuple(), {}, 1, 2, False, None),
]
# If the message is one of those expected, put nans in
# appropriate places of `statistics` and `pvalues`
too_small_messages = {"The input contains nan", # for nan_policy="raise"
"Degrees of freedom <= 0 for slice",
"x and y should have at least 5 elements",
"Data must be at least length 3",
"The sample must contain at least two",
"x and y must contain at least two",
"division by zero",
"Mean of empty slice",
"Data passed to ks_2samp must not be empty",
"Not enough test observations",
"Not enough other observations",
"Not enough observations.",
"At least one observation is required",
"zero-size array to reduction operation maximum",
"`x` and `y` must be of nonzero size.",
"The exact distribution of the Wilcoxon test",
"Data input must not be empty",
"Window length (0) must be positive and less",
"Window length (1) must be positive and less",
"Window length (2) must be positive and less",
"skewtest is not valid with less than",
"kurtosistest requires at least 5",
"attempt to get argmax of an empty sequence",
"No array values within given limits",
"Input sample size must be greater than one.",}
# If the message is one of these, results of the function may be inaccurate,
# but NaNs are not to be placed
inaccuracy_messages = {"Precision loss occurred in moment calculation",
"Sample size too small for normal approximation."}
# For some functions, nan_policy='propagate' should not just return NaNs
override_propagate_funcs = {stats.mode}
# For some functions, empty arrays produce non-NaN results
empty_special_case_funcs = {stats.entropy}
def _mixed_data_generator(n_samples, n_repetitions, axis, rng,
paired=False):
# generate random samples to check the response of hypothesis tests to
# samples with different (but broadcastable) shapes and various
# nan patterns (e.g. all nans, some nans, no nans) along axis-slices
data = []
for i in range(n_samples):
n_patterns = 6 # number of distinct nan patterns
n_obs = 20 if paired else 20 + i # observations per axis-slice
x = np.ones((n_repetitions, n_patterns, n_obs)) * np.nan
for j in range(n_repetitions):
samples = x[j, :, :]
# case 0: axis-slice with all nans (0 reals)
# cases 1-3: axis-slice with 1-3 reals (the rest nans)
# case 4: axis-slice with mostly (all but two) reals
# case 5: axis slice with all reals
for k, n_reals in enumerate([0, 1, 2, 3, n_obs-2, n_obs]):
# for cases 1-3, need paired nansw to be in the same place
indices = rng.permutation(n_obs)[:n_reals]
samples[k, indices] = rng.random(size=n_reals)
# permute the axis-slices just to show that order doesn't matter
samples[:] = rng.permutation(samples, axis=0)
# For multi-sample tests, we want to test broadcasting and check
# that nan policy works correctly for each nan pattern for each input.
# This takes care of both simultaneously.
new_shape = [n_repetitions] + [1]*n_samples + [n_obs]
new_shape[1 + i] = 6
x = x.reshape(new_shape)
x = np.moveaxis(x, -1, axis)
data.append(x)
return data
def _homogeneous_data_generator(n_samples, n_repetitions, axis, rng,
paired=False, all_nans=True):
# generate random samples to check the response of hypothesis tests to
# samples with different (but broadcastable) shapes and homogeneous
# data (all nans or all finite)
data = []
for i in range(n_samples):
n_obs = 20 if paired else 20 + i # observations per axis-slice
shape = [n_repetitions] + [1]*n_samples + [n_obs]
shape[1 + i] = 2
x = np.ones(shape) * np.nan if all_nans else rng.random(shape)
x = np.moveaxis(x, -1, axis)
data.append(x)
return data
def nan_policy_1d(hypotest, data1d, unpacker, *args, n_outputs=2,
nan_policy='raise', paired=False, _no_deco=True, **kwds):
# Reference implementation for how `nan_policy` should work for 1d samples
if nan_policy == 'raise':
for sample in data1d:
if np.any(np.isnan(sample)):
raise ValueError("The input contains nan values")
elif (nan_policy == 'propagate'
and hypotest not in override_propagate_funcs):
# For all hypothesis tests tested, returning nans is the right thing.
# But many hypothesis tests don't propagate correctly (e.g. they treat
# np.nan the same as np.inf, which doesn't make sense when ranks are
# involved) so override that behavior here.
for sample in data1d:
if np.any(np.isnan(sample)):
return np.full(n_outputs, np.nan)
elif nan_policy == 'omit':
# manually omit nans (or pairs in which at least one element is nan)
if not paired:
data1d = [sample[~np.isnan(sample)] for sample in data1d]
else:
nan_mask = np.isnan(data1d[0])
for sample in data1d[1:]:
nan_mask = np.logical_or(nan_mask, np.isnan(sample))
data1d = [sample[~nan_mask] for sample in data1d]
return unpacker(hypotest(*data1d, *args, _no_deco=_no_deco, **kwds))
@pytest.mark.filterwarnings('ignore::RuntimeWarning')
@pytest.mark.filterwarnings('ignore::UserWarning')
@pytest.mark.parametrize(("hypotest", "args", "kwds", "n_samples", "n_outputs",
"paired", "unpacker"), axis_nan_policy_cases)
@pytest.mark.parametrize(("nan_policy"), ("propagate", "omit", "raise"))
@pytest.mark.parametrize(("axis"), (1,))
@pytest.mark.parametrize(("data_generator"), ("mixed",))
def test_axis_nan_policy_fast(hypotest, args, kwds, n_samples, n_outputs,
paired, unpacker, nan_policy, axis,
data_generator):
_axis_nan_policy_test(hypotest, args, kwds, n_samples, n_outputs, paired,
unpacker, nan_policy, axis, data_generator)
@pytest.mark.slow
@pytest.mark.filterwarnings('ignore::RuntimeWarning')
@pytest.mark.filterwarnings('ignore::UserWarning')
@pytest.mark.parametrize(("hypotest", "args", "kwds", "n_samples", "n_outputs",
"paired", "unpacker"), axis_nan_policy_cases)
@pytest.mark.parametrize(("nan_policy"), ("propagate", "omit", "raise"))
@pytest.mark.parametrize(("axis"), range(-3, 3))
@pytest.mark.parametrize(("data_generator"),
("all_nans", "all_finite", "mixed"))
def test_axis_nan_policy_full(hypotest, args, kwds, n_samples, n_outputs,
paired, unpacker, nan_policy, axis,
data_generator):
_axis_nan_policy_test(hypotest, args, kwds, n_samples, n_outputs, paired,
unpacker, nan_policy, axis, data_generator)
def _axis_nan_policy_test(hypotest, args, kwds, n_samples, n_outputs, paired,
unpacker, nan_policy, axis, data_generator):
# Tests the 1D and vectorized behavior of hypothesis tests against a
# reference implementation (nan_policy_1d with np.ndenumerate)
# Some hypothesis tests return a non-iterable that needs an `unpacker` to
# extract the statistic and p-value. For those that don't:
if not unpacker:
def unpacker(res):
return res
rng = np.random.default_rng(0)
# Generate multi-dimensional test data with all important combinations
# of patterns of nans along `axis`
n_repetitions = 3 # number of repetitions of each pattern
data_gen_kwds = {'n_samples': n_samples, 'n_repetitions': n_repetitions,
'axis': axis, 'rng': rng, 'paired': paired}
if data_generator == 'mixed':
inherent_size = 6 # number of distinct types of patterns
data = _mixed_data_generator(**data_gen_kwds)
elif data_generator == 'all_nans':
inherent_size = 2 # hard-coded in _homogeneous_data_generator
data_gen_kwds['all_nans'] = True
data = _homogeneous_data_generator(**data_gen_kwds)
elif data_generator == 'all_finite':
inherent_size = 2 # hard-coded in _homogeneous_data_generator
data_gen_kwds['all_nans'] = False
data = _homogeneous_data_generator(**data_gen_kwds)
output_shape = [n_repetitions] + [inherent_size]*n_samples
# To generate reference behavior to compare against, loop over the axis-
# slices in data. Make indexing easier by moving `axis` to the end and
# broadcasting all samples to the same shape.
data_b = [np.moveaxis(sample, axis, -1) for sample in data]
data_b = [np.broadcast_to(sample, output_shape + [sample.shape[-1]])
for sample in data_b]
statistics = np.zeros(output_shape)
pvalues = np.zeros(output_shape)
for i, _ in np.ndenumerate(statistics):
data1d = [sample[i] for sample in data_b]
with np.errstate(divide='ignore', invalid='ignore'):
try:
res1d = nan_policy_1d(hypotest, data1d, unpacker, *args,
n_outputs=n_outputs,
nan_policy=nan_policy,
paired=paired, _no_deco=True, **kwds)
# Eventually we'll check the results of a single, vectorized
# call of `hypotest` against the arrays `statistics` and
# `pvalues` populated using the reference `nan_policy_1d`.
# But while we're at it, check the results of a 1D call to
# `hypotest` against the reference `nan_policy_1d`.
res1db = unpacker(hypotest(*data1d, *args,
nan_policy=nan_policy, **kwds))
assert_equal(res1db[0], res1d[0])
if len(res1db) == 2:
assert_equal(res1db[1], res1d[1])
# When there is not enough data in 1D samples, many existing
# hypothesis tests raise errors instead of returning nans .
# For vectorized calls, we put nans in the corresponding elements
# of the output.
except (RuntimeWarning, UserWarning, ValueError,
ZeroDivisionError) as e:
# whatever it is, make sure same error is raised by both
# `nan_policy_1d` and `hypotest`
with pytest.raises(type(e), match=re.escape(str(e))):
nan_policy_1d(hypotest, data1d, unpacker, *args,
n_outputs=n_outputs, nan_policy=nan_policy,
paired=paired, _no_deco=True, **kwds)
with pytest.raises(type(e), match=re.escape(str(e))):
hypotest(*data1d, *args, nan_policy=nan_policy, **kwds)
if any([str(e).startswith(message)
for message in too_small_messages]):
res1d = np.full(n_outputs, np.nan)
elif any([str(e).startswith(message)
for message in inaccuracy_messages]):
with suppress_warnings() as sup:
sup.filter(RuntimeWarning)
sup.filter(UserWarning)
res1d = nan_policy_1d(hypotest, data1d, unpacker,
*args, n_outputs=n_outputs,
nan_policy=nan_policy,
paired=paired, _no_deco=True,
**kwds)
else:
raise e
statistics[i] = res1d[0]
if len(res1d) == 2:
pvalues[i] = res1d[1]
# Perform a vectorized call to the hypothesis test.
# If `nan_policy == 'raise'`, check that it raises the appropriate error.
# If not, compare against the output against `statistics` and `pvalues`
if nan_policy == 'raise' and not data_generator == "all_finite":
message = 'The input contains nan values'
with pytest.raises(ValueError, match=message):
hypotest(*data, axis=axis, nan_policy=nan_policy, *args, **kwds)
else:
with suppress_warnings() as sup, \
np.errstate(divide='ignore', invalid='ignore'):
sup.filter(RuntimeWarning, "Precision loss occurred in moment")
sup.filter(UserWarning, "Sample size too small for normal "
"approximation.")
res = unpacker(hypotest(*data, axis=axis, nan_policy=nan_policy,
*args, **kwds))
assert_allclose(res[0], statistics, rtol=1e-15)
assert_equal(res[0].dtype, statistics.dtype)
if len(res) == 2:
assert_allclose(res[1], pvalues, rtol=1e-15)
assert_equal(res[1].dtype, pvalues.dtype)
@pytest.mark.filterwarnings('ignore::RuntimeWarning')
@pytest.mark.parametrize(("hypotest", "args", "kwds", "n_samples", "n_outputs",
"paired", "unpacker"), axis_nan_policy_cases)
@pytest.mark.parametrize(("nan_policy"), ("propagate", "omit", "raise"))
@pytest.mark.parametrize(("data_generator"),
("all_nans", "all_finite", "mixed", "empty"))
def test_axis_nan_policy_axis_is_None(hypotest, args, kwds, n_samples,
n_outputs, paired, unpacker, nan_policy,
data_generator):
# check for correct behavior when `axis=None`
if not unpacker:
def unpacker(res):
return res
rng = np.random.default_rng(0)
if data_generator == "empty":
data = [rng.random((2, 0)) for i in range(n_samples)]
else:
data = [rng.random((2, 20)) for i in range(n_samples)]
if data_generator == "mixed":
masks = [rng.random((2, 20)) > 0.9 for i in range(n_samples)]
for sample, mask in zip(data, masks):
sample[mask] = np.nan
elif data_generator == "all_nans":
data = [sample * np.nan for sample in data]
data_raveled = [sample.ravel() for sample in data]
if nan_policy == 'raise' and data_generator not in {"all_finite", "empty"}:
message = 'The input contains nan values'
# check for correct behavior whether or not data is 1d to begin with
with pytest.raises(ValueError, match=message):
hypotest(*data, axis=None, nan_policy=nan_policy,
*args, **kwds)
with pytest.raises(ValueError, match=message):
hypotest(*data_raveled, axis=None, nan_policy=nan_policy,
*args, **kwds)
else:
# behavior of reference implementation with 1d input, hypotest with 1d
# input, and hypotest with Nd input should match, whether that means
# that outputs are equal or they raise the same exception
ea_str, eb_str, ec_str = None, None, None
with np.errstate(divide='ignore', invalid='ignore'):
try:
res1da = nan_policy_1d(hypotest, data_raveled, unpacker, *args,
n_outputs=n_outputs,
nan_policy=nan_policy, paired=paired,
_no_deco=True, **kwds)
except (RuntimeWarning, ValueError, ZeroDivisionError) as ea:
ea_str = str(ea)
try:
res1db = unpacker(hypotest(*data_raveled, *args,
nan_policy=nan_policy, **kwds))
except (RuntimeWarning, ValueError, ZeroDivisionError) as eb:
eb_str = str(eb)
try:
res1dc = unpacker(hypotest(*data, *args, axis=None,
nan_policy=nan_policy, **kwds))
except (RuntimeWarning, ValueError, ZeroDivisionError) as ec:
ec_str = str(ec)
if ea_str or eb_str or ec_str:
assert any([str(ea_str).startswith(message)
for message in too_small_messages])
assert ea_str == eb_str == ec_str
else:
assert_equal(res1db, res1da)
assert_equal(res1dc, res1da)
for item in list(res1da) + list(res1db) + list(res1dc):
# Most functions naturally return NumPy numbers, which
# are drop-in replacements for the Python versions but with
# desirable attributes. Make sure this is consistent.
assert np.issubdtype(item.dtype, np.number)
# Test keepdims for:
# - single-output and multi-output functions (gmean and mannwhitneyu)
# - Axis negative, positive, None, and tuple
# - 1D with no NaNs
# - 1D with NaN propagation
# - Zero-sized output
@pytest.mark.parametrize("nan_policy", ("omit", "propagate"))
@pytest.mark.parametrize(
("hypotest", "args", "kwds", "n_samples", "unpacker"),
((stats.gmean, tuple(), dict(), 1, lambda x: (x,)),
(stats.mannwhitneyu, tuple(), {'method': 'asymptotic'}, 2, None))
)
@pytest.mark.parametrize(
("sample_shape", "axis_cases"),
(((2, 3, 3, 4), (None, 0, -1, (0, 2), (1, -1), (3, 1, 2, 0))),
((10, ), (0, -1)),
((20, 0), (0, 1)))
)
def test_keepdims(hypotest, args, kwds, n_samples, unpacker,
sample_shape, axis_cases, nan_policy):
# test if keepdims parameter works correctly
if not unpacker:
def unpacker(res):
return res
rng = np.random.default_rng(0)
data = [rng.random(sample_shape) for _ in range(n_samples)]
nan_data = [sample.copy() for sample in data]
nan_mask = [rng.random(sample_shape) < 0.2 for _ in range(n_samples)]
for sample, mask in zip(nan_data, nan_mask):
sample[mask] = np.nan
for axis in axis_cases:
expected_shape = list(sample_shape)
if axis is None:
expected_shape = np.ones(len(sample_shape))
else:
if isinstance(axis, int):
expected_shape[axis] = 1
else:
for ax in axis:
expected_shape[ax] = 1
expected_shape = tuple(expected_shape)
res = unpacker(hypotest(*data, *args, axis=axis, keepdims=True,
**kwds))
res_base = unpacker(hypotest(*data, *args, axis=axis, keepdims=False,
**kwds))
nan_res = unpacker(hypotest(*nan_data, *args, axis=axis,
keepdims=True, nan_policy=nan_policy,
**kwds))
nan_res_base = unpacker(hypotest(*nan_data, *args, axis=axis,
keepdims=False,
nan_policy=nan_policy, **kwds))
for r, r_base, rn, rn_base in zip(res, res_base, nan_res,
nan_res_base):
assert r.shape == expected_shape
r = np.squeeze(r, axis=axis)
assert_equal(r, r_base)
assert rn.shape == expected_shape
rn = np.squeeze(rn, axis=axis)
assert_equal(rn, rn_base)
@pytest.mark.parametrize(("fun", "nsamp"),
[(stats.kstat, 1),
(stats.kstatvar, 1)])
def test_hypotest_back_compat_no_axis(fun, nsamp):
m, n = 8, 9
rng = np.random.default_rng(0)
x = rng.random((nsamp, m, n))
res = fun(*x)
res2 = fun(*x, _no_deco=True)
res3 = fun([xi.ravel() for xi in x])
assert_equal(res, res2)
assert_equal(res, res3)
@pytest.mark.parametrize(("axis"), (0, 1, 2))
def test_axis_nan_policy_decorated_positional_axis(axis):
# Test for correct behavior of function decorated with
# _axis_nan_policy_decorator whether `axis` is provided as positional or
# keyword argument
shape = (8, 9, 10)
rng = np.random.default_rng(0)
x = rng.random(shape)
y = rng.random(shape)
res1 = stats.mannwhitneyu(x, y, True, 'two-sided', axis)
res2 = stats.mannwhitneyu(x, y, True, 'two-sided', axis=axis)
assert_equal(res1, res2)
message = "mannwhitneyu() got multiple values for argument 'axis'"
with pytest.raises(TypeError, match=re.escape(message)):
stats.mannwhitneyu(x, y, True, 'two-sided', axis, axis=axis)
def test_axis_nan_policy_decorated_positional_args():
# Test for correct behavior of function decorated with
# _axis_nan_policy_decorator when function accepts *args
shape = (3, 8, 9, 10)
rng = np.random.default_rng(0)
x = rng.random(shape)
x[0, 0, 0, 0] = np.nan
stats.kruskal(*x)
message = "kruskal() got an unexpected keyword argument 'samples'"
with pytest.raises(TypeError, match=re.escape(message)):
stats.kruskal(samples=x)
with pytest.raises(TypeError, match=re.escape(message)):
stats.kruskal(*x, samples=x)
def test_axis_nan_policy_decorated_keyword_samples():
# Test for correct behavior of function decorated with
# _axis_nan_policy_decorator whether samples are provided as positional or
# keyword arguments
shape = (2, 8, 9, 10)
rng = np.random.default_rng(0)
x = rng.random(shape)
x[0, 0, 0, 0] = np.nan
res1 = stats.mannwhitneyu(*x)
res2 = stats.mannwhitneyu(x=x[0], y=x[1])
assert_equal(res1, res2)
message = "mannwhitneyu() got multiple values for argument"
with pytest.raises(TypeError, match=re.escape(message)):
stats.mannwhitneyu(*x, x=x[0], y=x[1])
@pytest.mark.parametrize(("hypotest", "args", "kwds", "n_samples", "n_outputs",
"paired", "unpacker"), axis_nan_policy_cases)
def test_axis_nan_policy_decorated_pickled(hypotest, args, kwds, n_samples,
n_outputs, paired, unpacker):
if "ttest_ci" in hypotest.__name__:
pytest.skip("Can't pickle functions defined within functions.")
rng = np.random.default_rng(0)
# Some hypothesis tests return a non-iterable that needs an `unpacker` to
# extract the statistic and p-value. For those that don't:
if not unpacker:
def unpacker(res):
return res
data = rng.uniform(size=(n_samples, 2, 30))
pickled_hypotest = pickle.dumps(hypotest)
unpickled_hypotest = pickle.loads(pickled_hypotest)
res1 = unpacker(hypotest(*data, *args, axis=-1, **kwds))
res2 = unpacker(unpickled_hypotest(*data, *args, axis=-1, **kwds))
assert_allclose(res1, res2, rtol=1e-12)
def test_check_empty_inputs():
# Test that _check_empty_inputs is doing its job, at least for single-
# sample inputs. (Multi-sample functionality is tested below.)
# If the input sample is not empty, it should return None.
# If the input sample is empty, it should return an array of NaNs or an
# empty array of appropriate shape. np.mean is used as a reference for the
# output because, like the statistics calculated by these functions,
# it works along and "consumes" `axis` but preserves the other axes.
for i in range(5):
for combo in combinations_with_replacement([0, 1, 2], i):
for axis in range(len(combo)):
samples = (np.zeros(combo),)
output = stats._axis_nan_policy._check_empty_inputs(samples,
axis)
if output is not None:
with np.testing.suppress_warnings() as sup:
sup.filter(RuntimeWarning, "Mean of empty slice.")
sup.filter(RuntimeWarning, "invalid value encountered")
reference = samples[0].mean(axis=axis)
np.testing.assert_equal(output, reference)
def _check_arrays_broadcastable(arrays, axis):
# https://numpy.org/doc/stable/user/basics.broadcasting.html
# "When operating on two arrays, NumPy compares their shapes element-wise.
# It starts with the trailing (i.e. rightmost) dimensions and works its
# way left.
# Two dimensions are compatible when
# 1. they are equal, or
# 2. one of them is 1
# ...
# Arrays do not need to have the same number of dimensions."
# (Clarification: if the arrays are compatible according to the criteria
# above and an array runs out of dimensions, it is still compatible.)
# Below, we follow the rules above except ignoring `axis`
n_dims = max([arr.ndim for arr in arrays])
if axis is not None:
# convert to negative axis
axis = (-n_dims + axis) if axis >= 0 else axis
for dim in range(1, n_dims+1): # we'll index from -1 to -n_dims, inclusive
if -dim == axis:
continue # ignore lengths along `axis`
dim_lengths = set()
for arr in arrays:
if dim <= arr.ndim and arr.shape[-dim] != 1:
dim_lengths.add(arr.shape[-dim])
if len(dim_lengths) > 1:
return False
return True
@pytest.mark.slow
@pytest.mark.parametrize(("hypotest", "args", "kwds", "n_samples", "n_outputs",
"paired", "unpacker"), axis_nan_policy_cases)
def test_empty(hypotest, args, kwds, n_samples, n_outputs, paired, unpacker):
# test for correct output shape when at least one input is empty
if hypotest in override_propagate_funcs:
reason = "Doesn't follow the usual pattern. Tested separately."
pytest.skip(reason=reason)
if unpacker is None:
unpacker = lambda res: (res[0], res[1]) # noqa: E731
def small_data_generator(n_samples, n_dims):
def small_sample_generator(n_dims):
# return all possible "small" arrays in up to n_dim dimensions
for i in n_dims:
# "small" means with size along dimension either 0 or 1
for combo in combinations_with_replacement([0, 1, 2], i):
yield np.zeros(combo)
# yield all possible combinations of small samples
gens = [small_sample_generator(n_dims) for i in range(n_samples)]
yield from product(*gens)
n_dims = [2, 3]
for samples in small_data_generator(n_samples, n_dims):
# this test is only for arrays of zero size
if not any(sample.size == 0 for sample in samples):
continue
max_axis = max(sample.ndim for sample in samples)
# need to test for all valid values of `axis` parameter, too
for axis in range(-max_axis, max_axis):
try:
# After broadcasting, all arrays are the same shape, so
# the shape of the output should be the same as a single-
# sample statistic. Use np.mean as a reference.
concat = stats._stats_py._broadcast_concatenate(samples, axis)
with np.testing.suppress_warnings() as sup:
sup.filter(RuntimeWarning, "Mean of empty slice.")
sup.filter(RuntimeWarning, "invalid value encountered")
expected = np.mean(concat, axis=axis) * np.nan
if hypotest in empty_special_case_funcs:
empty_val = hypotest(*([[]]*len(samples)), *args, **kwds)
mask = np.isnan(expected)
expected[mask] = empty_val
with np.testing.suppress_warnings() as sup:
# generated by f_oneway for too_small inputs
sup.filter(stats.DegenerateDataWarning)
res = hypotest(*samples, *args, axis=axis, **kwds)
res = unpacker(res)
for i in range(n_outputs):
assert_equal(res[i], expected)
except ValueError:
# confirm that the arrays truly are not broadcastable
assert not _check_arrays_broadcastable(samples,
None if paired else axis)
# confirm that _both_ `_broadcast_concatenate` and `hypotest`
# produce this information.
message = "Array shapes are incompatible for broadcasting."
with pytest.raises(ValueError, match=message):
stats._stats_py._broadcast_concatenate(samples, axis, paired)
with pytest.raises(ValueError, match=message):
hypotest(*samples, *args, axis=axis, **kwds)
def test_masked_array_2_sentinel_array():
# prepare arrays
np.random.seed(0)
A = np.random.rand(10, 11, 12)
B = np.random.rand(12)
mask = A < 0.5
A = np.ma.masked_array(A, mask)
# set arbitrary elements to special values
# (these values might have been considered for use as sentinel values)
max_float = np.finfo(np.float64).max
max_float2 = np.nextafter(max_float, -np.inf)
max_float3 = np.nextafter(max_float2, -np.inf)
A[3, 4, 1] = np.nan
A[4, 5, 2] = np.inf
A[5, 6, 3] = max_float
B[8] = np.nan
B[7] = np.inf
B[6] = max_float2
# convert masked A to array with sentinel value, don't modify B
out_arrays, sentinel = _masked_arrays_2_sentinel_arrays([A, B])
A_out, B_out = out_arrays
# check that good sentinel value was chosen (according to intended logic)
assert (sentinel != max_float) and (sentinel != max_float2)
assert sentinel == max_float3
# check that output arrays are as intended
A_reference = A.data
A_reference[A.mask] = sentinel
np.testing.assert_array_equal(A_out, A_reference)
assert B_out is B
def test_masked_dtype():
# When _masked_arrays_2_sentinel_arrays was first added, it always
# upcast the arrays to np.float64. After gh16662, check expected promotion
# and that the expected sentinel is found.
# these are important because the max of the promoted dtype is the first
# candidate to be the sentinel value
max16 = np.iinfo(np.int16).max
max128c = np.finfo(np.complex128).max
# a is a regular array, b has masked elements, and c has no masked elements
a = np.array([1, 2, max16], dtype=np.int16)
b = np.ma.array([1, 2, 1], dtype=np.int8, mask=[0, 1, 0])
c = np.ma.array([1, 2, 1], dtype=np.complex128, mask=[0, 0, 0])
# check integer masked -> sentinel conversion
out_arrays, sentinel = _masked_arrays_2_sentinel_arrays([a, b])
a_out, b_out = out_arrays
assert sentinel == max16-1 # not max16 because max16 was in the data
assert b_out.dtype == np.int16 # check expected promotion
assert_allclose(b_out, [b[0], sentinel, b[-1]]) # check sentinel placement
assert a_out is a # not a masked array, so left untouched
assert not isinstance(b_out, np.ma.MaskedArray) # b became regular array
# similarly with complex
out_arrays, sentinel = _masked_arrays_2_sentinel_arrays([b, c])
b_out, c_out = out_arrays
assert sentinel == max128c # max128c was not in the data
assert b_out.dtype == np.complex128 # b got promoted
assert_allclose(b_out, [b[0], sentinel, b[-1]]) # check sentinel placement
assert not isinstance(b_out, np.ma.MaskedArray) # b became regular array
assert not isinstance(c_out, np.ma.MaskedArray) # c became regular array
# Also, check edge case when a sentinel value cannot be found in the data
min8, max8 = np.iinfo(np.int8).min, np.iinfo(np.int8).max
a = np.arange(min8, max8+1, dtype=np.int8) # use all possible values
mask1 = np.zeros_like(a, dtype=bool)
mask0 = np.zeros_like(a, dtype=bool)
# a masked value can be used as the sentinel
mask1[1] = True
a1 = np.ma.array(a, mask=mask1)
out_arrays, sentinel = _masked_arrays_2_sentinel_arrays([a1])
assert sentinel == min8+1
# unless it's the smallest possible; skipped for simiplicity (see code)
mask0[0] = True
a0 = np.ma.array(a, mask=mask0)
message = "This function replaces masked elements with sentinel..."
with pytest.raises(ValueError, match=message):
_masked_arrays_2_sentinel_arrays([a0])
# test that dtype is preserved in functions
a = np.ma.array([1, 2, 3], mask=[0, 1, 0], dtype=np.float32)
assert stats.gmean(a).dtype == np.float32
def test_masked_stat_1d():
# basic test of _axis_nan_policy_factory with 1D masked sample
males = [19, 22, 16, 29, 24]
females = [20, 11, 17, 12]
res = stats.mannwhitneyu(males, females)
# same result when extra nan is omitted
females2 = [20, 11, 17, np.nan, 12]
res2 = stats.mannwhitneyu(males, females2, nan_policy='omit')
np.testing.assert_array_equal(res2, res)
# same result when extra element is masked
females3 = [20, 11, 17, 1000, 12]
mask3 = [False, False, False, True, False]
females3 = np.ma.masked_array(females3, mask=mask3)
res3 = stats.mannwhitneyu(males, females3)
np.testing.assert_array_equal(res3, res)
# same result when extra nan is omitted and additional element is masked
females4 = [20, 11, 17, np.nan, 1000, 12]
mask4 = [False, False, False, False, True, False]
females4 = np.ma.masked_array(females4, mask=mask4)
res4 = stats.mannwhitneyu(males, females4, nan_policy='omit')
np.testing.assert_array_equal(res4, res)
# same result when extra elements, including nan, are masked
females5 = [20, 11, 17, np.nan, 1000, 12]
mask5 = [False, False, False, True, True, False]
females5 = np.ma.masked_array(females5, mask=mask5)
res5 = stats.mannwhitneyu(males, females5, nan_policy='propagate')
res6 = stats.mannwhitneyu(males, females5, nan_policy='raise')
np.testing.assert_array_equal(res5, res)
np.testing.assert_array_equal(res6, res)
@pytest.mark.parametrize(("axis"), range(-3, 3))
def test_masked_stat_3d(axis):
# basic test of _axis_nan_policy_factory with 3D masked sample
np.random.seed(0)
a = np.random.rand(3, 4, 5)
b = np.random.rand(4, 5)
c = np.random.rand(4, 1)
mask_a = a < 0.1
mask_c = [False, False, False, True]
a_masked = np.ma.masked_array(a, mask=mask_a)
c_masked = np.ma.masked_array(c, mask=mask_c)
a_nans = a.copy()
a_nans[mask_a] = np.nan
c_nans = c.copy()
c_nans[mask_c] = np.nan
res = stats.kruskal(a_nans, b, c_nans, nan_policy='omit', axis=axis)
res2 = stats.kruskal(a_masked, b, c_masked, axis=axis)
np.testing.assert_array_equal(res, res2)
def test_mixed_mask_nan_1():
# targeted test of _axis_nan_policy_factory with 2D masked sample:
# omitting samples with masks and nan_policy='omit' are equivalent
# also checks paired-sample sentinel value removal
m, n = 3, 20
axis = -1
np.random.seed(0)
a = np.random.rand(m, n)
b = np.random.rand(m, n)
mask_a1 = np.random.rand(m, n) < 0.2
mask_a2 = np.random.rand(m, n) < 0.1
mask_b1 = np.random.rand(m, n) < 0.15
mask_b2 = np.random.rand(m, n) < 0.15
mask_a1[2, :] = True
a_nans = a.copy()
b_nans = b.copy()
a_nans[mask_a1 | mask_a2] = np.nan
b_nans[mask_b1 | mask_b2] = np.nan
a_masked1 = np.ma.masked_array(a, mask=mask_a1)
b_masked1 = np.ma.masked_array(b, mask=mask_b1)
a_masked1[mask_a2] = np.nan
b_masked1[mask_b2] = np.nan
a_masked2 = np.ma.masked_array(a, mask=mask_a2)
b_masked2 = np.ma.masked_array(b, mask=mask_b2)
a_masked2[mask_a1] = np.nan
b_masked2[mask_b1] = np.nan
a_masked3 = np.ma.masked_array(a, mask=(mask_a1 | mask_a2))
b_masked3 = np.ma.masked_array(b, mask=(mask_b1 | mask_b2))
res = stats.wilcoxon(a_nans, b_nans, nan_policy='omit', axis=axis)
res1 = stats.wilcoxon(a_masked1, b_masked1, nan_policy='omit', axis=axis)
res2 = stats.wilcoxon(a_masked2, b_masked2, nan_policy='omit', axis=axis)
res3 = stats.wilcoxon(a_masked3, b_masked3, nan_policy='raise', axis=axis)
res4 = stats.wilcoxon(a_masked3, b_masked3,
nan_policy='propagate', axis=axis)
np.testing.assert_array_equal(res1, res)
np.testing.assert_array_equal(res2, res)
np.testing.assert_array_equal(res3, res)
np.testing.assert_array_equal(res4, res)
def test_mixed_mask_nan_2():
# targeted test of _axis_nan_policy_factory with 2D masked sample:
# check for expected interaction between masks and nans
# Cases here are
# [mixed nan/mask, all nans, all masked,
# unmasked nan, masked nan, unmasked non-nan]
a = [[1, np.nan, 2], [np.nan, np.nan, np.nan], [1, 2, 3],
[1, np.nan, 3], [1, np.nan, 3], [1, 2, 3]]
mask = [[1, 0, 1], [0, 0, 0], [1, 1, 1],
[0, 0, 0], [0, 1, 0], [0, 0, 0]]
a_masked = np.ma.masked_array(a, mask=mask)
b = [[4, 5, 6]]
ref1 = stats.ranksums([1, 3], [4, 5, 6])
ref2 = stats.ranksums([1, 2, 3], [4, 5, 6])
# nan_policy = 'omit'
# all elements are removed from first three rows
# middle element is removed from fourth and fifth rows
# no elements removed from last row
res = stats.ranksums(a_masked, b, nan_policy='omit', axis=-1)
stat_ref = [np.nan, np.nan, np.nan,
ref1.statistic, ref1.statistic, ref2.statistic]
p_ref = [np.nan, np.nan, np.nan,
ref1.pvalue, ref1.pvalue, ref2.pvalue]
np.testing.assert_array_equal(res.statistic, stat_ref)
np.testing.assert_array_equal(res.pvalue, p_ref)
# nan_policy = 'propagate'
# nans propagate in first, second, and fourth row
# all elements are removed by mask from third row
# middle element is removed from fifth row
# no elements removed from last row
res = stats.ranksums(a_masked, b, nan_policy='propagate', axis=-1)
stat_ref = [np.nan, np.nan, np.nan,
np.nan, ref1.statistic, ref2.statistic]
p_ref = [np.nan, np.nan, np.nan,
np.nan, ref1.pvalue, ref2.pvalue]
np.testing.assert_array_equal(res.statistic, stat_ref)
np.testing.assert_array_equal(res.pvalue, p_ref)
def test_axis_None_vs_tuple():
# `axis` `None` should be equivalent to tuple with all axes
shape = (3, 8, 9, 10)
rng = np.random.default_rng(0)
x = rng.random(shape)
res = stats.kruskal(*x, axis=None)
res2 = stats.kruskal(*x, axis=(0, 1, 2))
np.testing.assert_array_equal(res, res2)
def test_axis_None_vs_tuple_with_broadcasting():
# `axis` `None` should be equivalent to tuple with all axes,
# which should be equivalent to raveling the arrays before passing them
rng = np.random.default_rng(0)
x = rng.random((5, 1))
y = rng.random((1, 5))
x2, y2 = np.broadcast_arrays(x, y)
res0 = stats.mannwhitneyu(x.ravel(), y.ravel())
res1 = stats.mannwhitneyu(x, y, axis=None)
res2 = stats.mannwhitneyu(x, y, axis=(0, 1))
res3 = stats.mannwhitneyu(x2.ravel(), y2.ravel())
assert res1 == res0
assert res2 == res0
assert res3 != res0
@pytest.mark.parametrize(("axis"),
list(permutations(range(-3, 3), 2)) + [(-4, 1)])
def test_other_axis_tuples(axis):
# Check that _axis_nan_policy_factory treats all `axis` tuples as expected
rng = np.random.default_rng(0)
shape_x = (4, 5, 6)
shape_y = (1, 6)
x = rng.random(shape_x)
y = rng.random(shape_y)
axis_original = axis
# convert axis elements to positive
axis = tuple([(i if i >= 0 else 3 + i) for i in axis])
axis = sorted(axis)
if len(set(axis)) != len(axis):
message = "`axis` must contain only distinct elements"
with pytest.raises(AxisError, match=re.escape(message)):
stats.mannwhitneyu(x, y, axis=axis_original)
return
if axis[0] < 0 or axis[-1] > 2:
message = "`axis` is out of bounds for array of dimension 3"
with pytest.raises(AxisError, match=re.escape(message)):
stats.mannwhitneyu(x, y, axis=axis_original)
return
res = stats.mannwhitneyu(x, y, axis=axis_original)
# reference behavior
not_axis = {0, 1, 2} - set(axis) # which axis is not part of `axis`
not_axis = next(iter(not_axis)) # take it out of the set
x2 = x
shape_y_broadcasted = [1, 1, 6]
shape_y_broadcasted[not_axis] = shape_x[not_axis]
y2 = np.broadcast_to(y, shape_y_broadcasted)
m = x2.shape[not_axis]
x2 = np.moveaxis(x2, axis, (1, 2))
y2 = np.moveaxis(y2, axis, (1, 2))
x2 = np.reshape(x2, (m, -1))
y2 = np.reshape(y2, (m, -1))
res2 = stats.mannwhitneyu(x2, y2, axis=1)
np.testing.assert_array_equal(res, res2)
@pytest.mark.parametrize(
("weighted_fun_name, unpacker"),
[
("gmean", lambda x: x),
("hmean", lambda x: x),
("pmean", lambda x: x),
("combine_pvalues", lambda x: (x.pvalue, x.statistic)),
],
)
def test_mean_mixed_mask_nan_weights(weighted_fun_name, unpacker):
# targeted test of _axis_nan_policy_factory with 2D masked sample:
# omitting samples with masks and nan_policy='omit' are equivalent
# also checks paired-sample sentinel value removal
if weighted_fun_name == 'pmean':
def weighted_fun(a, **kwargs):
return stats.pmean(a, p=0.42, **kwargs)
else:
weighted_fun = getattr(stats, weighted_fun_name)
def func(*args, **kwargs):
return unpacker(weighted_fun(*args, **kwargs))
m, n = 3, 20
axis = -1
rng = np.random.default_rng(6541968121)
a = rng.uniform(size=(m, n))
b = rng.uniform(size=(m, n))
mask_a1 = rng.uniform(size=(m, n)) < 0.2
mask_a2 = rng.uniform(size=(m, n)) < 0.1
mask_b1 = rng.uniform(size=(m, n)) < 0.15
mask_b2 = rng.uniform(size=(m, n)) < 0.15
mask_a1[2, :] = True
a_nans = a.copy()
b_nans = b.copy()
a_nans[mask_a1 | mask_a2] = np.nan
b_nans[mask_b1 | mask_b2] = np.nan
a_masked1 = np.ma.masked_array(a, mask=mask_a1)
b_masked1 = np.ma.masked_array(b, mask=mask_b1)
a_masked1[mask_a2] = np.nan
b_masked1[mask_b2] = np.nan
a_masked2 = np.ma.masked_array(a, mask=mask_a2)
b_masked2 = np.ma.masked_array(b, mask=mask_b2)
a_masked2[mask_a1] = np.nan
b_masked2[mask_b1] = np.nan
a_masked3 = np.ma.masked_array(a, mask=(mask_a1 | mask_a2))
b_masked3 = np.ma.masked_array(b, mask=(mask_b1 | mask_b2))
mask_all = (mask_a1 | mask_a2 | mask_b1 | mask_b2)
a_masked4 = np.ma.masked_array(a, mask=mask_all)
b_masked4 = np.ma.masked_array(b, mask=mask_all)
with np.testing.suppress_warnings() as sup:
message = 'invalid value encountered'
sup.filter(RuntimeWarning, message)
res = func(a_nans, weights=b_nans, nan_policy="omit", axis=axis)
res1 = func(a_masked1, weights=b_masked1, nan_policy="omit", axis=axis)
res2 = func(a_masked2, weights=b_masked2, nan_policy="omit", axis=axis)
res3 = func(a_masked3, weights=b_masked3, nan_policy="raise", axis=axis)
res4 = func(a_masked3, weights=b_masked3, nan_policy="propagate", axis=axis)
# Would test with a_masked3/b_masked3, but there is a bug in np.average
# that causes a bug in _no_deco mean with masked weights. Would use
# np.ma.average, but that causes other problems. See numpy/numpy#7330.
if weighted_fun_name in {"hmean"}:
weighted_fun_ma = getattr(stats.mstats, weighted_fun_name)
res5 = weighted_fun_ma(a_masked4, weights=b_masked4,
axis=axis, _no_deco=True)
np.testing.assert_array_equal(res1, res)
np.testing.assert_array_equal(res2, res)
np.testing.assert_array_equal(res3, res)
np.testing.assert_array_equal(res4, res)
if weighted_fun_name in {"hmean"}:
# _no_deco mean returns masked array, last element was masked
np.testing.assert_allclose(res5.compressed(), res[~np.isnan(res)])
def test_raise_invalid_args_g17713():
# other cases are handled in:
# test_axis_nan_policy_decorated_positional_axis - multiple values for arg
# test_axis_nan_policy_decorated_positional_args - unexpected kwd arg
message = "got an unexpected keyword argument"
with pytest.raises(TypeError, match=message):
stats.gmean([1, 2, 3], invalid_arg=True)
message = " got multiple values for argument"
with pytest.raises(TypeError, match=message):
stats.gmean([1, 2, 3], a=True)
message = "missing 1 required positional argument"
with pytest.raises(TypeError, match=message):
stats.gmean()
message = "takes from 1 to 4 positional arguments but 5 were given"
with pytest.raises(TypeError, match=message):
stats.gmean([1, 2, 3], 0, float, [1, 1, 1], 10)
@pytest.mark.parametrize('dtype', [np.int16, np.float32, np.complex128])
def test_array_like_input(dtype):
# Check that `_axis_nan_policy`-decorated functions work with custom
# containers that are coercible to numeric arrays
class ArrLike:
def __init__(self, x, dtype):
self._x = x
self._dtype = dtype
def __array__(self, dtype=None, copy=None):
return np.asarray(x, dtype=self._dtype)
x = [1]*2 + [3, 4, 5]
res = stats.mode(ArrLike(x, dtype=dtype))
assert res.mode == 1
assert res.count == 2