# -*- coding:utf-8 -*-
"""Provides classes for the JSONPointer definition in
accordance to RFC6901 and relative pointers draft-1/2018.
"""
from __future__ import absolute_import
from __future__ import print_function
import re
import copy
from jsondata import V3K, ISSTR, JSONPointerError, JSONPointerTypeError, \
C_SHALLOW, C_DEEP, \
M_FIRST, M_LAST, \
rtypes2num, RT_LST, RT_JSONPOINTER, \
verify2num, V_FINAL, V_STEPS
__author__ = 'Arno-Can Uestuensoez'
__maintainer__ = 'Arno-Can Uestuensoez'
__license__ = "Artistic-License-2.0 + Forced-Fairplay-Constraints"
__copyright__ = "Copyright (C) 2015-2016 Arno-Can Uestuensoez" \
" @Ingenieurbuero Arno-Can Uestuensoez"
__version__ = '0.2.21'
__uuid__ = '63b597d6-4ada-4880-9f99-f5e0961351fb'
if V3K:
unicode = str
from urllib.parse import unquote as url_unquote # @UnresolvedImport @UnusedImport pylint: disable=import-error
else:
from urllib import unquote as url_unquote # @UnresolvedImport @Reimport pylint: disable=import-error
# Sets display for inetractive JSON/JSONschema design.
_interactive = False
from jsondata import NOTATION_JSON, NOTATION_JSON_REL, NOTATION_HTTP_FRAGMENT
VALID_NODE_TYPE = (
dict,
list,
str,
unicode,
int,
float,
bool,
None,
) #: Valid types of in-memory JSON node types.
CHARSET_UTF = 0 #: Unicode.
CHARSET_STR = 1 #: Python string.
#
# relative JSON Pointer - [RELPOINTER] draft-handrews-relative-json-pointer-01
#
# 1: integer offset
# and 3: fetch key
# and 4: fetch node @relpath
# and 5: offset only - top document
#
# else: error
#
if V3K:
_RELPOINTER = re.compile(r"(0|[1-9][0-9]*)(([#]$)|(/.*$)|($))") # 1 +(3 | 4 | 5)
else:
_RELPOINTER = re.compile(unicode(r"(0|[1-9][0-9]*)(([#]$)|(/.*$)|($))")) # 1 +(3 | 4 | 5)
#: Unescaped character in reference-token [RFC6901]_
_unescaped = re.compile(r'/|~[^01]')
_privattr = {
'isfragment': '__isfragment',
'isrel': '__isrel',
'raw': '__raw',
'relupidx': '__relupidx',
'start': '__start',
'startrel': '__startrel',
} #: private attributes accessible by __steattr__ and __getattr__ only
[docs]def fetch_pointerpath(node, base, restype=M_FIRST):
"""Converts the address of *node* within the data structure
*base* into a corresponding pointer path.
The current implementation is search based, thus
may cause performance issues when frequently applied,
or processing very large structures.
For example: ::
nodex = {'a': 'pattern'}
data = {0: nodex, 1: [{'x':[nodex}]]}
res = fetch_pointerpath(nodex, data)
res = [
[0],
[1, 0, 'x', 0]
]
Args:
**node**:
Address of Node to be searched for.
**base**:
A tree top node to search the subtree for node.
**restype**:
Type of search. ::
M_FIRST: The first match only.
M_LAST: The first match only.
M_ALL: All matches.
Returns:
Returns a list of lists, where the contained
lists are pointer pathlists for matched elements.
* restype:=M_FIRST: '[[<first-match>]]',
* restype:=M_LAST: '[[<last-match>]]',
* restype:=M_ALL: '[[<first-match>],[<second-match>],...]'
Raises:
JSONDataError
"""
if not node or not base:
return []
if isinstance(base, JSONData):
base = base.data
spath = []
res = []
kl = 0
if type(base) is list: # first layer - list of elements
kl = 0
if id(node) == id(base): # top node
res.append([kl])
else:
for sx in base:
if id(node) == id(sx):
s = spath[:]
s.append(kl)
res.append(s)
elif type(sx) in (dict, list):
sublst = fetch_pointerpath(node, sx, restype)
if sublst:
for slx in sublst:
# TODO: update the documentation-scanner: res.append([kl, *slx])
_l = [kl]
_l.extend(slx)
res.append(_l)
elif type(sx) in (tuple, set,):
raise JSONPointerError(sx)
kl += 1
elif type(base) is dict: # first layer - dict of elements
if id(node) == id(base): # top node
res.append([''])
else:
for k, v in base.items():
if id(node) == id(v):
spath.append(k)
res.append(spath)
continue
elif type(v) in (list, dict):
sublst = fetch_pointerpath(node, v, restype)
if sublst:
for slx in sublst:
if slx:
# TODO: update the documentation-scanner: res.append([k, *slx])
_l = [k]
_l.extend(slx)
res.append(_l)
elif type(v) in (tuple, set,):
raise JSONPointerError(v)
elif type(base) in (tuple, set,):
raise JSONPointerError(base)
if res and restype == M_FIRST:
return [res[0]]
elif res and restype == M_LAST:
return [res[-1]]
return res
[docs]class JSONPointer(list):
"""Represents exactly one JSONPointer in compliance with
IETF RFC6901 and relative-pointer/draft-1/2018
"""
VALID_INDEX = re.compile('0|[1-9][0-9]*$')
"""Regular expression for valid numerical index."""
[docs] def __init__(self, ptr, **kargs):
"""Normalizes and stores a JSONPointer. The internal
representation depends on the type.
* absolute path:
A list of ordered items representing
the path items.
* relative path:
Relative paths in addition provide
a positive numeric offset of outer
containers [RELPOINTER]_.
Processes the ABNF of a JSON Pointer from RFC6901
and/or a relative JSON Pointer(draft 2018).
Attributes:
For details see manuals.
* *isfragment*
* *isrel*
* *raw*
* *start*
* *startrel*
Args:
**ptr**:
A JSONPointer to be represented by this object. The
supported formats are:
.. parsed-literal::
ptr := (
JSONPointer # [RFC6901]_ or [RELPOINTER]_
| <rfc6901-string> # [RFC6901]_
| <relative-pointer-string> # [RELPOINTER]_
| <pointer-items-list> # non-URI-fragment pointer path items of [RFC6901]_
)
JSONPointer:
A valid object, is copied into this object,
see 'deep'. Supports *rfc6901* [RFC6901]_
and *relative* pointers [RELPOINTER]_.
*rfc6901-string*:
A string i accordance to RFC6901 [RFC6901]_.
*relative-pointer-string*:
Draft standard, currently
experimental [RELPOINTER]_.
*pointer-items-list*:
Expects a path list, where each item
is processed for escape and unquote.
Supports *rfc6901* pointers [RFC6901]_.
Containing:
* absolute JSON Pointer
* relative JSON Pointer, requires the
keyword argument *startrel*
kargs:
**debug**:
Enable debugging.
**deep**:
Applies for copy operations on structured data
'deep' when 'True', else 'shallow' only.
Flat data types are copied by value in any case.
**node**:
Force to set the pointed node in the internal cache.
**replace**:
Replace masked characters, is applied onto the *ptr*
parameter only. For the replacement of *startrel*
create and pass an object *JSONPointer*. ::
replace := (
True # replaces rfc6901 escape sequences: ~0 and ~1
| False # omit unescaping
)
.. note::
Match operations on address strings are proceeded literally,
thus the escaped characters should be consistent,
see rfc6901, Section 3.
default := False
**startrel**:
Start node for relative JSON Pointers. Is evaluated
only in combination with a relative path, else
ignored. ::
startrel := (
JSONPointer # supports [RFC6901]_ and [RELPOINTER]_
| <rfc6901-string> # supports [RFC6901]_
| <rel-pointer-string> # supports only relative to whole-document '0/...'
)
default := "" # whole document
Returns:
When successful returns *True*, else returns either *False*, or
raises an exception.
Success is the complete addition only, thus one failure returns
*False*.
Raises:
JSONPointerError:
"""
self.debug = kargs.get('debug', False)
self.deep = deep = kargs.get('deep', False)
self.node = kargs.get('node', None) # cache for reuse
self.__startrel = kargs.get('startrel', '') # default whole document
replace = kargs.get('replace', False)
super(JSONPointer, self).__init__()
if type(ptr) in (int, float): # pointer are unicode only
ptr = unicode(ptr) # is relative pointer
self.__raw = ptr
elif deep:
if type(ptr) in ISSTR:
self.__raw = ptr[:]
else:
self.__raw = copy.deepcopy(ptr)
else:
if type(ptr) in ISSTR:
self.__raw = ptr
elif isinstance(ptr, JSONPointer):
self.__raw = JSONPointer(ptr.get_raw(), startrel=ptr.get_startrel())
else:
self.__raw = copy.copy(ptr)
self.__isrel = False #: marks a relative pointer
self.__start = None
self.__isfragment = False #: marks a uri fragment
if type(ptr) in (list, tuple): # no-fragment rfc6901
self.extend(ptr)
return
if ptr in('', '#'): # shortcut for whole document, see RFC6901
# '#' is URI fragments RFC6901 section 6
return None
elif ptr == '/': # shortcut for empty tag at top-level, see RFC6901
self.append('')
return None
elif isinstance(ptr, ISSTR): # string in accordance to RFC6901
if ptr == '#/': # URI fragments RFC6901 section 6
self.__isfragment = True
self.append('')
return None
elif ptr.startswith('#'): # URI fragments RFC6901 section 6
self.__isfragment = True
ptr = url_unquote(ptr[1:])
elif ptr[0].isdigit(): # relative pointer - 2018/draft1 see doc [RELPOINTER]
#
# 1: integer offset
# 3: fetch key
# 4: fetch node @relpath
# 5: int
#
_m = _RELPOINTER.match(ptr)
if _m is None:
raise JSONPointerError("Syntax:" + str(ptr))
self.__isrel = True
if type(self.__startrel) not in (JSONPointer, list, str, unicode):
_sr = str(type(self.__startrel)) + " / " + str(self.__startrel)
if len(_sr) >200:
_sr = _sr[:200]
raise JSONPointerError("type not supported startrel=" + _sr)
#
# <int><jsonpointer> or <int>#
#
self.__relupidx = int(_m.group(1)) # <int> - upward increment
if self.__relupidx == None:
raise JSONPointerError("Cannot scan:" + str(ptr))
if _m.group(4): # <int><jsonpointer>
ptr = _m.group(4) # downward path
self.__isrelpathrequest = True
elif _m.group(3): # fetch-key/fetch-index
ptr = None
self.__isrelpathrequest = False
elif _m.group(5) != None: # <int> - whole rel-sub-document
ptr = None
self.__isrelpathrequest = True
else:
raise JSONPointerError("Cannot scan:" + str(ptr))
if type(self.__startrel) in ISSTR or type(self.__startrel) is list:
self.__startrel = JSONPointer(self.__startrel)
if len(self.__startrel) < self.__relupidx: # integer prefix-overflow
raise JSONPointerError(
"\ninteger prefix overflow:"
+ "\n prefix = " + str(self.__relupidx)
+ "\n len(startrel) = " + str(len(self.__startrel))
+ "\n startrel = " + str(self.__startrel)
)
elif len(self.__startrel) == self.__relupidx: # integer-prefix equel to offset
if self.__isrelpathrequest: # anchor for a relative path
self.__start = JSONPointer(self.__startrel[:(len(self.__startrel)-self.__relupidx)])
else: # request for key/index
raise JSONPointerError(
"\nkey/index request for <whole-document> prohibited by specification [RELPOINTER], see manuals:"
+ "\n prefix = " + str(self.__relupidx)
+ "\n len(startrel) = " + str(len(self.__startrel))
+ "\n startrel = " + str(self.__startrel)
)
elif len(self.__startrel) > self.__relupidx: # integer-prefix within offset
self.__start = JSONPointer(self.__startrel[:(len(self.__startrel)-self.__relupidx)])
else:
self.__start = JSONPointer('')
if ptr is not None:
self.extend(ptr.split('/'))
if len(self) == 1 or self[0] != '':
raise JSONPointerTypeError("requires a valid JSON pointer: " + str(ptr))
self.pop(0)
elif isinstance(ptr, JSONPointer): # copy constructor
if ptr.isrel():
self.__isrel = ptr.isrel()
self.__relupidx = ptr.get_relupidx()
self.__isrelpathrequest = ptr.isrelpathrequest()
if deep:
self.__raw = ptr.get_raw()[:]
if ptr.isrel():
self.__start = ptr.get_start().copy()
self.__startrel = ptr.get_startrel().copy()
self.extend(ptr.copy_path())
else:
self.__raw = ptr.get_raw()
if ptr.isrel():
self.__start = ptr.get_start().copy()
self.__startrel = ptr.get_startrel().copy()
self.extend(ptr)
elif type(ptr) is list:
# list of entries in accordance to RFC6901, and JSONPointer
def presolv(p0):
if isinstance(p0, JSONPointer): # copy constructor
return p0.ptr
elif p0 in ('', '/'):
return p0
elif type(p0) in (str, unicode):
return p0
elif type(p0) in (int, float):
return str(p0)
else:
raise JSONPointerError("Invalid nodepart:" + str(p0))
return p0
if deep:
self.extend(map(lambda s: s[:], ptr))
else:
self.extend(map(presolv, ptr))
self.__raw = '/' + '/'.join(self)
else:
if not ptr:
self.__raw = None
return None
raise JSONPointerError("Pointer type not supported:",
type(ptr))
def _rep0(x):
if type(x) in (str, unicode):
if x.isdigit():
return int(x)
return url_unquote(x).replace('~1', '/').replace('~0', '~')
return x
def _rep1(x):
if type(x) in (str, unicode):
if x.isdigit():
return int(x)
return x
if replace:
sx = [_rep0(x) for x in self]
else:
sx = [_rep1(x) for x in self]
del self[:]
self.extend(sx)
[docs] def __add__(self, x):
"""Appends a Pointer to self.
Args:
**x**:
A valid JSONPointer of type: ::
x := (
JSONPointer - fragment
| JSONPointer - relative-pointer
| relative pointer
)
Returns:
A new object of JSONPointer
Raises:
JSONPointerError:
"""
ret = JSONPointer(self)
# pointer are unicode only, RFC6901/RFC3829
try:
if type(x) in (str, unicode) and x[0] is '#':
x = x[1:]
except IndexError:
pass
if x == '': # whole document, RFC6901
pass
elif x == u'/': # empty tag
ret.__raw += x
ret.append('')
elif isinstance(x, JSONPointer):
ret.__raw += x.raw
ret.extend(x)
elif type(x) in (list, tuple,):
ret.__raw += u'/' + u'/'.join(x)
ret.extend(x)
elif type(x) in (str, unicode):
if x[0] == u'/':
ret.extend(x[1:].split('/'))
ret.__raw += x
else:
ret.extend(x.split('/'))
ret.__raw += u'/' + x
elif type(x) is int:
ret.append(x)
ret.__raw += u'/' + unicode(x)
elif x is None:
return ret
else:
raise JSONPointerError()
return ret
[docs] def __call__(self, x, *args, **kargs):
"""Evaluates the pointer value on the document.
Args:
**x**:
A valid JSON document.
Returns:
The resulting pointer value.
Raises:
JSONPointerError
"""
return self.evaluate(x, *args, **kargs)
def __delattr__(self, name):
raise NotImplementedError("delete: " + name)
[docs] def __eq__(self, x):
"""Compares this pointer with x.
Args:
**x**:
A JSONPointer object.
Returns:
True or False
Raises:
JSONPointerError
"""
if isinstance(x, JSONPointer):
return self.get_pointer_str(NOTATION_JSON) == x.get_pointer_str(NOTATION_JSON)
elif type(x) == list:
if not x:
return self.get_pointer_str(NOTATION_JSON) == u''
return self.get_pointer_str(NOTATION_JSON) == u'/' + u'/'.join(map(unicode, x))
elif type(x) in (str, unicode):
return self.get_pointer_str(forcenotation=NOTATION_JSON) == x
elif type(x) is int:
return self.get_pointer_str(forcenotation=NOTATION_JSON) == u'/' + unicode(x)
elif x is None:
return False
else:
raise JSONPointerError()
return False
[docs] def __ge__(self, x):
"""Checks containment(>=) of another pointer within this.
The weight of contained entries is the criteria, though
the shorter is the bigger. This is true only in case of
a containment relation.
The number of equal path pointer items is compared.
Args:
**x**:
A valid Pointer.
Returns:
True or False
Raises:
JSONPointerError:
"""
if isinstance(x, JSONPointer):
return super(JSONPointer, self).__le__(x)
elif type(x) in ISSTR:
return super(JSONPointer, self).__le__(JSONPointer(x))
else:
raise JSONPointerError()
[docs] def __gt__(self, x):
"""Checks containment(>) of another pointer or object within this.
The number of equal items is compared.
Args:
**x**:
A valid Pointer.
Returns:
True or False
Raises:
JSONPointerError:
"""
if isinstance(x, JSONPointer):
return super(JSONPointer, self).__gt__(x)
elif type(x) in ISSTR:
return super(JSONPointer, self).__lt__(JSONPointer(x))
else:
raise JSONPointerError()
[docs] def __iadd__(self, x):
"""Add in place x to self, appends a path.
Args:
**x**:
A valid Pointer.
Returns:
'self' with updated pointer attributes
Raises:
JSONPointerError:
"""
if type(x) == list:
self.__raw += unicode('/' + '/'.join(x))
self.extend(x)
elif isinstance(x, JSONPointer):
if x.raw[0] != u'/':
self.__raw += u'/' + x.raw
else:
self.__raw = x.raw
self.extend(x)
elif type(x) is int:
self.append(unicode(x))
self.__raw += u'/' + unicode(x)
elif x == '': # whole document, RFC6901
raise JSONPointerError("Cannot add the whole document")
elif x == u'/': # empty tag
self.__raw += x
self.append('')
elif type(x) in (str, unicode):
if x[0] == u'/':
self.extend(x[1:].split('/'))
self.__raw += x
else:
self.extend(x.split('/'))
self.__raw += u'/' + x
elif x is None:
return self
else:
raise JSONPointerError()
return self
[docs] def __le__(self, x):
"""Checks containment(<=) of this pointer within another.
The number of equal items is compared.
Args:
**x**:
A valid Pointer.
Returns:
True or False
Raises:
JSONPointerError:
"""
if isinstance(x, JSONPointer):
return super(JSONPointer, self).__ge__(x)
elif type(x) in ISSTR:
return super(JSONPointer, self).__ge__(JSONPointer(x))
else:
raise JSONPointerError()
[docs] def __lt__(self, x):
"""Checks containment(<) of this pointer within another.
The number of equal items is compared.
Args:
**x**:
A valid Pointer.
Returns:
True or False
Raises:
JSONPointerError:
"""
if isinstance(x, JSONPointer):
return super(JSONPointer, self).__gt__(x)
elif type(x) in ISSTR:
return super(JSONPointer, self).__gt__(JSONPointer(x))
else:
raise JSONPointerError()
[docs] def __ne__(self, x):
"""Compares this pointer with x.
Args:
**x**:
A valid Pointer.
Returns:
True or False
Raises:
JSONPointerError
"""
return not self.__eq__(x)
[docs] def __radd__(self, x):
"""Adds itself as the right-side-argument to the left.
This method appends 'self' to a path fragment on the left.
Therefore it adds the path separator on it's left side only.
The left side path fragment has to maintain to be in
accordance to RFC6901 by itself.
Once 'self' is added to the left side, it terminates it's
life cycle. Thus another simultaneous add operation is
handled by the resulting other element.
Args:
**x**:
A valid Pointer.
Returns:
The updated input of type 'x' as 'x+S(x)'
Raises:
JSONPointerError:
"""
if x == '': # whole document, RFC6901
return u'/' + u'/'.join(map(unicode, self))
elif x == u'/': # empty tag
return x + u'/' + u'/'.join(map(unicode, self))
elif type(x) is int:
return u'/' + unicode(x) + u'/' + u'/'.join(map(unicode, self))
elif type(x) in (str, unicode):
return x + u'/' + u'/'.join(map(unicode, self))
elif type(x) == list:
return x.extend(self)
else:
raise JSONPointerError()
return x
[docs] def __repr__(self):
"""Returns the attribute self.__raw, which is the raw input JSONPointer.
Args:
None
Attributes:
Evaluates *self.__isrel*
Returns:
For relative paths: ::
(<start-offset>, <pointer>)
start-offset := [<self.startrel>]
pointer := [<self>]
For RFC6901 paths: ::
<pointer>
pointer := [<self>]
Raises:
pass-through
"""
if self.__isrel:
ret = '(%s, %s)' % (
repr(self.__start),
unicode(super(JSONPointer, self).__repr__()),
)
else:
ret = super(JSONPointer, self).__repr__()
if ret == '':
return "''"
return ret
def __setattr__(self, name, value):
try:
self.__dict__[_privattr[name]] = value
except KeyError:
self.__dict__[name] = value
[docs] def __str__(self):
"""Returns the string for the processed path.
Args:
None
Attributes:
Evaluates *self.__isrel*
Returns:
For relative paths: ::
(<start-offset>, <pointer>)
start-offset := [<self.startrel>]
pointer := [<self>]
For RFC6901 paths: ::
<pointer>
pointer := [<self>]
Raises:
pass-through
"""
if self.__isrel:
if self.__start:
ret = "%s" % (
'/' + '/'.join((str(x) for x in self.__start)) + '/' + '/'.join((str(x) for x in self))
)
elif self:
ret = "%s" % ('/' + '/'.join((str(x) for x in self)))
else:
ret = '""'
else:
if self:
ret = "%s" % ('/' + '/'.join((str(x) for x in self)))
else:
ret = '""'
if ret == '':
return "''"
return ret
[docs] def check_node_or_value(self, jsondata, parent=False):
"""Checks the existence of the corresponding node
within the JSON document.
Args:
**jsondata**:
A valid JSON data node.
**parent**:
If *True* returns the parent node of the pointed value.
Returns:
True or False
Raises:
JSONPointerError:
pass-through
"""
if self == []: # special RFC6901, whole document
return not (not jsondata)
elif self == ['']: # special RFC6901, '/' empty top-tag
try:
return not ( not jsondata[''])
except KeyError:
return False
if not self.isvalid_nodetype(jsondata):
# concrete info for debugging for type mismatch
raise JSONPointerError("Invalid nodetype parameter:" +
str(type(jsondata)))
if parent:
_s = self[:-1]
else:
_s = self
for x in _s:
if isinstance(jsondata, dict):
jsondata = jsondata.get(x, False)
if not jsondata:
return False
elif isinstance(jsondata, list):
jsondata = jsondata[x]
if not jsondata:
return False
if not self.isvalid_nodetype(jsondata):
# concrete info for debugging for type mismatch
raise JSONPointerError("Invalid path nodetype:" +
str(type(jsondata)))
self.node = jsondata # cache for reuse
return True
def copy(self, **kargs):
"""Creates a copy of self.
Args:
None
kargs:
**deep**:
When *True* creates a deep copy,
else shallow.
Returns:
A copy of self.
Raises:
pass-through
"""
return JSONPointer(self, copydata=kargs.get('deep', C_DEEP))
[docs] def copy_path_list(self, parent=False):
"""Returns a deep copy of the objects pointer path list.
Args:
**parent**:
The parent node of the pointer path.
Returns:
A copy of the path list.
Raises:
none
"""
if self == []: # special RFC6901, whole document
return []
if self == ['']: # special RFC6901, '/' empty top-tag
return ['']
if parent:
return [ s[:] for s in self[:-1]]
else:
return [ s[:] for s in self[:]]
# if parent:
# return map(lambda s: s[:], self[:-1])
# else:
# return map(lambda s: s[:], self[:])
def __deepcopy__(self, memo):
# return JSONPointer(self[:], copydata=C_DEEP)
return JSONPointer(self[:])
def __getattr__(self, name):
try:
return self.__dict__[name]
except KeyError as e:
if V3K:
raise JSONPointerError(
"Unknown attribute: " + repr(e)
) # from None
else:
raise JSONPointerError(
"Unknown attribute: " + repr(e)
)
def get_key(self):
"""Get the resulting key for the pointer. In case
of a relative pointer as resulting from the processing
of the relative pointer and the starting node.
"""
if not self.__isrel: # pointer in accordance to rfc6901
if self == None:
# data is not initialized at all - basically impossible - anyhow
return None
elif not self:
# whole document - RFC6901
return ''
return self[-1]
#
# relative draft-1/2018
#
if self.__isrelpathrequest: # is get relpath request
if not self: # special - whole rel document - integer only
if not self.__start: # no offset
return None
return str(self.__start[-1]) # resulting offset only
return self[-1] # a valid pointer
else: # is get-key/index - request
# self is None in any case
if not self.__start: # the whole document
return None
elif len(self.__start) - 1 - self.__relupidx <= 0: # the whole document - fs-like for index-overflow
return None
return str(self.__start[len(self.__start) - 1 - self.__relupidx])
[docs] def get_node_and_child(self, jsondata):
"""Returns a tuple containing the parent node and self as the child.
Args:
**jsondata**:
A valid JSON data node.
Returns:
The the tuple: ::
(p, c):
p: Node reference to parent container.
c: Node reference to self as the child.
Raises:
JSONPointerError:
pass=through
"""
n = self(jsondata, True) # get parent
if len(self) == 1:
return n, None
try:
return n, self(jsondata, False)
except (IndexError, KeyError):
return n, None
raise JSONPointerError(self.__raw)
[docs] def get_node_and_key(self, jsondata):
"""Returns a tuple containing the parent node and the key of current.
Args:
**jsondata**:
A valid JSON data node.
Returns:
The the tuple: ::
(n, k):
n: Node reference to parent container.
k: Key for self as the child entry:
k := (
<list-index>
| <dict-key>
| None
)
list-index: 'int'
dict-key: 'UTF-8'
None: "for root-node"
Raises:
JSONPointerError:
pass-through
"""
n = self(jsondata, True)
if len(self) == 1 and self[0] == '':
return n, None
try:
return n, self[-1],
except (IndexError, KeyError):
return n, None
raise JSONPointerError(self.__raw)
[docs] def get_node_value(self, jsondata, cp=C_SHALLOW, **kargs):
"""Gets the copy of the corresponding node.
Relies on the standard package 'json'.
Args:
**jsondata**:
A valid JSON data node.
**cp**:
Type of returned copy. ::
cp := (
C_DEEP
| C_REF
| C_SHALLOW
)
kargs:
**valtype**:
Type of requested value.
Returns:
The copy of the node, see option *copy*.
Raises:
JSONPointerError
pass-through
"""
valtype = kargs.get('valtype', None)
_resroot = self.get_pointer(jsondata, superpose=True)
if _resroot == None: # basically impossible - anyhow
return None
if not _resroot: # == [] : special RFC6901, whole document
return jsondata
elif len(_resroot) == 0:
return jsondata # same as previous
elif len(_resroot) == 1:
if _resroot[0] == '':
return jsondata # the whole document
try:
return jsondata[_resroot[0]] #
except (KeyError, IndexError) as e:
if V3K:
raise JSONPointerError(
"Node(" + str(self.index(0)) + "):" + str(self[0]) + " of " + str(self) + ":" + str(e)
#
# TODO: update doctool for python3 introspection
#
) # from None
else:
raise JSONPointerError(
"Node(" + str(self.index(0)) + "):" + str(self[0]) + " of " + str(self) + ":" + str(e)
)
if not self.isvalid_nodetype(jsondata):
raise JSONPointerError("Invalid nodetype parameter:" +
str(type(jsondata)))
try:
for x in _resroot:
if not isinstance(jsondata, (list, dict, JSONData)):
break
try:
jsondata = jsondata[x] # want the exception
except TypeError:
jsondata = jsondata[int(x)] # still want the exception
except Exception as e:
if V3K:
raise JSONPointerError(
"Node(" + str(self.index(x)) + "):" + str(x) + " of " + str(self) + ":" + str(e)
#
# TODO: update doctool for python3 introspection
#
) # from None
else:
raise JSONPointerError(
"Node(" + str(self.index(x)) + "):" + str(x) + " of " + str(self) + ":" + str(e)
)
if valtype: # requested value type
# fix type ambiguity for numeric
if valtype in (int, float):
if jsondata.isdigit():
jsondata = int(jsondata)
elif valtype in (int, float):
if jsondata.isdigit():
jsondata = float(jsondata)
if not type(jsondata) is valtype:
raise JSONPointerError("Invalid path value type:" + str(
type(valtype)) + " != " + str(type(jsondata)))
else: # in general valid value types - RFC4729,RFC7951
if not self.isvalid_nodetype(jsondata):
raise JSONPointerError(
"Invalid path nodetype:"
+ str(type(jsondata)))
# self.node = jsondata # cache for reuse
if type(jsondata) in (dict, list,):
if cp == C_SHALLOW: # default
return copy.copy(jsondata)
elif cp == C_DEEP:
return copy.deepcopy(jsondata)
# C_REF
return jsondata
def get_node(self, jsondata):
"""Returns the existing node for the pointer,
calls transparently *JSONPointer.__call__*.
Args:
**jsondata**:
A valid JSON data node.
Returns:
The node reference.
Raises:
JSONPointerError:
pass-through
"""
return self.__call__(jsondata)
[docs] def get_node_exist(self, jsondata, parent=False):
"""Returns two parts, the exisitng node for valid part of
the pointer, and the remaining part of the pointer for
the non-existing sub-path.
This method works similar to the 'evaluate' method, whereas it
handles partial valid path pointers, which may also include
a '-' in accordance to RFC6902.
Therefore the non-ambiguous part of the pointer is resolved,
and returned with the remaining part for a newly create.
Thus this method is in particular foreseen to support the
creation of new sub data structures.
The 'evaluate' method therefore returns a list of two elements,
the first is the node reference, the second the list of the
remaining path pointer components. The latter may be empty in
case of a fully valid pointer.
Args:
**jsondata**:
A valid JSON data node.
**parent**:
Return the parent node of the pointed value.
Returns:
The node reference, and the remaining part.
ret:=(node, [<remaining-path-components-list>])
Raises:
JSONPointerError:
forwarded from json
"""
if super(JSONPointer, self).__eq__([]): # special RFC6901, whole document
return (jsondata, None)
if super(JSONPointer, self).__eq__(['']): # special RFC6901, '/' empty top-tag
return (jsondata[''], None)
if type(jsondata) not in (dict, list, JSONData):
# concrete info for debugging for type mismatch
raise JSONPointerError("Invalid nodetype parameter:" +
str(type(jsondata)))
remaining = None
try:
if parent:
for x in self[:-1]:
remaining = x
# want the exception, the keys within the process has to match
jsondata = jsondata[x]
else:
for x in self:
remaining = x
# want the exception, the keys within the process has to match
jsondata = jsondata[x]
except Exception:
if parent:
remaining = self[self.index(remaining):-1]
else:
remaining = self[self.index(remaining):]
else:
remaining = None
if type(jsondata) not in (dict, list):
# concrete info for debugging for type mismatch
raise JSONPointerError("Invalid path nodetype:" +
str(type(jsondata)))
self.node = jsondata # cache for reuse
return (jsondata, remaining,)
[docs] def get_path_list(self):
"""Gets for the corresponding path list of the object pointer for
in-memory access on the data of the 'json' package.
Args:
none
Returns:
The path list.
Raises:
none
"""
if __debug__:
if self.debug:
print(repr(self))
return list(self)
[docs] def get_path_list_and_key(self):
"""Gets for the corresponding path list of the object pointer for in-memory access on the data of the 'json' package.
Args:
none
Returns:
The path list.
Raises:
none
"""
if len(self) > 2:
return self[:-1], self[-1]
elif len(self) == 1:
return [], self[-1]
elif len(self) == 0:
return [], None
[docs] def get_pointer(self, jsondata=None, **kargs):
"""Gets the object pointer in compliance to RFC6901
or relative pointer/draft-01/2018.
The result is by default the assigned pointer itself without
verification. Similar in case of a relative pointer the start
offset is ignored by default and no verification is performed.
The following options modify this behaviour:
* superpose - superposes the *startrel* offset with the pointer
* verify - verifies the actual existence of the nodes and/or
intermediate nodes
The options could be applied combined.
Args:
kargs:
**returntype**:
Defines the return type. ::
returntype := (
RT_DEFAULT | 'default'
| RT_LST | 'list' | list
| RT_JSONPOINTER | 'jpointer'
)
**superpose**:
Is only relevant for relative paths. Superposes the offset
*startrel* with the pointer into the resulting final pointer.
By default nodes are not verified, see *verify* parameter.
default := True
**verify**:
Verifies the "road" of the superposed pointers. ::
verify := (
V_DEFAULT | 'default'
| V_NONE | 'none' | None # no checks at all
| V_FINAL | 'final' # checks final result only
| V_STEPS | 'steps' # checks each intermediate directory
)
default := None
Returns:
The new pointer in a choosen format, see *returntype*.
Raises:
none
"""
returntype = rtypes2num[kargs.get('returntype')]
verify = verify2num[kargs.get('verify')]
superpose = kargs.get('superpose', True)
# returned pointer
_ptr = JSONPointer(self)
if self.__isrel:
if superpose:
# - superpose the rfc6901 and the startrel pointers
#
# first calculate by ignoring jsondata,
# but checks consistency of integer-prefix
# and start-node-pointer
#
# depth from pointer root to start with __startrel,
# calc whether int-prefix larger than the length of the offset
ix = len(self.__startrel) - self.__relupidx
if ix < 0:
raise JSONPointerError(
"offset is shorter than integer-index startrel:%s - int-idx=%s" % (
len(self.__startrel), self.__relupidx)
)
if ix == 0:
_ptr = self
else:
_ptr = self.__startrel[:ix]
_ptr.extend(self)
#
# now perform optional verification
#
if verify & V_FINAL:
# check final only
_chk = JSONPointer(_ptr)(jsondata)
elif verify & V_STEPS:
#
# check intermediate anchors by stepwise verification
#
# check offset pointer , and get node
_chk = JSONPointer(self.__startrel)(jsondata)
# check int-prefix ends within document, and get node
_chk = JSONPointer(self.__relupidx + '#')(_chk)
# check pointer ends within document, and get node
_chk = JSONPointer(self)(_chk)
#
# cast return type
#
if returntype & RT_JSONPOINTER:
return _ptr
elif returntype & RT_LST:
return list(_ptr)
else:
raise JSONPointerError("Unknown return type: " + str(returntype))
[docs] def get_pointer_and_key(self, jsondata=None, **kargs):
"""Get the resulting pointer and key from the processing of
the pointer and the optional starting node *stratrel*.
"""
ret = self.get_pointer(jsondata, **kargs)
k = ret.pop()
return (ret, k,)
[docs] def get_pointer_str(self, jsondata=None, **kargs):
"""Gets the objects pointer string in compliance to RFC6901
or relative pointer/draft-01/2018.
The result is by default the assigned pointer itself without
verification. Similar in case of a relative pointer the start
offset is ignored by default and no verification is performed.
The following options modify this behaviour:
* superpose - superposes the *startrel* offset with the pointer
* verify - verifies the actual existence of the nodes and/or
intermediate nodes
The options could be applied combined.
Args:
kargs:
**forcenotation**:
Force the output notation for string representation to: ::
forcenotation := (
NOTATION_NATIVE # original format with unescape
| NOTATION_JSON # transform to resulting pointer
| NOTATION_HTTP_FRAGMENT # return a fragment with encoding
| NOTATION_JSON_REL # resulting relative pointer
| NOTATION_RAW # raw input
)
default := NOTATION_NATIVE
**REMINDER**: Applicable for return type string only.
**superpose**:
Is only relevant for relative paths. Superposes the offset
*startrel* with the pointer into the resulting final pointer.
By default nodes are not verified, see *verify* parameter.
Returns:
The new pointer in a choosen format, see *returntype*.
Raises:
none
"""
forcenotation = kargs.get('forcenotation')
superpose = kargs.get('superpose', False)
#
# notation is only relevant for string representation,
# while the internal technical representation is almost neutral
#
if forcenotation == NOTATION_JSON:
if self.__isrel:
if self.__isrelpathrequest:
ret = self.__start.copy()
else:
return self.__start + '#'
else:
if super(JSONPointer,self).__eq__([]):
return '""'
ret = '/'
elif forcenotation == NOTATION_HTTP_FRAGMENT:
if self.__isrel:
if self.__isrelpathrequest:
ret = self.__start
else:
return self.__start
else:
return '/'
elif forcenotation == NOTATION_JSON_REL:
if self.__isrel:
if self.__isrelpathrequest:
ret = self.__start
else:
return self.__start
else:
return '/'
else: # NOTATION_NATIVE
#
# set the appropriate prefix:
# rfc6901-pointer, rfc6901-uri-fragment-pointer,
# relpath-draft-1
#
if self.__isfragment: # rfc6901 - uri-fragment
if not self: # special - whole document
ret = '#'
else: # a subpath
ret = '#/'
elif self.__isrel: # relpointer
if self.__isrelpathrequest: # is get relpath request
if superpose:
ix = len(self.__startrel) - self.__relupidx
if ix < 0:
raise JSONPointerError(
"offset is shorter than integer-index startrel:%s - int-idx=%s" % (
len(self.__startrel), self.__relupidx)
)
if ix == 0:
_ptr = self
else:
_ptr = self.__startrel[:ix]
_ptr.extend(self)
else:
_ptr = self
if super(JSONPointer, self).__eq__([]):
if _ptr:
return unicode('/' + '/'.join(map(unicode, _ptr)) + '/')
else:
return unicode('""')
else:
return unicode('/' + '/'.join(map(unicode, _ptr)))
else: # is get-key/index - request
return str(self.__relupidx) + '#'
else: # rfc6901 - in-document-absolute pointer
if not self: # ==[] : special RFC6901, whole document
return ''
ret = '/'
if len(self) == 1 and self[0] == '': # special RFC6901, '/' empty top-tag
return '/'
return unicode(ret + '/'.join(map(unicode, self)))
[docs] def get_raw(self):
"""Gets the objects raw 6901-pointer.
Args:
none
Returns:
The raw path.
Raises:
none
"""
try:
return self.__raw
except KeyError:
return None
[docs] def get_relupidx(self):
"""Returns the resulting integer prefix.
"""
try:
return self.__relupidx
except KeyError:
return None
[docs] def get_start(self):
"""Returns the resulting start pointer after the
application of the integer prefix.
"""
try:
return self.__start
except KeyError:
return None
[docs] def get_startrel(self):
"""Returns the raw start pointer.
"""
try:
return self.__startrel
except KeyError:
return None
[docs] def evaluate(self, jsondata, parent=False):
"""Gets the value resulting from the current pointer.
Args:
**jsondata**:
A JSON data node. ::
jsondata := (
JSONData
| list
| dict
)
**parent**:
Return the parent node of the pointed value.
When parent is selected, the pointed child node
is not verified.
Returns:
The referenced value.
Raises:
JSONPointerError:
pass-through
"""
#
# calc starting node
#
if self.__isrel:
# relative draft-1/2018
# check whether pointed to a valid node
if not self.__startrel.check_node_or_value(jsondata):
raise JSONPointerError('node does not exist:"' + str(self.__startrel) + '"')
# pointer is getrelpath request
if self.__isrelpathrequest:
if self.__start: # already pre-calculated
_startnode = self.__start.evaluate(jsondata)
else:
_startnode = jsondata
if not self: # special - whole rel document - integer only
return _startnode
# pointer is get-key/index - request
else:
return self.__start.get_key()
elif self == []:
# special RFC6901, whole document
return jsondata
#
# REMEMBER: special RFC6901, '/""' empty top-tag
# section 5 / pg. 5
#
elif len(self) == 1 and self[0] == '':
try:
return jsondata['']
except KeyError as e:
raise JSONPointerError("""Non-existent node(see RFC6901 - chap 5): '/""'""")
else:
# a standard rfc6901 pointer
_startnode = jsondata
if type(_startnode) not in (dict, list, JSONData):
# concrete info for debugging for type mismatch
if self.__isrel:
raise JSONPointerError(
"Invalid nodetype parameter: %s"
"\n pointer = %s"
"\n startrel = %s"
"\n start = %s"
"\n jsondata = %s"
"\n startnode = %s"
% (
str(type(_startnode)),
str(self),
str(self.__startrel),
str(self.__start),
str(jsondata),
str(_startnode),
))
else:
raise JSONPointerError(
"Invalid nodetype parameter:" +
str(type(_startnode)))
try:
if parent:
for x in self[:-1]:
# want the exception, the keys within the process has to match
try:
_startnode = _startnode[x]
except TypeError as e:
if x == '-':
_startnode = _startnode[-1]
else:
try:
# special python - reverse index
_startnode = _startnode[int(x)]
except ValueError:
# now safe to give it up
# raise e
raise JSONPointerError(repr(e))
else:
for x in self:
try:
# standard index 0<=x<infinite
_startnode = _startnode[x]
except TypeError as e:
if x == '-':
# special rfc6901
_startnode = _startnode[-1]
else:
try:
# special python - reverse index
_startnode = _startnode[int(x)]
except ValueError:
# now safe to give it up
# raise e
raise JSONPointerError(repr(e))
except Exception as e:
if V3K:
if isinstance(_startnode, JSONData):
_startnode = _startnode.data
if type(_startnode) is dict:
_jdat = _startnode.keys()
elif type(_startnode) is list:
_jdat = '[0...' + str(len(_startnode) - 1) + ']'
else:
_jdat = _startnode
raise JSONPointerError(
"Requires existing Node(jsondata[" + str(self.index(x)) + "]):"
+ '"' + str(self) + '":' + repr(e) + ':jsondata(keys/indexes)=' + str(_jdat)
#
# TODO: update doctool for python3 introspection before re-activation
#
) # from None
else:
raise JSONPointerError(
"Requires existing Node(jsondata[" + str(self.index(x)) + "]):"
+ '"' + str(self) + '":' + repr(e) + ':jsondata=' + str(_startnode)
)
self.node = _startnode # cache for reuse
return _startnode
[docs] def isfragment(self):
"""Checks whether a http fragment."""
return self.__isfragment
[docs] def isrelpathrequest(self):
"""Checks whether a path request."""
try:
return self.__isrelpathrequest and self.__isrel
except KeyError:
return False
[docs] def isrel(self):
"""Checks whether a relative pointer."""
return self.__isrel
[docs] def isvalid_nodetype(self, x):
"""Checks valid node types of in-memory JSON data."""
return type(x) in VALID_NODE_TYPE_X or x == None
[docs] def isvalrequest(self):
"""Checks whether a value request."""
return not self.__isvalrequest and self.__isrel
[docs] def iter_path(self, jsondata=None, **kargs):
"""Iterator for the elements of the path pointer itself.
Args:
**jsondata**:
If provided a valid JSON data node, the
path components are successively verified on
the provided document.
kargs:
**parent**:
Uses the path pointer to parent node.
**rev**:
Reverse the order, start with last.
**superpose**:
Is only relevant for relative paths, for *rfc6901* defined
paths the parameter is ignored. When *True* superposes
the offset *startrel* with the pointer into the resulting
final pointer. By default nodes are not verified,
see *verify* parameter. ::
superpose := (
True # iterates resulting paths from *startrel*
| False # iterates the path only
)
default := True
Returns:
Yields the iterator for the current path pointer
components.
Raises:
JSONPointerError:
forwarded from json
"""
parent=kargs.get('parent', False)
rev=kargs.get('rev', False)
superpose=kargs.get('superpose', True)
_ptr = self.get_pointer(jsondata, superpose=superpose)
if self == []: # special RFC6901, whole document
yield ''
elif self == ['']: # special RFC6901, '/' empty top-tag
yield '/'
else:
if rev: # reverse
if parent: # for parent
ptrpath = _ptr[:-1:-1]
else: # full path
ptrpath = _ptr[::-1]
else:
if parent: # for parent
ptrpath = _ptr[:-1]
else: # full path
ptrpath = _ptr
try:
x = ptrpath[0]
for x in ptrpath:
if jsondata:
# want the exception, the keys within the process has to match
jsondata = jsondata[x]
yield x
except Exception as e:
if V3K:
raise JSONPointerError(
"Node(" + str(ptrpath.index(x)) +"):" + str(x) + " of " +
str(self) + ":" + repr(e)
#
# TODO: update doctool for python3 introspection
#
) # from None
else:
raise JSONPointerError(
"Node(" + str(ptrpath.index(x)) +"):" + str(x) + " of " +
str(self) + ":" + repr(e)
)
self.node = jsondata # cache for reuse
[docs] def iter_path_nodes(self, jsondata, parent=False, rev=False):
"""Iterator for the elements the path pointer points to.
Args:
**jsondata**:
A valid JSON data node.
**parent**:
Uses the path pointer to parent node.
**rev**:
Reverse the order, start with last.
Returns:
Yields the iterator of the current node reference.
Raises:
JSONPointerError:
forwarded from json
"""
if self == []: # special RFC6901, whole document
yield jsondata
elif self == ['']: # special RFC6901, '/' empty top-tag
yield jsondata['']
else:
if rev: # reverse
if parent: # for parent
ptrpath = self[:-1:-1]
else: # full path
ptrpath = self[::-1]
else:
if parent: # for parent
ptrpath = self[:-1]
else: # full path
ptrpath = self
try:
x = ptrpath[0]
for x in ptrpath:
# want the exception, the keys within the process has to match
jsondata = jsondata[x]
yield jsondata
except Exception as e:
if V3K:
raise JSONPointerError(
"Node(" + str(ptrpath.index(x)) +"):" + str(x) + " of " +
str(self) + ":" + repr(e)
#
# TODO: update doctool for python3 introspection
#
) # from None
else:
raise JSONPointerError(
"Node(" + str(ptrpath.index(x)) +"):" + str(x) + " of " +
str(self) + ":" + repr(e)
)
self.node = jsondata # cache for reuse
[docs] def iter_path_subpaths(self, jsondata=None, parent=False, rev=False):
"""Successive iterator for the resulting sub-paths the
path pointer itself.
Args:
**jsondata**:
If provided a valid JSON data node, the
path components are successively verified on
the provided document.
**parent**:
Uses the path pointer to parent node.
**rev**:
Reverse the order, start with last.
Returns:
Yields the iterator for the copy of the current
slice of the path pointer.
Raises:
JSONPointerError:
forwarded from json
"""
if self == []: # special RFC6901, whole document
yield ''
elif self == ['']: # special RFC6901, '/' empty top-tag
yield '/'
else:
curpath = []
if rev: # reverse
if parent: # for parent
ptrpath = self[:-1:-1]
else: # full path
ptrpath = self[::-1]
else:
if parent: # for parent
ptrpath = self[:-1]
else: # full path
ptrpath = self
try:
x = ptrpath[0]
for x in ptrpath:
if jsondata:
# want the exception, the keys within the process has to match
jsondata = jsondata[x]
curpath.append(x)
yield curpath[:]
except Exception as e:
if V3K:
raise JSONPointerError(
"Node(" + str(ptrpath.index(x)) +"):" + str(x) + " of " +
str(self) + ":" + repr(e)
#
# TODO: update doctool for python3 introspection
#
) # from None
else:
raise JSONPointerError(
"Node(" + str(ptrpath.index(x)) +"):" + str(x) + " of " +
str(self) + ":" + repr(e)
)
[docs] def iter_path_subpathdata(self, jsondata=None, parent=False, rev=False):
"""Successive iterator for the resulting sub-paths and the
corresponding nodes.
Args:
**jsondata**:
If provided a valid JSON data node, the
path components are successively verified on
the provided document.
**parent**:
Uses the path pointer to parent node.
**rev**:
Reverse the order, start with last.
Returns:
Yields the iterator for the tuple of the current
slice of the path pointer and the reference of the
corresponding node. ::
(<path-item>, <sub-path>, <node>)
path-item: copy of the item
sub-path: copy of the current subpath
node: reference to the corresponding node
Raises:
JSONPointerError:
forwarded from json
"""
if self == []: # special RFC6901, whole document
yield ''
elif self == ['']: # special RFC6901, '/' empty top-tag
yield '/'
else:
curpath = []
if rev: # reverse
if parent: # for parent
ptrpath = self[:-1:-1]
else: # full path
ptrpath = self[::-1]
else:
if parent: # for parent
ptrpath = self[:-1]
else: # full path
ptrpath = self
try:
x = ptrpath[0]
for x in ptrpath:
if jsondata:
# want the exception, the keys within the process has to match
jsondata = jsondata[x]
curpath.append(x)
yield (x, curpath[:], jsondata)
except Exception as e:
if V3K:
raise JSONPointerError(
"Node(" + str(ptrpath.index(x)) +"):" + str(x) + " of " +
str(self) + ":" + repr(e)
#
# TODO: update doctool for python3 introspection
#
) # from None
else:
raise JSONPointerError(
"Node(" + str(ptrpath.index(x)) +"):" + str(x) + " of " +
str(self) + ":" + repr(e)
)
self.node = jsondata # cache for reuse
from jsondata.jsondata import JSONData
VALID_NODE_TYPE_X = (
dict,
list,
str,
unicode,
int,
float,
bool,
None,
JSONData,)
"""Valid types of in-memory JSON node types for processing."""