Commit 0fe09452 authored by David Novak's avatar David Novak
Browse files

* the adaptive refinement with fixed steps to check was added and tested

 * improved statistics from the approx search 
 * PPPCodeObjects now enable a "lazy" reading - IDs are actually read only when really needed
 * PPP-Tree leaf node now enables refinement of the query-PPP distance by reading the stored PP suffixes
 * IDObjectRAStorage now uses the asynchronous access to disk storage 
parent d41fe5d5
Loading
Loading
Loading
Loading
+15 −24
Original line number Diff line number Diff line
@@ -19,13 +19,13 @@ package messif.buckets.index.impl;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.logging.Level;
import java.util.logging.Logger;
import messif.buckets.BucketStorageException;
import messif.buckets.index.IndexComparator;
import messif.buckets.index.impl.LongStorageMemoryIndex;
import messif.buckets.storage.impl.DiskStorage;
import messif.objects.LocalAbstractObject;
import mindex.MetricIndexes;
@@ -195,34 +195,25 @@ public class DiskStorageMemoryIntIndex implements Serializable {
    
    // ******************     Index access methods     ****************** //

    /**
     * Given a storage position, this method simply returns object on given position in the storage.
     * @param position storage position
     * @return object on given position in the storage
     */
    private LocalAbstractObject getObjectByStoragePosition(long position) {
        try {
            return storage.read(position);
        } catch (BucketStorageException ex) {
            throw new IllegalStateException("Cannot read object from storage", ex);
        }
    }
    
    /**
     * Given an integer key, this methods returns corresponding object from the storage or null (if not found).
     * @param key object integer key
     * @return corresponding object from the storage or null (if not found)
     */
    public LocalAbstractObject getObject(int key) {
    public Iterator<LocalAbstractObject> getObjects(Collection<Integer> keys) {
        //List<Long> positions = new ArrayList<>(keys.length);
        long positions [] = new long [keys.size()];
        int index = 0;
        for (int key : keys) {
            // try to find the key in the static index
            int keyIndex = binarySearch(true, key, 0, staticIndex.length - 1, true);
            if (keyIndex >= 0) {
            return getObjectByStoragePosition(staticPositions[keyIndex]);
                positions[index ++] = staticPositions[keyIndex];
            } else if (! dynamicIndex.isEmpty() && ((keyIndex = binarySearch(false, key, 0, dynamicIndex.size() - 1, true)) > 0)) {
                positions[index ++] = dynamicIndex.get(keyIndex).position;
            }
        if (! dynamicIndex.isEmpty() && ((keyIndex = binarySearch(false, key, 0, dynamicIndex.size() - 1, true)) > 0)) {
            return getObjectByStoragePosition(dynamicIndex.get(keyIndex).position);
        }
        return null;
        return storage.read((positions.length == index) ? positions : Arrays.copyOf(positions, index));
    }

    /**
+0 −130
Original line number Diff line number Diff line

package pppcodes;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * @author xnovak8
 */
public class TestIOSpeed {

    public static final String clearCacheRun = "/home/xnovak8/work/1303-pppcode/scripts/clear-cache";
    public static final int NUMBER_OF_READS = 10000;
    public static final int BUFFER_SIZE = 16*1024;
    
    protected static final Random random = new Random(System.currentTimeMillis());
    
    protected static long testSequentialRead(FileChannel fileChannel, long channelSize, int nReads) throws IOException {
        long startTime = System.currentTimeMillis();
        
        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        int sumRead = 0;
        for (int i = 0; i < nReads; i++) {
            long nextPosition = Math.abs(random.nextLong()) % channelSize;
            
            buffer.clear();
            sumRead += fileChannel.read(buffer, nextPosition);
        }
        //System.out.println("avg number of read bytes: " + (sumRead / nReads));
                
        return ((System.currentTimeMillis() - startTime));
    }

    protected static long testParallelRead(AsynchronousFileChannel asyncFileChannel, long channelSize, int nReads, int batchSize) {
        long startTime = System.currentTimeMillis();

        int readCounter = 0;
        while (readCounter < nReads) {
            Future [] futures = new Future[batchSize];
            for (int i = 0; i < batchSize && readCounter + i < nReads; i++) {
                ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);        
                long nextPosition = Math.abs(random.nextLong()) % channelSize;

                buffer.clear();
                futures[i] = asyncFileChannel.read(buffer, nextPosition);
            }
            try {
                int sumRead = 0;
                for (Future future : futures) {
                    sumRead += (Integer) future.get();
                }
                //System.out.println("avg number of read bytes: " + (sumRead / futures.length));
            } catch (InterruptedException | ExecutionException ex) {
                Logger.getLogger(TestIOSpeed.class.getName()).log(Level.SEVERE, null, ex);
            }
            readCounter += batchSize;
        }
        
        return (System.currentTimeMillis() - startTime);
        
    }
    
    protected static void clearCaches() throws IOException {
        Process process = new ProcessBuilder(clearCacheRun).start();
        
        BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
        System.out.println("Cleaning caches: ");
        String line;        
        while ((line = br.readLine()) != null) {
            System.out.println("\t" + line);
        }
    }
    
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        if (args.length < 1) {
            System.err.println("Usage: TestIOSpeed <file_to_read>");
            return;
        }

        File file = new File(args[0]);
        if (! file.exists()) {
            System.err.println("file not exists or cannot be read: " + file);
            return;
        }
        try {
            long size;
            try (FileChannel fileChannel = new RandomAccessFile(file, "r").getChannel()) {
                size = fileChannel.size();
                clearCaches();
                System.out.println("file size: " + size);
                System.out.println("Time of sequential reads of " + NUMBER_OF_READS +" blocks: " 
                        + testSequentialRead(fileChannel, size, NUMBER_OF_READS));
            }
            
            int [] batchSizes = new int [] {10, 20, 50, 100, 200, 500, 1000};

            try (AsynchronousFileChannel asyncFileChannel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) {
                for (int batchSize : batchSizes) {
                    clearCaches();
                    System.out.println("Time of async reads of " + NUMBER_OF_READS + " blocks (batch size = " + batchSize + ") : "
                            + testParallelRead(asyncFileChannel, size, NUMBER_OF_READS, batchSize));
                }
            }
            
        } catch (FileNotFoundException ex) {
            Logger.getLogger(TestIOSpeed.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IOException ex) {
            Logger.getLogger(TestIOSpeed.class.getName()).log(Level.SEVERE, null, ex);
        }
        
    }
}
+8 −5
Original line number Diff line number Diff line
@@ -42,7 +42,6 @@ import messif.operations.data.BulkInsertOperation;
import messif.operations.data.InsertOperation;
import messif.operations.query.ApproxKNNQueryOperation;
import messif.utility.ExtendedProperties;
import mindex.MetricIndex;
import mindex.MetricIndexes;
import pppcodes.PPPCodeIndex;
import pppcodes.ids.IDObjectRAStorage;
@@ -50,6 +49,7 @@ import pppcodes.ids.LocatorIntegerTranslator;
import pppcodes.ids.PPPDiskIndex;
import pppcodes.ids.ReversibleLocatorIntTranslator;
import pppcodes.ids.SimpleLocatorIntegerTranslator;
import pppcodes.processors.ApproxNavProcessorAdaptiveRefinement;

/**
 * This is an algorithm for PPP Codes that internally uses several standard M-Index algorithms.
@@ -189,15 +189,18 @@ public class PPPCodeAlgorithm extends MultipleOverlaysAlgorithm {
        }
        // Approximate operations is the only query operation to be evaluated on PPPCode algorithm
        if (operation instanceof ApproxKNNQueryOperation) {
            ((RankingQueryOperation) operation).setAnswerThresholdComputation(false, true, 0);
            // if the number of used overlays is limited
            int maxLambda = operation.getParameter("MAX_LAMBDA", Integer.class, algorithms.length);
            List<PPPCodeSingleAlgorithm> indexesToRunOn = new ArrayList<>(Arrays.asList(Arrays.copyOf(algorithms, maxLambda)));
            List<PPPCodeSingleAlgorithm> indexesToRunOn = new ArrayList<>(Arrays.asList(Arrays.copyOf(algorithms, operation.getParameter("MAX_LAMBDA", Integer.class, algorithms.length))));
            
            if (idObjectStorage == null || operation.getParameter("NOREFINE", Boolean.class, false)) {
                ((RankingQueryOperation) operation).setAnswerThresholdComputation(false, true, 0);
                return new ApproxNavProcessor((ApproxKNNQueryOperation) operation, indexesToRunOn, intLocatorTranslator);
            } else {
                if (0 < operation.getParameter(ApproxNavProcessorRefinement.PARAM_NR_OF_ACCESSED, Integer.class, 1)) {
                    return new ApproxNavProcessorRefinement(idObjectStorage, (ApproxKNNQueryOperation) operation, indexesToRunOn);
                } else {
                    return new ApproxNavProcessorAdaptiveRefinement(idObjectStorage, (ApproxKNNQueryOperation) operation, indexesToRunOn);
                }
            }
        }
        return null;
+11 −0
Original line number Diff line number Diff line

package pppcodes.ids;

import java.util.Collection;
import java.util.Iterator;
import messif.buckets.BucketStorageException;
import messif.objects.LocalAbstractObject;

@@ -17,6 +19,15 @@ public interface IDObjectRAStorage<T extends LocalAbstractObject> {
     */
    public T readObject(int id);

    /**
     * Given a list of integer IDs, this method returns the corresponding stored objects. The
     *  returned objects does not have to be in the same order! The objects that were not found
     *  are simply not returned by the iterator.
     * @param ids number identifiers of the objects to be returned
     * @return iterator over the object identified by the identifier passed
     */
    public Iterator<T> readObjects(Collection<Integer> ids);
    
    /**
     * Stores given object to this storage and returns true, if it works.
     * @param object object to be stored
+29 −38
Original line number Diff line number Diff line
@@ -10,13 +10,14 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import messif.buckets.BucketStorageException;
import messif.buckets.index.IndexComparator;
import messif.buckets.index.impl.LongStorageMemoryIndex;
import messif.buckets.storage.StorageSearch;
import messif.buckets.storage.impl.DiskStorage;
import messif.objects.LocalAbstractObject;
import messif.utility.Convert;
@@ -30,7 +31,6 @@ import mindex.MetricIndexes;
public class PPPDiskIndex extends SimpleLocatorIntegerTranslator implements IDObjectRAStorage<LocalAbstractObject>, ReversibleLocatorIntTranslator {
        
    /** The object storage */
    //private final LongStorageMemoryIndex<Integer, LocalAbstractObject> intObjectIndex;
    private final DiskStorageMemoryIntIndex intObjectIndex;
    
    /** Directory into which this index should be serialized */
@@ -39,7 +39,6 @@ public class PPPDiskIndex extends SimpleLocatorIntegerTranslator implements IDOb
    /** Creates the PPP disk index given a disk storage. */
    protected PPPDiskIndex(DiskStorage<LocalAbstractObject> storage, File serializationDir) {
        super();
        //this.intObjectIndex = new LongStorageMemoryIndex<>(storage, new IntegerKeyComparator());
        this.intObjectIndex = new DiskStorageMemoryIntIndex(storage);
        this.serializationDir = serializationDir;
    }
@@ -66,14 +65,6 @@ public class PPPDiskIndex extends SimpleLocatorIntegerTranslator implements IDOb
    public static PPPDiskIndex restoreFromDir(File dir) throws IOException {
        File file = new File(dir, SERIALIZATION_FILE);
        try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)))) {
//            LongStorageMemoryIndex<Integer, LocalAbstractObject> index = (LongStorageMemoryIndex) in.readObject();            
//            // find the maximal key stored in the passed index
//            AtomicInteger nextID = null;
//            StorageSearch<LocalAbstractObject> search = index.search(Integer.MAX_VALUE, false);
//            if (search.previous()) {
//                nextID = new AtomicInteger(index.comparator().extractKey(search.getCurrentObject()));
//                nextID.incrementAndGet();
//            }
            DiskStorageMemoryIntIndex index = (DiskStorageMemoryIntIndex) in.readObject();
            AtomicInteger nextID = new AtomicInteger(index.getMaxKey() + 1);
            return new PPPDiskIndex(index, dir, nextID);
@@ -116,12 +107,16 @@ public class PPPDiskIndex extends SimpleLocatorIntegerTranslator implements IDOb
    
    @Override
    public LocalAbstractObject readObject(int id) {
        return intObjectIndex.getObject(id);
//        StorageSearch<LocalAbstractObject> result = intObjectIndex.search(id, true);
//        if (result.next()) {
//            return result.getCurrentObject();
//        }
//        return null;
        Iterator<LocalAbstractObject> objects = intObjectIndex.getObjects(Collections.singletonList(id));
        if (objects.hasNext()) {
            return objects.next();
        }
        return null;
    }

    @Override
    public Iterator<LocalAbstractObject> readObjects(Collection<Integer> ids) {
        return intObjectIndex.getObjects(ids);
    }    
    
    @Override
@@ -135,25 +130,21 @@ public class PPPDiskIndex extends SimpleLocatorIntegerTranslator implements IDOb
        return (object == null) ? null : object.getLocatorURI();
    }

    /**
     * A natural comparator between an integer key and an object stored in 
     */
    protected static class IntegerKeyComparator implements IndexComparator<Integer, LocalAbstractObject> {

    @Override
        public final int indexCompare(Integer k, LocalAbstractObject o) {
            return k.compareTo(extractKey(o));
    public String[] getStringLocators(List<Integer> ids) {
        Iterator<LocalAbstractObject> readObjects = readObjects(ids);
        String [] retVal = new String [ids.size()];
        while (readObjects.hasNext()) {
            LocalAbstractObject next = readObjects.next();
            int integerID = MetricIndexes.getIntegerID(next);
            for (int i = 0; i < ids.size(); i++) {
                if (ids.get(i) == integerID) {
                    retVal[i] = next.getLocatorURI();
                    break;
                }

        @Override
        public final Integer extractKey(LocalAbstractObject object) {
            return MetricIndexes.getIntegerID(object);
            }

        @Override
        public final int compare(Integer o1, Integer o2) {
            return o1.compareTo(o2);
        }
        return retVal;
    }
    
}
Loading