Source code for API

"""
    Copyright 2020 Simon Vandevelde, Joost Vennekens
    This code is licensed under GNU GPLv3 license (see LICENSE) for more
    information.
    This file is part of the cDMN solver.
"""
from cdmn.parse_xml import XMLparser
from cdmn.glossary import Glossary
from cdmn.interpret import VariableInterpreter
from cdmn.idply import Parser
from cdmn.table_operations import (fill_in_merged, identify_tables,
                                   find_glossary, find_execute_method,
                                   replace_with_error_check,
                                   create_voc, create_main, create_struct,
                                   create_theory, find_auxiliary_variables,
                                   create_display, create_dependency_graph,
                                   get_dependencies)
import sys
from io import StringIO
import copy
from typing import Dict, List
from contextlib import redirect_stdout

try:
    from idp_engine import IDP
except ImportError:
    print("You need to install the 'idp_engine' package to use the API.")
    import sys
    sys.exit(1)


[docs] class DMNError(Exception): """ Base class for all DMN-related exceptions """ pass
[docs] class NotSatisfiableError(DMNError): """ Error thrown when unsat """
[docs] class Variable(): """ Class representing (c)DMN variables. On top of variable name, type, logical type and value, we also gather what dependencies (both upstream and downstream) a variable has. :param name: the name of the variable :type name: str :param type: if the variable is a constant, the type can be string, integer, float. If the variable is a boolean, the type is None. :type type: str :param logical_type: the variable type from a logical viewpoint. This can be boolean, constant, predicate or function. :type logical_type: str :param value: the value that a symbol has. :type value: str, int :param possible_values: a list containing possible values. This list is only relevant for symbols with type string. :type value: List[str] :param dependent_on: a Dict of symbols on which this variable depends, together with their 'dependency level' :type dependent_on: List[str] :param dependency_of: a Dict of symbols that depend on this variable, together with their 'dependency level' :type dependency_of: List[str] """ def __init__(self, name: str, var_type: str, logical_type: str, possible_values: List, dependent_on: Dict, dependency_of: Dict): self.name = name self.type = var_type self.logical_type = logical_type self._value = None self.possible_values = possible_values self.dependent_on = dependent_on self.dependency_of = dependency_of def __str__(self): return f"{self.name} {self.logical_type}, with value {self.value}" @property def value(self): return self._value @value.setter def value(self, new_value): if (self.possible_values and new_value and new_value not in self.possible_values): raise DMNError(f'Value {new_value} not in possible values for' f' {self.name}.\n Possible values:' f' {self.possible_values}') self._value = new_value def is_input(self): if len(self.dependent_on) == 0: return True else: return False def is_output(self): if len(self.dependency_of) == 0: return True else: return False
[docs] class DMN(): """ A class representing DMN objects. :param _specification: the xml specification :type _specification: str :param variables: a dict containing all the variables found in the specification :type variables: Dict[str]: :class:`cdmn.API.Variable` :param prop_variables: a dict containing the values of propagated variables :param dependency_tree: a dictionary containing for every variable what other variables define it :type dependency_tree: Dict[str]: List[str] :param _idp: a dictionary containing the IDP blocks :type _idp: Dict[str]: str """ def __init__(self, path: str = None, xml: str = None, auto_propagate: bool = False): """ Initializes the DMN object. There are two ways to input the DMN: * Using `path` by supplying a path to the DMN file. * Using `xml` by supplying the XML directly. By setting the specification, we also invoke _update_variables(). :param path: the path to a .dmn file :type path: str, optional :param xml: xml representing a DMN model :type xml: str, optional """ self._specification = '' self.variables: Dict[str, Variable] = {} self.prop_variables: Dict[str, Variable] = {} self.dependency_tree = '' self.auto_propagate = auto_propagate self._idp = {'voc': '', 'struct': '', 'theory': '', 'main': ''} if path and not xml: with open(path, 'r') as fp: self.specification = fp.read() elif xml and not path: self.specification = xml elif xml and path: raise DMNError('Cannot init DMN with both path and xml.') def __str__(self): msg = "Values of DMN specification: \n" for name, val in self.get_all_values().items(): msg += f"{name} = {val}\n" return msg @property def specification(self): """Getter for specification. :returns: the specification :rtype: str """ return self._specification @specification.setter def specification(self, spec: str): """ Setter method to set the specification. Additionally, this invokes the _update_variables() method. :param spec: the DMN specification in XML :type spec: str :returns: None """ self._specification = spec self._update_variables() @property def idp(self): """Getter for idp. :returns: the idp code :rtype: str """ return "".join(self._idp.values()) def _update_variables(self): """ Method to update the list of variables and meta-info. This method sets the following attributes: * _idp * variables It first parses the XML, then parses the resulting cDMN, after which it creates a variable for every symbol in the glossary. :returns: None """ # Parse XML, parse cDMN. xml_parser = XMLparser(self._specification) tables = xml_parser.get_tables() glossary = Glossary(find_glossary(tables)) i = VariableInterpreter(glossary) cdmn_parser = Parser(i) dep_graph = create_dependency_graph(tables, cdmn_parser) for symb in glossary.predicates: # Create new variable. var_type = symb.super_type if symb.super_type is not None else None if var_type: logical_type = 'constant' var_type = var_type.name else: logical_type = 'boolean' var_type = None if symb.super_type is not None and symb.super_type.possible_values: possible_values = symb.super_type.possible_values else: possible_values = None up_deps = get_dependencies(symb.name, dep_graph, downstream=False) down_deps = get_dependencies(symb.name, dep_graph, downstream=True) var = Variable(symb.name, var_type, logical_type, possible_values, up_deps, down_deps) self.variables[symb.name] = var # We don't set the main, as it depends on the inference method used. self._idp['voc'] = create_voc(glossary) self._idp['theory'] = create_theory(tables, cdmn_parser) self.prop_variables = self.variables if self.auto_propagate: self.propagate()
[docs] def set_value(self, variable: str, value): """ Set a variable's value. :param variable: the name of the variable :type variable: str :param value: the value for the variable :type value: str or int """ if value is True: value = 'true' elif value is False: value = 'false' self.variables[variable].value = value if self.auto_propagate: self.propagate() pass
[docs] def update_structure(self): """ Method to generate the structure. If a variable has been assigned a value, it should be included in the structure. This method also updates the struct value in _idp. :returns: the structure :rtype: str """ struct = "structure S: V{\n" for name, var in self.variables.items(): # If a value has been set, add it to the structure. if var.value: struct += f"{name} := {var.value}.\n" struct += "}\n" self._idp['struct'] = struct
[docs] def model_expand(self, models=10): """ Method to model expand the current system """ self.update_structure() idp = ''.join(self._idp.values()) idp += f'procedure main() {{\n\t pretty_print(model_expand(T, S, max={models}))}}' idp = IDP.from_str(idp) # We need to capture the idp output, which has the terminal as default # stdout. with open('/tmp/idp_temp.txt', mode='w', encoding='utf-8') \ as buf, redirect_stdout(buf): idp.execute() with open('/tmp/idp_temp.txt', mode='r', encoding='utf-8') as fp: output = fp.read() if "No models" in output: raise NotSatisfiableError('DMN model has no models.') return output
[docs] def minimize(self, term): """ Method to optimize """ if not term.endswith('()'): term += '()' self.update_structure() idp = ''.join(self._idp.values()) idp += (f'procedure main() {{\n\t pretty_print(Theory(T, S)' f'.optimize("{term}", True).assignments)}}') idp = IDP.from_str(idp) # We need to capture the idp output, which has the terminal as default # stdout. with open('/tmp/idp_temp.txt', mode='w', encoding='utf-8') \ as buf, redirect_stdout(buf): try: idp.execute() except Exception as e: if 'unbounded objectives' in str(e): return "-inf" else: raise e with open('/tmp/idp_temp.txt', mode='r', encoding='utf-8') as fp: output = fp.read() if "No models" in output: raise NotSatisfiableError('DMN model has no models.') return output
[docs] def maximize(self, term): """ Method to optimize """ if not term.endswith('()'): term += '()' self.update_structure() idp = ''.join(self._idp.values()) idp += (f'procedure main() {{\n\t pretty_print(Theory(T, S)' f'.optimize("{term}", False).assignments)}}') idp = IDP.from_str(idp) # We need to capture the idp output, which has the terminal as default # stdout. with open('/tmp/idp_temp.txt', mode='w', encoding='utf-8') \ as buf, redirect_stdout(buf): try: idp.execute() except Exception as e: if 'unbounded objectives' in str(e): return "+inf" else: raise e with open('/tmp/idp_temp.txt', mode='r', encoding='utf-8') as fp: output = fp.read() if "No models" in output: raise NotSatisfiableError('DMN model has no models.') return output
[docs] def propagate(self): """ Method to propagate. :returns: None :rtype: None :throws NotSatisfiableError: thrown when model resulted in unsat """ self.update_structure() idp = ''.join(self._idp.values()) idp += 'procedure main() {\n\t pretty_print(model_propagate(T, S))}' # We need to capture the idp output, which has the terminal as default # stdout. idp = IDP.from_str(idp) # We need to capture the idp output, which has the terminal as default # stdout. with open('/tmp/idp_temp.txt', mode='w', encoding='utf-8') \ as buf, redirect_stdout(buf): idp.execute() with open('/tmp/idp_temp.txt', mode='r', encoding='utf-8') as fp: output = fp.read() if "Not satisfiable" in output: raise NotSatisfiableError('DMN model not satisfiable') # Now that we have propagated, we check if any propagations happened. self.prop_variables = copy.deepcopy(self.variables) props = output.split('\n') for prop in props: # Constants. if '->' in prop: variable, value = prop.split(' -> ') variable = variable.strip('()') if variable in self.prop_variables: self.prop_variables[variable].value = value else: # If the variable is not in the variables list, it might be # because we changed the key to cDMN format. try: self.prop_variables[variable.replace('_', ' ')].value = value except KeyError: raise Exception('Internal error. Please report') # Booleans. elif '()' in prop: value = True if "Not " == prop[:4]: prop = prop[4:] value = False variable = prop.strip('()') if variable in self.prop_variables: self.prop_variables[variable].value = value else: # If the variable is not in the variables list, it might be # because we changed the key to cDMN format. try: self.prop_variables[variable.replace('_', ' ')].value = value except KeyError: raise Exception('Internal error. Please report')
[docs] def dependencies_of(self, var: str): """ Returns the list of dependencies of a variable. :param var: the name of the variable :type var: str :returns: list of variable dependencies :rtype: List[str] """ return self.variables[var].dependent_on
def type_of(self, var: str): return self.variables[var].type
[docs] def possible_values_of(self, var: str) -> List: """ Returns the possible values of a variable. Only strings have a set of possible values in DMN. :param var: the variable :type var: Variable :returns: list of possible values :rtype: List[str] """ return self.variables[var].possible_values
def value_of(self, var: str, prop: bool = True): if prop: return self.prop_variables[var].value else: return self.variables[var].value
[docs] def is_certain(self, var: str): """ Method to check if a variable is certain. A variable is certain if it has been given a value using `set_value`, or if it has been propagated a value (i.e. all of the symbols it depends on are also certain) :param var: the variable :type var: str :returns: whether the variable's value is certain :rtype: bool """ if self.prop_variables[var].value is None: return False else: return True
[docs] def get_inputs(self): """ Get a list of the input variables. :returns: the list of inputs :rtype: List[str] """ variables = [x for x, y in self.variables.items() if y.is_input()] return variables
[docs] def get_outputs(self): """ Get a list of the output variables. :returns: list of the output variables :rtype: List[str] """ return [x for x, y in self.variables.items() if y.is_output()]
[docs] def get_intermediary(self): """ Get a list of intermediary variables. An intermediary variable is a variable that is not :returns: list of intermediary variables :rtype: List[str] """ return [x for x, y in self.variables.items() if not y.is_input() and not y.is_output()]
[docs] def get_unknown_variables(self): """ Get a list of all variables with unknown values :returns: the list of variables that are still missing :rtype: List[str] """ return [x for x, y in self.prop_variables.items() if not y.value]
[docs] def get_certain_variables(self): """ Get a list of all the variables for which the value is known. :returns: list of variables for which the value is known :rtype: List[str] """ return [x for x, y in self.prop_variables.items() if y.value]
[docs] def missing_for(self, variable: str, ): """ Get a list of dependencies of varuable without known value. :param variable: :type variable: str :returns: list of variables needed that are still unknown :rtype: List[str] """ return [x for x in self.prop_variables[variable].dependent_on if self.prop_variables[x].value is None]
[docs] def get_all_values(self, propagated: bool = True): """ Get a dictionary mapping every variable on their value. If the value is not known, it is None. If propagated is True, then we also include the values of propagated variables. Else, we exclude them, and set them as None. :param propagated: True if propagated variables are included. :type propagated: bool :returns: dict mapping variables on their (propagated) values. :rtype: Dict[str, str] """ values = {} if propagated: values = {x: y.value for x, y in self.prop_variables.items()} else: values = {x: y.value for x, y in self.variables.items()} return values
[docs] def get_variable_names(self) -> List: """ Get all variable names. :returns: a list containing all variable names. :rtype: List[str] """ return list(self.variables.keys())
[docs] def clear(self): """ Reset all variables' values back to None. :returns: None :rtype: Nono """ for var in self.variables.keys(): self.variables[var].value = None self.prop_variables = copy.deepcopy(self.variables)