297 lines
8.7 KiB
Python
297 lines
8.7 KiB
Python
|
#
|
||
|
# This file is part of pyasn1 software.
|
||
|
#
|
||
|
# Copyright (c) 2005-2018, Ilya Etingof <etingof@gmail.com>
|
||
|
# License: http://snmplabs.com/pyasn1/license.html
|
||
|
#
|
||
|
from pyasn1 import error
|
||
|
from pyasn1.codec.ber import encoder
|
||
|
from pyasn1.compat.octets import str2octs, null
|
||
|
from pyasn1.type import univ
|
||
|
from pyasn1.type import useful
|
||
|
|
||
|
__all__ = ['encode']
|
||
|
|
||
|
|
||
|
class BooleanEncoder(encoder.IntegerEncoder):
|
||
|
def encodeValue(self, value, asn1Spec, encodeFun, **options):
|
||
|
if value == 0:
|
||
|
substrate = (0,)
|
||
|
else:
|
||
|
substrate = (255,)
|
||
|
return substrate, False, False
|
||
|
|
||
|
|
||
|
class RealEncoder(encoder.RealEncoder):
|
||
|
def _chooseEncBase(self, value):
|
||
|
m, b, e = value
|
||
|
return self._dropFloatingPoint(m, b, e)
|
||
|
|
||
|
|
||
|
# specialized GeneralStringEncoder here
|
||
|
|
||
|
class TimeEncoderMixIn(object):
|
||
|
zchar, = str2octs('Z')
|
||
|
pluschar, = str2octs('+')
|
||
|
minuschar, = str2octs('-')
|
||
|
commachar, = str2octs(',')
|
||
|
minLength = 12
|
||
|
maxLength = 19
|
||
|
|
||
|
def encodeValue(self, value, asn1Spec, encodeFun, **options):
|
||
|
# Encoding constraints:
|
||
|
# - minutes are mandatory, seconds are optional
|
||
|
# - subseconds must NOT be zero
|
||
|
# - no hanging fraction dot
|
||
|
# - time in UTC (Z)
|
||
|
# - only dot is allowed for fractions
|
||
|
|
||
|
if asn1Spec is not None:
|
||
|
value = asn1Spec.clone(value)
|
||
|
|
||
|
octets = value.asOctets()
|
||
|
|
||
|
if not self.minLength < len(octets) < self.maxLength:
|
||
|
raise error.PyAsn1Error('Length constraint violated: %r' % value)
|
||
|
|
||
|
if self.pluschar in octets or self.minuschar in octets:
|
||
|
raise error.PyAsn1Error('Must be UTC time: %r' % octets)
|
||
|
|
||
|
if octets[-1] != self.zchar:
|
||
|
raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % octets)
|
||
|
|
||
|
if self.commachar in octets:
|
||
|
raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value)
|
||
|
|
||
|
options.update(maxChunkSize=1000)
|
||
|
|
||
|
return encoder.OctetStringEncoder.encodeValue(
|
||
|
self, value, asn1Spec, encodeFun, **options
|
||
|
)
|
||
|
|
||
|
|
||
|
class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
|
||
|
minLength = 12
|
||
|
maxLength = 19
|
||
|
|
||
|
|
||
|
class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
|
||
|
minLength = 10
|
||
|
maxLength = 14
|
||
|
|
||
|
|
||
|
class SetEncoder(encoder.SequenceEncoder):
|
||
|
@staticmethod
|
||
|
def _componentSortKey(componentAndType):
|
||
|
"""Sort SET components by tag
|
||
|
|
||
|
Sort regardless of the Choice value (static sort)
|
||
|
"""
|
||
|
component, asn1Spec = componentAndType
|
||
|
|
||
|
if asn1Spec is None:
|
||
|
asn1Spec = component
|
||
|
|
||
|
if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet:
|
||
|
if asn1Spec.tagSet:
|
||
|
return asn1Spec.tagSet
|
||
|
else:
|
||
|
return asn1Spec.componentType.minTagSet
|
||
|
else:
|
||
|
return asn1Spec.tagSet
|
||
|
|
||
|
def encodeValue(self, value, asn1Spec, encodeFun, **options):
|
||
|
|
||
|
substrate = null
|
||
|
|
||
|
comps = []
|
||
|
compsMap = {}
|
||
|
|
||
|
if asn1Spec is None:
|
||
|
# instance of ASN.1 schema
|
||
|
value.verifySizeSpec()
|
||
|
|
||
|
namedTypes = value.componentType
|
||
|
|
||
|
for idx, component in enumerate(value.values()):
|
||
|
if namedTypes:
|
||
|
namedType = namedTypes[idx]
|
||
|
|
||
|
if namedType.isOptional and not component.isValue:
|
||
|
continue
|
||
|
|
||
|
if namedType.isDefaulted and component == namedType.asn1Object:
|
||
|
continue
|
||
|
|
||
|
compsMap[id(component)] = namedType
|
||
|
|
||
|
else:
|
||
|
compsMap[id(component)] = None
|
||
|
|
||
|
comps.append((component, asn1Spec))
|
||
|
|
||
|
else:
|
||
|
# bare Python value + ASN.1 schema
|
||
|
for idx, namedType in enumerate(asn1Spec.componentType.namedTypes):
|
||
|
|
||
|
try:
|
||
|
component = value[namedType.name]
|
||
|
|
||
|
except KeyError:
|
||
|
raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value))
|
||
|
|
||
|
if namedType.isOptional and namedType.name not in value:
|
||
|
continue
|
||
|
|
||
|
if namedType.isDefaulted and component == namedType.asn1Object:
|
||
|
continue
|
||
|
|
||
|
compsMap[id(component)] = namedType
|
||
|
comps.append((component, asn1Spec[idx]))
|
||
|
|
||
|
for comp, compType in sorted(comps, key=self._componentSortKey):
|
||
|
namedType = compsMap[id(comp)]
|
||
|
|
||
|
if namedType:
|
||
|
options.update(ifNotEmpty=namedType.isOptional)
|
||
|
|
||
|
chunk = encodeFun(comp, compType, **options)
|
||
|
|
||
|
# wrap open type blob if needed
|
||
|
if namedType and namedType.openType:
|
||
|
wrapType = namedType.asn1Object
|
||
|
if wrapType.tagSet and not wrapType.isSameTypeWith(comp):
|
||
|
chunk = encodeFun(chunk, wrapType, **options)
|
||
|
|
||
|
substrate += chunk
|
||
|
|
||
|
return substrate, True, True
|
||
|
|
||
|
|
||
|
class SetOfEncoder(encoder.SequenceOfEncoder):
|
||
|
def encodeValue(self, value, asn1Spec, encodeFun, **options):
|
||
|
if asn1Spec is None:
|
||
|
value.verifySizeSpec()
|
||
|
else:
|
||
|
asn1Spec = asn1Spec.componentType
|
||
|
|
||
|
components = [encodeFun(x, asn1Spec, **options)
|
||
|
for x in value]
|
||
|
|
||
|
# sort by serialised and padded components
|
||
|
if len(components) > 1:
|
||
|
zero = str2octs('\x00')
|
||
|
maxLen = max(map(len, components))
|
||
|
paddedComponents = [
|
||
|
(x.ljust(maxLen, zero), x) for x in components
|
||
|
]
|
||
|
paddedComponents.sort(key=lambda x: x[0])
|
||
|
|
||
|
components = [x[1] for x in paddedComponents]
|
||
|
|
||
|
substrate = null.join(components)
|
||
|
|
||
|
return substrate, True, True
|
||
|
|
||
|
|
||
|
class SequenceEncoder(encoder.SequenceEncoder):
|
||
|
omitEmptyOptionals = True
|
||
|
|
||
|
|
||
|
class SequenceOfEncoder(encoder.SequenceOfEncoder):
|
||
|
def encodeValue(self, value, asn1Spec, encodeFun, **options):
|
||
|
|
||
|
if options.get('ifNotEmpty', False) and not len(value):
|
||
|
return null, True, True
|
||
|
|
||
|
if asn1Spec is None:
|
||
|
value.verifySizeSpec()
|
||
|
else:
|
||
|
asn1Spec = asn1Spec.componentType
|
||
|
|
||
|
substrate = null
|
||
|
|
||
|
for idx, component in enumerate(value):
|
||
|
substrate += encodeFun(value[idx], asn1Spec, **options)
|
||
|
|
||
|
return substrate, True, True
|
||
|
|
||
|
|
||
|
tagMap = encoder.tagMap.copy()
|
||
|
tagMap.update({
|
||
|
univ.Boolean.tagSet: BooleanEncoder(),
|
||
|
univ.Real.tagSet: RealEncoder(),
|
||
|
useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(),
|
||
|
useful.UTCTime.tagSet: UTCTimeEncoder(),
|
||
|
# Sequence & Set have same tags as SequenceOf & SetOf
|
||
|
univ.SetOf.tagSet: SetOfEncoder(),
|
||
|
univ.Sequence.typeId: SequenceEncoder()
|
||
|
})
|
||
|
|
||
|
typeMap = encoder.typeMap.copy()
|
||
|
typeMap.update({
|
||
|
univ.Boolean.typeId: BooleanEncoder(),
|
||
|
univ.Real.typeId: RealEncoder(),
|
||
|
useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(),
|
||
|
useful.UTCTime.typeId: UTCTimeEncoder(),
|
||
|
# Sequence & Set have same tags as SequenceOf & SetOf
|
||
|
univ.Set.typeId: SetEncoder(),
|
||
|
univ.SetOf.typeId: SetOfEncoder(),
|
||
|
univ.Sequence.typeId: SequenceEncoder(),
|
||
|
univ.SequenceOf.typeId: SequenceOfEncoder()
|
||
|
})
|
||
|
|
||
|
|
||
|
class Encoder(encoder.Encoder):
|
||
|
fixedDefLengthMode = False
|
||
|
fixedChunkSize = 1000
|
||
|
|
||
|
#: Turns ASN.1 object into CER octet stream.
|
||
|
#:
|
||
|
#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
|
||
|
#: walks all its components recursively and produces a CER octet stream.
|
||
|
#:
|
||
|
#: Parameters
|
||
|
#: ----------
|
||
|
#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
|
||
|
#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec`
|
||
|
#: parameter is required to guide the encoding process.
|
||
|
#:
|
||
|
#: Keyword Args
|
||
|
#: ------------
|
||
|
#: asn1Spec:
|
||
|
#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
|
||
|
#:
|
||
|
#: Returns
|
||
|
#: -------
|
||
|
#: : :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
|
||
|
#: Given ASN.1 object encoded into BER octet-stream
|
||
|
#:
|
||
|
#: Raises
|
||
|
#: ------
|
||
|
#: :py:class:`~pyasn1.error.PyAsn1Error`
|
||
|
#: On encoding errors
|
||
|
#:
|
||
|
#: Examples
|
||
|
#: --------
|
||
|
#: Encode Python value into CER with ASN.1 schema
|
||
|
#:
|
||
|
#: .. code-block:: pycon
|
||
|
#:
|
||
|
#: >>> seq = SequenceOf(componentType=Integer())
|
||
|
#: >>> encode([1, 2, 3], asn1Spec=seq)
|
||
|
#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
|
||
|
#:
|
||
|
#: Encode ASN.1 value object into CER
|
||
|
#:
|
||
|
#: .. code-block:: pycon
|
||
|
#:
|
||
|
#: >>> seq = SequenceOf(componentType=Integer())
|
||
|
#: >>> seq.extend([1, 2, 3])
|
||
|
#: >>> encode(seq)
|
||
|
#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
|
||
|
#:
|
||
|
encode = Encoder(tagMap, typeMap)
|
||
|
|
||
|
# EncoderFactory queries class instance and builds a map of tags -> encoders
|