* Vendor ruamel.yaml v0.17.21 * Add unit test for whitespace regression * Add an abstraction layer in Spack to wrap ruamel.yaml All YAML operations are routed through spack.util.spack_yaml The custom classes have been adapted to the new ruamel.yaml class hierarchy. Fixed line annotation issue in "spack config blame"
244 lines
8.2 KiB
Python
244 lines
8.2 KiB
Python
# coding: utf-8
|
|
|
|
import warnings
|
|
|
|
from ruamel.yaml.error import MarkedYAMLError, ReusedAnchorWarning
|
|
from ruamel.yaml.compat import _F, nprint, nprintf # NOQA
|
|
|
|
from ruamel.yaml.events import (
|
|
StreamStartEvent,
|
|
StreamEndEvent,
|
|
MappingStartEvent,
|
|
MappingEndEvent,
|
|
SequenceStartEvent,
|
|
SequenceEndEvent,
|
|
AliasEvent,
|
|
ScalarEvent,
|
|
)
|
|
from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode
|
|
|
|
if False: # MYPY
|
|
from typing import Any, Dict, Optional, List # NOQA
|
|
|
|
__all__ = ['Composer', 'ComposerError']
|
|
|
|
|
|
class ComposerError(MarkedYAMLError):
|
|
pass
|
|
|
|
|
|
class Composer:
|
|
def __init__(self, loader=None):
|
|
# type: (Any) -> None
|
|
self.loader = loader
|
|
if self.loader is not None and getattr(self.loader, '_composer', None) is None:
|
|
self.loader._composer = self
|
|
self.anchors = {} # type: Dict[Any, Any]
|
|
|
|
@property
|
|
def parser(self):
|
|
# type: () -> Any
|
|
if hasattr(self.loader, 'typ'):
|
|
self.loader.parser
|
|
return self.loader._parser
|
|
|
|
@property
|
|
def resolver(self):
|
|
# type: () -> Any
|
|
# assert self.loader._resolver is not None
|
|
if hasattr(self.loader, 'typ'):
|
|
self.loader.resolver
|
|
return self.loader._resolver
|
|
|
|
def check_node(self):
|
|
# type: () -> Any
|
|
# Drop the STREAM-START event.
|
|
if self.parser.check_event(StreamStartEvent):
|
|
self.parser.get_event()
|
|
|
|
# If there are more documents available?
|
|
return not self.parser.check_event(StreamEndEvent)
|
|
|
|
def get_node(self):
|
|
# type: () -> Any
|
|
# Get the root node of the next document.
|
|
if not self.parser.check_event(StreamEndEvent):
|
|
return self.compose_document()
|
|
|
|
def get_single_node(self):
|
|
# type: () -> Any
|
|
# Drop the STREAM-START event.
|
|
self.parser.get_event()
|
|
|
|
# Compose a document if the stream is not empty.
|
|
document = None # type: Any
|
|
if not self.parser.check_event(StreamEndEvent):
|
|
document = self.compose_document()
|
|
|
|
# Ensure that the stream contains no more documents.
|
|
if not self.parser.check_event(StreamEndEvent):
|
|
event = self.parser.get_event()
|
|
raise ComposerError(
|
|
'expected a single document in the stream',
|
|
document.start_mark,
|
|
'but found another document',
|
|
event.start_mark,
|
|
)
|
|
|
|
# Drop the STREAM-END event.
|
|
self.parser.get_event()
|
|
|
|
return document
|
|
|
|
def compose_document(self):
|
|
# type: (Any) -> Any
|
|
# Drop the DOCUMENT-START event.
|
|
self.parser.get_event()
|
|
|
|
# Compose the root node.
|
|
node = self.compose_node(None, None)
|
|
|
|
# Drop the DOCUMENT-END event.
|
|
self.parser.get_event()
|
|
|
|
self.anchors = {}
|
|
return node
|
|
|
|
def return_alias(self, a):
|
|
# type: (Any) -> Any
|
|
return a
|
|
|
|
def compose_node(self, parent, index):
|
|
# type: (Any, Any) -> Any
|
|
if self.parser.check_event(AliasEvent):
|
|
event = self.parser.get_event()
|
|
alias = event.anchor
|
|
if alias not in self.anchors:
|
|
raise ComposerError(
|
|
None,
|
|
None,
|
|
_F('found undefined alias {alias!r}', alias=alias),
|
|
event.start_mark,
|
|
)
|
|
return self.return_alias(self.anchors[alias])
|
|
event = self.parser.peek_event()
|
|
anchor = event.anchor
|
|
if anchor is not None: # have an anchor
|
|
if anchor in self.anchors:
|
|
# raise ComposerError(
|
|
# "found duplicate anchor %r; first occurrence"
|
|
# % (anchor), self.anchors[anchor].start_mark,
|
|
# "second occurrence", event.start_mark)
|
|
ws = (
|
|
'\nfound duplicate anchor {!r}\nfirst occurrence {}\nsecond occurrence '
|
|
'{}'.format((anchor), self.anchors[anchor].start_mark, event.start_mark)
|
|
)
|
|
warnings.warn(ws, ReusedAnchorWarning)
|
|
self.resolver.descend_resolver(parent, index)
|
|
if self.parser.check_event(ScalarEvent):
|
|
node = self.compose_scalar_node(anchor)
|
|
elif self.parser.check_event(SequenceStartEvent):
|
|
node = self.compose_sequence_node(anchor)
|
|
elif self.parser.check_event(MappingStartEvent):
|
|
node = self.compose_mapping_node(anchor)
|
|
self.resolver.ascend_resolver()
|
|
return node
|
|
|
|
def compose_scalar_node(self, anchor):
|
|
# type: (Any) -> Any
|
|
event = self.parser.get_event()
|
|
tag = event.tag
|
|
if tag is None or tag == '!':
|
|
tag = self.resolver.resolve(ScalarNode, event.value, event.implicit)
|
|
node = ScalarNode(
|
|
tag,
|
|
event.value,
|
|
event.start_mark,
|
|
event.end_mark,
|
|
style=event.style,
|
|
comment=event.comment,
|
|
anchor=anchor,
|
|
)
|
|
if anchor is not None:
|
|
self.anchors[anchor] = node
|
|
return node
|
|
|
|
def compose_sequence_node(self, anchor):
|
|
# type: (Any) -> Any
|
|
start_event = self.parser.get_event()
|
|
tag = start_event.tag
|
|
if tag is None or tag == '!':
|
|
tag = self.resolver.resolve(SequenceNode, None, start_event.implicit)
|
|
node = SequenceNode(
|
|
tag,
|
|
[],
|
|
start_event.start_mark,
|
|
None,
|
|
flow_style=start_event.flow_style,
|
|
comment=start_event.comment,
|
|
anchor=anchor,
|
|
)
|
|
if anchor is not None:
|
|
self.anchors[anchor] = node
|
|
index = 0
|
|
while not self.parser.check_event(SequenceEndEvent):
|
|
node.value.append(self.compose_node(node, index))
|
|
index += 1
|
|
end_event = self.parser.get_event()
|
|
if node.flow_style is True and end_event.comment is not None:
|
|
if node.comment is not None:
|
|
nprint(
|
|
'Warning: unexpected end_event commment in sequence '
|
|
'node {}'.format(node.flow_style)
|
|
)
|
|
node.comment = end_event.comment
|
|
node.end_mark = end_event.end_mark
|
|
self.check_end_doc_comment(end_event, node)
|
|
return node
|
|
|
|
def compose_mapping_node(self, anchor):
|
|
# type: (Any) -> Any
|
|
start_event = self.parser.get_event()
|
|
tag = start_event.tag
|
|
if tag is None or tag == '!':
|
|
tag = self.resolver.resolve(MappingNode, None, start_event.implicit)
|
|
node = MappingNode(
|
|
tag,
|
|
[],
|
|
start_event.start_mark,
|
|
None,
|
|
flow_style=start_event.flow_style,
|
|
comment=start_event.comment,
|
|
anchor=anchor,
|
|
)
|
|
if anchor is not None:
|
|
self.anchors[anchor] = node
|
|
while not self.parser.check_event(MappingEndEvent):
|
|
# key_event = self.parser.peek_event()
|
|
item_key = self.compose_node(node, None)
|
|
# if item_key in node.value:
|
|
# raise ComposerError("while composing a mapping",
|
|
# start_event.start_mark,
|
|
# "found duplicate key", key_event.start_mark)
|
|
item_value = self.compose_node(node, item_key)
|
|
# node.value[item_key] = item_value
|
|
node.value.append((item_key, item_value))
|
|
end_event = self.parser.get_event()
|
|
if node.flow_style is True and end_event.comment is not None:
|
|
node.comment = end_event.comment
|
|
node.end_mark = end_event.end_mark
|
|
self.check_end_doc_comment(end_event, node)
|
|
return node
|
|
|
|
def check_end_doc_comment(self, end_event, node):
|
|
# type: (Any, Any) -> None
|
|
if end_event.comment and end_event.comment[1]:
|
|
# pre comments on an end_event, no following to move to
|
|
if node.comment is None:
|
|
node.comment = [None, None]
|
|
assert not isinstance(node, ScalarEvent)
|
|
# this is a post comment on a mapping node, add as third element
|
|
# in the list
|
|
node.comment.append(end_event.comment[1])
|
|
end_event.comment[1] = None
|