Source code for opslib.icsutils.jsondiff

"""
JsonDiff: Library for JSON DIFF
-------------------------------

+------------------------+-------------+
| This is the JsonDiff common library. |
+------------------------+-------------+
"""

try:
    import json
except ImportError:
    import simplejson as json

from opslib.icsexception import IcsException

import logging
log = logging.getLogger(__name__)


[docs]def is_scalar(value): """ Primitive version, relying on the fact that JSON cannot contain any more complicated data structures. """ return not isinstance(value, (list, tuple, dict))
[docs]class Comparator(object): """ Main workhorse for JSON Comparator """
[docs] def __init__(self, fp1=None, fp2=None, include=[], exclude=[], ignore_add=False): """ :type fp1: object :param fp1: file object (opened with read permission) :type fp2: object :param fp2: file object (opened with read permission) :type include: list :param include: a list of attributes to include in the comparison :type exclude: list :param exclude: a list of attributes to exclude in the comparison :type ignore_add: bool :param ignore_add: whether to ignore the added items in the comparison **Example:** >>> from opslib.icsutils.jsondiff import Comparator >>> import json >>> old_json = { ... "name": "opslib", ... "version": "1.2.0", ... "members": { ... "role": "ops", ... "group": [ "ops", "devops" ] ... } ... } >>> new_json = { ... "name": "opslib", ... "version": "1.3.0", ... "members": { ... "role": "devops", ... "group": [ "devops" ] ... } ... } >>> json.dump(old_json, open("old.json", "w")) >>> json.dump(new_json, open("new.json", "w")) >>> fp_old = open("old.json", "r") >>> fp_new = open("new.json", "r") >>> engine = Comparator(fp_old, fp_new) >>> res = engine.compare_dicts() >>> print json.dumps(res, sort_keys=True, indent=4) { "members": { "group": { "0": { "+++": "devops", "---": "ops" }, "1": { "---": "devops" } }, "role": { "+++": "devops", "---": "ops" } }, "version": { "+++": "1.3.0", "---": "1.2.0" } } """ self.obj1 = None self.obj2 = None if fp1: try: self.obj1 = json.load(fp1) except (TypeError, OverflowError, ValueError), exc: raise IcsException("Cannot decode object from JSON.\n%s" % unicode(exc)) if fp2: try: self.obj2 = json.load(fp2) except (TypeError, OverflowError, ValueError), exc: raise IcsException("Cannot decode object from JSON\n%s" % unicode(exc)) self.excluded_attributes = [] self.included_attributes = [] self.ignore_added = False if include: self.included_attributes = include or [] if exclude: self.excluded_attributes = exclude or [] if ignore_add: self.ignore_added = ignore_add or False
[docs] def _is_incex_key(self, key, value): """Is this key excluded or not among included ones? If yes, it should be ignored.""" key_out = ((self.included_attributes and (key not in self.included_attributes)) or (key in self.excluded_attributes)) value_out = True if isinstance(value, dict): for change_key in value: if isinstance(value[change_key], dict): for key in value[change_key]: if ((self.included_attributes and (key in self.included_attributes)) or (key not in self.excluded_attributes)): value_out = False return key_out and value_out
[docs] def _filter_results(self, result): """Whole -i or -x functionality. Rather than complicate logic while going through the object's tree we filter the result of plain comparison. Also clear out unused keys in result""" out_result = {} for change_type in result: temp_dict = {} for key in result[change_type]: log.debug("change_type = %s", change_type) if self.ignore_added and (change_type == "+++"): continue log.debug("result[change_type] = %s, key = %s", unicode(result[change_type]), key) log.debug("self._is_incex_key = %s", self._is_incex_key( key, result[change_type][key])) if not self._is_incex_key(key, result[change_type][key]): temp_dict[key] = result[change_type][key] if len(temp_dict) > 0: out_result[change_type] = temp_dict return out_result
[docs] def _compare_elements(self, old, new): """Unify decision making on the leaf node level.""" res = None # We want to go through the tree post-order if isinstance(old, dict): res_dict = self.compare_dicts(old, new) if (len(res_dict) > 0): res = res_dict # Now we are on the same level # different types, new value is new elif (type(old) != type(new)): res = {'---': old, '+++': new} # recursive arrays # we can be sure now, that both new and old are # of the same type elif (isinstance(old, list)): res_arr = self._compare_arrays(old, new) if (len(res_arr) > 0): res = res_arr # the only thing remaining are scalars else: scalar_diff = self._compare_scalars(old, new) if scalar_diff is not None: res = scalar_diff return res
[docs] def _compare_scalars(self, old, new, name=None): """ Be careful with the result of this function. Negative answer from this function is really None, not False, so deciding based on the return value like in if self._compare_scalars(...): leads to wrong answer (it should be if self._compare_scalars(...) is not None:) """ # Explicitly excluded arguments if old != new: return {'---': old, '+++': new} else: return None
[docs] def _compare_arrays(self, old_arr, new_arr): """ simpler version of compare_dicts; just an internal method, because it could never be called from outside. We have it guaranteed that both new_arr and old_arr are of type list. """ inters = min(len(old_arr), len(new_arr)) # this is the smaller length result = { u"+++": {}, u"---": {}, } for idx in range(inters): res = self._compare_elements(old_arr[idx], new_arr[idx]) if res is not None: result[idx] = res # the rest of the larger array if (inters == len(old_arr)): for idx in range(inters, len(new_arr)): result[idx] = {u'+++': new_arr[idx]} else: for idx in range(inters, len(old_arr)): result[idx] = {u'---': old_arr[idx]} # Clear out unused keys in result out_result = {} for key in result: if len(result[key]) > 0: out_result[key] = result[key] return self._filter_results(result)
[docs] def compare_dicts(self, old_obj=None, new_obj=None): """ The real workhorse """ if not old_obj and hasattr(self, "obj1"): old_obj = self.obj1 if not new_obj and hasattr(self, "obj2"): new_obj = self.obj2 old_keys = set() new_keys = set() if old_obj and len(old_obj) > 0: old_keys = set(old_obj.keys()) if new_obj and len(new_obj) > 0: new_keys = set(new_obj.keys()) keys = old_keys | new_keys result = { u"+++": {}, u"---": {}, } for name in keys: # old_obj is missing if name not in old_obj: result[u'+++'][name] = new_obj[name] # new_obj is missing elif name not in new_obj: result[u'---'][name] = old_obj[name] else: res = self._compare_elements(old_obj[name], new_obj[name]) if res is not None: result[name] = res return self._filter_results(result) # vim: tabstop=4 shiftwidth=4 softtabstop=4
Read the Docs v: latest
Versions
latest
Downloads
PDF
HTML
Epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.