Verified Commit 740d5989 authored by Vladimír Štill's avatar Vladimír Štill
Browse files

CFL: Rework word generation

parent de4bc741
Loading
Loading
Loading
Loading
+92 −68
Original line number Original line Diff line number Diff line
@@ -278,9 +278,7 @@ class CFG:
        for src, prod in self.productions():
        for src, prod in self.productions():
            for new_prod in drop(prod):
            for new_prod in drop(prod):
                if new_prod:
                if new_prod:
                    if src not in new_rules:
                    new_rules.setdefault(src, set()).add(new_prod)
                        new_rules[src] = set()
                    new_rules[src].add(new_prod)
        if self.init in erasable:
        if self.init in erasable:
            new_init = Nonterminal("S")
            new_init = Nonterminal("S")
            while new_init in self.nonterminals:
            while new_init in self.nonterminals:
@@ -308,9 +306,7 @@ class CFG:
            for esrc in simple_to[src]:
            for esrc in simple_to[src]:
                for prod in self.rules.get(esrc, []):
                for prod in self.rules.get(esrc, []):
                    if len(prod) != 1 or prod[0] in self.terminals:
                    if len(prod) != 1 or prod[0] in self.terminals:
                        if src not in new_rules:
                        new_rules.setdefault(src, set()).add(prod)
                            new_rules[src] = set()
                        new_rules[src].add(prod)
        return CFG(self.nonterminals, self.terminals, new_rules, self.init)
        return CFG(self.nonterminals, self.terminals, new_rules, self.init)


    def proper(self) -> CFG:
    def proper(self) -> CFG:
@@ -444,42 +440,6 @@ class CFG:
    def all_terminal(sentence: CFG.Sentence) -> bool:
    def all_terminal(sentence: CFG.Sentence) -> bool:
        return all_of(lambda x: isinstance(x, Terminal), sentence)
        return all_of(lambda x: isinstance(x, Terminal), sentence)


    def _generate(self, max_length: int) -> Iterable[CFG.Word]:
        """
        Yields all words of the grammar up to {max_length} in length. If the
        grammar is in CNF (Chomsky normal form) then shorter words come first.
        No words are repeated.

        Needs grammar in epsilon normal form. Otherwise it might fail to
        generate some words.
        """
        seen = set()
        queue: Deque[CFG.Sentence] = deque([(self.init,)])

        # Walk in BFS order so the the sentences are explored from shorter
        # to longer for CNF.
        # As we yield a word immediatelly on finding it between the sentences
        # (i.e., when we find a sentence with no nonterminals), we also yield
        # words from shorter to longer for CNF grammars (because a word of
        # length N needs exactly N + (N - 1) derivations in CNF and therefore
        # shorter words preceed longer once in BFS order.
        while queue:
            sentence = queue.popleft()
            if len(sentence) > max_length or sentence in seen:
                continue
            seen.add(sentence)

            if CFG.all_terminal(sentence):
                yield typing.cast(CFG.Word, sentence)
            else:
                for i in range(len(sentence)):
                    if isinstance(sentence[i], Nonterminal):
                        for p in self.rules.get(
                                typing.cast(Nonterminal, sentence[i]), []):
                            new_sentence = sentence[:i] + p + sentence[i + 1:]
                            queue.append(new_sentence)
                        break  # it suffices to perform left derivations

    def _recursive_nonterminals(self) -> Set[Nonterminal]:
    def _recursive_nonterminals(self) -> Set[Nonterminal]:
        rewritable_to: Dict[Nonterminal, Set[Nonterminal]] \
        rewritable_to: Dict[Nonterminal, Set[Nonterminal]] \
            = {n: set() for n in self.nonterminals}
            = {n: set() for n in self.nonterminals}
@@ -507,13 +467,19 @@ class CFG:


    @staticmethod
    @staticmethod
    def is_equivalent_test(left_ : CFG, right_ : CFG,
    def is_equivalent_test(left_ : CFG, right_ : CFG,
                           full_cmp_len: Optional[int] = None,
                           full_cmp_cnt: Optional[int] = None,
                           max_cmp_len: Optional[int] = None,
                           max_cmp_len: Optional[int] = None,
                           random_samples: int = 1000
                           random_samples: int = 1000
                           ) -> IsEquivalentResult:
                           ) -> IsEquivalentResult:
        left = left_.cnf()
        left = left_.cnf()
        right = right_.cnf()
        right = right_.cnf()


        left_gen = WordGenerator(left)
        right_gen = WordGenerator(right)

        left_rnd = CFGRandom(left)
        right_rnd = CFGRandom(right)

        left_ce: Optional[CFG.Word] = None
        left_ce: Optional[CFG.Word] = None
        right_ce: Optional[CFG.Word] = None
        right_ce: Optional[CFG.Word] = None


@@ -545,12 +511,9 @@ class CFG:
                return None
                return None
            return word
            return word


        if full_cmp_len is None:
        if full_cmp_cnt is None:
            alphabet_size = max(len(left.terminals), len(right.terminals))
            full_cmp_cnt = pow(2, 12)
            max_full_compare = pow(2, 16)
            print(f"full_cmp_cnt = {full_cmp_cnt}")
            full_cmp_len = math.floor(math.log(max_full_compare,
                                               alphabet_size))
            print(f"full_cmp_len = {full_cmp_len}")


        if max_cmp_len is None:
        if max_cmp_len is None:
            max_cmp_len = min(max(pow(2, len(left.nonterminals) + 1),
            max_cmp_len = min(max(pow(2, len(left.nonterminals) + 1),
@@ -558,39 +521,58 @@ class CFG:
                              25)
                              25)
            print(f"max_cmp_len = {max_cmp_len}")
            print(f"max_cmp_len = {max_cmp_len}")


        if full_cmp_len > 0:
        last_checked_len = 0
            lenmap: Dict[int, Set[CFG.Word]] \
        if full_cmp_cnt > 0:
                = {n: set() for n in range(full_cmp_len + 1)}
            Lenmap = Dict[int, Set[CFG.Word]]
            left_words = deepcopy(lenmap)
            left_words: Lenmap = dict()
            right_words = lenmap
            right_words: Lenmap = dict()
            last_min_size = 0
            last_min_size = 0


            for lword, rword in zip_fill(left._generate(full_cmp_len),
            def total(words: Dict[int, Set[CFG.Word]]) -> int:
                                         right._generate(full_cmp_len),
                return sum(len(x) for k, x in words.items()
                                         sentinel=True):
                           if k <= last_min_size)

            def nxderivations(words: Dict[int, Set[CFG.Word]], rng: CFGRandom,
                              new_size: int) -> int:
                return total(words) + \
                    sum(rng.derivations_count(l) for l in
                        range(last_min_size + 1, new_size + 1))

            while True:
                lword = left_gen.next()
                rword = right_gen.next()

                if lword is not None:
                if lword is not None:
                    left_words[len(lword)].add(lword)
                    left_words.setdefault(len(lword), set()).add(lword)
                if rword is not None:
                if rword is not None:
                    right_words[len(rword)].add(rword)
                    right_words.setdefault(len(rword), set()).add(rword)


                min_size = min((len(w) for w in [lword, rword]
                min_size = min((len(w) for w in [lword, rword]
                                if w is not None),
                                if w is not None),
                               default=last_min_size + 1)
                               default=last_min_size + 1)
                if last_min_size < min_size:
                if last_min_size < min_size:
                    for sz in range(last_min_size, min_size):
                    for sz in range(last_min_size, min_size):
                        fill_ces(left_words[sz], right_words[sz])
                        fill_ces(left_words.get(sz, set()),
                                 right_words.get(sz, set()))
                        last_checked_len = sz
                    nxl = nxderivations(left_words, left_rnd, min_size)
                    nxr = nxderivations(right_words, right_rnd, min_size)

                    last_min_size = min_size
                    last_min_size = min_size
                    print(f"Full comparison for {last_min_size} done…")
                    print(f"Full comparison up to {last_checked_len} done…")

                    print(f"{min_size} {nxl}, {nxr}")
                    if max(nxl, nxr) > full_cmp_cnt:
                        print(f"Not going to generate {nxl}, {nxr} words")
                        break


                if left_ce is not None and right_ce is not None:
                if left_ce is not None and right_ce is not None:
                    return mkres()
                    return mkres()


        left_rnd = CFGRandom(left)
        left_cyk = CachedCYK(left)
        left_cyk = CachedCYK(left)
        right_rnd = CFGRandom(right)
        right_cyk = CachedCYK(right)
        right_cyk = CachedCYK(right)


        for length in range(full_cmp_len + 1, max_cmp_len + 1):
        for length in range(last_checked_len + 1, max_cmp_len + 1):
            for _ in range(random_samples):
            for _ in range(random_samples):
                left_ce = try_word(left_ce, left_rnd, right_cyk, length)
                left_ce = try_word(left_ce, left_rnd, right_cyk, length)
                right_ce = try_word(right_ce, right_rnd, left_cyk, length)
                right_ce = try_word(right_ce, right_rnd, left_cyk, length)
@@ -717,10 +699,7 @@ class CachedCYK:
        for src, dst in self.cfg.productions():
        for src, dst in self.cfg.productions():
            if len(dst) <= 1:
            if len(dst) <= 1:
                dst = typing.cast(CFG.Word, dst)
                dst = typing.cast(CFG.Word, dst)
                if dst not in self.cache:
                self.cache.setdefault(dst, set()).add(src)
                    self.cache[dst] = {src}
                else:
                    self.cache[dst].add(src)


    def generates(self, word: Union[str, Iterable[Terminal]]) -> bool:
    def generates(self, word: Union[str, Iterable[Terminal]]) -> bool:
        if isinstance(word, str):
        if isinstance(word, str):
@@ -751,3 +730,48 @@ class CachedCYK:


        self.cache[word] = out
        self.cache[word] = out
        return out
        return out


class WordGenerator:
    """
    Generates all words of the grammar in ascending order.
    No words are repeated.
    """

    def __init__(self, cfg: CFG):
        self.cfg = cfg.cnf()
        self.seen: Set[CFG.Sentence] = set()
        self.queue: Deque[CFG.Sentence] = deque([(self.cfg.init,)])
        self.last: Optional[CFG.Word] = None

    def get(self) -> Optional[CFG.Word]:
        if self.last is None:
            return self.next()
        return self.last

    def next(self) -> Optional[CFG.Word]:
        # Walk in BFS order so the the sentences are explored from shorter
        # to longer for CNF.
        # As we yield a word immediatelly on finding it between the sentences
        # (i.e., when we find a sentence with no nonterminals), we also yield
        # words from shorter to longer for CNF grammars (because a word of
        # length N needs exactly N + (N - 1) derivations in CNF and therefore
        # shorter words preceed longer once in BFS order.
        while self.queue:
            sentence = self.queue.popleft()
            if sentence in self.seen:
                continue
            self.seen.add(sentence)

            if CFG.all_terminal(sentence):
                self.last = typing.cast(CFG.Word, sentence)
                return self.last
            else:
                for i in range(len(sentence)):
                    if isinstance(sentence[i], Nonterminal):
                        for p in self.cfg.rules.get(
                                typing.cast(Nonterminal, sentence[i]), []):
                            new_sentence = sentence[:i] + p + sentence[i + 1:]
                            self.queue.append(new_sentence)
                        break  # it suffices to perform left derivations
        return None