Commit 16bc0e74 authored by Filip Kučerák's avatar Filip Kučerák
Browse files

new sequencing system

parent 35506db0
Loading
Loading
Loading
Loading
+45 −0
Original line number Diff line number Diff line
import sys
sys.path.insert(0, '../..')

from kdtesting import *

context_provider = ArbitraryContextProvider({
    "number": NumberA(1, 10)
})

operation_command = Command([
    VariableA("number", context_provider),
    ConstantA("+"),
    VariableA("number", context_provider)
])

print_command = Command([
    ConstantA("print"),
    VariableA("number", context_provider)
])

final_command = ContextRefresher(CommandSeq([
    operation_command,
    print_command
]), context_provider)

testing_group = CommandGroup([
    (1, final_command)
], 5, 10)

print(testing_group.example())
"""
OUTPUT - notice the fixed numbers
7 + 7
print 7
8 + 8
print 8
1 + 1
print 1
3 + 3
print 3
3 + 3
print 3
9 + 9
print 9
"""
+239 −109
Original line number Diff line number Diff line
from typing import List, Callable, Tuple, Optional, TypeVar, Generic, Any, \
    Generator, Dict, Sequence
    Generator, Dict, Sequence, Union
import random
import subprocess
import os
@@ -282,6 +282,19 @@ class Arbitrary(Monoid[T]):
        return self.to_str(self.get())


class ArbitraryWrapper(Arbitrary[T]):

    def __init__(self, arbitrary: Arbitrary[T]):
        super().__init__(arbitrary.get)
        self.arbitrary = arbitrary

    def shrink(self, value: T) -> Generator[T, None, None]:
        return self.arbitrary.shrink(value)

    def to_str(self, value: T) -> str:
        return self.arbitrary.to_str(value)


class ConstantA(Arbitrary[T]):
    """
    ConstantA returns the value provided and shrinks to nothing.
@@ -327,7 +340,7 @@ class TupleA(Arbitrary[Tuple[Generatable, ...]]):
    """

    def __init__(self, arbitraries: Sequence[Arbitrary[Generatable]]):
        super().__init__(tupleof_gen([a.generator for a in arbitraries]))
        super().__init__(tupleof_gen([a.get for a in arbitraries]))
        self.arbitraries = arbitraries
        self.str_start = '('
        self.str_end = ')'
@@ -392,7 +405,7 @@ class ListA(Arbitrary[List[T]]):

    def __init__(self, arbitrary: Arbitrary[T], min_size: int,
                 max_size: int) -> None:
        super().__init__(listof_gen(arbitrary.generator, min_size, max_size))
        super().__init__(listof_gen(arbitrary.get, min_size, max_size))
        self.arbitrary = arbitrary
        self.deep_shrink = True
        self.str_start = '['
