430 lines
14 KiB
Python
430 lines
14 KiB
Python
# @package utils
|
|
# Module caffe2.python.utils
|
|
|
|
|
|
|
|
|
|
|
|
from caffe2.proto import caffe2_pb2
|
|
from future.utils import viewitems
|
|
from google.protobuf.message import DecodeError, Message
|
|
from google.protobuf import text_format
|
|
|
|
import sys
|
|
import collections
|
|
import copy
|
|
import functools
|
|
import numpy as np
|
|
from six import integer_types, binary_type, text_type, string_types
|
|
|
|
OPTIMIZER_ITERATION_NAME = "optimizer_iteration"
|
|
ITERATION_MUTEX_NAME = "iteration_mutex"
|
|
|
|
|
|
def OpAlmostEqual(op_a, op_b, ignore_fields=None):
|
|
'''
|
|
Two ops are identical except for each field in the `ignore_fields`.
|
|
'''
|
|
ignore_fields = ignore_fields or []
|
|
if not isinstance(ignore_fields, list):
|
|
ignore_fields = [ignore_fields]
|
|
|
|
assert all(isinstance(f, text_type) for f in ignore_fields), (
|
|
'Expect each field is text type, but got {}'.format(ignore_fields))
|
|
|
|
def clean_op(op):
|
|
op = copy.deepcopy(op)
|
|
for field in ignore_fields:
|
|
if op.HasField(field):
|
|
op.ClearField(field)
|
|
return op
|
|
|
|
op_a = clean_op(op_a)
|
|
op_b = clean_op(op_b)
|
|
return op_a == op_b or str(op_a) == str(op_b)
|
|
|
|
|
|
def CaffeBlobToNumpyArray(blob):
|
|
if (blob.num != 0):
|
|
# old style caffe blob.
|
|
return (np.asarray(blob.data, dtype=np.float32)
|
|
.reshape(blob.num, blob.channels, blob.height, blob.width))
|
|
else:
|
|
# new style caffe blob.
|
|
return (np.asarray(blob.data, dtype=np.float32)
|
|
.reshape(blob.shape.dim))
|
|
|
|
|
|
def Caffe2TensorToNumpyArray(tensor):
|
|
if tensor.data_type == caffe2_pb2.TensorProto.FLOAT:
|
|
return np.asarray(
|
|
tensor.float_data, dtype=np.float32).reshape(tensor.dims)
|
|
elif tensor.data_type == caffe2_pb2.TensorProto.DOUBLE:
|
|
return np.asarray(
|
|
tensor.double_data, dtype=np.float64).reshape(tensor.dims)
|
|
elif tensor.data_type == caffe2_pb2.TensorProto.INT64:
|
|
return np.asarray(
|
|
tensor.int64_data, dtype=np.int64).reshape(tensor.dims)
|
|
elif tensor.data_type == caffe2_pb2.TensorProto.INT32:
|
|
return np.asarray(
|
|
tensor.int32_data, dtype=np.int).reshape(tensor.dims) # pb.INT32=>np.int use int32_data
|
|
elif tensor.data_type == caffe2_pb2.TensorProto.INT16:
|
|
return np.asarray(
|
|
tensor.int32_data, dtype=np.int16).reshape(tensor.dims) # pb.INT16=>np.int16 use int32_data
|
|
elif tensor.data_type == caffe2_pb2.TensorProto.UINT16:
|
|
return np.asarray(
|
|
tensor.int32_data, dtype=np.uint16).reshape(tensor.dims) # pb.UINT16=>np.uint16 use int32_data
|
|
elif tensor.data_type == caffe2_pb2.TensorProto.INT8:
|
|
return np.asarray(
|
|
tensor.int32_data, dtype=np.int8).reshape(tensor.dims) # pb.INT8=>np.int8 use int32_data
|
|
elif tensor.data_type == caffe2_pb2.TensorProto.UINT8:
|
|
return np.asarray(
|
|
tensor.int32_data, dtype=np.uint8).reshape(tensor.dims) # pb.UINT8=>np.uint8 use int32_data
|
|
else:
|
|
# TODO: complete the data type: bool, float16, byte, int64, string
|
|
raise RuntimeError(
|
|
"Tensor data type not supported yet: " + str(tensor.data_type))
|
|
|
|
|
|
def NumpyArrayToCaffe2Tensor(arr, name=None):
|
|
tensor = caffe2_pb2.TensorProto()
|
|
tensor.dims.extend(arr.shape)
|
|
if name:
|
|
tensor.name = name
|
|
if arr.dtype == np.float32:
|
|
tensor.data_type = caffe2_pb2.TensorProto.FLOAT
|
|
tensor.float_data.extend(list(arr.flatten().astype(float)))
|
|
elif arr.dtype == np.float64:
|
|
tensor.data_type = caffe2_pb2.TensorProto.DOUBLE
|
|
tensor.double_data.extend(list(arr.flatten().astype(np.float64)))
|
|
elif arr.dtype == np.int64:
|
|
tensor.data_type = caffe2_pb2.TensorProto.INT64
|
|
tensor.int64_data.extend(list(arr.flatten().astype(np.int64)))
|
|
elif arr.dtype == np.int or arr.dtype == np.int32:
|
|
tensor.data_type = caffe2_pb2.TensorProto.INT32
|
|
tensor.int32_data.extend(arr.flatten().astype(np.int).tolist())
|
|
elif arr.dtype == np.int16:
|
|
tensor.data_type = caffe2_pb2.TensorProto.INT16
|
|
tensor.int32_data.extend(list(arr.flatten().astype(np.int16))) # np.int16=>pb.INT16 use int32_data
|
|
elif arr.dtype == np.uint16:
|
|
tensor.data_type = caffe2_pb2.TensorProto.UINT16
|
|
tensor.int32_data.extend(list(arr.flatten().astype(np.uint16))) # np.uint16=>pb.UNIT16 use int32_data
|
|
elif arr.dtype == np.int8:
|
|
tensor.data_type = caffe2_pb2.TensorProto.INT8
|
|
tensor.int32_data.extend(list(arr.flatten().astype(np.int8))) # np.int8=>pb.INT8 use int32_data
|
|
elif arr.dtype == np.uint8:
|
|
tensor.data_type = caffe2_pb2.TensorProto.UINT8
|
|
tensor.int32_data.extend(list(arr.flatten().astype(np.uint8))) # np.uint8=>pb.UNIT8 use int32_data
|
|
else:
|
|
# TODO: complete the data type: bool, float16, byte, string
|
|
raise RuntimeError(
|
|
"Numpy data type not supported yet: " + str(arr.dtype))
|
|
return tensor
|
|
|
|
|
|
def MakeArgument(key, value):
|
|
"""Makes an argument based on the value type."""
|
|
argument = caffe2_pb2.Argument()
|
|
argument.name = key
|
|
iterable = isinstance(value, collections.abc.Iterable)
|
|
|
|
# Fast tracking common use case where a float32 array of tensor parameters
|
|
# needs to be serialized. The entire array is guaranteed to have the same
|
|
# dtype, so no per-element checking necessary and no need to convert each
|
|
# element separately.
|
|
if isinstance(value, np.ndarray) and value.dtype.type is np.float32:
|
|
argument.floats.extend(value.flatten().tolist())
|
|
return argument
|
|
|
|
if isinstance(value, np.ndarray):
|
|
value = value.flatten().tolist()
|
|
elif isinstance(value, np.generic):
|
|
# convert numpy scalar to native python type
|
|
value = np.asscalar(value)
|
|
|
|
if type(value) is float:
|
|
argument.f = value
|
|
elif type(value) in integer_types or type(value) is bool:
|
|
# We make a relaxation that a boolean variable will also be stored as
|
|
# int.
|
|
argument.i = value
|
|
elif isinstance(value, binary_type):
|
|
argument.s = value
|
|
elif isinstance(value, text_type):
|
|
argument.s = value.encode('utf-8')
|
|
elif isinstance(value, caffe2_pb2.NetDef):
|
|
argument.n.CopyFrom(value)
|
|
elif isinstance(value, Message):
|
|
argument.s = value.SerializeToString()
|
|
elif iterable and all(type(v) in [float, np.float_] for v in value):
|
|
argument.floats.extend(
|
|
v.item() if type(v) is np.float_ else v for v in value
|
|
)
|
|
elif iterable and all(
|
|
type(v) in integer_types or type(v) in [bool, np.int_] for v in value
|
|
):
|
|
argument.ints.extend(
|
|
v.item() if type(v) is np.int_ else v for v in value
|
|
)
|
|
elif iterable and all(
|
|
isinstance(v, binary_type) or isinstance(v, text_type) for v in value
|
|
):
|
|
argument.strings.extend(
|
|
v.encode('utf-8') if isinstance(v, text_type) else v
|
|
for v in value
|
|
)
|
|
elif iterable and all(isinstance(v, caffe2_pb2.NetDef) for v in value):
|
|
argument.nets.extend(value)
|
|
elif iterable and all(isinstance(v, Message) for v in value):
|
|
argument.strings.extend(v.SerializeToString() for v in value)
|
|
else:
|
|
if iterable:
|
|
raise ValueError(
|
|
"Unknown iterable argument type: key={} value={}, value "
|
|
"type={}[{}]".format(
|
|
key, value, type(value), set(type(v) for v in value)
|
|
)
|
|
)
|
|
else:
|
|
raise ValueError(
|
|
"Unknown argument type: key={} value={}, value type={}".format(
|
|
key, value, type(value)
|
|
)
|
|
)
|
|
return argument
|
|
|
|
|
|
def TryReadProtoWithClass(cls, s):
|
|
"""Reads a protobuffer with the given proto class.
|
|
|
|
Inputs:
|
|
cls: a protobuffer class.
|
|
s: a string of either binary or text protobuffer content.
|
|
|
|
Outputs:
|
|
proto: the protobuffer of cls
|
|
|
|
Throws:
|
|
google.protobuf.message.DecodeError: if we cannot decode the message.
|
|
"""
|
|
obj = cls()
|
|
try:
|
|
text_format.Parse(s, obj)
|
|
return obj
|
|
except (text_format.ParseError, UnicodeDecodeError):
|
|
obj.ParseFromString(s)
|
|
return obj
|
|
|
|
|
|
def GetContentFromProto(obj, function_map):
|
|
"""Gets a specific field from a protocol buffer that matches the given class
|
|
"""
|
|
for cls, func in viewitems(function_map):
|
|
if type(obj) is cls:
|
|
return func(obj)
|
|
|
|
|
|
def GetContentFromProtoString(s, function_map):
|
|
for cls, func in viewitems(function_map):
|
|
try:
|
|
obj = TryReadProtoWithClass(cls, s)
|
|
return func(obj)
|
|
except DecodeError:
|
|
continue
|
|
else:
|
|
raise DecodeError("Cannot find a fit protobuffer class.")
|
|
|
|
|
|
def ConvertProtoToBinary(proto_class, filename, out_filename):
|
|
"""Convert a text file of the given protobuf class to binary."""
|
|
with open(filename) as f:
|
|
proto = TryReadProtoWithClass(proto_class, f.read())
|
|
with open(out_filename, 'w') as fid:
|
|
fid.write(proto.SerializeToString())
|
|
|
|
|
|
def GetGPUMemoryUsageStats():
|
|
"""Get GPU memory usage stats from CUDAContext/HIPContext. This requires flag
|
|
--caffe2_gpu_memory_tracking to be enabled"""
|
|
from caffe2.python import workspace, core
|
|
workspace.RunOperatorOnce(
|
|
core.CreateOperator(
|
|
"GetGPUMemoryUsage",
|
|
[],
|
|
["____mem____"],
|
|
device_option=core.DeviceOption(workspace.GpuDeviceType, 0),
|
|
),
|
|
)
|
|
b = workspace.FetchBlob("____mem____")
|
|
return {
|
|
'total_by_gpu': b[0, :],
|
|
'max_by_gpu': b[1, :],
|
|
'total': np.sum(b[0, :]),
|
|
'max_total': np.sum(b[1, :])
|
|
}
|
|
|
|
|
|
def ResetBlobs(blobs):
|
|
from caffe2.python import workspace, core
|
|
workspace.RunOperatorOnce(
|
|
core.CreateOperator(
|
|
"Free",
|
|
list(blobs),
|
|
list(blobs),
|
|
device_option=core.DeviceOption(caffe2_pb2.CPU),
|
|
),
|
|
)
|
|
|
|
|
|
class DebugMode(object):
|
|
'''
|
|
This class allows to drop you into an interactive debugger
|
|
if there is an unhandled exception in your python script
|
|
|
|
Example of usage:
|
|
|
|
def main():
|
|
# your code here
|
|
pass
|
|
|
|
if __name__ == '__main__':
|
|
from caffe2.python.utils import DebugMode
|
|
DebugMode.run(main)
|
|
'''
|
|
|
|
@classmethod
|
|
def run(cls, func):
|
|
try:
|
|
return func()
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except Exception:
|
|
import pdb
|
|
|
|
print(
|
|
'Entering interactive debugger. Type "bt" to print '
|
|
'the full stacktrace. Type "help" to see command listing.')
|
|
print(sys.exc_info()[1])
|
|
print
|
|
|
|
pdb.post_mortem()
|
|
sys.exit(1)
|
|
raise
|
|
|
|
|
|
def raiseIfNotEqual(a, b, msg):
|
|
if a != b:
|
|
raise Exception("{}. {} != {}".format(msg, a, b))
|
|
|
|
|
|
def debug(f):
|
|
'''
|
|
Use this method to decorate your function with DebugMode's functionality
|
|
|
|
Example:
|
|
|
|
@debug
|
|
def test_foo(self):
|
|
raise Exception("Bar")
|
|
|
|
'''
|
|
|
|
@functools.wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
def func():
|
|
return f(*args, **kwargs)
|
|
return DebugMode.run(func)
|
|
|
|
return wrapper
|
|
|
|
|
|
def BuildUniqueMutexIter(
|
|
init_net,
|
|
net,
|
|
iter=None,
|
|
iter_mutex=None,
|
|
iter_val=0
|
|
):
|
|
'''
|
|
Often, a mutex guarded iteration counter is needed. This function creates a
|
|
mutex iter in the net uniquely (if the iter already existing, it does
|
|
nothing)
|
|
|
|
This function returns the iter blob
|
|
'''
|
|
iter = iter if iter is not None else OPTIMIZER_ITERATION_NAME
|
|
iter_mutex = iter_mutex if iter_mutex is not None else ITERATION_MUTEX_NAME
|
|
from caffe2.python import core
|
|
if not init_net.BlobIsDefined(iter):
|
|
# Add training operators.
|
|
with core.DeviceScope(
|
|
core.DeviceOption(caffe2_pb2.CPU,
|
|
extra_info=["device_type_override:cpu"])
|
|
):
|
|
iteration = init_net.ConstantFill(
|
|
[],
|
|
iter,
|
|
shape=[1],
|
|
value=iter_val,
|
|
dtype=core.DataType.INT64,
|
|
)
|
|
iter_mutex = init_net.CreateMutex([], [iter_mutex])
|
|
net.AtomicIter([iter_mutex, iteration], [iteration])
|
|
else:
|
|
iteration = init_net.GetBlobRef(iter)
|
|
return iteration
|
|
|
|
|
|
def EnumClassKeyVals(cls):
|
|
# cls can only be derived from object
|
|
assert type(cls) == type
|
|
# Enum attribute keys are all capitalized and values are strings
|
|
enum = {}
|
|
for k in dir(cls):
|
|
if k == k.upper():
|
|
v = getattr(cls, k)
|
|
if isinstance(v, string_types):
|
|
assert v not in enum.values(), (
|
|
"Failed to resolve {} as Enum: "
|
|
"duplicate entries {}={}, {}={}".format(
|
|
cls, k, v, [key for key in enum if enum[key] == v][0], v
|
|
)
|
|
)
|
|
enum[k] = v
|
|
return enum
|
|
|
|
|
|
def ArgsToDict(args):
|
|
"""
|
|
Convert a list of arguments to a name, value dictionary. Assumes that
|
|
each argument has a name. Otherwise, the argument is skipped.
|
|
"""
|
|
ans = {}
|
|
for arg in args:
|
|
if not arg.HasField("name"):
|
|
continue
|
|
for d in arg.DESCRIPTOR.fields:
|
|
if d.name == "name":
|
|
continue
|
|
if d.label == d.LABEL_OPTIONAL and arg.HasField(d.name):
|
|
ans[arg.name] = getattr(arg, d.name)
|
|
break
|
|
elif d.label == d.LABEL_REPEATED:
|
|
list_ = getattr(arg, d.name)
|
|
if len(list_) > 0:
|
|
ans[arg.name] = list_
|
|
break
|
|
else:
|
|
ans[arg.name] = None
|
|
return ans
|
|
|
|
|
|
def NHWC2NCHW(tensor):
|
|
assert tensor.ndim >= 1
|
|
return tensor.transpose((0, tensor.ndim - 1) + tuple(range(1, tensor.ndim - 1)))
|
|
|
|
|
|
def NCHW2NHWC(tensor):
|
|
assert tensor.ndim >= 2
|
|
return tensor.transpose((0,) + tuple(range(2, tensor.ndim)) + (1,))
|