Commit ee950806 authored by Dávid Smolka's avatar Dávid Smolka
Browse files

Merge branch 'revert-98620a6b' into 'main'

Revert "Started typing and need to change machines..."

See merge request !1
parents 98620a6b 08afac4b
Loading
Loading
Loading
Loading
+7 −9
Original line number Diff line number Diff line
@@ -5,12 +5,11 @@ import spot
from subfiles.pruning import pruning
from subfiles.quotienting import quotienting
from subfiles.modal_pruning import modal_pruning
from subfiles.my_types import Automaton, Namespace

spot.setup()


def main() -> None:
def main():
    parser = argparse.ArgumentParser(description="""Reduce MTBA by pruning or
                                                    quotienting, utilizing
                                                    XY-simulations.""")
@@ -28,27 +27,26 @@ def main() -> None:
    parser.add_argument("-n", default=0, type=int,
                        help="""specify the 'n' in simulation
                                (default value is 0)""")
    args: Namespace = parser.parse_args()
    args = parser.parse_args()

    hoaf: str = ""
    line: str
    hoaf = ""
    for line in sys.stdin:
        hoaf += line
    automaton: Automaton = spot.automaton(hoaf)
    reduced: Automaton = reduced_automaton(automaton, args)
    automaton = spot.automaton(hoaf)
    reduced = reduced_automaton(automaton, args)
    reduced.purge_dead_states()
    reduced.prop_reset()
    print(reduced.to_str("hoa", "1.1"))
    return


def reduced_automaton(automaton: Automaton, args: Namespace) -> Automaton:
def reduced_automaton(automaton, args):
    if args.technique == "prune":
        return pruning(automaton, args)
    elif args.technique == "quotient":
        return quotienting(automaton, args)
    elif args.technique == "modal_prune":
        return modal_pruning(automaton, args)
    return None


if __name__ == '__main__':
+15 −19
Original line number Diff line number Diff line
@@ -2,17 +2,14 @@ import buddy
from bidict import bidict
import numpy as np
import spot
from typing import List, Set, Tuple

from subfiles.utilities import implies
from subfiles.my_types import Automaton, PGArena, StateBidict, EdgeDict, StateNum, EdgeNum, StateRel

spot.setup()


class PGData:
    def __init__(self, automaton: Automaton, arena: PGArena, s_maps: StateBidict, e_maps: EdgeDict, owners: List[int], odd_states: Set[StateNum],
                 low: Set[EdgeNum], match_initial: bool) -> None:
    def __init__(self, automaton, arena, s_maps, e_maps, owners, odd_states,
                 low, match_initial):
        self.automaton = automaton
        self.arena = arena
        self.s_maps = s_maps
@@ -23,7 +20,7 @@ class PGData:
        self.match_initial = match_initial


def lw_hw_sim(automaton: Automaton, aut_hell: StateNum, low: Set[EdgeNum], match_initial: bool =False) -> Tuple[StateRel, PGArena, StateBidict, EdgeDict]:
def lw_hw_sim(automaton, aut_hell, low, match_initial=False):
    arena, s_maps, e_maps, pg_eden = create_parity_arena(automaton, low,
                                                         match_initial)
    sim_rel = np.identity(automaton.num_states(), dtype=bool)