@@ -446,7 +459,7 @@ class ChoiceA(_ChoiceA):
    """

    def __init__(self, choices: Sequence[Tuple[int, Arbitrary[Generatable]]]):
        super().__init__(choices_gen([(v, a.generator) for v, a in choices]),
        super().__init__(choices_gen([(v, a.get) for v, a in choices]),
                         [a for _, a in choices])


@@ -457,7 +470,7 @@ class ChoiceEQA(_ChoiceA):
    """

    def __init__(self, arbitraries: Sequence[Arbitrary[T]]):
        super().__init__(eqchoices_gen([a.generator for a in arbitraries]),
        super().__init__(eqchoices_gen([a.get for a in arbitraries]),
                         arbitraries)


@@ -653,122 +666,70 @@ class Tester:


###############################################################################
# REFERENCE IMPLEMENTATION TESTING
# COMMANDS
###############################################################################

def str_id(x: str) -> str:
    return x


ProcessOutput = Any
ProcessOutputTest = Callable[[ProcessOutput, ProcessOutput], bool]

class ContextProvider:
    def __init__(self):
        pass

def str_to_bytes(v: str) -> bytes:
    return bytes(v, 'utf-8')
    def get(self, symbol: str) -> Generatable:
        pass

    def shrink(self, symbol: str, value: Generatable) -> \
            Generator[Generatable, None, None]:
        pass

def bytes_to_str(v: bytes) -> str:
    return v.decode("utf-8")
    def to_str(self, symbol: str, value: Generatable) -> str:
        pass


def stdout_processed_equality(test_preprocessor: Callable[[str], str],
                              exp_preprocessor: Callable[[str], str]) \
        -> ProcessOutputTest:
    """
    Returns:
        ProcessOutputTest: test that first converts the bytes to string, then
            runs the prerocessors, the compares for equality.
    """
    def res_tester(test_out: ProcessOutput, exp_out: ProcessOutput) -> bool:
        processed_test_out = test_preprocessor(bytes_to_str(test_out.stdout))
        processed_exp_out = exp_preprocessor(bytes_to_str(exp_out.stdout))
        return processed_test_out == processed_exp_out
    return res_tester
class ArbitraryContextProvider(ContextProvider):

    def __init__(self, arbitraries: Dict[str, Arbitrary]):
        super().__init__()
        self.arbitraries = arbitraries
        self.values: Dict[str, Generatable] = {}
        self.refresh_values()

def stdout_equality() -> ProcessOutputTest:
    """
    Returns:
        ProcessOutputTest: test that compares the raw outputs
    """
    def res_tester(test_out: ProcessOutput, exp_out: ProcessOutput) -> bool:
        return test_out.stdout == exp_out.stdout
    return res_tester
    def refresh_values(self) -> None:
        for key, arb in self.arbitraries.items():
            self.values[key] = arb.get()

    def get(self, symbol: str) -> Generatable:
        return self.values[symbol]

class RefImplTester(Tester):
    """
    RefImplTester is instance of the Tester class, which uses reference
    implementation to do the testing. It converts each test to string, then
    supplies the string to the tested program and reference implementation,
    then runs the process output tests.
    def shrink(self, symbol: str, value: Generatable) -> Generator[Generatable, None, None]:
        yield self.arbitraries[symbol].shrink(value)

    Use:
        initialize this class using its constructor, then add your process
        output tests using 'add_output_test'. If your program needs some flags
        add them using set_flags.
    """
    def to_str(self, symbol: str, value: Generatable) -> str:
        return self.arbitraries[symbol].to_str(value)

    def __init__(self, program_tested: str, program_referenced: str,
                 verbosity: int = 5) -> None:
class ContextRefresher(ArbitraryWrapper[T]):

        super().__init__(verbosity)
        self.program_tested = program_tested
        self.program_referenced = program_referenced
        self.args: List[str] = []
        self.test_args: List[str] = []
        self.ref_args: List[str] = []
        self.output_tests: List[ProcessOutputTest] = []
    def __init__(self, arbitrary: Arbitrary[T],
                 context_provider: ArbitraryContextProvider):
        super().__init__(arbitrary)
        self.context_provider = context_provider

    def set_args(self, args: List[str]) -> None:
        """
        Appends the arguments to both executables BEFORE specific arguments (
        see test_args and ref_args
        """
        self.args = args
    def get(self) -> T:
        self.context_provider.refresh_values()
        return super().get()

    def set_test_args(self, test_args: List[str]) -> None:
        """
        Appends the arguments to the tested executable AFTER shared
        arguments
        """
        self.test_args = test_args

    def set_ref_args(self, ref_args: List[str]) -> None:
        """
        Appends the arguments to the reference executable AFTER shared
        arguments
        """
        self.ref_args = ref_args
class VariableA(Arbitrary[Generatable]):

    def run_test(self, test: Test[Generatable]) -> bool:
        test_string = test.to_str()

        test_program = subprocess.run(
            [self.program_tested] + self.args + self.test_args,
            capture_output=True,
            input=str_to_bytes(test_string))
    def __init__(self, symbol: str, context_provider: ContextProvider):
        super().__init__(lambda: context_provider.get(symbol))
        self.context_provider = context_provider
        self.symbol = symbol

        exp_program = subprocess.run(
            [self.program_referenced] + self.args + self.ref_args,
            capture_output=True,
            input=str_to_bytes(test_string))
    def shrink(self, value: Generatable) -> Generatable:
        self.context_provider.shrink(self.symbol, value)

        res = True

        if self.verbosity >= 8:
            self.log(8, "TESTED STDOUT\n" + bytes_to_str(test_program.stdout))
            self.log(8, "REFERENCE STDOUT\n" +
                     bytes_to_str(exp_program.stdout))

        for process_test in self.output_tests:
            res = res and process_test(test_program, exp_program)

        return res

    def add_output_test(self, test: ProcessOutputTest) -> None:
        self.output_tests.append(test)
    def to_str(self, value: Generatable) -> str:
        return self.context_provider.to_str(self.symbol, value)


class Command(TupleA):
@@ -787,14 +748,9 @@ class Command(TupleA):
                 symbols: Optional[Dict[str, List[int]]] = None) -> None:
        super().__init__(arbitraries)
        self.symbols = symbols if symbols is not None else {}

    def to_str(self, value: Tuple[Generatable, ...]) -> str:
        res = ""
        for i, (v, arb) in enumerate(zip(value, self.arbitraries)):
            res += arb.to_str(v)
            if i < len(value) - 1:
                res += " "
        return res
        self.str_start = ''
        self.str_del = ' '
        self.str_end = ''

    def get_symbol(self, symbol: str, value: Tuple[Generatable, ...]) \
            -> List[Generatable]:
