Commit 58980706 authored by Adam Kučera's avatar Adam Kučera
Browse files

Added WindowMaker

parent 8d26cdec
......@@ -154,10 +154,12 @@ public class TrendsEndpoint {
@QueryParam("start") String start,
@QueryParam("end") String end,
@QueryParam("aggregation.function") AggregationFunction aggregation,
@QueryParam("aggregation.window") AggregationWindow window
@QueryParam("aggregation.window") AggregationWindow window,
@QueryParam("aggregation.cron") String windowCron,
@QueryParam("aggregation.duration") Long windowDur
) {
try {
TrendsRequest r = getReadSpecsSingle(id, start, end, null, null, null, aggregation, window);
TrendsRequest r = getReadSpecsSingle(id, start, end, windowCron, windowDur, null, aggregation, window);
Trend v = dc.getTrend(r);
return new TrendResponse(v);
} catch (Exception e) {
......@@ -385,10 +387,12 @@ public class TrendsEndpoint {
@QueryParam("start") String start,
@QueryParam("end") String end,
@QueryParam("aggregation.function") AggregationFunction aggregation,
@QueryParam("aggregation.window") AggregationWindow window
@QueryParam("aggregation.window") AggregationWindow window,
@QueryParam("aggregation.cron") String windowCron,
@QueryParam("aggregation.duration") Long windowDur
) {
try {
TrendsRequest r = getReadSpecsMulti(ids, grouping, start, end, null, null, null, aggregation, window);
TrendsRequest r = getReadSpecsMulti(ids, grouping, start, end, windowCron, windowDur, null, aggregation, window);
if(grouping) {
Map<String, Map<Address, Trend>> v = dc.getGroupedTrends(r);
return new SetOfTrendsResponse(v, true);
......@@ -400,7 +404,7 @@ public class TrendsEndpoint {
return SetOfTrendsResponse.getErrorResponse(e.getMessage());
}
}
// Method 18
@RolesAllowed({"user","admin"})
@POST
......@@ -412,10 +416,12 @@ public class TrendsEndpoint {
@QueryParam("start") String start,
@QueryParam("end") String end,
@QueryParam("aggregation.function") AggregationFunction aggregation,
@QueryParam("aggregation.window") AggregationWindow window
@QueryParam("aggregation.window") AggregationWindow window,
@QueryParam("aggregation.cron") String windowCron,
@QueryParam("aggregation.duration") Long windowDur
) {
try {
TrendsRequest r = getReadSpecsMulti(ids, grouping, start, end, null, null, null, aggregation, window);
TrendsRequest r = getReadSpecsMulti(ids, grouping, start, end, windowCron, windowDur, null, aggregation, window);
if(grouping) {
Map<String, Trend> v = dc.getGroupedWindowedTrendsAggregate(r);
return new TrendResponse(v);
......@@ -444,6 +450,8 @@ public class TrendsEndpoint {
@QueryParam("sampling.interpolation") Interpolation interpolation,
@QueryParam("aggregation.function") AggregationFunction aggregation,
@QueryParam("aggregation.window") AggregationWindow window,
@QueryParam("aggregation.cron") String windowCron,
@QueryParam("aggregation.duration") Long windowDur
) {
Address a = new Address(id);
Trend v = ProviderManager.getTrendsProvider(a.getProtocol()).getTrend(a, getSeriesSpecs(start, end, samplingCron, samplingDur, interpolation));
......@@ -475,19 +483,27 @@ public class TrendsEndpoint {
private TrendsRequest getReadSpecsBase(String start, String end, String samplingCron,
Long samplingDur, Interpolation interpolation, AggregationFunction aggregation, AggregationWindow window) {
TrendsRequest readSpecs = new TrendsRequest();
SeriesSpecs s = getSeriesSpecs(start, end, samplingCron, samplingDur, interpolation);
SeriesSpecs s = getSeriesSpecs(start, end, samplingCron, samplingDur, interpolation, aggregation, window);
readSpecs.setSeriesSpecs(s);
readSpecs.setAggregation(aggregation, window);
return readSpecs;
}
private SeriesSpecs getSeriesSpecs(String start, String end, String samplingCron,
Long samplingDur, Interpolation interpolation) {
private SeriesSpecs getSeriesSpecs(String start, String end, String cron,
Long duration, Interpolation interpolation, AggregationFunction aggregation, AggregationWindow window) {
SeriesSpecs s = null;
if(samplingCron != null) {
s = new SeriesSpecs(getDateTime(start), getDateTime(end), samplingCron, interpolation);
} else if (samplingDur != null) {
s = new SeriesSpecs(getDateTime(start), getDateTime(end), Duration.of(samplingDur, ChronoUnit.SECONDS), interpolation);
if(interpolation != null && !interpolation.equals(Interpolation.NONE)) {
if(cron != null) {
s = new SeriesSpecs(getDateTime(start), getDateTime(end), cron, interpolation);
} else if (duration != null) {
s = new SeriesSpecs(getDateTime(start), getDateTime(end), Duration.of(duration, ChronoUnit.SECONDS), interpolation);
}
} else if (window != null && !window.equals(AggregationWindow.NONE)) {
if(cron != null) {
s = new SeriesSpecs(getDateTime(start), getDateTime(end), cron, aggregation);
} else if (duration != null) {
s = new SeriesSpecs(getDateTime(start), getDateTime(end), Duration.of(duration, ChronoUnit.SECONDS), aggregation);
}
} else {
s = new SeriesSpecs(getDateTime(start), getDateTime(end));
}
......
......@@ -9,25 +9,25 @@ import cz.muni.fi.lasaris.sbms.data.entities.containers.Aggregation;
import cz.muni.fi.lasaris.sbms.data.entities.values.AggregatedValue;
public class SetOfAggregatesResponse {
private static final String DEFAULT_GROUPING = "noGrouping";
private static final String DEFAULT_GROUPING = "noGrouping";
private String error;
private Map<String, Map<Address, AggregatedValue>> results;
public Map<String, Map<Address, AggregatedValue>> getResults() {
return this.results;
}
public static SetOfAggregatesResponse getErrorResponse(String error) {
SetOfAggregatesResponse r = new SetOfAggregatesResponse();
r.setError(error);
return r;
}
private SetOfAggregatesResponse() {
this.results = null;
}
public SetOfAggregatesResponse(Map<String, Aggregation<Address>> data) {
this.results = new LinkedHashMap<String, Map<Address, AggregatedValue>>();
for(Entry<String, Aggregation<Address>> e : data.entrySet()) {
......@@ -36,14 +36,14 @@ private static final String DEFAULT_GROUPING = "noGrouping";
this.results.put(e.getKey(), a);
}
}
public SetOfAggregatesResponse(Address a, AggregatedValue v) {
this.results = new LinkedHashMap<String, Map<Address, AggregatedValue>>();
this.results.put(DEFAULT_GROUPING, new LinkedHashMap<Address, AggregatedValue>());
this.results.get(DEFAULT_GROUPING).put(a, v);
}
public SetOfAggregatesResponse(Aggregation<Address> a) {
this.results = new LinkedHashMap<String, Map<Address, AggregatedValue>>();
this.results.put(DEFAULT_GROUPING, new LinkedHashMap<Address, AggregatedValue>());
......
......@@ -2,34 +2,21 @@ package cz.muni.fi.lasaris.sbms.data.entities;
public enum AggregationWindow {
NONE,
DAY,
HOUR,
WEEK,
MONTH,
YEAR;
DURATION,
CRON;
public static AggregationWindow fromString(String s) {
s = s.toUpperCase().trim();
if(s.equals("DAY")) {
return AggregationWindow.DAY;
if(s.toUpperCase().trim().equals("DURATION")) {
return AggregationWindow.DURATION;
}
if(s.equals("HOUR")) {
return AggregationWindow.HOUR;
if(s.toUpperCase().trim().equals("CRON")) {
return AggregationWindow.CRON;
}
if(s.equals("WEEK")) {
return AggregationWindow.WEEK;
}
if(s.equals("MONTH")) {
return AggregationWindow.MONTH;
}
if(s.equals("YEAR")) {
return AggregationWindow.YEAR;
}
if(s.equals("NONE")) {
if(s.toUpperCase().trim().equals("NONE")) {
return AggregationWindow.NONE;
}
throw new IllegalArgumentException("Unknown window type.");
throw new IllegalArgumentException("Unknown sampling type.");
}
}
......@@ -8,8 +8,10 @@ public class SeriesSpecs {
private ZonedDateTime end;
private Sampling sampling;
private Interpolation interpolation;
private Duration samplingDuration;
private String samplingCron;
private Duration duration;
private String cron;
private AggregationWindow window;
private AggregationFunction windowAggregation;
public ZonedDateTime getStart() {
return start;
......@@ -27,40 +29,73 @@ public class SeriesSpecs {
return interpolation;
}
public Duration getSamplingDuration() {
return samplingDuration;
public Duration getDuration() {
return duration;
}
public String getSamplingCron() {
return samplingCron;
public String getCron() {
return cron;
}
public AggregationFunction getWindowAggregation() {
return this.windowAggregation;
}
public AggregationWindow getWindow() {
return this.window;
}
public SeriesSpecs(ZonedDateTime start, ZonedDateTime end) {
this.start = start;
this.end = end;
this.sampling = Sampling.NONE;
this.window = AggregationWindow.NONE;
this.windowAggregation = AggregationFunction.NONE;
}
public SeriesSpecs(ZonedDateTime start, ZonedDateTime end, Duration samplingDuration, Interpolation interpolation) {
this.start = start;
this.end = end;
this.samplingDuration = samplingDuration;
this.duration = samplingDuration;
this.sampling = Sampling.DURATION;
this.interpolation = interpolation;
this.window = AggregationWindow.NONE;
this.windowAggregation = AggregationFunction.NONE;
}
public SeriesSpecs(ZonedDateTime start, ZonedDateTime end, String samplingCron, Interpolation interpolation) {
this.start = start;
this.end = end;
this.samplingCron = samplingCron;
this.cron = samplingCron;
this.sampling = Sampling.CRON;
this.interpolation = interpolation;
this.window = AggregationWindow.NONE;
this.windowAggregation = AggregationFunction.NONE;
}
public SeriesSpecs(ZonedDateTime start, ZonedDateTime end, Duration windowDuration, AggregationFunction windowAggregation) {
this.start = start;
this.end = end;
this.sampling = Sampling.NONE;
this.window = AggregationWindow.DURATION;
this.duration = windowDuration;
this.windowAggregation = windowAggregation;
}
public SeriesSpecs(ZonedDateTime start, ZonedDateTime end, String windowCron, AggregationFunction windowAggregation) {
this.start = start;
this.end = end;
this.sampling = Sampling.NONE;
this.window = AggregationWindow.CRON;
this.cron = windowCron;
this.windowAggregation = windowAggregation;
}
public boolean isWithWindows() {
return !(this.window.equals(AggregationWindow.NONE));
}
public boolean isWithSampling() {
return !(this.sampling.equals(Sampling.NONE));
}
}
......@@ -22,6 +22,7 @@ import cz.muni.fi.lasaris.sbms.data.util.GroupHandler;
import cz.muni.fi.lasaris.sbms.data.util.GroupHandler.ContainerHandler;
import cz.muni.fi.lasaris.sbms.data.util.GroupHandler.ScalarHandler;
import cz.muni.fi.lasaris.sbms.data.util.Sampler;
import cz.muni.fi.lasaris.sbms.data.util.WindowMaker;
public abstract class AbstractSimpleTrendsProvider implements TrendsProvider {
......@@ -46,7 +47,11 @@ public abstract class AbstractSimpleTrendsProvider implements TrendsProvider {
}
private Map<Address,Trend> getProcessedTrends(List<Address> trends, SeriesSpecs s, boolean olderRequired) {
return Sampler.computeSampling(getRawTrends(trends, s, olderRequired), s);
if(s.isWithWindows()) {
return WindowMaker.computeWindows(getRawTrends(trends, s, olderRequired), s);
} else {
return Sampler.computeSampling(getRawTrends(trends, s, olderRequired), s);
}
}
private Map<Address,Trend> getRawTrends(List<Address> trends, SeriesSpecs s, boolean olderRequired) {
......
......@@ -4,18 +4,11 @@ import java.time.ZonedDateTime;
import java.time.temporal.ChronoField;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import cz.muni.fi.lasaris.sbms.data.entities.Address;
import cz.muni.fi.lasaris.sbms.data.entities.DataType;
import cz.muni.fi.lasaris.sbms.data.entities.Interpolation;
......@@ -29,7 +22,7 @@ import cz.muni.fi.lasaris.sbms.data.entities.values.Value;
public class Sampler {
public static Map<Address, Trend> computeSampling(Map<Address, Trend> data, SeriesSpecs s) {
List<ZonedDateTime> timestamps = generateTimestamps(s);
List<ZonedDateTime> timestamps = TimestampGenerator.generateTimestamps(s);
Map<Address, Trend> r = new LinkedHashMap<Address, Trend>(data.size());
for(Address a : data.keySet()) {
......@@ -80,55 +73,6 @@ public class Sampler {
return r;
}
private static List<ZonedDateTime> generateTimestamps(SeriesSpecs s) {
switch(s.getSampling()) {
case DURATION:
return generateDurationTimestamps(s);
case CRON:
return generateCRONTimestamps(s);
case NONE:
default:
throw new IllegalArgumentException("Sampling must be set");
}
}
private static List<ZonedDateTime> generateDurationTimestamps(SeriesSpecs s) {
List<ZonedDateTime> r = new LinkedList<ZonedDateTime>();
ZonedDateTime t = s.getStart();
while (t.isBefore(s.getEnd()) || t.isEqual(s.getEnd())) {
r.add(t);
t = t.plus(s.getSamplingDuration());
}
return r;
}
private static List<ZonedDateTime> generateCRONTimestamps(SeriesSpecs s) {
List<ZonedDateTime> r = new LinkedList<ZonedDateTime>();
CronParser parser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX));
Cron expr = null;
try {
expr = parser.parse(s.getSamplingCron());
expr.validate();
} catch(Exception e) {
r.add(s.getStart());
r.add(s.getEnd());
return r;
}
ZonedDateTime t = s.getStart();
ExecutionTime et = ExecutionTime.forCron(expr);
t = et.nextExecution(t);
while (t.isBefore(s.getEnd()) || t.isEqual(s.getEnd())) {
r.add(t);
t = et.nextExecution(t);
}
return r;
}
/**
* Creates Slices from sampled trends. The trends must be previously sampled by the computeSampling method! If not called,
......
package cz.muni.fi.lasaris.sbms.data.util;
import java.time.ZonedDateTime;
import java.util.LinkedList;
import java.util.List;
import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import cz.muni.fi.lasaris.sbms.data.entities.SeriesSpecs;
public class TimestampGenerator {
public static List<ZonedDateTime> generateTimestamps(SeriesSpecs s) {
switch(s.getSampling()) {
case DURATION:
return generateDurationTimestamps(s);
case CRON:
return generateCRONTimestamps(s);
case NONE:
default:
throw new IllegalArgumentException("Sampling must be set");
}
}
private static List<ZonedDateTime> generateDurationTimestamps(SeriesSpecs s) {
List<ZonedDateTime> r = new LinkedList<ZonedDateTime>();
ZonedDateTime t = s.getStart();
while (t.isBefore(s.getEnd()) || t.isEqual(s.getEnd())) {
r.add(t);
t = t.plus(s.getDuration());
}
return r;
}
private static List<ZonedDateTime> generateCRONTimestamps(SeriesSpecs s) {
List<ZonedDateTime> r = new LinkedList<ZonedDateTime>();
CronParser parser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX));
Cron expr = null;
try {
expr = parser.parse(s.getCron());
expr.validate();
} catch(Exception e) {
r.add(s.getStart());
r.add(s.getEnd());
return r;
}
ZonedDateTime t = s.getStart();
ExecutionTime et = ExecutionTime.forCron(expr);
t = et.nextExecution(t);
while (t.isBefore(s.getEnd()) || t.isEqual(s.getEnd())) {
r.add(t);
t = et.nextExecution(t);
}
return r;
}
}
package cz.muni.fi.lasaris.sbms.data.util;
import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
import cz.muni.fi.lasaris.sbms.data.entities.Address;
import cz.muni.fi.lasaris.sbms.data.entities.AggregationFunction;
import cz.muni.fi.lasaris.sbms.data.entities.SeriesSpecs;
import cz.muni.fi.lasaris.sbms.data.entities.containers.Trend;
import cz.muni.fi.lasaris.sbms.data.entities.values.AggregatedValue;
import cz.muni.fi.lasaris.sbms.data.entities.values.Value;
public class WindowMaker {
public static Map<Address, Trend> computeWindows(Map<Address, Trend> data, SeriesSpecs s) {
List<ZonedDateTime> timestamps = TimestampGenerator.generateTimestamps(s);
Map<Address, Trend> r = new LinkedHashMap<Address, Trend>(data.size());
for(Address a : data.keySet()) {
Trend ot = data.get(a);
Trend rt = new Trend(getWindows(ot.getNavigableData(), timestamps, s));
r.put(a, rt);
}
return data;
}
private static NavigableMap<ZonedDateTime, Value> getWindows(NavigableMap<ZonedDateTime, Value> d,
List<ZonedDateTime> t, SeriesSpecs s) {
if(d == null || d.isEmpty()) {
return errorSamples(t, s.getWindowAggregation());
}
NavigableMap<ZonedDateTime, Value> r = new TreeMap<ZonedDateTime, Value>();
Iterator<Map.Entry<ZonedDateTime, Value>> it = d.entrySet().iterator();
NavigableMap<ZonedDateTime, Value> window;
AggregatedValue v;
int i = 0;
while(i < t.size() - 1) {
ZonedDateTime windowStart = t.get(i);
ZonedDateTime windowEnd = t.get(i+1);
window = getWindow(it, windowStart, windowEnd);
v = Aggregator.computeTemporalAggregation(window, s.getWindowAggregation(), windowStart, windowEnd);
r.put(windowStart, v);
i++;
}
// the last window
window = getWindow(it, t.get(i), s.getEnd());
v = Aggregator.computeTemporalAggregation(window, s.getWindowAggregation(), t.get(i), s.getEnd());
r.put(t.get(i), v);
return r;
}
// Custom implementation of NavigableMap.getSubmas, so the search for the boundaries does not have
// to be performed over and over again
private static NavigableMap<ZonedDateTime, Value> getWindow(Iterator<Entry<ZonedDateTime, Value>> i,
ZonedDateTime windowStart, ZonedDateTime windowEnd) {
NavigableMap<ZonedDateTime, Value> w = new TreeMap<ZonedDateTime, Value>();
if(!i.hasNext()) {
return w;
}
Map.Entry<ZonedDateTime, Value> e = i.next();
// moving to the start of a window - should not be called
while(i.hasNext() && e.getKey().compareTo(windowStart) <= 0) {
e = i.next();
}
// adding values to the window
while(i.hasNext() && e.getKey().compareTo(windowEnd) < 0) {
w.put(e.getKey(), e.getValue());
e = i.next();
}
return w;
}
private static NavigableMap<ZonedDateTime, Value> errorSamples(List<ZonedDateTime> t, AggregationFunction a) {
NavigableMap<ZonedDateTime, Value> r = new TreeMap<ZonedDateTime, Value>();
for(ZonedDateTime dt : t) {
r.put(dt, AggregatedValue.getAggregationError(a, "No data to use for window aggregation"));
}
return r;
}
}
Supports Markdown
0% or .