* Vendor ruamel.yaml v0.17.21 * Add unit test for whitespace regression * Add an abstraction layer in Spack to wrap ruamel.yaml All YAML operations are routed through spack.util.spack_yaml The custom classes have been adapted to the new ruamel.yaml class hierarchy. Fixed line annotation issue in "spack config blame"
1157 lines
43 KiB
Python
1157 lines
43 KiB
Python
# coding: utf-8
|
|
|
|
from ruamel.yaml.error import * # NOQA
|
|
from ruamel.yaml.nodes import * # NOQA
|
|
from ruamel.yaml.compat import ordereddict
|
|
from ruamel.yaml.compat import _F, nprint, nprintf # NOQA
|
|
from ruamel.yaml.scalarstring import (
|
|
LiteralScalarString,
|
|
FoldedScalarString,
|
|
SingleQuotedScalarString,
|
|
DoubleQuotedScalarString,
|
|
PlainScalarString,
|
|
)
|
|
from ruamel.yaml.comments import (
|
|
CommentedMap,
|
|
CommentedOrderedMap,
|
|
CommentedSeq,
|
|
CommentedKeySeq,
|
|
CommentedKeyMap,
|
|
CommentedSet,
|
|
comment_attrib,
|
|
merge_attrib,
|
|
TaggedScalar,
|
|
)
|
|
from ruamel.yaml.scalarint import ScalarInt, BinaryInt, OctalInt, HexInt, HexCapsInt
|
|
from ruamel.yaml.scalarfloat import ScalarFloat
|
|
from ruamel.yaml.scalarbool import ScalarBoolean
|
|
from ruamel.yaml.timestamp import TimeStamp
|
|
from ruamel.yaml.anchor import Anchor
|
|
|
|
import datetime
|
|
import sys
|
|
import types
|
|
|
|
import copyreg
|
|
import base64
|
|
|
|
if False: # MYPY
|
|
from typing import Dict, List, Any, Union, Text, Optional # NOQA
|
|
|
|
# fmt: off
|
|
__all__ = ['BaseRepresenter', 'SafeRepresenter', 'Representer',
|
|
'RepresenterError', 'RoundTripRepresenter']
|
|
# fmt: on
|
|
|
|
|
|
class RepresenterError(YAMLError):
|
|
pass
|
|
|
|
|
|
class BaseRepresenter:
|
|
|
|
yaml_representers = {} # type: Dict[Any, Any]
|
|
yaml_multi_representers = {} # type: Dict[Any, Any]
|
|
|
|
def __init__(self, default_style=None, default_flow_style=None, dumper=None):
|
|
# type: (Any, Any, Any, Any) -> None
|
|
self.dumper = dumper
|
|
if self.dumper is not None:
|
|
self.dumper._representer = self
|
|
self.default_style = default_style
|
|
self.default_flow_style = default_flow_style
|
|
self.represented_objects = {} # type: Dict[Any, Any]
|
|
self.object_keeper = [] # type: List[Any]
|
|
self.alias_key = None # type: Optional[int]
|
|
self.sort_base_mapping_type_on_output = True
|
|
|
|
@property
|
|
def serializer(self):
|
|
# type: () -> Any
|
|
try:
|
|
if hasattr(self.dumper, 'typ'):
|
|
return self.dumper.serializer
|
|
return self.dumper._serializer
|
|
except AttributeError:
|
|
return self # cyaml
|
|
|
|
def represent(self, data):
|
|
# type: (Any) -> None
|
|
node = self.represent_data(data)
|
|
self.serializer.serialize(node)
|
|
self.represented_objects = {}
|
|
self.object_keeper = []
|
|
self.alias_key = None
|
|
|
|
def represent_data(self, data):
|
|
# type: (Any) -> Any
|
|
if self.ignore_aliases(data):
|
|
self.alias_key = None
|
|
else:
|
|
self.alias_key = id(data)
|
|
if self.alias_key is not None:
|
|
if self.alias_key in self.represented_objects:
|
|
node = self.represented_objects[self.alias_key]
|
|
# if node is None:
|
|
# raise RepresenterError(
|
|
# f"recursive objects are not allowed: {data!r}")
|
|
return node
|
|
# self.represented_objects[alias_key] = None
|
|
self.object_keeper.append(data)
|
|
data_types = type(data).__mro__
|
|
if data_types[0] in self.yaml_representers:
|
|
node = self.yaml_representers[data_types[0]](self, data)
|
|
else:
|
|
for data_type in data_types:
|
|
if data_type in self.yaml_multi_representers:
|
|
node = self.yaml_multi_representers[data_type](self, data)
|
|
break
|
|
else:
|
|
if None in self.yaml_multi_representers:
|
|
node = self.yaml_multi_representers[None](self, data)
|
|
elif None in self.yaml_representers:
|
|
node = self.yaml_representers[None](self, data)
|
|
else:
|
|
node = ScalarNode(None, str(data))
|
|
# if alias_key is not None:
|
|
# self.represented_objects[alias_key] = node
|
|
return node
|
|
|
|
def represent_key(self, data):
|
|
# type: (Any) -> Any
|
|
"""
|
|
David Fraser: Extract a method to represent keys in mappings, so that
|
|
a subclass can choose not to quote them (for example)
|
|
used in represent_mapping
|
|
https://bitbucket.org/davidfraser/pyyaml/commits/d81df6eb95f20cac4a79eed95ae553b5c6f77b8c
|
|
"""
|
|
return self.represent_data(data)
|
|
|
|
@classmethod
|
|
def add_representer(cls, data_type, representer):
|
|
# type: (Any, Any) -> None
|
|
if 'yaml_representers' not in cls.__dict__:
|
|
cls.yaml_representers = cls.yaml_representers.copy()
|
|
cls.yaml_representers[data_type] = representer
|
|
|
|
@classmethod
|
|
def add_multi_representer(cls, data_type, representer):
|
|
# type: (Any, Any) -> None
|
|
if 'yaml_multi_representers' not in cls.__dict__:
|
|
cls.yaml_multi_representers = cls.yaml_multi_representers.copy()
|
|
cls.yaml_multi_representers[data_type] = representer
|
|
|
|
def represent_scalar(self, tag, value, style=None, anchor=None):
|
|
# type: (Any, Any, Any, Any) -> Any
|
|
if style is None:
|
|
style = self.default_style
|
|
comment = None
|
|
if style and style[0] in '|>':
|
|
comment = getattr(value, 'comment', None)
|
|
if comment:
|
|
comment = [None, [comment]]
|
|
node = ScalarNode(tag, value, style=style, comment=comment, anchor=anchor)
|
|
if self.alias_key is not None:
|
|
self.represented_objects[self.alias_key] = node
|
|
return node
|
|
|
|
def represent_sequence(self, tag, sequence, flow_style=None):
|
|
# type: (Any, Any, Any) -> Any
|
|
value = [] # type: List[Any]
|
|
node = SequenceNode(tag, value, flow_style=flow_style)
|
|
if self.alias_key is not None:
|
|
self.represented_objects[self.alias_key] = node
|
|
best_style = True
|
|
for item in sequence:
|
|
node_item = self.represent_data(item)
|
|
if not (isinstance(node_item, ScalarNode) and not node_item.style):
|
|
best_style = False
|
|
value.append(node_item)
|
|
if flow_style is None:
|
|
if self.default_flow_style is not None:
|
|
node.flow_style = self.default_flow_style
|
|
else:
|
|
node.flow_style = best_style
|
|
return node
|
|
|
|
def represent_omap(self, tag, omap, flow_style=None):
|
|
# type: (Any, Any, Any) -> Any
|
|
value = [] # type: List[Any]
|
|
node = SequenceNode(tag, value, flow_style=flow_style)
|
|
if self.alias_key is not None:
|
|
self.represented_objects[self.alias_key] = node
|
|
best_style = True
|
|
for item_key in omap:
|
|
item_val = omap[item_key]
|
|
node_item = self.represent_data({item_key: item_val})
|
|
# if not (isinstance(node_item, ScalarNode) \
|
|
# and not node_item.style):
|
|
# best_style = False
|
|
value.append(node_item)
|
|
if flow_style is None:
|
|
if self.default_flow_style is not None:
|
|
node.flow_style = self.default_flow_style
|
|
else:
|
|
node.flow_style = best_style
|
|
return node
|
|
|
|
def represent_mapping(self, tag, mapping, flow_style=None):
|
|
# type: (Any, Any, Any) -> Any
|
|
value = [] # type: List[Any]
|
|
node = MappingNode(tag, value, flow_style=flow_style)
|
|
if self.alias_key is not None:
|
|
self.represented_objects[self.alias_key] = node
|
|
best_style = True
|
|
if hasattr(mapping, 'items'):
|
|
mapping = list(mapping.items())
|
|
if self.sort_base_mapping_type_on_output:
|
|
try:
|
|
mapping = sorted(mapping)
|
|
except TypeError:
|
|
pass
|
|
for item_key, item_value in mapping:
|
|
node_key = self.represent_key(item_key)
|
|
node_value = self.represent_data(item_value)
|
|
if not (isinstance(node_key, ScalarNode) and not node_key.style):
|
|
best_style = False
|
|
if not (isinstance(node_value, ScalarNode) and not node_value.style):
|
|
best_style = False
|
|
value.append((node_key, node_value))
|
|
if flow_style is None:
|
|
if self.default_flow_style is not None:
|
|
node.flow_style = self.default_flow_style
|
|
else:
|
|
node.flow_style = best_style
|
|
return node
|
|
|
|
def ignore_aliases(self, data):
|
|
# type: (Any) -> bool
|
|
return False
|
|
|
|
|
|
class SafeRepresenter(BaseRepresenter):
|
|
def ignore_aliases(self, data):
|
|
# type: (Any) -> bool
|
|
# https://docs.python.org/3/reference/expressions.html#parenthesized-forms :
|
|
# "i.e. two occurrences of the empty tuple may or may not yield the same object"
|
|
# so "data is ()" should not be used
|
|
if data is None or (isinstance(data, tuple) and data == ()):
|
|
return True
|
|
if isinstance(data, (bytes, str, bool, int, float)):
|
|
return True
|
|
return False
|
|
|
|
def represent_none(self, data):
|
|
# type: (Any) -> Any
|
|
return self.represent_scalar('tag:yaml.org,2002:null', 'null')
|
|
|
|
def represent_str(self, data):
|
|
# type: (Any) -> Any
|
|
return self.represent_scalar('tag:yaml.org,2002:str', data)
|
|
|
|
def represent_binary(self, data):
|
|
# type: (Any) -> Any
|
|
if hasattr(base64, 'encodebytes'):
|
|
data = base64.encodebytes(data).decode('ascii')
|
|
else:
|
|
# check py2 only?
|
|
data = base64.encodestring(data).decode('ascii') # type: ignore
|
|
return self.represent_scalar('tag:yaml.org,2002:binary', data, style='|')
|
|
|
|
def represent_bool(self, data, anchor=None):
|
|
# type: (Any, Optional[Any]) -> Any
|
|
try:
|
|
value = self.dumper.boolean_representation[bool(data)]
|
|
except AttributeError:
|
|
if data:
|
|
value = 'true'
|
|
else:
|
|
value = 'false'
|
|
return self.represent_scalar('tag:yaml.org,2002:bool', value, anchor=anchor)
|
|
|
|
def represent_int(self, data):
|
|
# type: (Any) -> Any
|
|
return self.represent_scalar('tag:yaml.org,2002:int', str(data))
|
|
|
|
inf_value = 1e300
|
|
while repr(inf_value) != repr(inf_value * inf_value):
|
|
inf_value *= inf_value
|
|
|
|
def represent_float(self, data):
|
|
# type: (Any) -> Any
|
|
if data != data or (data == 0.0 and data == 1.0):
|
|
value = '.nan'
|
|
elif data == self.inf_value:
|
|
value = '.inf'
|
|
elif data == -self.inf_value:
|
|
value = '-.inf'
|
|
else:
|
|
value = repr(data).lower()
|
|
if getattr(self.serializer, 'use_version', None) == (1, 1):
|
|
if '.' not in value and 'e' in value:
|
|
# Note that in some cases `repr(data)` represents a float number
|
|
# without the decimal parts. For instance:
|
|
# >>> repr(1e17)
|
|
# '1e17'
|
|
# Unfortunately, this is not a valid float representation according
|
|
# to the definition of the `!!float` tag in YAML 1.1. We fix
|
|
# this by adding '.0' before the 'e' symbol.
|
|
value = value.replace('e', '.0e', 1)
|
|
return self.represent_scalar('tag:yaml.org,2002:float', value)
|
|
|
|
def represent_list(self, data):
|
|
# type: (Any) -> Any
|
|
# pairs = (len(data) > 0 and isinstance(data, list))
|
|
# if pairs:
|
|
# for item in data:
|
|
# if not isinstance(item, tuple) or len(item) != 2:
|
|
# pairs = False
|
|
# break
|
|
# if not pairs:
|
|
return self.represent_sequence('tag:yaml.org,2002:seq', data)
|
|
|
|
# value = []
|
|
# for item_key, item_value in data:
|
|
# value.append(self.represent_mapping('tag:yaml.org,2002:map',
|
|
# [(item_key, item_value)]))
|
|
# return SequenceNode('tag:yaml.org,2002:pairs', value)
|
|
|
|
def represent_dict(self, data):
|
|
# type: (Any) -> Any
|
|
return self.represent_mapping('tag:yaml.org,2002:map', data)
|
|
|
|
def represent_ordereddict(self, data):
|
|
# type: (Any) -> Any
|
|
return self.represent_omap('tag:yaml.org,2002:omap', data)
|
|
|
|
def represent_set(self, data):
|
|
# type: (Any) -> Any
|
|
value = {} # type: Dict[Any, None]
|
|
for key in data:
|
|
value[key] = None
|
|
return self.represent_mapping('tag:yaml.org,2002:set', value)
|
|
|
|
def represent_date(self, data):
|
|
# type: (Any) -> Any
|
|
value = data.isoformat()
|
|
return self.represent_scalar('tag:yaml.org,2002:timestamp', value)
|
|
|
|
def represent_datetime(self, data):
|
|
# type: (Any) -> Any
|
|
value = data.isoformat(' ')
|
|
return self.represent_scalar('tag:yaml.org,2002:timestamp', value)
|
|
|
|
def represent_yaml_object(self, tag, data, cls, flow_style=None):
|
|
# type: (Any, Any, Any, Any) -> Any
|
|
if hasattr(data, '__getstate__'):
|
|
state = data.__getstate__()
|
|
else:
|
|
state = data.__dict__.copy()
|
|
return self.represent_mapping(tag, state, flow_style=flow_style)
|
|
|
|
def represent_undefined(self, data):
|
|
# type: (Any) -> None
|
|
raise RepresenterError(_F('cannot represent an object: {data!s}', data=data))
|
|
|
|
|
|
SafeRepresenter.add_representer(type(None), SafeRepresenter.represent_none)
|
|
|
|
SafeRepresenter.add_representer(str, SafeRepresenter.represent_str)
|
|
|
|
SafeRepresenter.add_representer(bytes, SafeRepresenter.represent_binary)
|
|
|
|
SafeRepresenter.add_representer(bool, SafeRepresenter.represent_bool)
|
|
|
|
SafeRepresenter.add_representer(int, SafeRepresenter.represent_int)
|
|
|
|
SafeRepresenter.add_representer(float, SafeRepresenter.represent_float)
|
|
|
|
SafeRepresenter.add_representer(list, SafeRepresenter.represent_list)
|
|
|
|
SafeRepresenter.add_representer(tuple, SafeRepresenter.represent_list)
|
|
|
|
SafeRepresenter.add_representer(dict, SafeRepresenter.represent_dict)
|
|
|
|
SafeRepresenter.add_representer(set, SafeRepresenter.represent_set)
|
|
|
|
SafeRepresenter.add_representer(ordereddict, SafeRepresenter.represent_ordereddict)
|
|
|
|
if sys.version_info >= (2, 7):
|
|
import collections
|
|
|
|
SafeRepresenter.add_representer(
|
|
collections.OrderedDict, SafeRepresenter.represent_ordereddict
|
|
)
|
|
|
|
SafeRepresenter.add_representer(datetime.date, SafeRepresenter.represent_date)
|
|
|
|
SafeRepresenter.add_representer(datetime.datetime, SafeRepresenter.represent_datetime)
|
|
|
|
SafeRepresenter.add_representer(None, SafeRepresenter.represent_undefined)
|
|
|
|
|
|
class Representer(SafeRepresenter):
|
|
def represent_complex(self, data):
|
|
# type: (Any) -> Any
|
|
if data.imag == 0.0:
|
|
data = repr(data.real)
|
|
elif data.real == 0.0:
|
|
data = _F('{data_imag!r}j', data_imag=data.imag)
|
|
elif data.imag > 0:
|
|
data = _F('{data_real!r}+{data_imag!r}j', data_real=data.real, data_imag=data.imag)
|
|
else:
|
|
data = _F('{data_real!r}{data_imag!r}j', data_real=data.real, data_imag=data.imag)
|
|
return self.represent_scalar('tag:yaml.org,2002:python/complex', data)
|
|
|
|
def represent_tuple(self, data):
|
|
# type: (Any) -> Any
|
|
return self.represent_sequence('tag:yaml.org,2002:python/tuple', data)
|
|
|
|
def represent_name(self, data):
|
|
# type: (Any) -> Any
|
|
try:
|
|
name = _F(
|
|
'{modname!s}.{qualname!s}', modname=data.__module__, qualname=data.__qualname__
|
|
)
|
|
except AttributeError:
|
|
# ToDo: check if this can be reached in Py3
|
|
name = _F('{modname!s}.{name!s}', modname=data.__module__, name=data.__name__)
|
|
return self.represent_scalar('tag:yaml.org,2002:python/name:' + name, "")
|
|
|
|
def represent_module(self, data):
|
|
# type: (Any) -> Any
|
|
return self.represent_scalar('tag:yaml.org,2002:python/module:' + data.__name__, "")
|
|
|
|
def represent_object(self, data):
|
|
# type: (Any) -> Any
|
|
# We use __reduce__ API to save the data. data.__reduce__ returns
|
|
# a tuple of length 2-5:
|
|
# (function, args, state, listitems, dictitems)
|
|
|
|
# For reconstructing, we calls function(*args), then set its state,
|
|
# listitems, and dictitems if they are not None.
|
|
|
|
# A special case is when function.__name__ == '__newobj__'. In this
|
|
# case we create the object with args[0].__new__(*args).
|
|
|
|
# Another special case is when __reduce__ returns a string - we don't
|
|
# support it.
|
|
|
|
# We produce a !!python/object, !!python/object/new or
|
|
# !!python/object/apply node.
|
|
|
|
cls = type(data)
|
|
if cls in copyreg.dispatch_table: # type: ignore
|
|
reduce = copyreg.dispatch_table[cls](data) # type: ignore
|
|
elif hasattr(data, '__reduce_ex__'):
|
|
reduce = data.__reduce_ex__(2)
|
|
elif hasattr(data, '__reduce__'):
|
|
reduce = data.__reduce__()
|
|
else:
|
|
raise RepresenterError(_F('cannot represent object: {data!r}', data=data))
|
|
reduce = (list(reduce) + [None] * 5)[:5]
|
|
function, args, state, listitems, dictitems = reduce
|
|
args = list(args)
|
|
if state is None:
|
|
state = {}
|
|
if listitems is not None:
|
|
listitems = list(listitems)
|
|
if dictitems is not None:
|
|
dictitems = dict(dictitems)
|
|
if function.__name__ == '__newobj__':
|
|
function = args[0]
|
|
args = args[1:]
|
|
tag = 'tag:yaml.org,2002:python/object/new:'
|
|
newobj = True
|
|
else:
|
|
tag = 'tag:yaml.org,2002:python/object/apply:'
|
|
newobj = False
|
|
try:
|
|
function_name = _F(
|
|
'{fun!s}.{qualname!s}', fun=function.__module__, qualname=function.__qualname__
|
|
)
|
|
except AttributeError:
|
|
# ToDo: check if this can be reached in Py3
|
|
function_name = _F(
|
|
'{fun!s}.{name!s}', fun=function.__module__, name=function.__name__
|
|
)
|
|
if not args and not listitems and not dictitems and isinstance(state, dict) and newobj:
|
|
return self.represent_mapping(
|
|
'tag:yaml.org,2002:python/object:' + function_name, state
|
|
)
|
|
if not listitems and not dictitems and isinstance(state, dict) and not state:
|
|
return self.represent_sequence(tag + function_name, args)
|
|
value = {}
|
|
if args:
|
|
value['args'] = args
|
|
if state or not isinstance(state, dict):
|
|
value['state'] = state
|
|
if listitems:
|
|
value['listitems'] = listitems
|
|
if dictitems:
|
|
value['dictitems'] = dictitems
|
|
return self.represent_mapping(tag + function_name, value)
|
|
|
|
|
|
Representer.add_representer(complex, Representer.represent_complex)
|
|
|
|
Representer.add_representer(tuple, Representer.represent_tuple)
|
|
|
|
Representer.add_representer(type, Representer.represent_name)
|
|
|
|
Representer.add_representer(types.FunctionType, Representer.represent_name)
|
|
|
|
Representer.add_representer(types.BuiltinFunctionType, Representer.represent_name)
|
|
|
|
Representer.add_representer(types.ModuleType, Representer.represent_module)
|
|
|
|
Representer.add_multi_representer(object, Representer.represent_object)
|
|
|
|
Representer.add_multi_representer(type, Representer.represent_name)
|
|
|
|
|
|
class RoundTripRepresenter(SafeRepresenter):
|
|
# need to add type here and write out the .comment
|
|
# in serializer and emitter
|
|
|
|
def __init__(self, default_style=None, default_flow_style=None, dumper=None):
|
|
# type: (Any, Any, Any) -> None
|
|
if not hasattr(dumper, 'typ') and default_flow_style is None:
|
|
default_flow_style = False
|
|
SafeRepresenter.__init__(
|
|
self,
|
|
default_style=default_style,
|
|
default_flow_style=default_flow_style,
|
|
dumper=dumper,
|
|
)
|
|
|
|
def ignore_aliases(self, data):
|
|
# type: (Any) -> bool
|
|
try:
|
|
if data.anchor is not None and data.anchor.value is not None:
|
|
return False
|
|
except AttributeError:
|
|
pass
|
|
return SafeRepresenter.ignore_aliases(self, data)
|
|
|
|
def represent_none(self, data):
|
|
# type: (Any) -> Any
|
|
if len(self.represented_objects) == 0 and not self.serializer.use_explicit_start:
|
|
# this will be open ended (although it is not yet)
|
|
return self.represent_scalar('tag:yaml.org,2002:null', 'null')
|
|
return self.represent_scalar('tag:yaml.org,2002:null', "")
|
|
|
|
def represent_literal_scalarstring(self, data):
|
|
# type: (Any) -> Any
|
|
tag = None
|
|
style = '|'
|
|
anchor = data.yaml_anchor(any=True)
|
|
tag = 'tag:yaml.org,2002:str'
|
|
return self.represent_scalar(tag, data, style=style, anchor=anchor)
|
|
|
|
represent_preserved_scalarstring = represent_literal_scalarstring
|
|
|
|
def represent_folded_scalarstring(self, data):
|
|
# type: (Any) -> Any
|
|
tag = None
|
|
style = '>'
|
|
anchor = data.yaml_anchor(any=True)
|
|
for fold_pos in reversed(getattr(data, 'fold_pos', [])):
|
|
if (
|
|
data[fold_pos] == ' '
|
|
and (fold_pos > 0 and not data[fold_pos - 1].isspace())
|
|
and (fold_pos < len(data) and not data[fold_pos + 1].isspace())
|
|
):
|
|
data = data[:fold_pos] + '\a' + data[fold_pos:]
|
|
tag = 'tag:yaml.org,2002:str'
|
|
return self.represent_scalar(tag, data, style=style, anchor=anchor)
|
|
|
|
def represent_single_quoted_scalarstring(self, data):
|
|
# type: (Any) -> Any
|
|
tag = None
|
|
style = "'"
|
|
anchor = data.yaml_anchor(any=True)
|
|
tag = 'tag:yaml.org,2002:str'
|
|
return self.represent_scalar(tag, data, style=style, anchor=anchor)
|
|
|
|
def represent_double_quoted_scalarstring(self, data):
|
|
# type: (Any) -> Any
|
|
tag = None
|
|
style = '"'
|
|
anchor = data.yaml_anchor(any=True)
|
|
tag = 'tag:yaml.org,2002:str'
|
|
return self.represent_scalar(tag, data, style=style, anchor=anchor)
|
|
|
|
def represent_plain_scalarstring(self, data):
|
|
# type: (Any) -> Any
|
|
tag = None
|
|
style = ''
|
|
anchor = data.yaml_anchor(any=True)
|
|
tag = 'tag:yaml.org,2002:str'
|
|
return self.represent_scalar(tag, data, style=style, anchor=anchor)
|
|
|
|
def insert_underscore(self, prefix, s, underscore, anchor=None):
|
|
# type: (Any, Any, Any, Any) -> Any
|
|
if underscore is None:
|
|
return self.represent_scalar('tag:yaml.org,2002:int', prefix + s, anchor=anchor)
|
|
if underscore[0]:
|
|
sl = list(s)
|
|
pos = len(s) - underscore[0]
|
|
while pos > 0:
|
|
sl.insert(pos, '_')
|
|
pos -= underscore[0]
|
|
s = "".join(sl)
|
|
if underscore[1]:
|
|
s = '_' + s
|
|
if underscore[2]:
|
|
s += '_'
|
|
return self.represent_scalar('tag:yaml.org,2002:int', prefix + s, anchor=anchor)
|
|
|
|
def represent_scalar_int(self, data):
|
|
# type: (Any) -> Any
|
|
if data._width is not None:
|
|
s = '{:0{}d}'.format(data, data._width)
|
|
else:
|
|
s = format(data, 'd')
|
|
anchor = data.yaml_anchor(any=True)
|
|
return self.insert_underscore("", s, data._underscore, anchor=anchor)
|
|
|
|
def represent_binary_int(self, data):
|
|
# type: (Any) -> Any
|
|
if data._width is not None:
|
|
# cannot use '{:#0{}b}', that strips the zeros
|
|
s = '{:0{}b}'.format(data, data._width)
|
|
else:
|
|
s = format(data, 'b')
|
|
anchor = data.yaml_anchor(any=True)
|
|
return self.insert_underscore('0b', s, data._underscore, anchor=anchor)
|
|
|
|
def represent_octal_int(self, data):
|
|
# type: (Any) -> Any
|
|
if data._width is not None:
|
|
# cannot use '{:#0{}o}', that strips the zeros
|
|
s = '{:0{}o}'.format(data, data._width)
|
|
else:
|
|
s = format(data, 'o')
|
|
anchor = data.yaml_anchor(any=True)
|
|
return self.insert_underscore('0o', s, data._underscore, anchor=anchor)
|
|
|
|
def represent_hex_int(self, data):
|
|
# type: (Any) -> Any
|
|
if data._width is not None:
|
|
# cannot use '{:#0{}x}', that strips the zeros
|
|
s = '{:0{}x}'.format(data, data._width)
|
|
else:
|
|
s = format(data, 'x')
|
|
anchor = data.yaml_anchor(any=True)
|
|
return self.insert_underscore('0x', s, data._underscore, anchor=anchor)
|
|
|
|
def represent_hex_caps_int(self, data):
|
|
# type: (Any) -> Any
|
|
if data._width is not None:
|
|
# cannot use '{:#0{}X}', that strips the zeros
|
|
s = '{:0{}X}'.format(data, data._width)
|
|
else:
|
|
s = format(data, 'X')
|
|
anchor = data.yaml_anchor(any=True)
|
|
return self.insert_underscore('0x', s, data._underscore, anchor=anchor)
|
|
|
|
def represent_scalar_float(self, data):
|
|
# type: (Any) -> Any
|
|
""" this is way more complicated """
|
|
value = None
|
|
anchor = data.yaml_anchor(any=True)
|
|
if data != data or (data == 0.0 and data == 1.0):
|
|
value = '.nan'
|
|
elif data == self.inf_value:
|
|
value = '.inf'
|
|
elif data == -self.inf_value:
|
|
value = '-.inf'
|
|
if value:
|
|
return self.represent_scalar('tag:yaml.org,2002:float', value, anchor=anchor)
|
|
if data._exp is None and data._prec > 0 and data._prec == data._width - 1:
|
|
# no exponent, but trailing dot
|
|
value = '{}{:d}.'.format(data._m_sign if data._m_sign else "", abs(int(data)))
|
|
elif data._exp is None:
|
|
# no exponent, "normal" dot
|
|
prec = data._prec
|
|
ms = data._m_sign if data._m_sign else ""
|
|
# -1 for the dot
|
|
value = '{}{:0{}.{}f}'.format(
|
|
ms, abs(data), data._width - len(ms), data._width - prec - 1
|
|
)
|
|
if prec == 0 or (prec == 1 and ms != ""):
|
|
value = value.replace('0.', '.')
|
|
while len(value) < data._width:
|
|
value += '0'
|
|
else:
|
|
# exponent
|
|
m, es = '{:{}.{}e}'.format(
|
|
# data, data._width, data._width - data._prec + (1 if data._m_sign else 0)
|
|
data,
|
|
data._width,
|
|
data._width + (1 if data._m_sign else 0),
|
|
).split('e')
|
|
w = data._width if data._prec > 0 else (data._width + 1)
|
|
if data < 0:
|
|
w += 1
|
|
m = m[:w]
|
|
e = int(es)
|
|
m1, m2 = m.split('.') # always second?
|
|
while len(m1) + len(m2) < data._width - (1 if data._prec >= 0 else 0):
|
|
m2 += '0'
|
|
if data._m_sign and data > 0:
|
|
m1 = '+' + m1
|
|
esgn = '+' if data._e_sign else ""
|
|
if data._prec < 0: # mantissa without dot
|
|
if m2 != '0':
|
|
e -= len(m2)
|
|
else:
|
|
m2 = ""
|
|
while (len(m1) + len(m2) - (1 if data._m_sign else 0)) < data._width:
|
|
m2 += '0'
|
|
e -= 1
|
|
value = m1 + m2 + data._exp + '{:{}0{}d}'.format(e, esgn, data._e_width)
|
|
elif data._prec == 0: # mantissa with trailing dot
|
|
e -= len(m2)
|
|
value = m1 + m2 + '.' + data._exp + '{:{}0{}d}'.format(e, esgn, data._e_width)
|
|
else:
|
|
if data._m_lead0 > 0:
|
|
m2 = '0' * (data._m_lead0 - 1) + m1 + m2
|
|
m1 = '0'
|
|
m2 = m2[: -data._m_lead0] # these should be zeros
|
|
e += data._m_lead0
|
|
while len(m1) < data._prec:
|
|
m1 += m2[0]
|
|
m2 = m2[1:]
|
|
e -= 1
|
|
value = m1 + '.' + m2 + data._exp + '{:{}0{}d}'.format(e, esgn, data._e_width)
|
|
|
|
if value is None:
|
|
value = repr(data).lower()
|
|
return self.represent_scalar('tag:yaml.org,2002:float', value, anchor=anchor)
|
|
|
|
def represent_sequence(self, tag, sequence, flow_style=None):
|
|
# type: (Any, Any, Any) -> Any
|
|
value = [] # type: List[Any]
|
|
# if the flow_style is None, the flow style tacked on to the object
|
|
# explicitly will be taken. If that is None as well the default flow
|
|
# style rules
|
|
try:
|
|
flow_style = sequence.fa.flow_style(flow_style)
|
|
except AttributeError:
|
|
flow_style = flow_style
|
|
try:
|
|
anchor = sequence.yaml_anchor()
|
|
except AttributeError:
|
|
anchor = None
|
|
node = SequenceNode(tag, value, flow_style=flow_style, anchor=anchor)
|
|
if self.alias_key is not None:
|
|
self.represented_objects[self.alias_key] = node
|
|
best_style = True
|
|
try:
|
|
comment = getattr(sequence, comment_attrib)
|
|
node.comment = comment.comment
|
|
# reset any comment already printed information
|
|
if node.comment and node.comment[1]:
|
|
for ct in node.comment[1]:
|
|
ct.reset()
|
|
item_comments = comment.items
|
|
for v in item_comments.values():
|
|
if v and v[1]:
|
|
for ct in v[1]:
|
|
ct.reset()
|
|
item_comments = comment.items
|
|
if node.comment is None:
|
|
node.comment = comment.comment
|
|
else:
|
|
# as we are potentially going to extend this, make a new list
|
|
node.comment = comment.comment[:]
|
|
try:
|
|
node.comment.append(comment.end)
|
|
except AttributeError:
|
|
pass
|
|
except AttributeError:
|
|
item_comments = {}
|
|
for idx, item in enumerate(sequence):
|
|
node_item = self.represent_data(item)
|
|
self.merge_comments(node_item, item_comments.get(idx))
|
|
if not (isinstance(node_item, ScalarNode) and not node_item.style):
|
|
best_style = False
|
|
value.append(node_item)
|
|
if flow_style is None:
|
|
if len(sequence) != 0 and self.default_flow_style is not None:
|
|
node.flow_style = self.default_flow_style
|
|
else:
|
|
node.flow_style = best_style
|
|
return node
|
|
|
|
def merge_comments(self, node, comments):
|
|
# type: (Any, Any) -> Any
|
|
if comments is None:
|
|
assert hasattr(node, 'comment')
|
|
return node
|
|
if getattr(node, 'comment', None) is not None:
|
|
for idx, val in enumerate(comments):
|
|
if idx >= len(node.comment):
|
|
continue
|
|
nc = node.comment[idx]
|
|
if nc is not None:
|
|
assert val is None or val == nc
|
|
comments[idx] = nc
|
|
node.comment = comments
|
|
return node
|
|
|
|
def represent_key(self, data):
|
|
# type: (Any) -> Any
|
|
if isinstance(data, CommentedKeySeq):
|
|
self.alias_key = None
|
|
return self.represent_sequence('tag:yaml.org,2002:seq', data, flow_style=True)
|
|
if isinstance(data, CommentedKeyMap):
|
|
self.alias_key = None
|
|
return self.represent_mapping('tag:yaml.org,2002:map', data, flow_style=True)
|
|
return SafeRepresenter.represent_key(self, data)
|
|
|
|
def represent_mapping(self, tag, mapping, flow_style=None):
|
|
# type: (Any, Any, Any) -> Any
|
|
value = [] # type: List[Any]
|
|
try:
|
|
flow_style = mapping.fa.flow_style(flow_style)
|
|
except AttributeError:
|
|
flow_style = flow_style
|
|
try:
|
|
anchor = mapping.yaml_anchor()
|
|
except AttributeError:
|
|
anchor = None
|
|
node = MappingNode(tag, value, flow_style=flow_style, anchor=anchor)
|
|
if self.alias_key is not None:
|
|
self.represented_objects[self.alias_key] = node
|
|
best_style = True
|
|
# no sorting! !!
|
|
try:
|
|
comment = getattr(mapping, comment_attrib)
|
|
if node.comment is None:
|
|
node.comment = comment.comment
|
|
else:
|
|
# as we are potentially going to extend this, make a new list
|
|
node.comment = comment.comment[:]
|
|
if node.comment and node.comment[1]:
|
|
for ct in node.comment[1]:
|
|
ct.reset()
|
|
item_comments = comment.items
|
|
if self.dumper.comment_handling is None:
|
|
for v in item_comments.values():
|
|
if v and v[1]:
|
|
for ct in v[1]:
|
|
ct.reset()
|
|
try:
|
|
node.comment.append(comment.end)
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
# NEWCMNT
|
|
pass
|
|
except AttributeError:
|
|
item_comments = {}
|
|
merge_list = [m[1] for m in getattr(mapping, merge_attrib, [])]
|
|
try:
|
|
merge_pos = getattr(mapping, merge_attrib, [[0]])[0][0]
|
|
except IndexError:
|
|
merge_pos = 0
|
|
item_count = 0
|
|
if bool(merge_list):
|
|
items = mapping.non_merged_items()
|
|
else:
|
|
items = mapping.items()
|
|
for item_key, item_value in items:
|
|
item_count += 1
|
|
node_key = self.represent_key(item_key)
|
|
node_value = self.represent_data(item_value)
|
|
item_comment = item_comments.get(item_key)
|
|
if item_comment:
|
|
# assert getattr(node_key, 'comment', None) is None
|
|
# issue 351 did throw this because the comment from the list item was
|
|
# moved to the dict
|
|
node_key.comment = item_comment[:2]
|
|
nvc = getattr(node_value, 'comment', None)
|
|
if nvc is not None: # end comment already there
|
|
nvc[0] = item_comment[2]
|
|
nvc[1] = item_comment[3]
|
|
else:
|
|
node_value.comment = item_comment[2:]
|
|
if not (isinstance(node_key, ScalarNode) and not node_key.style):
|
|
best_style = False
|
|
if not (isinstance(node_value, ScalarNode) and not node_value.style):
|
|
best_style = False
|
|
value.append((node_key, node_value))
|
|
if flow_style is None:
|
|
if ((item_count != 0) or bool(merge_list)) and self.default_flow_style is not None:
|
|
node.flow_style = self.default_flow_style
|
|
else:
|
|
node.flow_style = best_style
|
|
if bool(merge_list):
|
|
# because of the call to represent_data here, the anchors
|
|
# are marked as being used and thereby created
|
|
if len(merge_list) == 1:
|
|
arg = self.represent_data(merge_list[0])
|
|
else:
|
|
arg = self.represent_data(merge_list)
|
|
arg.flow_style = True
|
|
value.insert(merge_pos, (ScalarNode('tag:yaml.org,2002:merge', '<<'), arg))
|
|
return node
|
|
|
|
def represent_omap(self, tag, omap, flow_style=None):
|
|
# type: (Any, Any, Any) -> Any
|
|
value = [] # type: List[Any]
|
|
try:
|
|
flow_style = omap.fa.flow_style(flow_style)
|
|
except AttributeError:
|
|
flow_style = flow_style
|
|
try:
|
|
anchor = omap.yaml_anchor()
|
|
except AttributeError:
|
|
anchor = None
|
|
node = SequenceNode(tag, value, flow_style=flow_style, anchor=anchor)
|
|
if self.alias_key is not None:
|
|
self.represented_objects[self.alias_key] = node
|
|
best_style = True
|
|
try:
|
|
comment = getattr(omap, comment_attrib)
|
|
if node.comment is None:
|
|
node.comment = comment.comment
|
|
else:
|
|
# as we are potentially going to extend this, make a new list
|
|
node.comment = comment.comment[:]
|
|
if node.comment and node.comment[1]:
|
|
for ct in node.comment[1]:
|
|
ct.reset()
|
|
item_comments = comment.items
|
|
for v in item_comments.values():
|
|
if v and v[1]:
|
|
for ct in v[1]:
|
|
ct.reset()
|
|
try:
|
|
node.comment.append(comment.end)
|
|
except AttributeError:
|
|
pass
|
|
except AttributeError:
|
|
item_comments = {}
|
|
for item_key in omap:
|
|
item_val = omap[item_key]
|
|
node_item = self.represent_data({item_key: item_val})
|
|
# node_item.flow_style = False
|
|
# node item has two scalars in value: node_key and node_value
|
|
item_comment = item_comments.get(item_key)
|
|
if item_comment:
|
|
if item_comment[1]:
|
|
node_item.comment = [None, item_comment[1]]
|
|
assert getattr(node_item.value[0][0], 'comment', None) is None
|
|
node_item.value[0][0].comment = [item_comment[0], None]
|
|
nvc = getattr(node_item.value[0][1], 'comment', None)
|
|
if nvc is not None: # end comment already there
|
|
nvc[0] = item_comment[2]
|
|
nvc[1] = item_comment[3]
|
|
else:
|
|
node_item.value[0][1].comment = item_comment[2:]
|
|
# if not (isinstance(node_item, ScalarNode) \
|
|
# and not node_item.style):
|
|
# best_style = False
|
|
value.append(node_item)
|
|
if flow_style is None:
|
|
if self.default_flow_style is not None:
|
|
node.flow_style = self.default_flow_style
|
|
else:
|
|
node.flow_style = best_style
|
|
return node
|
|
|
|
def represent_set(self, setting):
|
|
# type: (Any) -> Any
|
|
flow_style = False
|
|
tag = 'tag:yaml.org,2002:set'
|
|
# return self.represent_mapping(tag, value)
|
|
value = [] # type: List[Any]
|
|
flow_style = setting.fa.flow_style(flow_style)
|
|
try:
|
|
anchor = setting.yaml_anchor()
|
|
except AttributeError:
|
|
anchor = None
|
|
node = MappingNode(tag, value, flow_style=flow_style, anchor=anchor)
|
|
if self.alias_key is not None:
|
|
self.represented_objects[self.alias_key] = node
|
|
best_style = True
|
|
# no sorting! !!
|
|
try:
|
|
comment = getattr(setting, comment_attrib)
|
|
if node.comment is None:
|
|
node.comment = comment.comment
|
|
else:
|
|
# as we are potentially going to extend this, make a new list
|
|
node.comment = comment.comment[:]
|
|
if node.comment and node.comment[1]:
|
|
for ct in node.comment[1]:
|
|
ct.reset()
|
|
item_comments = comment.items
|
|
for v in item_comments.values():
|
|
if v and v[1]:
|
|
for ct in v[1]:
|
|
ct.reset()
|
|
try:
|
|
node.comment.append(comment.end)
|
|
except AttributeError:
|
|
pass
|
|
except AttributeError:
|
|
item_comments = {}
|
|
for item_key in setting.odict:
|
|
node_key = self.represent_key(item_key)
|
|
node_value = self.represent_data(None)
|
|
item_comment = item_comments.get(item_key)
|
|
if item_comment:
|
|
assert getattr(node_key, 'comment', None) is None
|
|
node_key.comment = item_comment[:2]
|
|
node_key.style = node_value.style = '?'
|
|
if not (isinstance(node_key, ScalarNode) and not node_key.style):
|
|
best_style = False
|
|
if not (isinstance(node_value, ScalarNode) and not node_value.style):
|
|
best_style = False
|
|
value.append((node_key, node_value))
|
|
best_style = best_style
|
|
return node
|
|
|
|
def represent_dict(self, data):
|
|
# type: (Any) -> Any
|
|
"""write out tag if saved on loading"""
|
|
try:
|
|
t = data.tag.value
|
|
except AttributeError:
|
|
t = None
|
|
if t:
|
|
if t.startswith('!!'):
|
|
tag = 'tag:yaml.org,2002:' + t[2:]
|
|
else:
|
|
tag = t
|
|
else:
|
|
tag = 'tag:yaml.org,2002:map'
|
|
return self.represent_mapping(tag, data)
|
|
|
|
def represent_list(self, data):
|
|
# type: (Any) -> Any
|
|
try:
|
|
t = data.tag.value
|
|
except AttributeError:
|
|
t = None
|
|
if t:
|
|
if t.startswith('!!'):
|
|
tag = 'tag:yaml.org,2002:' + t[2:]
|
|
else:
|
|
tag = t
|
|
else:
|
|
tag = 'tag:yaml.org,2002:seq'
|
|
return self.represent_sequence(tag, data)
|
|
|
|
def represent_datetime(self, data):
|
|
# type: (Any) -> Any
|
|
inter = 'T' if data._yaml['t'] else ' '
|
|
_yaml = data._yaml
|
|
if _yaml['delta']:
|
|
data += _yaml['delta']
|
|
value = data.isoformat(inter)
|
|
else:
|
|
value = data.isoformat(inter)
|
|
if _yaml['tz']:
|
|
value += _yaml['tz']
|
|
return self.represent_scalar('tag:yaml.org,2002:timestamp', value)
|
|
|
|
def represent_tagged_scalar(self, data):
|
|
# type: (Any) -> Any
|
|
try:
|
|
tag = data.tag.value
|
|
except AttributeError:
|
|
tag = None
|
|
try:
|
|
anchor = data.yaml_anchor()
|
|
except AttributeError:
|
|
anchor = None
|
|
return self.represent_scalar(tag, data.value, style=data.style, anchor=anchor)
|
|
|
|
def represent_scalar_bool(self, data):
|
|
# type: (Any) -> Any
|
|
try:
|
|
anchor = data.yaml_anchor()
|
|
except AttributeError:
|
|
anchor = None
|
|
return SafeRepresenter.represent_bool(self, data, anchor=anchor)
|
|
|
|
def represent_yaml_object(self, tag, data, cls, flow_style=None):
|
|
# type: (Any, Any, Any, Optional[Any]) -> Any
|
|
if hasattr(data, '__getstate__'):
|
|
state = data.__getstate__()
|
|
else:
|
|
state = data.__dict__.copy()
|
|
anchor = state.pop(Anchor.attrib, None)
|
|
res = self.represent_mapping(tag, state, flow_style=flow_style)
|
|
if anchor is not None:
|
|
res.anchor = anchor
|
|
return res
|
|
|
|
|
|
RoundTripRepresenter.add_representer(type(None), RoundTripRepresenter.represent_none)
|
|
|
|
RoundTripRepresenter.add_representer(
|
|
LiteralScalarString, RoundTripRepresenter.represent_literal_scalarstring
|
|
)
|
|
|
|
RoundTripRepresenter.add_representer(
|
|
FoldedScalarString, RoundTripRepresenter.represent_folded_scalarstring
|
|
)
|
|
|
|
RoundTripRepresenter.add_representer(
|
|
SingleQuotedScalarString, RoundTripRepresenter.represent_single_quoted_scalarstring
|
|
)
|
|
|
|
RoundTripRepresenter.add_representer(
|
|
DoubleQuotedScalarString, RoundTripRepresenter.represent_double_quoted_scalarstring
|
|
)
|
|
|
|
RoundTripRepresenter.add_representer(
|
|
PlainScalarString, RoundTripRepresenter.represent_plain_scalarstring
|
|
)
|
|
|
|
# RoundTripRepresenter.add_representer(tuple, Representer.represent_tuple)
|
|
|
|
RoundTripRepresenter.add_representer(ScalarInt, RoundTripRepresenter.represent_scalar_int)
|
|
|
|
RoundTripRepresenter.add_representer(BinaryInt, RoundTripRepresenter.represent_binary_int)
|
|
|
|
RoundTripRepresenter.add_representer(OctalInt, RoundTripRepresenter.represent_octal_int)
|
|
|
|
RoundTripRepresenter.add_representer(HexInt, RoundTripRepresenter.represent_hex_int)
|
|
|
|
RoundTripRepresenter.add_representer(HexCapsInt, RoundTripRepresenter.represent_hex_caps_int)
|
|
|
|
RoundTripRepresenter.add_representer(ScalarFloat, RoundTripRepresenter.represent_scalar_float)
|
|
|
|
RoundTripRepresenter.add_representer(ScalarBoolean, RoundTripRepresenter.represent_scalar_bool)
|
|
|
|
RoundTripRepresenter.add_representer(CommentedSeq, RoundTripRepresenter.represent_list)
|
|
|
|
RoundTripRepresenter.add_representer(CommentedMap, RoundTripRepresenter.represent_dict)
|
|
|
|
RoundTripRepresenter.add_representer(
|
|
CommentedOrderedMap, RoundTripRepresenter.represent_ordereddict
|
|
)
|
|
|
|
if sys.version_info >= (2, 7):
|
|
import collections
|
|
|
|
RoundTripRepresenter.add_representer(
|
|
collections.OrderedDict, RoundTripRepresenter.represent_ordereddict
|
|
)
|
|
|
|
RoundTripRepresenter.add_representer(CommentedSet, RoundTripRepresenter.represent_set)
|
|
|
|
RoundTripRepresenter.add_representer(
|
|
TaggedScalar, RoundTripRepresenter.represent_tagged_scalar
|
|
)
|
|
|
|
RoundTripRepresenter.add_representer(TimeStamp, RoundTripRepresenter.represent_datetime)
|