@@ -811,6 +767,57 @@ class Command(TupleA):

CommandAppend = Callable[['CommandSequence', List[Tuple[int, Generatable]]],
                         str]
CommandElement = Union['Command', 'CommandSequence']


class CommandGroup(ListA[Tuple[int, Generatable]]):

    def __init__(self,
                 commands: List[Tuple[int,
                                      Command]],
                 min_size: int,
                 max_size: int
                 ) -> None:
        super().__init__(ChoiceA(commands), min_size, max_size)
        self.commands = commands
        self.str_start = ''
        self.str_del = '\n'
        self.str_end = ''

    def get_symbol(self, symbol: str, values: List[Tuple[int, Generatable]]) \
            -> List[Generatable]:
        """
        Returns:
            List[Generatable]: list of the occurences of the symbol in the
            whole command sequence.
        """
        res = []
        for i, val in values:
            res += self.commands[i][1].get_symbol(symbol, val)
        return res


class CommandSeq(TupleA):

    def __init__(self,
                 commands: Sequence[Command],
                 ) -> None:
        super().__init__(commands)
        self.commands = commands
        self.str_start = ''
        self.str_del = '\n'
        self.str_end = ''

    def get_symbol(self, symbol: str, values: Tuple[Generatable]) -> List[Generatable]:
        res = []
        for i, val in enumerate(values):
            res += self.commands[i].get_symbol(symbol, val)
        return res


###############################################################################
# LEGACY
###############################################################################


