""" Common helper classes for flow (ofproto/dpif) parsing
"""
import re
import functools
from dataclasses import dataclass
[docs]class ParseError(RuntimeError):
"""Exception raised when an error occurs during parsing.
"""
pass
[docs]@dataclass
class KeyValue:
"""Class for keeping key-value data
Attributes:
key (str): The key string.
value (any): The value data.
meta (KeyMetadata): The key metadata.
"""
def __init__(self, key, value, meta=None):
"""Constructor"""
self.key = key
self.value = value
self.meta = meta
def __str__(self):
return "{}: {} ({})".format(self.key, str(self.value), str(self.meta))
[docs]class KVDecoders:
"""KVDecoders class is used by KVParser to select how to decoode the value
of a specific keyword.
A decoder is simply a function that accepts the keyword and value strings
and returns the keyword and value objects to be stored. The returned
keyword must be a string but may not be the same as the given keyword.
The returned value may be of any type.
Args:
decoders (dict): Optional; A dictionary of decoders indexed by keyword.
default (callable): Optional; A decoder used if a match is not found in
configured decoders. If not provided, the default behavior is to
try to decode the value into an integer and, if that fails,
just return the string as-is.
default_free (callable): Optional; A decoder used if a match is not
found in configured decoders and it's a free value (e.g: a keyword
without a value). Defaults to returning the free value as keyword
and "True" as value.
"""
def __init__(self, decoders=None, default=None, default_free=None):
self._decoders = decoders or dict()
self._default = default or self._default_decoder
self._default_free = default_free or self._default_free_decoder
[docs] def decode(self, keyword, value_str):
"""Decode a keyword and value.
Args:
keyword (str): The keyword whose value is to be decoded.
value_str (str): The value string.
Returns:
The key (str) and value(any) to be stored.
"""
decoder = self._decoders.get(keyword)
if decoder:
return decoder(keyword, value_str)
else:
if value_str:
return self._default(keyword, value_str)
else:
return self._default_free(keyword)
@staticmethod
def _default_free_decoder(key):
"""Default decoder for free kewords."""
return key, True
@staticmethod
def _default_decoder(key, value):
"""Default decoder.
It tries to convert into an integer value and, if it fails, just
returns the string.
"""
try:
ival = int(value, 0)
return key, ival
except ValueError:
return key, value
[docs]class KVParser:
"""KVParser parses a string looking for key-value pairs.
Args:
decoders (KVDecoders): Optional; the KVDecoders instance to use.
"""
def __init__(self, decoders=None):
"""Constructor"""
self._decoders = decoders or KVDecoders()
self._keyval = list()
[docs] def keys(self):
return list(kv.key for kv in self._keyval)
[docs] def kv(self):
return self._keyval
def __iter__(self):
return iter(self._keyval)
[docs] def parse(self, string):
"""Parse the key-value pairs in string.
Args:
string (str): the string to parse.
Raises:
ParseError if any parsing error occurs.
"""
kpos = 0
while kpos < len(string) and string[kpos] != "\n":
# strip string
if string[kpos] == "," or string[kpos] == " ":
kpos += 1
continue
split_parts = re.split(r"(\(|=|:|,|\n|\r|\t)", string[kpos:], 1)
# the delimiter should be included in the returned list
if len(split_parts) < 3:
break
keyword = split_parts[0]
delimiter = split_parts[1]
rest = split_parts[2]
value_str = ""
vpos = kpos + len(keyword) + 1
end_delimiter = False
# Figure out the end of the value
# If the delimiter is ':' or '=', the end of the value is the end
# of the string or a ', '
if delimiter in ("=", ":"):
value_parts = re.split(r"( |,)", rest, 1)
value_str = value_parts[0] if len(value_parts) == 3 else rest
next_kpos = vpos + len(value_str)
elif delimiter == "(":
# Find the next ')'
level = 1
index = 0
value_parts = re.split(r"(\(|\))", rest)
for val in value_parts:
if val == "(":
level += 1
elif val == ")":
level -= 1
index += len(val)
if level == 0:
break
if level != 0:
raise ParseError(
"Error parsing string {}: "
"Failed to find matching ')' in {}".format(string, rest)
)
value_str = rest[: index - 1]
next_kpos = vpos + len(value_str) + 1
end_delimiter = True
elif delimiter in (",", "\n", "\t", "\r"):
# key with no value
next_kpos = kpos + len(keyword)
vpos = -1
key, val = self._decoders.decode(keyword, value_str)
meta = KeyMetadata(
kpos=kpos,
vpos=vpos,
kstring=keyword,
vstring=value_str,
end_del=end_delimiter,
)
self._keyval.append(KeyValue(key, val, meta))
kpos = next_kpos
def _kv_decoder(decoders, key, value):
"""A key-value decoder that extracts nested key-value pairs and returns
them in a dictionary
Args:
decoders (KVDecoders): the KVDecoders to use.
key (str): the keyword we're decoding.
value (str): the value string to decode.
"""
parser = KVParser(decoders)
parser.parse(value)
return key, {kv.key: kv.value for kv in parser.kv()}
default_kv_decoder = functools.partial(_kv_decoder, None)