213 lines
8.2 KiB
Python
213 lines
8.2 KiB
Python
|
|
|
|
import caffe2.python.hypothesis_test_util as hu
|
|
import hypothesis.strategies as st
|
|
import numpy as np
|
|
import numpy.testing as npt
|
|
from caffe2.python import core, layer_model_instantiator, regularizer, schema, workspace
|
|
from caffe2.python.layer_test_util import LayersTestCase
|
|
from caffe2.python.optimizer import SgdOptimizer
|
|
from caffe2.python.regularizer import L1Norm, RegularizationBy
|
|
from caffe2.python.regularizer_context import RegularizerContext, UseRegularizer
|
|
from hypothesis import given
|
|
|
|
|
|
class TestRegularizerContext(LayersTestCase):
|
|
@given(X=hu.arrays(dims=[2, 5]))
|
|
def test_regularizer_context(self, X):
|
|
weight_reg_out = L1Norm(0.2)
|
|
bias_reg_out = L1Norm(0)
|
|
regularizers = {"WEIGHT": weight_reg_out, "BIAS": bias_reg_out}
|
|
|
|
output_dims = 2
|
|
input_record = self.new_record(schema.Scalar((np.float32, (5,))))
|
|
schema.FeedRecord(input_record, [X])
|
|
|
|
with UseRegularizer(regularizers):
|
|
weight_reg = RegularizerContext.current().get_regularizer("WEIGHT")
|
|
bias_reg = RegularizerContext.current().get_regularizer("BIAS")
|
|
optim = SgdOptimizer(0.15)
|
|
|
|
assert (
|
|
weight_reg == weight_reg_out
|
|
), "fail to get correct weight reg from context"
|
|
assert bias_reg == bias_reg_out, "fail to get correct bias reg from context"
|
|
fc_output = self.model.FC(
|
|
input_record,
|
|
output_dims,
|
|
weight_optim=optim,
|
|
bias_optim=optim,
|
|
weight_reg=weight_reg,
|
|
bias_reg=bias_reg,
|
|
)
|
|
# model.output_schema has to a struct
|
|
self.model.output_schema = schema.Struct(("fc_output", fc_output))
|
|
|
|
self.assertEqual(schema.Scalar((np.float32, (output_dims,))), fc_output)
|
|
|
|
_, train_net = layer_model_instantiator.generate_training_nets(self.model)
|
|
ops = train_net.Proto().op
|
|
ops_type_list = [ops[i].type for i in range(len(ops))]
|
|
assert ops_type_list.count("LpNorm") == 2
|
|
assert ops_type_list.count("Scale") == 4
|
|
assert ops_type_list.count("LpNormGradient") == 2
|
|
|
|
|
|
class TestRegularizer(LayersTestCase):
|
|
@given(X=hu.arrays(dims=[2, 5], elements=hu.floats(min_value=-1.0, max_value=1.0)))
|
|
def test_log_barrier(self, X):
|
|
param = core.BlobReference("X")
|
|
workspace.FeedBlob(param, X)
|
|
train_init_net, train_net = self.get_training_nets()
|
|
reg = regularizer.LogBarrier(1.0)
|
|
output = reg(train_net, train_init_net, param, by=RegularizationBy.ON_LOSS)
|
|
reg(
|
|
train_net,
|
|
train_init_net,
|
|
param,
|
|
grad=None,
|
|
by=RegularizationBy.AFTER_OPTIMIZER,
|
|
)
|
|
workspace.RunNetOnce(train_init_net)
|
|
workspace.RunNetOnce(train_net)
|
|
|
|
def ref(X):
|
|
return (
|
|
np.array(np.sum(-np.log(np.clip(X, 1e-9, None))) * 0.5).astype(
|
|
np.float32
|
|
),
|
|
np.clip(X, 1e-9, None),
|
|
)
|
|
|
|
for x, y in zip(workspace.FetchBlobs([output, param]), ref(X)):
|
|
npt.assert_allclose(x, y, rtol=1e-3)
|
|
|
|
@given(
|
|
X=hu.arrays(dims=[2, 5], elements=hu.floats(min_value=-1.0, max_value=1.0)),
|
|
left_open=st.booleans(),
|
|
right_open=st.booleans(),
|
|
eps=hu.floats(min_value=1e-6, max_value=1e-4),
|
|
ub=hu.floats(min_value=-1.0, max_value=1.0),
|
|
lb=hu.floats(min_value=-1.0, max_value=1.0),
|
|
**hu.gcs_cpu_only
|
|
)
|
|
def test_bounded_grad_proj(self, X, left_open, right_open, eps, ub, lb, gc, dc):
|
|
if ub - (eps if right_open else 0.) < lb + (eps if left_open else 0.):
|
|
return
|
|
param = core.BlobReference("X")
|
|
workspace.FeedBlob(param, X)
|
|
train_init_net, train_net = self.get_training_nets()
|
|
reg = regularizer.BoundedGradientProjection(
|
|
lb=lb, ub=ub, left_open=left_open, right_open=right_open, epsilon=eps
|
|
)
|
|
output = reg(train_net, train_init_net, param, by=RegularizationBy.ON_LOSS)
|
|
reg(
|
|
train_net,
|
|
train_init_net,
|
|
param,
|
|
grad=None,
|
|
by=RegularizationBy.AFTER_OPTIMIZER,
|
|
)
|
|
workspace.RunNetOnce(train_init_net)
|
|
workspace.RunNetOnce(train_net)
|
|
|
|
def ref(X):
|
|
return np.clip(
|
|
X, lb + (eps if left_open else 0.), ub - (eps if right_open else 0.)
|
|
)
|
|
|
|
assert output is None
|
|
npt.assert_allclose(workspace.blobs[param], ref(X), atol=1e-7)
|
|
|
|
@given(
|
|
output_dim=st.integers(1, 10),
|
|
input_num=st.integers(3, 30),
|
|
reg_weight=st.integers(0, 10)
|
|
)
|
|
def test_group_l1_norm(self, output_dim, input_num, reg_weight):
|
|
"""
|
|
1. create a weight blob
|
|
2. create random group splits
|
|
3. run group_l1_nrom with the weight blob
|
|
4. run equivalent np operations to calculate group l1 norm
|
|
5. compare if the results from 3 and 4 are equal
|
|
"""
|
|
def compare_reference(weight, group_boundaries, reg_lambda, output):
|
|
group_splits = np.hsplit(weight, group_boundaries[1:-1])
|
|
l2_reg = np.sqrt([np.sum(np.square(g)) for g in group_splits])
|
|
l2_normalized = np.multiply(l2_reg,
|
|
np.array([np.sqrt(g.shape[1]) for g in group_splits]))
|
|
result = np.multiply(np.sum(l2_normalized), reg_lambda)
|
|
npt.assert_almost_equal(result, workspace.blobs[output], decimal=2)
|
|
|
|
weight = np.random.rand(output_dim, input_num).astype(np.float32)
|
|
|
|
feature_num = np.random.randint(low=1, high=input_num - 1)
|
|
group_boundaries = [0]
|
|
group_boundaries = np.append(
|
|
group_boundaries,
|
|
np.sort(
|
|
np.random.choice(range(1, input_num - 1), feature_num, replace=False)
|
|
),
|
|
)
|
|
group_boundaries = np.append(group_boundaries, [input_num])
|
|
split_info = np.diff(group_boundaries)
|
|
|
|
weight_blob = core.BlobReference("weight_blob")
|
|
workspace.FeedBlob(weight_blob, weight)
|
|
|
|
train_init_net, train_net = self.get_training_nets()
|
|
reg = regularizer.GroupL1Norm(reg_weight * 0.1, split_info.tolist())
|
|
output = reg(
|
|
train_net, train_init_net, weight_blob, by=RegularizationBy.ON_LOSS
|
|
)
|
|
|
|
workspace.RunNetOnce(train_init_net)
|
|
workspace.RunNetOnce(train_net)
|
|
compare_reference(weight, group_boundaries, reg_weight * 0.1, output)
|
|
|
|
@given(
|
|
param_dim=st.integers(10, 30),
|
|
k=st.integers(5, 9),
|
|
reg_weight=st.integers(0, 10)
|
|
)
|
|
def test_l1_norm_trimmed(self, param_dim, k, reg_weight):
|
|
weight = np.random.rand(param_dim).astype(np.float32)
|
|
weight_blob = core.BlobReference("weight_blob")
|
|
workspace.FeedBlob(weight_blob, weight)
|
|
|
|
train_init_net, train_net = self.get_training_nets()
|
|
reg = regularizer.L1NormTrimmed(reg_weight * 0.1, k)
|
|
output = reg(
|
|
train_net, train_init_net, weight_blob, by=RegularizationBy.ON_LOSS
|
|
)
|
|
|
|
workspace.RunNetOnce(train_init_net)
|
|
workspace.RunNetOnce(train_net)
|
|
result = np.sum(np.sort(np.absolute(weight))[:(param_dim - k)]) * reg_weight * 0.1
|
|
npt.assert_almost_equal(result, workspace.blobs[output], decimal=2)
|
|
|
|
@given(
|
|
param_dim=st.integers(10, 30),
|
|
k=st.integers(5, 9),
|
|
l1=st.integers(0, 10),
|
|
l2=st.integers(0, 10)
|
|
)
|
|
def test_elastic_l1_norm_trimmed(self, param_dim, k, l1, l2):
|
|
weight = np.random.rand(param_dim).astype(np.float32)
|
|
weight_blob = core.BlobReference("weight_blob")
|
|
workspace.FeedBlob(weight_blob, weight)
|
|
|
|
train_init_net, train_net = self.get_training_nets()
|
|
reg = regularizer.ElasticNetL1NormTrimmed(l1 * 0.1, l2 * 0.1, k)
|
|
output = reg(
|
|
train_net, train_init_net, weight_blob, by=RegularizationBy.ON_LOSS
|
|
)
|
|
|
|
workspace.RunNetOnce(train_init_net)
|
|
workspace.RunNetOnce(train_net)
|
|
l1_norm = np.sum(np.sort(np.absolute(weight))[:(param_dim - k)])
|
|
l2_norm = np.sum(np.square(weight))
|
|
result = l1_norm * l1 * 0.1 + l2_norm * l2 * 0.1
|
|
npt.assert_almost_equal(result, workspace.blobs[output], decimal=2)
|