Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
fja
eval
Commits
76284e76
Verified
Commit
76284e76
authored
Apr 05, 2020
by
Vladimír Štill
Browse files
CFL: Implement the random-sampling-based comparison
parent
df1a78ab
Changes
1
Hide whitespace changes
Inline
Side-by-side
cfl.py
View file @
76284e76
...
...
@@ -8,6 +8,7 @@ from collections import deque
from
common
import
Terminal
,
Nonterminal
from
reg_automata
import
IsEquivalentResult
# TODO: to common
import
random
import
math
T
=
TypeVar
(
"T"
)
TA
=
TypeVar
(
"TA"
)
...
...
@@ -490,175 +491,13 @@ class CFG:
if
sym
not
in
rewritable_to
[
src
]:
rewritable_to
[
src
].
add
(
sym
)
tracker
.
changed
()
# print(f"A: {rewritable_to} ({src} -> {sym})")
for
tgt
in
rewritable_to
[
sym
]:
if
tgt
not
in
rewritable_to
[
src
]:
rewritable_to
[
src
].
add
(
tgt
)
tracker
.
changed
()
# print(f"B: {rewritable_to} ({src} -> {sym} -> {tgt})")
return
{
n
for
n
in
self
.
nonterminals
if
n
in
rewritable_to
[
n
]}
def
_nonterminal_min_length
(
self
)
->
Dict
[
Nonterminal
,
int
]:
shortest_word
:
Dict
[
Nonterminal
,
int
]
=
dict
()
for
tracker
in
ChangeTracker
():
for
src
,
prod
in
self
.
productions
():
if
CFG
.
all_terminal
(
prod
):
shortest
:
Optional
[
int
]
=
len
(
prod
)
else
:
shortest
=
0
for
sym
in
prod
:
if
isinstance
(
sym
,
Nonterminal
):
if
sym
in
shortest_word
:
shortest
+=
shortest_word
[
sym
]
else
:
shortest
=
None
break
else
:
shortest
+=
1
if
shortest
is
not
None
and
(
src
not
in
shortest_word
or
shortest_word
[
src
]
>
shortest
):
shortest_word
[
src
]
=
shortest
tracker
.
changed
()
return
shortest_word
def
_nonterminal_max_length
(
self
,
recset
:
Optional
[
Set
[
Nonterminal
]]
=
None
)
\
->
Dict
[
Nonterminal
,
Union
[
int
,
InfinityType
]]:
if
recset
is
None
:
recset
=
self
.
_recursive_nonterminals
()
longest_word
:
Dict
[
Nonterminal
,
Union
[
int
,
InfinityType
]]
\
=
{
n
:
Infinity
if
n
in
recset
else
0
for
n
in
self
.
nonterminals
}
for
tracker
in
ChangeTracker
():
for
src
,
prod
in
self
.
productions
():
if
CFG
.
all_terminal
(
prod
):
longest
:
Union
[
int
,
InfinityType
]
=
len
(
prod
)
else
:
longest
=
0
for
sym
in
prod
:
if
isinstance
(
sym
,
Nonterminal
):
longest
+=
longest_word
[
sym
]
else
:
longest
+=
1
if
longest
>
longest_word
[
src
]:
longest_word
[
src
]
=
longest
tracker
.
changed
()
return
longest_word
def
_nonterminal_lang_size
(
self
,
recset
:
Optional
[
Set
[
Nonterminal
]]
=
None
)
\
->
Dict
[
Nonterminal
,
Union
[
int
,
InfinityType
]]:
if
recset
is
None
:
recset
=
self
.
_recursive_nonterminals
()
lang_size
:
Dict
[
Nonterminal
,
Union
[
int
,
InfinityType
]]
\
=
{
n
:
Infinity
for
n
in
recset
}
for
tracker
in
ChangeTracker
():
for
src
,
prods
in
self
.
rules
.
items
():
if
src
in
lang_size
:
continue
src_wc
:
Union
[
None
,
InfinityType
,
int
]
=
0
for
prod
in
prods
:
prod_wc
:
Union
[
int
,
InfinityType
]
=
1
for
sym
in
prod
:
if
isinstance
(
sym
,
Nonterminal
):
sym_wc
=
lang_size
.
get
(
sym
)
if
sym_wc
is
None
:
src_wc
=
None
break
if
sym_wc
is
Infinity
:
src_wc
=
Infinity
break
prod_wc
*=
sym_wc
if
src_wc
is
None
or
src_wc
is
Infinity
:
break
if
src_wc
is
not
None
:
src_wc
+=
prod_wc
if
src_wc
is
not
None
:
lang_size
[
src
]
=
src_wc
tracker
.
changed
()
return
lang_size
def
_generate_random
(
self
,
min_length
:
int
,
max_length
:
int
,
seed
:
Optional
[
int
]
=
0
,
rec_bias
:
int
=
2
,
max_fin_size
:
int
=
16
)
\
->
Iterable
[
CFG
.
Word
]:
"""
Yields a stream of random words of the grammar up to {max_length} in
length. The stream is infinite Words can repeat.
Needs grammar in epsilon normal form, without simple rules, and
normalized (so proper grammar is OK). Otherwise it might fail to
generate some words or the generation might not terminate.
"""
assert
rec_bias
>=
1
random
.
seed
(
seed
)
recursive
=
self
.
_recursive_nonterminals
()
shortest_word
=
self
.
_nonterminal_min_length
()
longest_word
=
self
.
_nonterminal_max_length
(
recursive
)
lang_size
=
self
.
_nonterminal_lang_size
(
recursive
)
def
prod_size
(
prod
:
CFG
.
Production
)
->
Union
[
int
,
InfinityType
]:
out
:
Union
[
int
,
InfinityType
]
=
1
for
x
in
prod
:
if
isinstance
(
x
,
Nonterminal
):
out
*=
lang_size
[
x
]
return
max
(
out
,
max_fin_size
)
max_fin
=
max
(
x
for
x
in
(
prod_size
(
p
)
for
_
,
p
in
self
.
productions
())
if
isinstance
(
x
,
int
))
def
prod_weight
(
prod
:
CFG
.
Production
)
->
int
:
sz
=
prod_size
(
prod
)
if
isinstance
(
sz
,
int
):
return
sz
return
max_fin
*
rec_bias
def
sentence_length_bound
(
sentence
:
CFG
.
Sentence
,
length_map
)
->
int
:
return
sum
(
map
(
lambda
s
:
length_map
[
s
]
if
isinstance
(
s
,
Nonterminal
)
else
1
,
sentence
))
def
sentence_min_length
(
sentence
:
CFG
.
Sentence
)
->
int
:
return
sentence_length_bound
(
sentence
,
shortest_word
)
def
sentence_max_length
(
sentence
:
CFG
.
Sentence
)
->
int
:
return
sentence_length_bound
(
sentence
,
longest_word
)
while
True
:
sentence
:
CFG
.
Sentence
=
(
self
.
init
,)
while
not
CFG
.
all_terminal
(
sentence
):
current_min_len
=
sentence_min_length
(
sentence
)
candidates
:
List
[
Tuple
[
int
,
CFG
.
Production
]]
=
[]
weights
:
List
[
int
]
=
[]
for
i
in
range
(
len
(
sentence
)):
sym
=
sentence
[
i
]
if
not
isinstance
(
sym
,
Nonterminal
)
\
or
sym
not
in
self
.
rules
:
continue
base_min
=
current_min_len
-
shortest_word
[
sym
]
# cannot use the same trick for max due to inifinity
base_max
=
sentence_max_length
(
sentence
[:
i
]
+
sentence
[
1
+
i
:])
for
prod
in
self
.
rules
[
sym
]:
minl
=
base_min
+
sentence_min_length
(
prod
)
maxl
=
base_max
+
sentence_max_length
(
prod
)
if
minl
<=
max_length
and
maxl
>=
min_length
:
candidates
.
append
((
i
,
prod
))
weights
.
append
(
prod_weight
(
prod
))
# print([(CFG._terminal_sequence_to_str(p), w) for (_,p), w in zip(candidates, weights)])
move
=
random
.
choices
(
candidates
,
weights
=
weights
)[
0
]
i
=
move
[
0
]
sentence
=
sentence
[:
i
]
+
move
[
1
]
+
sentence
[
1
+
i
:]
yield
typing
.
cast
(
CFG
.
Word
,
sentence
)
@
staticmethod
def
_terminal_sequence_to_str
(
seq
:
Optional
[
Iterable
[
Terminal
]])
\
->
Optional
[
str
]:
...
...
@@ -668,16 +507,17 @@ class CFG:
@
staticmethod
def
is_equivalent_test
(
left_
:
CFG
,
right_
:
CFG
,
full_cmp_len
=
3
,
randomSample
=
100
,
randomDepth
=-
4
)
->
IsEquivalentResult
:
full_cmp_len
:
Optional
[
int
]
=
None
,
max_cmp_len
:
Optional
[
int
]
=
None
,
random_samples
:
int
=
1000
)
->
IsEquivalentResult
:
left
=
left_
.
cnf
()
right
=
right_
.
cnf
()
left_ce
:
Optional
[
CFG
.
Word
]
=
None
right_ce
:
Optional
[
CFG
.
Word
]
=
None
def
mkres
():
def
mkres
()
->
IsEquivalentResult
:
return
IsEquivalentResult
(
CFG
.
_terminal_sequence_to_str
(
left_ce
),
CFG
.
_terminal_sequence_to_str
(
right_ce
))
...
...
@@ -695,6 +535,29 @@ class CFG:
if
right_ce
is
None
:
right_ce
=
may_pop
(
right_words
-
left_words
)
def
try_word
(
maybe_ce
:
Optional
[
CFG
.
Word
],
rng
:
CFGRandom
,
other
:
CFG
,
length
:
int
)
->
Optional
[
CFG
.
Word
]:
if
maybe_ce
is
not
None
:
return
maybe_ce
word
=
rng
.
rnd_word
(
length
)
if
word
is
None
or
other
.
generates
(
word
):
return
None
return
word
if
full_cmp_len
is
None
:
alphabet_size
=
max
(
len
(
left
.
terminals
),
len
(
right
.
terminals
))
max_full_compare
=
pow
(
2
,
16
)
full_cmp_len
=
math
.
floor
(
math
.
log
(
max_full_compare
,
alphabet_size
))
print
(
f
"full_cmp_len =
{
full_cmp_len
}
"
)
if
max_cmp_len
is
None
:
max_cmp_len
=
min
(
max
(
pow
(
2
,
len
(
left
.
nonterminals
)
+
1
),
pow
(
2
,
len
(
right
.
nonterminals
)
+
1
)),
100
)
print
(
f
"max_cmp_len =
{
max_cmp_len
}
"
)
if
full_cmp_len
>
0
:
lenmap
:
Dict
[
int
,
Set
[
CFG
.
Word
]]
\
=
{
n
:
set
()
for
n
in
range
(
full_cmp_len
+
1
)}
...
...
@@ -717,11 +580,21 @@ class CFG:
for
sz
in
range
(
last_min_size
,
min_size
):
fill_ces
(
left_words
[
sz
],
right_words
[
sz
])
last_min_size
=
min_size
print
(
f
"Full comparison for
{
last_min_size
}
done…"
)
if
left_ce
is
not
None
and
right_ce
is
not
None
:
return
mkres
()
assert
randomSample
==
0
,
"UNIMPLEMENTED"
left_rnd
=
CFGRandom
(
left
)
right_rnd
=
CFGRandom
(
right
)
for
length
in
range
(
full_cmp_len
+
1
,
max_cmp_len
+
1
):
for
_
in
range
(
random_samples
):
left_ce
=
try_word
(
left_ce
,
left_rnd
,
right
,
length
)
right_ce
=
try_word
(
right_ce
,
right_rnd
,
left
,
length
)
if
left_ce
is
not
None
and
right_ce
is
not
None
:
return
mkres
()
print
(
f
"Tested for length
{
length
}
…"
)
return
mkres
()
...
...
@@ -756,7 +629,6 @@ class CFGRandom:
return
self
.
counts
[
length
][
nterm
]
def
_materialize
(
self
,
length
:
int
)
->
None
:
# print(f"_materialize({length})")
if
len
(
self
.
counts
)
>
length
:
return
for
l
in
range
(
len
(
self
.
counts
),
length
):
...
...
@@ -771,8 +643,6 @@ class CFGRandom:
def
_materialize_prod
(
self
,
prod
:
CFG
.
Production
,
length
:
int
,
prefix
=
""
):
"""Assumes smaller length are already computed"""
# print(f"{prefix}_materialize_prod({CFG._terminal_sequence_to_str(prod)}, {length})")
assert
len
(
self
.
prod_counts
)
>=
length
,
\
"smaller production lengths must be already computed"
...
...
@@ -788,7 +658,6 @@ class CFGRandom:
self
.
prod_counts
.
append
(
dict
())
if
prod
in
self
.
prod_counts
[
length
]:
# print(f"{prefix} -> {self.prod_counts[length][prod]} (c)")
return
self
.
prod_counts
[
length
][
prod
]
# for N -> γ to get number of words of lenght l
...
...
@@ -807,7 +676,6 @@ class CFGRandom:
count
+=
cnt_alpha
*
cnt_beta
self
.
prod_counts
[
length
][
prod
]
=
count
# print(f"{prefix} -> {count}")
return
count
def
rnd_word
(
self
,
length
:
int
)
->
Optional
[
CFG
.
Word
]:
...
...
@@ -836,6 +704,5 @@ class CFGRandom:
candidates
.
append
(
cand
)
weights
.
append
(
w
)
print
([(
CFG
.
_terminal_sequence_to_str
(
candidates
[
i
]),
weights
[
i
])
for
i
in
range
(
len
(
candidates
))])
sentence
=
random
.
choices
(
candidates
,
weights
=
weights
)[
0
]
return
typing
.
cast
(
CFG
.
Word
,
sentence
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment