Commit f9e4dd11 authored by David Novak's avatar David Novak
Browse files

Merged version2 into master

parents 4cf244a3 8fa7f791
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@

    <groupId>messif</groupId>
    <artifactId>messif</artifactId>
    <version>2.3.7-DEVEL</version>
    <version>2.3.8-DEVEL</version>
    <packaging>jar</packaging>
    <url>http://disa.fi.muni.cz/trac/messif/</url>

+105 −16
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
@@ -47,12 +48,14 @@ import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
import static messif.algorithms.NavigationProcessors.getNavigationProcessor;
import messif.buckets.index.Search;
import messif.executor.MethodClassExecutor;
import messif.executor.MethodExecutor;
import messif.objects.LocalAbstractObject;
import messif.operations.AbstractOperation;
import messif.operations.QueryOperation;
import messif.operations.RankingQueryOperation;
import messif.operations.data.DataManipulationOperation;
import messif.operations.query.GetObjectCountOperation;
import messif.statistics.FutureWithStatistics;
import messif.statistics.FutureWithStatisticsImpl;
@@ -83,7 +86,7 @@ import messif.utility.reflection.NoSuchInstantiatorException;
 * @author David Novak, Masaryk University, Brno, Czech Republic, david.novak@fi.muni.cz
 */
public abstract class Algorithm implements Serializable {
    /** class id for serialization */
    /** Class id for serialization. */
    private static final long serialVersionUID = 1L;

    //****************** Constants ******************//
@@ -91,30 +94,39 @@ public abstract class Algorithm implements Serializable {
    /** Logger */
    protected static final Logger log = Logger.getLogger("messif.algorithm");

    /** Maximal number of currently executed operations */
    /** Maximal number of currently executed operations. */
    protected static final int maximalConcurrentOperations = 1024;


    //****************** Attributes ******************//

    /** The name of this algorithm */
    /** The name of this algorithm. */
    private final String algorithmName;

    /** Verbosity of the logging of the last executed operation */
    /** Lock object for synchronization. */
    private final transient Object lock = new Object();
    
    /** Verbosity of the logging of the last executed operation. */
    private int executedOperationsLogVerbosity = 1;

    /** Number of actually running operations */
    /** Number of actually running operations. */
    private transient Semaphore runningOperationsSemaphore;

    /** List of currently running operations */
    /** Number of actually running update operations. */
    private transient Semaphore updateOperationsSemaphore;

    /** List of currently running operations. */
    private transient WeakHashMap<AbstractOperation, Thread> runningOperations;

    /** Executor for operations */
    /** Executor for operations. */
    private transient MethodClassExecutor operationExecutor;

    /** Thread pool service to process operations in threads. */
    private transient ExecutorService operationsThreadPool;
    
    /** Operation log for update operations. */
    private transient UpdateOperationLog updateOperationLog;

    
    //****************** Constructors ******************//

@@ -233,6 +245,56 @@ public abstract class Algorithm implements Serializable {
        this.executedOperationsLogVerbosity = executedOperationsLogVerbosity;
    }

    /**
     * Start using the update operation log {@link UpdateOperationLog} and use given file;
     *  if the file contains some operations, they are applied to this algorithm.
     * @param logFileName name of the file to store operation log (can exist)
     * @throws messif.algorithms.AlgorithmMethodException if the log is already set and used
     */
    public void useUpdateOperationLog(String logFileName) throws AlgorithmMethodException {
        if (maximalConcurrentOperations > 0) {
            updateOperationsSemaphore.acquireUninterruptibly(maximalConcurrentOperations);
        } else {
            throw new AlgorithmMethodException("cannot use the update log without the update semaphore");
        }
        try {
            if (updateOperationLog != null && updateOperationLog.isUsed()) {
                throw new AlgorithmMethodException("the update operation log is already set and not empty (call Algorithm.storeToFile() first to clear the log)");
            }
            if (logFileName == null) {
                this.updateOperationLog = null;
            } else {
                UpdateOperationLog createdLog = new UpdateOperationLog(logFileName);
                try {
                    Search<DataManipulationOperation> operations = createdLog.readOperations();
                    if (operations != null) {
                        log.log(Level.WARNING, "replaying the update operation log for: {0}", getName());
                        while (operations.next()) {
                            DataManipulationOperation operation = operations.getCurrentObject();
                            executeInternal(Statistics.isEnabledGlobally(), operation);
                            log.log(Level.INFO, "{0} processed: {1}", new Object[]{this.getName(), operation});
                        }
                    }
                    this.updateOperationLog = createdLog;
                } catch (AlgorithmMethodException | NoSuchMethodException | InterruptedException | CloneNotSupportedException | InvocationTargetException ex) {
                    log.log(Level.SEVERE, null, ex);
                }
            }
        } finally {
            updateOperationsSemaphore.release(maximalConcurrentOperations);
        }
    }
    
    /**
     * Checks whether the update operation log is used (it is set) but there is no update operation
     *  written there. If true, the extending algorithm can be sure that there was no update operation
     *  since last serialization of the algorithm.
     * @return true only if the {@link #updateOperationLog} is used (not null) and {@link UpdateOperationLog#isUsed() }.
     */
    protected boolean isUpdateLogEmpty() {
        return updateOperationLog != null && ! updateOperationLog.isUsed();
    }
            
    
    //****************** Serialization ******************//

@@ -246,6 +308,13 @@ public abstract class Algorithm implements Serializable {
    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        initializeExecutor();
        try {
            Field lockField = Algorithm.class.getDeclaredField("lock");
            lockField.setAccessible(true);
            lockField.set(this, new Object());
        } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException ex) {
            throw new IOException(ex);
        }
    }

    /**
@@ -299,7 +368,7 @@ public abstract class Algorithm implements Serializable {
    public void storeToFile(String filepath) throws IOException {
        // Acquire all locks, thus waiting for all currently running operations and disable additional
        if (maximalConcurrentOperations > 0)
            runningOperationsSemaphore.acquireUninterruptibly(maximalConcurrentOperations);
            updateOperationsSemaphore.acquireUninterruptibly(maximalConcurrentOperations);
        
        try {
            // Check if the file is a regular file
@@ -328,7 +397,7 @@ public abstract class Algorithm implements Serializable {
        } finally {
            // Unlock operations
            if (maximalConcurrentOperations > 0)
                runningOperationsSemaphore.release(maximalConcurrentOperations);
                updateOperationsSemaphore.release(maximalConcurrentOperations);
        }
    }

@@ -347,6 +416,9 @@ public abstract class Algorithm implements Serializable {
     * @param successful true, if the write to file was successful, false otherwise
     */
    protected void afterStoreToFile(String filepath, boolean successful) { 
        if (updateOperationLog != null) {
            updateOperationLog.clearLog();
        }
    }
    

@@ -387,7 +459,7 @@ public abstract class Algorithm implements Serializable {
     * @return a collection of all operations
     */
    public Collection<AbstractOperation> getAllRunningOperations() {
        synchronized (algorithmName) { // We are synchronizing the access to the list using algorithmName so that the runningOperations can be set when deserializing
        synchronized (lock) { // We are synchronizing the access to the list using algorithmName so that the runningOperations can be set when deserializing
            // The list of operations must be copied to a serializable list
            Collection<AbstractOperation> ret = new ArrayList<AbstractOperation>(runningOperations.size());
            for (Entry<AbstractOperation, Thread> entry : runningOperations.entrySet()) { // Note that the entryset never returns a key weak-ref that is garbage collected
@@ -480,6 +552,7 @@ public abstract class Algorithm implements Serializable {
     */
    private void initializeExecutor() throws IllegalArgumentException {
        runningOperationsSemaphore = new Semaphore(maximalConcurrentOperations, true);
        updateOperationsSemaphore = new Semaphore(maximalConcurrentOperations, true);
        runningOperations = new WeakHashMap<>(maximalConcurrentOperations);
        operationExecutor = new MethodClassExecutor(this, 0, null, Modifier.PUBLIC|Modifier.PROTECTED, Algorithm.class, getExecutorParamClasses());
    }
@@ -516,6 +589,14 @@ public abstract class Algorithm implements Serializable {
        return true;
    }
    
    /**
     * Tries to execute the operation using {@link NavigationDirectory} and if it does not work, it
     *  uses the executor (Java reflection).
     */
    private void executeInternal(final boolean statisticsOn, Object... params) throws InterruptedException, AlgorithmMethodException, CloneNotSupportedException, NoSuchMethodException, InvocationTargetException {
        if (!executeUsingNavDir(this, params[0], statisticsOn))
            operationExecutor.execute(params);        
    }
    
    /**
     * Execute operation with additional parameters.
@@ -528,8 +609,15 @@ public abstract class Algorithm implements Serializable {
    protected final void execute(final boolean statisticsOn, Object... params) throws AlgorithmMethodException, NoSuchMethodException {
        if (maximalConcurrentOperations > 0)
            runningOperationsSemaphore.acquireUninterruptibly();
        synchronized (algorithmName) { // We are synchronizing the access to the list using algorithmName so that the runningOperations can be set when deserializing
            runningOperations.put(getExecutorOperationParam(params), Thread.currentThread());
        AbstractOperation operation = getExecutorOperationParam(params);
        if (operation instanceof DataManipulationOperation) {
            if (maximalConcurrentOperations > 0)
               updateOperationsSemaphore.acquireUninterruptibly();
            if (updateOperationLog != null)
                updateOperationLog.log((DataManipulationOperation) operation);
        }
        synchronized (lock) { // We are synchronizing the access to the list using algorithmName so that the runningOperations can be set when deserializing
            runningOperations.put(operation, Thread.currentThread());
        }
        // log the operation processing information 
        long startTimeStamp = System.currentTimeMillis();
@@ -540,8 +628,7 @@ public abstract class Algorithm implements Serializable {
                operationTime = OperationStatistics.getOpStatistics("OperationTime", StatisticTimer.class);
                operationTime.start();
            }
            if (!executeUsingNavDir(this, params[0], statisticsOn))
                operationExecutor.execute(params);
            executeInternal(statisticsOn, params);
            if (statisticsOn) {
                operationTime.stop();
            }
@@ -563,6 +650,8 @@ public abstract class Algorithm implements Serializable {
            }
            if (maximalConcurrentOperations > 0)
                runningOperationsSemaphore.release();
            if ((operation instanceof DataManipulationOperation) && (maximalConcurrentOperations > 0))
                updateOperationsSemaphore.release();
        }
    }

@@ -721,7 +810,7 @@ public abstract class Algorithm implements Serializable {
     *          if there was no operation or the thread executing it has already finished, <tt>false</tt> is returned
     */
    public boolean terminateOperation(UUID operationId) {
        synchronized (algorithmName) { // We are synchronizing the access to the list using algorithmName so that the runningOperations can be set when deserializing
        synchronized (lock) { // We are synchronizing the access to the list using algorithmName so that the runningOperations can be set when deserializing
            for (Entry<AbstractOperation, Thread> entry : runningOperations.entrySet()) {
                if (entry.getKey().getOperationID().equals(operationId)) {
                    entry.getValue().interrupt();
+125 −0
Original line number Diff line number Diff line
/*
 *  This file is part of MESSIF library.
 *
 *  MESSIF library is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  MESSIF library is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with MESSIF library.  If not, see <http://www.gnu.org/licenses/>.
 */
package messif.algorithms;

import java.io.File;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import messif.buckets.BucketStorageException;
import messif.buckets.index.Search;
import messif.buckets.storage.impl.DiskStorage;
import messif.objects.nio.MultiClassSerializator;
import messif.operations.data.DataManipulationOperation;

/**
 * Operation log to keep track of all update operations that are processed on given algorithm.
 * This class is not serializable and is expected to be created always with a provided file name.
 * 
 * @author david
 */
class UpdateOperationLog {
    
    /** The name of the operation log file. */
    private final File logFile;
    
    /** Opened storage with the log. */
    private DiskStorage<DataManipulationOperation> log;

    /** 
     * Constructor given a file name to store the operation log to.
     * @param logFileName name of the file to store the log to (the file can exist)
     */
    public UpdateOperationLog(String logFileName) {
        this.logFile = new File(logFileName);
    }
    
    /**
     * Test if the log is opened and open it if not.
     * @throws IOException 
     */
    private void initLog() throws IOException {
        if (log != null) {
            return;
        }
        synchronized (this) {
            if (log == null) {
                log = new DiskStorage(DataManipulationOperation.class, logFile, false, 16*1024, false, 1, 
                        0L, Long.MAX_VALUE, new MultiClassSerializator(DataManipulationOperation.class));
            }
        }
    }
    
    /**
     * Stores given operation to the disk operation log.
     * @param operation operation to store
     * @return true if the operation was actually stored, false otherwise
     */
    public boolean log(DataManipulationOperation operation) {
        try {
            initLog();
            log.store(operation);
            log.flush(false);
            return true;
        } catch (BucketStorageException | IOException ex) {
            Logger.getLogger(UpdateOperationLog.class.getName()).log(Level.SEVERE, null, ex);
            return false;
        }
    }
    
    /**
     * Read all operations from the log and returns an "iterator" over these operations.
     * @return MESSIF search object with the update operations or null, if the log is empty
     * @throws AlgorithmMethodException
     * @throws NoSuchMethodException 
     */
    public Search<DataManipulationOperation> readOperations() throws AlgorithmMethodException, NoSuchMethodException {
        try {
            initLog();
            if (log.size() > 0) {
                return log.search();
            }
            return null;
        } catch (IOException ex) {
            Logger.getLogger(UpdateOperationLog.class.getName()).log(Level.SEVERE, null, ex);
            return null;
        }
    }
    
    /**
     * Clear existing operation log, remove the file and set the {@link #log} to null.
     */
    public void clearLog() {
        if (log != null) {
            try {
                log.destroy();
            } catch (Throwable ex) {
                Logger.getLogger(UpdateOperationLog.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        log = null;
    }
    
    /**
     * Return true if the log is open and there is some operation stored in the log.
     * @return true/false if the log is actually used
     */
    public boolean isUsed() {
        return log != null && log.size() > 0;
    }
    
}
+0 −7
Original line number Diff line number Diff line
@@ -51,13 +51,6 @@ public abstract class OneStepNavigationProcessor<O extends AbstractOperation> im
        return operation;
    }

    @Override
    public void close() {
        if (! operation.isFinished()) {
            operation.endOperation();
        }
    }

    @Override
    public boolean isFinished() {
        return started;
+5 −1
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ package messif.objects.impl;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -35,7 +36,10 @@ import messif.utility.ArrayMap;
 * This is an implementation of the {@link MetaObject} which stores data in fixed {@link ArrayMap}.
 * @author David Novak, Masaryk University, Brno, Czech Republic, david.novak@fi.muni.cz
 */
public class MetaObjectFixedMap extends MetaObject {
public class MetaObjectFixedMap extends MetaObject implements Serializable {

    /** Class id for serialization. */
    private static final long serialVersionUID = 1L;    
    
    /** Map to store the data in. */
    protected final ArrayMap<String, LocalAbstractObject> objects;
Loading