# -*- coding: utf-8 -*- ''' Top-down recursive descent parser for the Fuzzy Control Language (FCL). This is a bare-bones parser that just collects things as it goes through, then returns the file contents as a tuple - really a simple AST. I'm working from the draft IEC 61131-7 standard, but I have widened the grammar slightly in places, as usage seems to be more liberal. In particular, in blocks (variables, rules) I'm not fussy about the order of decls where it doesn't matter. Also, I've made the terminating semi-colon optional in most places (again reflecting usage). References: https://en.wikipedia.org/wiki/Fuzzy_Control_Language @author: james.power@mu.ie, Created on Tue Aug 14 09:58:10 2018 ''' import os import sys import codecs import numpy as np import skfuzzy.control as ctrl import skfuzzy.control.term as fuzzterm from fcl_scanner import BufferedFCLLexer from fcl_symbols import NameMapper, SymbolTable # A universe is given this no. of points unless specified: _DEFAULT_UNIVERSE_SIZE = 1000 class ParsingError(Exception): '''The parser raises this to flag an error while parsing an FCL file.''' def __init__(self, pos, error_kind, msg): Exception.__init__(self, '{} {}: {}'.format(pos, error_kind, msg)) self.pos = pos # filename[line,col] self.error_kind = error_kind # e.g. 'lexical error', 'syntax error' class FCLParser(NameMapper, SymbolTable): ''' A top-down parser for the Fuzzy Control Language (FCL). The main entry point is fcl_file with a filename, or you can call any non-terminal with a string. The relationship with NameMapper and SymbolTable should really be "has-a" rather than "is-a", but it's simpler this way. ''' def __init__(self, vars=None): ''' Set up parser by initialising symbol table and lexer Optionally supply an initial list of variables (or add them later) ''' NameMapper.__init__(self) self.load_ieee_names() self.load_fcl_names_too() self.load_jfl_names() SymbolTable.__init__(self, vars) self.lex = BufferedFCLLexer(self._report_error) def _report_error(self, msg, error_kind='syntax error', pos=None): ''' Raise an error; report the current position if none given. All errors (lexical, syntax, scope) go through this method. ''' if not pos: # No user-supplied position, get it from lexer: tok = self.lex.token() pos = self.lex.get_pos(tok) got = tok.value if tok else '[EOF]' msg += ' while reading token "{}"'.format(got) raise ParsingError(pos, error_kind, msg) def _calc_universe(self, start, stop, step=None): ''' Return an np array corresponding to the given RANGE bounds. Optionally specify the step, otherwise we guess. ''' if start >= stop: self._report_error('invalid range bounds ({}, {})' .format(start, stop)) if not step: # Guess some "reasonable" step: urange = 1 + (stop - start) scale_by = urange / _DEFAULT_UNIVERSE_SIZE step = np.power(10, np.round(np.log10(scale_by), 0)) universe = np.arange(start, stop, step) return universe def _make_mf(self, universe, mfunc, params): ''' Given a function name and parameters, make a membership function. ''' assert len(universe) > 0,\ 'No current universe has been set for this mf' skfunc, split_params = self.translate_mf(mfunc) if split_params: return skfunc(universe, *params) else: # Takes parameters as an array return skfunc(universe, params) def _finalise_ante_var(self, universe, varname): ''' Have just finished an input var definition, so add it to the list. ''' fuzzyvar = ctrl.Antecedent(universe, varname) self.add_vars([fuzzyvar]) return fuzzyvar def _finalise_cons_var(self, universe, varname, options): ''' Have just finished an output var definition, so add it to the list. Make sure any declared options (e.g. defuzz method) are registered. Default values are ignored at the moment. ''' fuzzyvar = ctrl.Consequent(universe, varname) for key, val in options.items(): key = key.upper() if key == 'METHOD': fuzzyvar.defuzzify_method = self.translate_defuzz(val) elif key == 'ACCU': fuzzyvar.accumulation_method = self.translate_accu(val) elif key == 'DEFAULT': pass self.add_vars([fuzzyvar]) return fuzzyvar def _finalise_terms(self, fuzzyvar, termlist): ''' Propagate range values to any terms declared before the range. That is, make sure all term definitions are skfuzzy Term objects. ''' universe = fuzzyvar.universe for term in termlist: if not isinstance(term, fuzzterm.Term): (term_name, fname, params) = term mf_def = self._make_mf(universe, fname, params) term = fuzzterm.Term(term_name, mf_def) self.add_term_to_var(fuzzyvar, term) def _add_hedges(self, fvar, hedges, membfun): ''' Apply one or more hedge functions to the variable's member func. Create a new mf for the overall result, and add it to the variable. Return the term corresponding to this new membership function. ''' if len(hedges) == 0: return membfun mf_name = '_{}_{}'.format('_'.join(hedges), membfun) if mf_name in fvar.terms: # Already done it (some previous rule) return fvar[mf_name] mf_vals = fvar[membfun].mf # Now apply each hedge in turn, starting at the last one: for hedge_name in hedges[::-1]: hedge_func = self.translate_hedge(hedge_name) mf_vals = hedge_func(mf_vals) # All the hedges processed, so add this as a new mf to the variable: fvar[mf_name] = mf_vals return fvar[mf_name] def _finalise_rules(self, rbname, rulelist, options): ''' Prefix the rule labels by the ruleblock name (if any). Propagate any ruleblock AND/OR option-values to individual rules. Ignoring any ACCU option here, since skfuzzy does this at the variable level & could have same variable in different rule-blocks. ''' and_key = options.get('AND', None) or_key = options.get('OR', None) fam = self.translate_and_or(and_key, or_key) for rule in rulelist: if rbname: self.set_rule_label(rule, '{}.{}'.format(rbname, rule.label)) rule.and_func = fam.and_func rule.or_func = fam.or_func return rulelist def read_fcl_file(self, filename): ''' Read the given FCL file and parse it. Returns the parser object, to facilitate create-and-call. ''' self.lex.reset_lineno(filename) self.flag_error_on_redefine() with codecs.open(filename, 'r', encoding='utf-8', errors='ignore') as fileh: try: self.lex.input(fileh.read()) self.function_block() return self except ParsingError as parsing_error: raise parsing_error except Exception as other_error: # Show all errors as parser errors so we get line,col ref: self._report_error(str(other_error), 'internal error') # ########################################## # # ### FCL grammar definition starts here ### # # ########################################## # # All of these parsing routines correspond to a grammar non-terminal, # all can be called wiht a string (and will parse that string) # and (nearly) all return an corresponding fuzzy object. # ################################# # # 1. Overall FCL program structure: # # ################################# # def function_block(self, input_string=None): ''' This is the grammar's start symbol. function_block_declaration ::= 'FUNCTION_BLOCK' function_block_name {fb_io_var_declarations} {fuzzify_block} {defuzzify_block} {rule_block} {option_block} 'END_FUNCTION_BLOCK' Actually, I take these contents in any order. ''' self.lex.maybe_set_input(input_string) self.lex.recognise('FUNCTION_BLOCK') self.fb_name = self.lex.recognise_if_there('IDENTIFIER') while self.lex.peek_not(['END_FUNCTION_BLOCK']): if self.lex.peek_some(['VAR_INPUT', 'VAR_OUTPUT']): self.var_decls() elif self.lex.peek('FUZZIFY'): self.fuzzify_block() elif self.lex.peek('DEFUZZIFY'): self.defuzzify_block() elif self.lex.peek('RULEBLOCK'): self.rule_block() elif self.lex.peek('OPTION'): self.option_block() else: self._report_error('Unknown element in function block') self.lex.recognise('END_FUNCTION_BLOCK') return None def var_decls(self, input_string=None): ''' fb_io_var_declarations ::= 'VAR_INPUT' {IDENTIFIER ':' IDENTIFIER ';'} 'END_VAR' | 'VAR_OUTPUT' {IDENTIFIER ':' IDENTIFIER ';'} 'END_VAR' ''' self.lex.maybe_set_input(input_string) self.lex.recognise_some(['VAR_INPUT', 'VAR_OUTPUT']) decls = [] while self.lex.peek_not(['END_VAR']): vname = self.lex.recognise('IDENTIFIER') self.lex.recognise('COLON') vtype = self.lex.recognise('IDENTIFIER') self.lex.recognise_if_there('SEMICOLON') decls.append((vname, vtype)) self.lex.recognise('END_VAR') return decls def option_block(self, input_string=None): ''' option_block ::= 'OPTION' any-old-stuff 'END_OPTION' ''' self.lex.maybe_set_input(input_string) self.lex.recognise('OPTION') while self.lex.peek_not(['END_OPTION']): self.lex.recognise_anything() # Chuck away any contents... self.lex.recognise('END_OPTION') return None # Just for emphasis # ################### # # 2. Fuzzy variables: # # ################### # def _option_def(self, keyword): ''' Options in variable or rule-block definitions: an_option ::= keyword ':' IDENTIFIER ';' ''' key = self.lex.recognise(keyword) self.lex.recognise('COLON') value = self.lex.recognise('IDENTIFIER') self.lex.recognise_if_there('SEMICOLON') return {key: value} def fuzzify_block(self, input_string=None): ''' fuzzify_block ::= 'FUZZIFY' variable_name {linguistic_term} [range] 'END_FUZZIFY' The range can occur at beginning or end (or anywhere in between). Don't add the terms until you have the range. ''' self.lex.maybe_set_input(input_string) self.lex.recognise('FUZZIFY') varname = self.lex.recognise('IDENTIFIER') termlist = [] universe = () while self.lex.peek_not(['END_FUZZIFY']): if self.lex.peek('TERM'): termlist.append(self.term_def()) elif self.lex.peek('RANGE'): universe = self.range_def() else: self._report_error('Unknown element in fuzzify block') self.lex.recognise('END_FUZZIFY') if len(universe) == 0: self._report_error('No universe for variable "{}"' .format(varname), 'range error') fuzzyvar = self._finalise_ante_var(universe, varname) self._finalise_terms(fuzzyvar, termlist) return fuzzyvar def defuzzify_block(self, input_string=None): ''' defuzzify_block ::= 'DEFUZZIFY' variable_name {linguistic_term} 'ACCU' ':' accumulation_method ';' 'METHOD' ':' defuzzification_method ';' default_value [range] 'END_FUZZIFY' defuzzification_method ::= IDENTIFIER accumulation_method ::= IDENTIFIER default_value ::= 'DEFAULT' ':=' numeric_literal | 'NC' ';' I'm not fussy about the order of the block contents, and I accept any identifier as a defuzz/accu method, and worry about it later. ''' self.lex.maybe_set_input(input_string) self.lex.recognise('DEFUZZIFY') varname = self.lex.recognise('IDENTIFIER') options = {} termlist = [] universe = () while self.lex.peek_not(['END_DEFUZZIFY']): toktype = self.lex.peek_type() if toktype == 'TERM': termlist.append(self.term_def()) elif toktype == 'RANGE': universe = self.range_def() elif toktype in ['METHOD', 'ACCU']: options.update(self._option_def(toktype)) elif self.lex.recognise_if_there('DEFAULT'): self.lex.recognise_some(['ASSIGN', 'COLON']) if self.lex.recognise_if_there('NC'): default_val = 'NC' if self.lex.recognise_if_there('NAN'): default_val = 'NAN' else: default_val = self.number() self.lex.recognise_if_there('SEMICOLON') options['DEFAULT'] = default_val else: self._report_error('Unknown element in defuzzify block') self.lex.recognise('END_DEFUZZIFY') if len(universe) == 0: self._report_error('No universe for variable "{}"' .format(varname), 'range error') fuzzyvar = self._finalise_cons_var(universe, varname, options) self._finalise_terms(fuzzyvar, termlist) return fuzzyvar def range_def(self, input_string=None): ''' range ::= 'RANGE ':=' '(' numeric_literal '..' numeric_literal ')' [WITH numeric_literal] ';' ''' self.lex.maybe_set_input(input_string) self.lex.recognise('RANGE') self.lex.recognise('ASSIGN') self.lex.recognise('LPAREN') rmin = self.number() # originally ident_or_number() self.lex.recognise('DOTDOT') rmax = self.number() self.lex.recognise('RPAREN') numpoints = None if self.lex.recognise_if_there('WITH'): numpoints = self.number() self.lex.recognise_if_there('SEMICOLON') return self._calc_universe(rmin, rmax, numpoints) # ###################################### # # 3. Fuzzy terms (membership functions): # # ###################################### # def term_def(self, input_string=None): ''' linguistic_term ::= term_header membership_function ';' ''' self.lex.maybe_set_input(input_string) name = self.term_header() body = self.mf() self.lex.recognise_if_there('SEMICOLON') if body[0] == 'MF': # No universe defined yet body[0] = name return body else: # Have a universe so make a term: return fuzzterm.Term(name, body) def term_header(self, input_string=None): ''' term_header ::= 'TERM' term_name ':=' ''' self.lex.maybe_set_input(input_string) self.lex.recognise('TERM') name = self.lex.recognise('IDENTIFIER') self.lex.recognise('ASSIGN') return str(name) def mf(self, input_string=None, universe=[]): ''' membership_function ::= singleton | points | funcall singleton ::= numeric_literal funcall ::= 'IDENTIFIER' {'IDENTIFIER'} ''' self.lex.maybe_set_input(input_string) if self.lex.peek('LPAREN'): fname, params = 'pointlist', self.point_list() elif self.lex.peek('IDENTIFIER'): fname = self.lex.recognise('IDENTIFIER') # Possible list of parameter values now follows: params = [] while self.lex.peek_some(['INT_CONST', 'FLOAT_CONST']): params.append(self.number()) else: # Must be a singleton value fname, params = 'singleton', [self.number()] # Make a term if we have a universe: if len(universe) > 0: mf_def = self._make_mf(universe, fname, params) else: # No universe defined yet, return items for the moment: mf_def = ['MF', fname, params] return mf_def def point_list(self, input_string=None): ''' points ::= {'(' numeric_literal ',' numeric_literal ')'} The original allowed an ident for first point; not sure why. ''' self.lex.maybe_set_input(input_string) plist = [] while self.lex.recognise_if_there('LPAREN'): x_val = self.number() self.lex.recognise('COMMA') y_val = self.number() self.lex.recognise('RPAREN') plist.append((x_val, y_val)) return plist # ####################### # # ### 4. Fuzzy rules: ### # # ####################### # def rule_block(self, input_string=None): ''' rule_block ::= 'RULEBLOCK' [rule_block_name] 'AND' ':' operator_definition ';' 'OR' ':' operator_definition ';' 'ACT' ':' activation_method ';' 'ACCU' ':' accumulation_method ';' {rule} 'END_RULEBLOCK' operator_definition ::= IDENTIFER activation_method ::= IDENTIFER accumulation_method ::= IDENTIFER I'm not fussy about the order of the block contents, and I've made its name optional. ''' self.lex.maybe_set_input(input_string) self.lex.recognise('RULEBLOCK') rbname = self.lex.recognise_if_there('IDENTIFIER') rules = [] options = {} while self.lex.peek_not(['END_RULEBLOCK']): toktype = self.lex.peek_type() if toktype == 'RULE': rules.append(self.rule_def()) elif toktype in ['AND', 'OR', 'ACT', 'ACCU']: options.update(self._option_def(toktype)) else: self._report_error('Unknown element in rule block') self.lex.recognise('END_RULEBLOCK') return self._finalise_rules(rbname, rules, options) def rule_def(self, input_string=None): ''' rule ::= rule_header rule 'SEMICOLON' ''' self.lex.maybe_set_input(input_string) name = self.rule_header() body = self.rule() self.lex.recognise_if_there('SEMICOLON') self.set_rule_label(body, name) return body def rule_header(self, input_string=None): ''' rule_header ::= 'RULE' integer_literal ':' I allow an identifier (or a number) as a rule name. ''' self.lex.maybe_set_input(input_string) self.lex.recognise('RULE') name = self.ident_or_number() self.lex.recognise('COLON') return str(name) def rule(self, input_string=None): ''' rule ::= 'IF' antecedent 'THEN' consequent [WITH weighting_factor] weighting_factor ::= variable | numeric_literal ''' self.lex.maybe_set_input(input_string) self.lex.recognise('IF') ant = self.antecedent() self.lex.recognise('THEN') con = self.consequent() # Recognise a weighting_factor if there is one: if self.lex.recognise_if_there('WITH'): weight = self.ident_or_number() con = [fuzzterm.WeightedTerm(c, weight) for c in con] return self.add_rule(ctrl.Rule(ant, con)) def antecedent(self, input_string=None): ''' condition ::= clause {('AND' | 'OR') clause} I need to do enforce precedence, so this is actually: condition ::= _condition_and {'OR' _condition_and} ''' self.lex.maybe_set_input(input_string) left = self._antecedent_and() while self.lex.recognise_if_there('OR'): right = self._antecedent_and() left = fuzzterm.TermAggregate(left, right, 'or') return left def _antecedent_and(self): ''' condition_and ::= clause {('COMMA' | 'AND') clause} Assuming 'COMMA' is just another way of saying 'AND' ''' left = self.clause(parent_rule=self.antecedent) while self.lex.peek_some(['COMMA', 'AND']): self.lex.recognise_some(['COMMA', 'AND']) right = self.clause(parent_rule=self.antecedent) left = fuzzterm.TermAggregate(left, right, 'and') return left def consequent(self, input_string=None): ''' condition ::= clause {'AND' clause} Return a list of these. ''' self.lex.maybe_set_input(input_string) clist = [self.clause(parent_rule=self.consequent)] while self.lex.peek_some(['COMMA', 'AND']): self.lex.recognise_some(['COMMA', 'AND']) clist.append(self.clause(parent_rule=self.consequent)) return clist def clause(self, input_string=None, parent_rule=None): ''' clause ::= | 'NOT' condition() | '(' condition() ')' # Allow extra parentheses | atomic_clause The syntax has been loosened to permit more flexible expressions; These are the same: 'NOT v IS t', 'v IS NOT t', 'NOT(v IS t') ''' # Note that the parent (caller) might be antecedent or consequent # We pass it as a parameter so we can call it for sub-clauses. self.lex.maybe_set_input(input_string) if self.lex.recognise_if_there('NOT'): subclause = self.clause(parent_rule=parent_rule) return fuzzterm.TermAggregate(subclause, None, 'not') elif self.lex.recognise_if_there('LPAREN'): subclause = parent_rule() if parent_rule else self.clause() self.lex.recognise('RPAREN') return subclause else: in_consequent = (parent_rule == self.consequent) return self.atomic_clause(in_consequent=in_consequent) def atomic_clause(self, input_string=None, in_consequent=False): ''' atomic_clause ::= | variable_name # Not doing this! | variable_name 'IS' {hedge} term_name The optional hedges are: any identifier or 'NOT'. ''' varname = self.lex.recognise('IDENTIFIER') hedges = [] self.lex.recognise('IS') while self.lex.peek_some(['IDENTIFIER', 'NOT']): hedges.append(self.lex.recognise_some(['IDENTIFIER', 'NOT'])) # Actually, the last one was the member function name: membfun = hedges.pop() fvar = self.get_var_defn(varname) this_clause = fvar[membfun] # Special case when the only hedge is 'not': if len(hedges) == 1 and hedges[0] == 'NOT': this_clause = fuzzterm.TermAggregate(this_clause, None, 'not') # Otherwise apply the hedge functions, if there are any: elif len(hedges) > 0: this_clause = self._add_hedges(fvar, hedges, membfun) return this_clause def ident_or_number(self, input_string=None): ''' ident_or_number ::= identifier | integer_literal | real_literal ''' self.lex.maybe_set_input(input_string) if self.lex.peek('IDENTIFIER'): return self.lex.recognise('IDENTIFIER') if self.lex.peek('INT_CONST'): return int(self.lex.recognise('INT_CONST')) if self.lex.peek('FLOAT_CONST'): return float(self.lex.recognise('FLOAT_CONST')) self._report_error('expected ident/num') def number(self, input_string=None): ''' numeric_literal ::= integer_literal | real_literal ''' self.lex.maybe_set_input(input_string) if self.lex.peek('INT_CONST'): return int(self.lex.recognise('INT_CONST')) if self.lex.peek('FLOAT_CONST'): return float(self.lex.recognise('FLOAT_CONST')) self._report_error('expected numeric literal') # ######################################### # # ### FCL grammar definition ends here ### # # ######################################### # _FCL_SUFFIX = '.fcl' def parse_dir(parser, rootdir, want_output=False): ''' Scan all the .fcl files in rootdir and its subdirs. Print any errors and the number of files parsed. ''' files_tot, files_err = 0, 0 for rootpath, _, files in os.walk(rootdir): for filename in files: if filename.endswith(_FCL_SUFFIX): filepath = os.path.join(rootpath, filename) print('===', filepath) try: files_tot += 1 parser.clear() parser.read_fcl_file(filepath) if want_output: print(parser) except Exception as exc: files_err += 1 print(exc) print('Parsed %d files (%d had errors).' % (files_tot, files_err)) if __name__ == '__main__': _parser = FCLParser() if len(sys.argv) == 1: # No args, scan all examples parse_dir(_parser, 'Examples') else: # Parse the given files: for fcl_filename in sys.argv[1:]: _parser.read_fcl_file(fcl_filename) print(_parser)