Commit 9017ec17 authored by Adam Kučera's avatar Adam Kučera

"rough" implementation of the data access API completed

the REST endpoint is not correctly specified and the provider is not
tested
parent 58980706
......@@ -52,7 +52,7 @@ public class DataPointsEndpoint {
logger.debug(json);
AddressRequest readSpecs = m.readValue(json, AddressRequest.class);
logger.debug(readSpecs);
SnapshotResponse result = dc.getDataPointValues(readSpecs, allowCache(cache));
SnapshotResponse result = new SnapshotResponse(dc.getDataPointValues(readSpecs, allowCache(cache)));
logger.debug(result);
return result;
} catch (IOException e) {
......@@ -78,7 +78,7 @@ public class DataPointsEndpoint {
logger.debug(json);
AddressRequest readSpecs = m.readValue(json, AddressRequest.class);
logger.debug(readSpecs);
return dc.getDataPointsAggregation(readSpecs, agg, allowCache(cache));
return new AggregateResponse(dc.getDataPointsAggregation(readSpecs, agg, allowCache(cache)));
} catch (IOException e) {
return AggregateResponse.getErrorResponse("Unable to parse query:" + e);
}
......@@ -99,7 +99,7 @@ public class DataPointsEndpoint {
logger.debug(json);
GroupedAddressRequest readSpecs = m.readValue(json,GroupedAddressRequest.class);
logger.debug(readSpecs);
return dc.getDataPointGroupedValues(readSpecs, allowCache(cache));
return new SnapshotResponse(dc.getDataPointGroupedValues(readSpecs, allowCache(cache)));
} catch (IOException e) {
return SnapshotResponse.getErrorResponse("Unable to parse query:" + e);
}
......@@ -120,7 +120,7 @@ public class DataPointsEndpoint {
logger.debug(json);
GroupedAddressRequest readSpecs = m.readValue(json, GroupedAddressRequest.class);
logger.debug(readSpecs);
AggregateResponse result = dc.getDataPointGroupAggregations(readSpecs, agg, allowCache(cache));
AggregateResponse result = new AggregateResponse(dc.getDataPointGroupAggregations(readSpecs, agg, allowCache(cache)));
logger.debug(result);
return result;
} catch (IOException e) {
......
......@@ -4,43 +4,59 @@ import java.util.Arrays;
import cz.muni.fi.lasaris.sbms.data.entities.Address;
import cz.muni.fi.lasaris.sbms.data.entities.AggregationFunction;
import cz.muni.fi.lasaris.sbms.data.entities.AggregationWindow;
import cz.muni.fi.lasaris.sbms.data.entities.SeriesSpecs;
public class TrendsRequest {
public class TrendRequest {
private AddressRequest ar;
private GroupedAddressRequest gar;
private SeriesSpecs s;
private AggregationFunction af;
public Address getAddress() {
// TODO Auto-generated method stub
return null;
return ar.getAddresses().get(0);
}
public void setSeriesSpecs(SeriesSpecs s) {
// TODO Auto-generated method stub
this.s = s;
}
public void setAggregation(AggregationFunction aggregation, AggregationWindow window) {
// TODO Auto-generated method stub
public void setAggregation(AggregationFunction aggregation) {
this.af = aggregation;
}
public void setAddressSpecs(Address a) {
AddressRequest ar = new AddressRequest();
ar.setAddresses(Arrays.asList(new Address[] { a }));
setAddressSpecs(ar);
}
public void setAddressSpecs(AddressRequest ar) {
// TODO Auto-generated method stub
this.ar = ar;
}
public void setAddressSpecs(GroupedAddressRequest readValue) {
// TODO Auto-generated method stub
public void setAddressSpecs(GroupedAddressRequest gar) {
this.gar = gar;
}
public AddressRequest getAddressSpecs() {
return this.ar;
}
public GroupedAddressRequest getGroupedAddressSpecs() {
return this.gar;
}
public SeriesSpecs getSeriesSpecs() {
return this.s;
}
public AggregationFunction getAggregation() {
return this.af;
}
public boolean isGropuing() {
return this.gar != null;
}
}
......@@ -2,7 +2,6 @@ package cz.muni.fi.lasaris.sbms.data.api.response;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import cz.muni.fi.lasaris.sbms.data.entities.Address;
import cz.muni.fi.lasaris.sbms.data.entities.containers.Aggregation;
......@@ -27,7 +26,11 @@ public class SetOfAggregatesResponse {
private SetOfAggregatesResponse() {
this.results = null;
}
public SetOfAggregatesResponse(Map<String, Map<Address, AggregatedValue>> data) {
this.results = data;
}
/*
public SetOfAggregatesResponse(Map<String, Aggregation<Address>> data) {
this.results = new LinkedHashMap<String, Map<Address, AggregatedValue>>();
for(Entry<String, Aggregation<Address>> e : data.entrySet()) {
......@@ -36,7 +39,7 @@ public class SetOfAggregatesResponse {
this.results.put(e.getKey(), a);
}
}
*/
public SetOfAggregatesResponse(Address a, AggregatedValue v) {
this.results = new LinkedHashMap<String, Map<Address, AggregatedValue>>();
......
......@@ -2,9 +2,11 @@ package cz.muni.fi.lasaris.sbms.data.api.response;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import cz.muni.fi.lasaris.sbms.data.entities.Address;
import cz.muni.fi.lasaris.sbms.data.entities.containers.Trend;
import cz.muni.fi.lasaris.sbms.data.entities.containers.TrendGroup;
public class SetOfTrendsResponse {
private static final String DEFAULT_GROUPING = "noGrouping";
......@@ -26,18 +28,29 @@ public class SetOfTrendsResponse {
this.results = null;
}
/*
public SetOfTrendsResponse(Map<String, Map<Address, Trend>> data, boolean groups) {
this.results = data;
}
*/
public SetOfTrendsResponse(Map<String, TrendGroup> data, boolean groups) {
this.results = new LinkedHashMap<String, Map<Address, Trend>>();
for(Entry<String, TrendGroup> e : data.entrySet()) {
Map<Address, Trend> a = new LinkedHashMap<Address, Trend>();
e.getValue().forEach((k,v) -> a.put(k, v));
this.results.put(e.getKey(), a);
}
}
public SetOfTrendsResponse(Map<Address, Trend> data) {
this.results = new LinkedHashMap<String, Map<Address, Trend>>();
this.results.put(DEFAULT_GROUPING, data);
}
public SetOfTrendsResponse(Address a, Trend v) {
this(new LinkedHashMap<String, Map<Address, Trend>>(), true);
this.results = new LinkedHashMap<String, Map<Address, Trend>>();
this.results.put(DEFAULT_GROUPING, new LinkedHashMap<Address, Trend>());
this.results.get(DEFAULT_GROUPING).put(a, v);
}
......
package cz.muni.fi.lasaris.sbms.data.logic;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import cz.muni.fi.lasaris.sbms.data.api.request.GroupedAddressRequest;
import cz.muni.fi.lasaris.sbms.data.api.request.AddressRequest;
import cz.muni.fi.lasaris.sbms.data.api.response.AggregateResponse;
import cz.muni.fi.lasaris.sbms.data.api.response.SnapshotResponse;
import cz.muni.fi.lasaris.sbms.data.entities.Address;
import cz.muni.fi.lasaris.sbms.data.entities.AggregationFunction;
import cz.muni.fi.lasaris.sbms.data.entities.containers.Snapshot;
import cz.muni.fi.lasaris.sbms.data.entities.values.AggregatedValue;
import cz.muni.fi.lasaris.sbms.data.entities.values.RawValue;
import cz.muni.fi.lasaris.sbms.data.util.Aggregator;
import cz.muni.fi.lasaris.sbms.data.providers.AbstractSimpleDataPointsProvider;
import cz.muni.fi.lasaris.sbms.data.providers.DataPointsProvider;
public class DPDataCollector {
final static Logger logger = Logger.getLogger(DPDataCollector.class);
public SnapshotResponse getDataPointValues(AddressRequest readSpecs, boolean allowCache) {
return new SnapshotResponse(getDataPointValuesSnapshot(readSpecs, allowCache));
public Snapshot getDataPointValues(AddressRequest readSpecs, boolean allowCache) {
DataPointsProvider p = new MultiProtocolProvider(new ProtocolParser(readSpecs));
Snapshot result = p.getDataPointValues(readSpecs.getAddresses(), allowCache);
return result;
}
private Snapshot getDataPointValuesSnapshot(AddressRequest readSpecs, boolean allowCache) {
NavigableMap<String, AddressRequest> protocols = readSpecs.getProtocols();
if(protocols.size() == 1) {
return ProviderManager.getDataPointsProvider(protocols.firstKey())
.getDataPointValues(protocols.get(protocols.firstKey()).getAddresses(), allowCache);
}
AtomicInteger counter = new AtomicInteger(0);
int count = protocols.keySet().size();
List<Snapshot> results = Collections.synchronizedList(new LinkedList<Snapshot>());
for(String protocol : protocols.keySet()) {
new Thread(new Runnable() {
@Override
public void run() {
Snapshot rs = (ProviderManager.getDataPointsProvider(protocol)
.getDataPointValues(protocols.get(protocol).getAddresses(), allowCache));
synchronized(results) {
results.add(rs);
counter.incrementAndGet();
results.notifyAll();
}
}}).start();
}
synchronized(results) {
while(counter.get() < count) {
try {
results.wait();
} catch (InterruptedException e) {
logger.error("Exception while waiting for completion.", e);
}
}
}
Snapshot result = new Snapshot();
synchronized(results) {
for(Snapshot r : results) {
result.addAll(r.getData());
}
}
public AggregatedValue getDataPointsAggregation(AddressRequest readSpecs, AggregationFunction aggregation,
boolean allowCache) {
DataPointsProvider p = new MultiProtocolProvider(new ProtocolParser(readSpecs));
AggregatedValue result = p.getDataPointsAggregation(readSpecs.getAddresses(), aggregation, allowCache);
return result;
}
public AggregateResponse getDataPointsAggregation(AddressRequest readSpecs, AggregationFunction aggregation,
public Map<String, Snapshot> getDataPointGroupedValues(GroupedAddressRequest readSpecs,
boolean allowCache) {
NavigableMap<String, AddressRequest> protocols = readSpecs.getProtocols();
if(protocols.size() == 1) {
return new AggregateResponse(ProviderManager.getDataPointsProvider(protocols.firstKey())
.getDataPointsAggregation(protocols.get(protocols.firstKey()).getAddresses(), aggregation, allowCache));
}
Snapshot data = getDataPointValuesSnapshot(readSpecs, allowCache);
return new AggregateResponse(computeAggregation(data.getData(), aggregation));
DataPointsProvider p = new MultiProtocolProvider(new ProtocolParser(readSpecs));
Map<String, Snapshot> result = p.getDataPointGroupedValues(readSpecs.getGroups(), allowCache);
return result;
}
public SnapshotResponse getDataPointGroupedValues(GroupedAddressRequest readSpecs,
boolean allowCache) {
return new SnapshotResponse(getDataPointGroupedValuesMap(readSpecs, allowCache));
public Map<String,AggregatedValue> getDataPointGroupAggregations(GroupedAddressRequest readSpecs,
AggregationFunction aggregation, boolean allowCache) {
DataPointsProvider p = new MultiProtocolProvider(new ProtocolParser(readSpecs));
Map<String,AggregatedValue> result = p.getDataPointGroupAggregations(readSpecs.getGroups(), aggregation, allowCache);
return result;
}
private Map<String, Snapshot> getDataPointGroupedValuesMap(GroupedAddressRequest readSpecs,
boolean allowCache) {
NavigableMap<String, GroupedAddressRequest> protocols = readSpecs.getProtocols();
if(protocols.size() == 1) {
return ProviderManager.getDataPointsProvider(protocols.firstKey())
.getDataPointGroupedValues(protocols.get(protocols.firstKey()).getGroups(), allowCache);
}
AtomicInteger counter = new AtomicInteger(0);
int count = protocols.keySet().size();
private class MultiProtocolProvider extends AbstractSimpleDataPointsProvider {
List<SnapshotResponse> results = Collections.synchronizedList(new LinkedList<SnapshotResponse>());
ProtocolParser p;
for(String protocol : protocols.keySet()) {
new Thread(new Runnable() {
@Override
public void run() {
SnapshotResponse rs = new SnapshotResponse(ProviderManager.getDataPointsProvider(protocol).getDataPointGroupedValues(protocols.get(protocol).getGroups(), allowCache));
synchronized(results) {
results.add(rs);
counter.incrementAndGet();
results.notifyAll();
}
}}).start();
public MultiProtocolProvider(ProtocolParser parser) {
p = parser;
}
synchronized(results) {
while(counter.get() < count) {
try {
results.wait();
} catch (InterruptedException e) {
logger.error("Exception while waiting for completion.", e);
@Override
protected Map<Address, RawValue> getValues(List<Address> dataPoints, boolean allowCache) {
NavigableSet<String> protocols = p.getProtocolsSet();
if(protocols.size() == 1) {
return ProviderManager.getDataPointsProvider(protocols.first())
.getDataPointValues(p.getAddresses(protocols.first()), allowCache).getData();
}
AtomicInteger counter = new AtomicInteger(0);
int count = protocols.size();
List<Snapshot> results = Collections.synchronizedList(new LinkedList<Snapshot>());
for(String protocol : protocols) {
new Thread(new Runnable() {
@Override
public void run() {
Snapshot rs = (ProviderManager.getDataPointsProvider(protocol)
.getDataPointValues(p.getAddresses(protocol), allowCache));
synchronized(results) {
results.add(rs);
counter.incrementAndGet();
results.notifyAll();
}
}}).start();
}
synchronized(results) {
while(counter.get() < count) {
try {
results.wait();
} catch (InterruptedException e) {
logger.error("Exception while waiting for completion.", e);
}
}
}
}
Map<String, Snapshot> result = new LinkedHashMap<String, Snapshot>();
for(String group : readSpecs.getGroups().keySet()) {
result.put(group, new Snapshot());
}
synchronized(results) {
for(SnapshotResponse r : results) {
for(String group : r.getResults().keySet()) {
result.get(group).addAll(r.getResults().get(group).getData());
Snapshot result = new Snapshot();
synchronized(results) {
for(Snapshot r : results) {
result.addAll(r.getData());
}
}
return result.getData();
}
return result;
}
public AggregateResponse getDataPointGroupAggregations(GroupedAddressRequest readSpecs,
AggregationFunction aggregation, boolean allowCache) {
NavigableMap<String, GroupedAddressRequest> protocols = readSpecs.getProtocols();
if(protocols.size() == 1) {
return new AggregateResponse(ProviderManager.getDataPointsProvider(protocols.firstKey())
.getDataPointGroupAggregations(protocols.get(protocols.firstKey()).getGroups(), aggregation, allowCache));
@Override
public void init(Properties props, String name, String propertiesPrefix) {
}
Map<String, Snapshot> data = getDataPointGroupedValuesMap(readSpecs, allowCache);
return new AggregateResponse(computeGroupAggregations(data, aggregation));
}
private Map<String, AggregatedValue> computeGroupAggregations(Map<String, Snapshot> data, AggregationFunction aggregation) {
Map<String, AggregatedValue> result = new LinkedHashMap<String, AggregatedValue>();
for(String group : data.keySet()) {
AggregatedValue value = Aggregator.computeAggregation(data.get(group).getData(), aggregation);
result.put(group, value);
@Override
public void close() {
}
return result;
}
private AggregatedValue computeAggregation(Map<Address, RawValue> data, AggregationFunction aggregation) {
return Aggregator.computeAggregation(data, aggregation);
}
}
package cz.muni.fi.lasaris.sbms.data.logic;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableSet;
import cz.muni.fi.lasaris.sbms.data.api.request.AddressRequest;
import cz.muni.fi.lasaris.sbms.data.api.request.GroupedAddressRequest;
import cz.muni.fi.lasaris.sbms.data.api.request.TrendRequest;
import cz.muni.fi.lasaris.sbms.data.entities.Address;
public class ProtocolParser {
private AddressRequest readSpecs;
private GroupedAddressRequest greadSpecs;
public ProtocolParser(AddressRequest readSpecs) {
this.readSpecs = readSpecs;
}
public ProtocolParser(GroupedAddressRequest readSpecs) {
this.greadSpecs = readSpecs;
}
public ProtocolParser(TrendRequest readSpecs) {
if(readSpecs.isGropuing()) {
this.greadSpecs = readSpecs.getGroupedAddressSpecs();
} else {
this.readSpecs = readSpecs.getAddressSpecs();
}
}
public NavigableSet<String> getProtocolsSet() {
NavigableSet<String> protocols = null;
if(readSpecs != null) {
protocols = readSpecs.getProtocols().navigableKeySet();
}
if(greadSpecs != null) {
protocols = greadSpecs.getProtocols().navigableKeySet();
}
return protocols;
}
public List<Address> getAddresses(String p) {
List<Address> a = null;
if(readSpecs != null) {
a = readSpecs.getProtocols().get(p).getAddresses();
}
if(greadSpecs != null) {
a = new LinkedList<Address>();
for(List<Address> g : greadSpecs.getProtocols().get(p).getGroups().values()) {
a.addAll(g);
}
}
return a;
}
}
......@@ -22,12 +22,11 @@ import cz.muni.fi.lasaris.sbms.data.util.GroupHandler;
import cz.muni.fi.lasaris.sbms.data.util.GroupHandler.ContainerHandler;
import cz.muni.fi.lasaris.sbms.data.util.GroupHandler.ScalarHandler;
import cz.muni.fi.lasaris.sbms.data.util.Sampler;
import cz.muni.fi.lasaris.sbms.data.util.Slicer;
import cz.muni.fi.lasaris.sbms.data.util.WindowMaker;
public abstract class AbstractSimpleTrendsProvider implements TrendsProvider {
protected abstract Map<Address, Trend> getRecords(List<Address> trends, ZonedDateTime from, ZonedDateTime to);
protected abstract Map<Address, Trend> getClosestOlderRecord(List<Address> trends,ZonedDateTime timestamp);
protected abstract Map<Address, Trend> getClosestNewerRecord(List<Address> trends,ZonedDateTime timestamp);
......@@ -42,11 +41,16 @@ public abstract class AbstractSimpleTrendsProvider implements TrendsProvider {
return new TrendGroup(getProcessedTrends(trends, s));
}
@Override
public TrendGroup getTrends(List<Address> trends, SeriesSpecs s, boolean withOlder) {
return new TrendGroup(getProcessedTrends(trends, s, withOlder));
}
private Map<Address,Trend> getProcessedTrends(List<Address> trends, SeriesSpecs s) {
return getProcessedTrends(trends, s, false);
}
private Map<Address,Trend> getProcessedTrends(List<Address> trends, SeriesSpecs s, boolean olderRequired) {
protected Map<Address,Trend> getProcessedTrends(List<Address> trends, SeriesSpecs s, boolean olderRequired) {
if(s.isWithWindows()) {
return WindowMaker.computeWindows(getRawTrends(trends, s, olderRequired), s);
} else {
......@@ -55,17 +59,22 @@ public abstract class AbstractSimpleTrendsProvider implements TrendsProvider {
}
private Map<Address,Trend> getRawTrends(List<Address> trends, SeriesSpecs s, boolean olderRequired) {
/*
if(s.getSampling() != Sampling.NONE && s.getInterpolation() == Interpolation.NONE
|| s.getSampling() == Sampling.NONE && s.getInterpolation() != Interpolation.NONE)
{
throw new IllegalArgumentException("Both sampling and interpolation must be set");
}
*/
Map<Address, Trend> results = getRecords(trends, s.getStart(), s.getEnd());
if(s.getInterpolation() == Interpolation.NONE && s.getSampling() == Sampling.NONE) {
if(!s.isWithSampling() && !s.isWithWindows()) {
// do nothing - we already have all the necessary data
return results;
}
if (s.getInterpolation() == Interpolation.LAST_VALUE || s.getInterpolation() == Interpolation.LINEAR || olderRequired) {
if (s.getInterpolation() == Interpolation.LAST_VALUE
|| s.getInterpolation() == Interpolation.LINEAR
|| (s.isWithWindows() && isOlderRequired(s.getWindowAggregation()))
|| olderRequired) {
Map<Address, Trend> older = getClosestOlderRecord(trends, s.getStart());
mergeSeriesMaps(results, older);
}
......@@ -175,13 +184,13 @@ public abstract class AbstractSimpleTrendsProvider implements TrendsProvider {
if(seriesSpecs.getSampling() == Sampling.NONE) {
throw new IllegalArgumentException("Sampling must be set when creating slices.");
}
return Sampler.createSlices(getProcessedTrends(trends, seriesSpecs));
return Slicer.createSlices(getProcessedTrends(trends, seriesSpecs));
}
@Override
public Series<AggregatedValue> getSliceAggregations(List<Address> trends, SeriesSpecs seriesSpecs,
public Trend getSliceAggregationSeries(List<Address> trends, SeriesSpecs seriesSpecs,
AggregationFunction aggregation) {
Series<AggregatedValue> results = new Series<AggregatedValue>();
Trend results = new Trend();
Series<Slice> slices = getSlices(trends, seriesSpecs);
for(ZonedDateTime d : slices.getKeys()) {
AggregatedValue v = Aggregator.computeAggregation(slices.get(d).getData(), aggregation);
......@@ -191,24 +200,55 @@ public abstract class AbstractSimpleTrendsProvider implements TrendsProvider {
}
@Override
public Series<Map<String, AggregatedValue>> getGroupSliceAggregations(Map<String, List<Address>> trends,
public Map<String, Trend> getGroupedSliceAggregationsSeries(Map<String, List<Address>> trends,
SeriesSpecs seriesSpecs, AggregationFunction aggregation) {
Series<Map<String, AggregatedValue>> results = new Series<Map<String, AggregatedValue>>();
Map<String, TrendGroup> rawResults = getTrendsByGroup(trends, seriesSpecs);
for(String group : rawResults.keySet()) {
TrendGroup tg = rawResults.get(group);
Series<Slice> slices = Sampler.createSlices(tg.getData());
if(results.getKeys().size() == 0) {
// empty result - we create the "slices"
for(ZonedDateTime dt :slices.getKeys()) {
results.add(dt, new LinkedHashMap<String, AggregatedValue>());
}
}
for(ZonedDateTime dt : slices.getKeys()) {
AggregatedValue v = Aggregator.computeAggregation(slices.get(dt).getData(), aggregation);
results.get(dt).put(group, v);
}
Map<String, Trend> results