class CommandSequence(ListA[Tuple[int, Generatable]]):
@@ -828,7 +835,9 @@ class CommandSequence(ListA[Tuple[int, Generatable]]):
                 commands: List[Tuple[int,
                                      Command]],
                 min_size: int,
                 max_size: int) -> None:
                 max_size: int,
                 context_provider: Optional[ArbitraryContextProvider] = None
                 ) -> None:
        """
        Args:
            commands (List[Tuple[int, Command]]): list of commands, and their
@@ -840,6 +849,11 @@ class CommandSequence(ListA[Tuple[int, Generatable]]):
        self.commands = commands
        self.start_str: CommandAppend = lambda s, x: ""
        self.end_str: CommandAppend = lambda s, x: ""
        self.context_provider = context_provider

    def get(self) -> List[Tuple[int, Generatable]]:
        self.context_provider.refresh_values()
        return super().get()

    def to_str(self, values: List[Tuple[int, Generatable]]) -> str:
        body = ""
@@ -865,6 +879,48 @@ class CommandSequence(ListA[Tuple[int, Generatable]]):
# PROCESS OUTPUT TESTING
###############################################################################


def str_id(x: str) -> str:
    return x


ProcessOutput = Any
ProcessOutputTest = Callable[[ProcessOutput, ProcessOutput], bool]


def str_to_bytes(v: str) -> bytes:
    return bytes(v, 'utf-8')


def bytes_to_str(v: bytes) -> str:
    return v.decode("utf-8")


def stdout_processed_equality(test_preprocessor: Callable[[str], str],
                              exp_preprocessor: Callable[[str], str]) \
        -> ProcessOutputTest:
    """
    Returns:
        ProcessOutputTest: test that first converts the bytes to string, then
            runs the prerocessors, the compares for equality.
    """
    def res_tester(test_out: ProcessOutput, exp_out: ProcessOutput) -> bool:
        processed_test_out = test_preprocessor(bytes_to_str(test_out.stdout))
        processed_exp_out = exp_preprocessor(bytes_to_str(exp_out.stdout))
        return processed_test_out == processed_exp_out
    return res_tester


def stdout_equality() -> ProcessOutputTest:
    """
    Returns:
        ProcessOutputTest: test that compares the raw outputs
    """
    def res_tester(test_out: ProcessOutput, exp_out: ProcessOutput) -> bool:
        return test_out.stdout == exp_out.stdout
    return res_tester


OutputTest = Callable[[Any], bool]


@@ -909,6 +965,80 @@ class OutputTester(Tester):
        return res


class RefImplTester(Tester):
    """
    RefImplTester is instance of the Tester class, which uses reference
    implementation to do the testing. It converts each test to string, then
    supplies the string to the tested program and reference implementation,
    then runs the process output tests.

    Use:
        initialize this class using its constructor, then add your process
        output tests using 'add_output_test'. If your program needs some flags
        add them using set_flags.
    """

    def __init__(self, program_tested: str, program_referenced: str,
                 verbosity: int = 5) -> None:

        super().__init__(verbosity)
        self.program_tested = program_tested
        self.program_referenced = program_referenced
        self.args: List[str] = []
        self.test_args: List[str] = []
        self.ref_args: List[str] = []
        self.output_tests: List[ProcessOutputTest] = []

    def set_args(self, args: List[str]) -> None:
        """
        Appends the arguments to both executables BEFORE specific arguments (
        see test_args and ref_args
        """
        self.args = args

    def set_test_args(self, test_args: List[str]) -> None:
        """
        Appends the arguments to the tested executable AFTER shared
        arguments
        """
        self.test_args = test_args

    def set_ref_args(self, ref_args: List[str]) -> None:
        """
        Appends the arguments to the reference executable AFTER shared
        arguments
        """
        self.ref_args = ref_args

    def run_test(self, test: Test[Generatable]) -> bool:
        test_string = test.to_str()

        test_program = subprocess.run(
            [self.program_tested] + self.args + self.test_args,
            capture_output=True,
            input=str_to_bytes(test_string))

        exp_program = subprocess.run(
            [self.program_referenced] + self.args + self.ref_args,
            capture_output=True,
            input=str_to_bytes(test_string))

        res = True

        if self.verbosity >= 8:
            self.log(8, "TESTED STDOUT\n" + bytes_to_str(test_program.stdout))
            self.log(8, "REFERENCE STDOUT\n" +
                     bytes_to_str(exp_program.stdout))

        for process_test in self.output_tests:
            res = res and process_test(test_program, exp_program)

        return res

    def add_output_test(self, test: ProcessOutputTest) -> None:
        self.output_tests.append(test)


_VALGRIND_PROBLEM = 11