@@ -41,8 +38,8 @@ def lw_hw_sim(automaton: Automaton, aut_hell: StateNum, low: Set[EdgeNum], match
    return sim_rel, arena, s_maps, e_maps


def add_eden(pgdata: PGData) -> StateNum:
    eden: StateNum = pgdata.arena.new_state()
def add_eden(pgdata):
    eden = pgdata.arena.new_state()
    pgdata.owners.append(True)
    for i in range(pgdata.arena.num_states()):
        if i != eden:
@@ -50,15 +47,15 @@ def add_eden(pgdata: PGData) -> StateNum:
    return eden


def create_parity_arena(automaton: Automaton, low: Set[EdgeNum], match_initial: bool) -> Tuple[PGArena, StateBidict, EdgeDict, StateNum]:
    s_maps: StateBidict = bidict(dict())
    e_maps: EdgeDict = dict()
    owners: List[int] = []
def create_parity_arena(automaton, low, match_initial):
    s_maps = bidict(dict())
    e_maps = dict()
    owners = []
    dic = spot.bdd_dict()
    arena = spot.make_twa_graph(dic)
    acc = spot.acc_code("parity max odd 1")
    arena.set_acceptance(1, acc)
    odd_states: Set[StateNum] = set()
    odd_states = set()
    pgdata = PGData(automaton, arena, s_maps, e_maps, owners, odd_states, low,
                    match_initial)
    create_even_states(pgdata)
@@ -66,11 +63,10 @@ def create_parity_arena(automaton: Automaton, low: Set[EdgeNum], match_initial:
    eden = add_eden(pgdata)
    arena.set_init_state(eden)
    spot.set_state_players(arena, owners)
    print(arena.num_states(), arena.num_edges())
    return arena, s_maps, e_maps, eden


def create_even_states(pgdata: PGData) -> None:
def create_even_states(pgdata):
    num_states = pgdata.automaton.num_states()
    init_state_n = pgdata.automaton.get_init_state_number()
    for p in range(num_states):
@@ -83,7 +79,7 @@ def create_even_states(pgdata: PGData) -> None:
                create_even_to_odd_edges(n_state, p, q, pgdata)


def create_even_to_odd_edges(e_state: StateNum, p: StateNum, q: StateNum, pgdata: PGData) -> None:
def create_even_to_odd_edges(e_state, p, q, pgdata):
    for edge in pgdata.automaton.out(p):
        edge_num = pgdata.automaton.edge_number(edge)
        if edge_num not in pgdata.low:
@@ -98,15 +94,15 @@ def create_even_to_odd_edges(e_state: StateNum, p: StateNum, q: StateNum, pgdata
        pgdata.e_maps[arena_en] = edge_num


def create_odd_state(p: StateNum, q: StateNum, cond: buddy.bdd, acc: spot.impl.mark_t, pgdata: PGData) -> StateNum:
    n_state: StateNum = pgdata.arena.new_state()
def create_odd_state(p, q, cond, acc, pgdata):
    n_state = pgdata.arena.new_state()
    pgdata.odd_states.add(n_state)
    pgdata.s_maps[(p, q, cond, acc.has(0))] = n_state
    pgdata.owners.append(True)
    return n_state


def create_odd_to_even_edges(pgdata: PGData) -> None:
def create_odd_to_even_edges(pgdata):
    for s in pgdata.odd_states:
        (p, q, cond, bacc) = pgdata.s_maps.inverse[s]
        acc = spot.mark_t([0]) if bacc else spot.mark_t([])
+57 −138
Original line number Diff line number Diff line
import numpy as np
import buddy
import spot
from typing import Tuple, Dict
from itertools import product

from subfiles.sims import lw_hw_fsim, lw_hw_bsim
from subfiles.utilities import is_may_edge, make_must_edge, remove_edge
from subfiles.utilities import Direction
from subfiles.my_types import Automaton, PGArena, Namespace, Edge, StateRel, EdgeRel, EdgeNum, EdgeDict, StateBidict, StateNum

spot.setup()


def modal_pruning(automaton: Automaton, args: Namespace) -> Automaton:
    direction = Direction.FORWARD if args.direction == "fw" else Direction.BACKWARD
    return modal_pruning_iter(automaton, dir)
def modal_pruning(automaton, args):
    if args.direction == "fw":
        return forward_modal_pruning(automaton)
    elif args.direction == "bw":
        return automaton
        # return backward_modal_pruning(automaton)


def modal_pruning_iter(automaton: Automaton, dir: Direction) -> Automaton:
def forward_modal_pruning(automaton):
    check = True
    while check:
        check, automaton = modal_pruning_one_step(automaton, dir)
        check, automaton = modal_pruning_one_step(automaton, True)
    return automaton


def modal_pruning_one_step(automaton: Automaton, dir: Direction) -> Tuple[bool, Automaton]:
    fsim: StateRel
    bsim: StateRel
    if dir is Direction.FORWARD:
# def backward_modal_pruning(automaton):
#     check = True
#     while check:
#         check, automaton = modal_pruning_one_step(automaton, False)
#         return automaton


def modal_pruning_one_step(automaton, fw=True):
    if fw:
        fsim, arena, s_maps, e_maps = lw_hw_fsim(automaton)
        bsim = np.identity(automaton.num_states(), dtype=bool)
    else:
        fsim = np.identity(automaton.num_states(), dtype=bool)
        bsim, arena, s_maps, e_maps = lw_hw_bsim(automaton)
    edge_rel: EdgeRel = create_edge_rel(automaton, fsim, bsim)
    sub_e: EdgeNum
    dom_e: EdgeNum
    edge_rel = create_edge_rel(automaton, fsim, bsim)
    sub_e, dom_e = choose_edge_pair(automaton, edge_rel)
    if sub_e == -1 or dom_e == -1:
    if sub_e == -1 and dom_e == -1:
        return False, automaton
    check, sub_e2, dom_e2 = choose_edge_pair_second(automaton, edge_rel, sub_e, dom_e)
    if not check:
        update_dominating_traces(automaton, arena, s_maps, e_maps, sub_e, dom_e, dir)
        remove_edge(automaton, sub_e)
        automaton.purge_dead_states()
        return True, automaton
    print("a")
    update_dominating_traces(automaton, arena, s_maps, e_maps, sub_e, dom_e, dir)
    update_dominating_traces(automaton, arena, s_maps, e_maps, sub_e2, dom_e2, dir)
    if fw:
        update_dominating_forward_traces(automaton, arena, s_maps, e_maps,
                                         sub_e, dom_e)
    else:
        update_dominating_backward_traces(automaton, arena, s_maps, e_maps,
                                          sub_e, dom_e)
    remove_edge(automaton, sub_e)
    remove_edge(automaton, sub_e2)
    automaton.purge_dead_states()
    return True, automaton


# def modal_pruning_one_step(automaton: Automaton, fw: bool =True) -> Tuple[bool, Automaton]:
#     fsim: StateRel
#     bsim: StateRel
#     if fw:
#         fsim, arena, s_maps, e_maps = lw_hw_fsim(automaton)
#         bsim = np.identity(automaton.num_states(), dtype=bool)
#     else:
#         fsim = np.identity(automaton.num_states(), dtype=bool)
#         bsim, arena, s_maps, e_maps = lw_hw_bsim(automaton)
#     edge_rel: EdgeRel = create_edge_rel(automaton, fsim, bsim)
#     tups = choose_edge_pairs(automaton, edge_rel)
#     if len(tups) == 0:
#         return False, automaton
#     for s, d in tups:
#         update_dominating_traces(automaton, arena, s_maps, e_maps, s, d, fw)
#     for s, d in tups:
#         remove_edge(automaton, s)
#     automaton.purge_dead_states()
#     return True, automaton


def update_dominating_traces(automaton: Automaton, arena: PGArena, s_maps: StateBidict, e_maps: EdgeDict, sub_e: EdgeNum, dom_e: EdgeNum, dir: Direction) -> None:
    if dir is Direction.FORWARD:
        update_dominating_forward_traces(automaton, arena, s_maps, e_maps, sub_e, dom_e)
    else:
        update_dominating_backward_traces(automaton, arena, s_maps, e_maps, sub_e, dom_e)


def update_dominating_forward_traces(automaton: Automaton, arena: PGArena, s_maps: StateBidict, e_maps: EdgeDict,
                                     sub_e: EdgeNum, dom_e: EdgeNum) -> None:
def update_dominating_forward_traces(automaton, arena, s_maps, e_maps,
                                     sub_e, dom_e):
    strategies = arena.get_strategy()
    arena_edges = create_edges_dict(arena)
    aut_edges = create_edges_dict(automaton)
@@ -108,34 +79,33 @@ def update_dominating_forward_traces(automaton: Automaton, arena: PGArena, s_map
                to_visit.append(new_parity_state_to_visit)


def update_dominating_backward_traces(automaton: Automaton, arena: PGArena, s_maps: StateBidict, e_maps: EdgeDict,
                                      sub_e: EdgeNum, dom_e: EdgeNum) -> None:
    strategies = arena.get_strategy()
    arena_edges = create_edges_dict(arena)
    aut_edges = create_edges_dict(automaton)
    visited = []
    sub_edge = aut_edges[sub_e]
    dom_edge = aut_edges[dom_e]
    to_visit = [s_maps[(sub_edge.src, dom_edge.src)]]
    if is_may_edge(automaton, dom_e):
        automaton.highlight_edge(dom_e, None)
    while len(to_visit) != 0:
        current_n = to_visit.pop()
        visited.append(current_n)
        for spo_edge_arena in arena.out(current_n):
            spo_dst_state_num_arena = spo_edge_arena.dst
            dup_edge_num_arena = strategies[spo_dst_state_num_arena]
            dup_edge_arena = arena_edges[dup_edge_num_arena]
            dup_edge_num_automaton = e_maps[dup_edge_num_arena]
            if dup_edge_num_automaton in aut_edges.keys():
                if is_may_edge(automaton, dup_edge_num_automaton):
                    make_must_edge(automaton, dup_edge_num_automaton)
            new_parity_state_to_visit = dup_edge_arena.dst
            if new_parity_state_to_visit not in visited:
                to_visit.append(new_parity_state_to_visit)


def choose_edge_pair(automaton: Automaton, edge_rel: EdgeRel) -> Tuple[EdgeNum, EdgeNum]:
def update_dominating_backward_traces(automaton, arena, s_maps, e_maps,
                                      sub_e, dom_e):
    pass
    # strategies = arena.get_strategy()
    # arena_edges = list(arena.edges())
    # aut_edges = list(automaton.edges())
    # visited = []
    # sub_edge = aut_edges[sub_e]
    # dom_edge = aut_edges[dom_e]
    # to_visit = [s_maps[(sub_edge.dst, dom_edge.dst)]]
    #
    # while len(to_visit) != 0:
    #     current_n = to_visit.pop()
    #     visited.append(current_n)
    #
    #     for spoiler_move in arena.out(current_n):
    #         strategy = strategies[spoiler_move.dst]
    #         duplicator_move = arena_edges[strategy]
    #         aut_en = e_maps[arena.edge_number(duplicator_move)]
    #         automaton.highlight_edge(aut_en, None)
    #         new = duplicator_move.dst
    #
    #         if new not in visited:
    #             to_visit.append(new)


def choose_edge_pair(automaton, edge_rel):
    for s in range(automaton.num_edges()):
        for d in range(automaton.num_edges()):
            if s != d and edge_rel[s][d]:
@@ -143,58 +113,7 @@ def choose_edge_pair(automaton: Automaton, edge_rel: EdgeRel) -> Tuple[EdgeNum,
    return -1, -1


def choose_edge_pair_second(automaton: Automaton, edge_rel: EdgeRel, sub_e: EdgeNum, dom_e: EdgeNum) -> Tuple[bool, EdgeNum, EdgeNum]:
    for s in range(automaton.num_edges()):
        for d in range(automaton.num_edges()):
            if s != d and edge_rel[s][d]:
                s = s + 1
                d = d + 1
                if not(sub_e == s and dom_e == d) and check_collision(automaton, sub_e, dom_e, s, d):
                    return True, s, d
    return False, -1, -1


# def choose_edge_pairs(automaton, edge_rel):
#     tups = set()
#     edges = create_edges_dict(automaton)
#     l = list(edges.keys())
#     # l.reverse()
#     for (s, d) in product(l, l):
#         if s != d and edge_rel[s - 1][d - 1]:
#             good = True
#             for a, b in tups:
#                 if not check_collision(automaton, s, d, a, b):
#                     good = False
#                     break
#             if good:
#                 tups.add((s, d))
#     return tups


def check_collision(automaton: Automaton, sub1: EdgeNum, dom1: EdgeNum, sub2: EdgeNum, dom2: EdgeNum) -> bool:
    edges = create_edges_dict(automaton)
    return not (is_reachable(automaton, edges[sub2].dst, edges[dom1].dst) or
            is_reachable(automaton, edges[sub1].dst, edges[dom2].dst))


def is_reachable(automaton: Automaton, fst: StateNum, snd: StateNum) -> bool:
    visited = set()
    to_visit = [fst]
    while len(to_visit) != 0:
        state = to_visit.pop()
        if state == snd:
            return True
        for e in automaton.out(state):
            dst = e.dst
            if dst == snd:
                return True
            if dst not in visited:
                to_visit.append(dst)
        visited.add(state)
    return False


def create_edge_rel(aut: Automaton, fsim: StateRel, bsim: StateRel) -> EdgeRel:
def create_edge_rel(aut, fsim, bsim):
    n = aut.num_edges()
    rel = np.zeros([n, n], dtype=bool)
    for se in aut.edges():
@@ -208,7 +127,7 @@ def create_edge_rel(aut: Automaton, fsim: StateRel, bsim: StateRel) -> EdgeRel:
    return rel


def create_edges_dict(automaton: Automaton) -> Dict[EdgeNum, Edge]:
def create_edges_dict(automaton):
    dictionary = dict()
    for edge in automaton.edges():
        edge_number = automaton.edge_number(edge)

subfiles/my_types.py

deleted100644 → 0
+0 −20
Original line number Diff line number Diff line
import argparse
import numpy
import bidict
import spot

from typing import Dict

spot.setup()


Automaton = spot.twa_graph
PGArena = spot.twa_graph
Edge = spot.impl.twa_graph_edge_storage
Namespace = argparse.Namespace
EdgeNum = int
StateNum = int
EdgeRel = numpy.ndarray[EdgeNum]
StateRel = numpy.ndarray[StateNum]
EdgeDict = Dict[EdgeNum, EdgeNum]
StateBidict = bidict.bidict[StateNum, StateNum]
 No newline at end of file
+8 −12
Original line number Diff line number Diff line
@@ -4,36 +4,32 @@ import buddy

from subfiles.sims import hnlw_hnhlw_fsim, hnlw_hnhlw_bsim
from subfiles.maximal_transitive_subrelation import mts
from subfiles.my_types import Automaton, Namespace, EdgeRel, StateRel, EdgeNum

spot.setup()


def pruning(automaton: Automaton, args: Namespace) -> Automaton:
    fsim: StateRel
    bsim: StateRel
def pruning(automaton, args):
    if args.direction == "fw":
        fsim = hnlw_hnhlw_fsim(automaton, args.n)
        bsim = np.identity(automaton.num_states(), dtype=bool)
    elif args.direction == "bw":
        bsim = hnlw_hnhlw_bsim(automaton, args.n)
        fsim = np.identity(automaton.num_states(), dtype=bool)
    edge_rel: EdgeRel = get_assym(create_edge_rel(automaton, fsim, bsim))
    edge_rel = get_assym(create_edge_rel(automaton, fsim, bsim))
    mts(edge_rel)
    pruned = pruned_automaton(automaton, edge_rel)
    pruned.purge_unreachable_states()
    return pruned


def pruned_automaton(aut: Automaton, edges_rel: EdgeRel) -> Automaton:
def pruned_automaton(aut, edges_rel):
    dic = spot.bdd_dict()
    pruned: Automaton = spot.make_twa_graph(dic)
    pruned = spot.make_twa_graph(dic)
    acc = spot.acc_code('Inf(0)')
    pruned.set_acceptance(1, acc)
    pruned.new_states(aut.num_states())
    pruned.set_init_state(aut.get_init_state_number())
    pruned.copy_ap_of(aut)

    for e in aut.edges():
        en = aut.edge_number(e)
        if is_maximal(en - 1, edges_rel):
@@ -41,7 +37,7 @@ def pruned_automaton(aut: Automaton, edges_rel: EdgeRel) -> Automaton:
    return pruned


def create_edge_rel(aut: Automaton, fsim: StateRel, bsim: StateRel) -> EdgeRel:
def create_edge_rel(aut, fsim, bsim):
    n = aut.num_edges()
    rel = np.zeros([n, n], dtype=bool)
    for se in aut.edges():
@@ -55,9 +51,9 @@ def create_edge_rel(aut: Automaton, fsim: StateRel, bsim: StateRel) -> EdgeRel:
    return rel


def get_assym(rel: EdgeRel) -> EdgeRel:
def get_assym(rel):
    n = len(rel)
    arel: EdgeRel = np.zeros([n, n], dtype=bool)
    arel = np.zeros([n, n], dtype=bool)
    for x in range(n):
        for y in range(n):
            if x == y:
@@ -67,5 +63,5 @@ def get_assym(rel: EdgeRel) -> EdgeRel:
    return arel


def is_maximal(elem: EdgeNum, rel: EdgeRel) -> bool:
def is_maximal(elem, rel):
    return True not in rel[elem]
Loading