Project Homepage
Advanced Database Systems (CSCE 6350)
| Team Members |
|---|
| Name | Image | E-mail |
| Michael Mohler |  | mgm0038@unt.edu |
Project
Topic of Interest
I would like to implement the dynamic multidimensional histograms algorithms described in (Thuler, et al, 2002). I propose to implement the base
O(N^3L*d*B^2) algorithm as well as the several enhancements mentioned [ O(N^2L*d*B) down to O(N^L*d*logN) ]. I intend to analyze the accuracy of these
algorithms (particularly concerning the optimum/approximation split) as well as the usefulness of histograms for a given data set. To this end, I
intend to find various types of datasets (high dimensionality/single attributes, low-dimensionality/many attributes, randomized data, localized data,
etc.). Additionally, an analysis of the relative gains/losses associated with the individual enhancements may lead to better usage of histograms for a
given data set.
# P2. Describe the dataset, tools, related papers, tutorials related to your project. Write a 1 page summarization on the dataset, tool, and
related work for your project.
Outline available here.
Proposal slides available here.
Final paper available here.
Final presentation available here.
Package available here.
Questions for Paper 9
Question 1: What is the difference between Query Load Shedding and Object Load Shedding?
Answer 2: Query Load Shedding reduces the number of objects stored by
reducing the size of the stored region (cache + query) during overload.
When the system indicates a reduction in size should occur, the system
analyzes all objects currently in the grid area defined by the query and
drops those outside of the reduced bounds. One drawback to this approach
is the cost associated with reanalyzing all of these objects. Another is
the lack of guaranteed memory savings.
Object Load Shedding lazily drops objects from memory by only storing
objects that are within the "significant" regions of at least k queries
where k is tunable based upon memory requirements. Whenever an object is
updated, it's count is tested against the current k and either dropped or
retained. A good value for k can be determined easily by storing a vector
containing the count of objects that are significant for at least k
queries (1-based) and analyzing the vector. Of course, priority and
minimum accuracy can be taken into account by extending this approach
through "locking" some objects or queries.
Question 2: What is the purpose of the Cache Area?
Answer 2: If only objects in the query area are stored in memory, it may
be possible to miss objects during a certain time frame. Assume that a
stationary object P is just outside the query area at time T1. If the
query area moves during that time (i.e. a movable query) to include P, the
system is unaware of this, because P is not being stored in memory and it
won't register as being inside the query area until it moves and sends an
update. During that time, it may even move out of the query area and be
missed altogether.
The cache area is an extension to the query area which covers the maximum
distance that the query area can cover in the next timestamp by taking
into account the maximum speed of the objects.
Questions for Paper 8
Out of the Country
Questions for Paper 7
Question 1: What are distributive, algebraic, and holistic aggregation functions?
Answer 1: Distributive functions have the property that given two sets of elements X1 and X2, and the aggregate
values f(X1) and f(X2) for those two sets it is possible to compute the aggregate value for the union of X1 and X2
(i.e. f(X1 U X2)) using only f(X1) and f(X2). For instance, if the minimum of the first 100 tuples is 10 and the
minimum of the next 100 tuples is 12, you can determine the minimum of the first 200 tuples by taking the minimum of 10
and 12. The following are distributive: SUM, COUNT, MIN, MAX.
Algebraic functions can be computed from combinations of other functions. Specifically, given X1 and X2, f(X) must be
computable from function g(X), g must require constant memory usage, and g must be distributive. The AVG function falls
in this category because it can be computed from SUM(X) and COUNT(X). Note that it is not possible to determine AVG(X1
U X2) from AVG(X1) and AVG(X2) without either the SUM or the COUNT.
Holistic functions (such as QUANTILE) are neither distributive nor algebraic.
Question 2: How does the B-Int algorithm work?
Answer 2: B-Int works on distributive and algebraic functions by storing the values of O(Nmax) subproblems and
combining them as described above. At update time, at most log (Nmax) subproblems are generated. Lookup is done in
O(log W) time by recursively splitting the search space along base intervals until the full interval can be
combined.
Questions for Paper 6
Question 1: How can an approximate average be maintained while all values within a bound are filtered out?
Answer 1: Assuming you have a bound Li and Hi. All incoming values outside of this bounds are averaged in the usual way
(AVGo). If, during this same period, any values arrive within the bound, the average of these values MUST lie within the bound. Given the
values Nb and No representing the number of in-bounds values and out-of-bounds values respectively, the overall average can be
computed as AVGtotal = [(Ni*Li+No*AVGo)/(Ni+No),
(Ni*Hi+No*AVGo)/(Ni+No)].
Question 2: How is bound width determined?
Answer 2: Two values -- burden score and burden target -- are determined. The burden score is the relative transmission cost associated with an
individual data object based upon the number of out-of-bounds updates, the cost of such an update, and the width of the bounds. The burden target is
effectively a weighted average of the costs of the objects associated with a query which is solved in practice using a linear equation solver. Given
these values, the objects are sorted in decreasing order of their deviation (i.e. burden target - burden score) and the bound width is greedily
increased.
Questions for Paper 5
Question 1: How are the Min-Cost and Min-Latency scheduing algoritms different?
Answer 1: The Min-Cost (MC) algorithm seeks to minimize the number of box calls per output tuple by working up from the leafs of the operator
tree rooted at the output. In this way each box will be called only once per output tuple.
The Min-Latency algorithm (ML) determines the output cost of each box (i.e. the cost of immediately pushing its tuple all the way the output) and
sorts in increasing order of output cost. Each box performs, in order, a box-to-output operation by processing its input tuple, and passing
execution downstream until the tuple is output. This seeks to minimize the waiting time for the system by producing output tuples.
The MC algorithm will always require less CPU time than ML, since it performs an optimal number of box executions, but it will produce no output tuples
until the last step of scheduling. The ML algorithm has more overhead (due to box switching) but produces output throughout execution. For high per-box
costs, ML has lower average latency than MC.
Question 2: Describe the tuple train scheduling process.
Answer 2: Tuple train scheduling is an extension to other algorithms infosar as it involves operating on multiple tuples per box operation. It
significantly reduces overhead during bursty conditions by reducing the number of box switches and executing more tuples per box switch. Since it
prevents tuples from waiting at the inputs for earlier tuples to be completed, average latency is also improved during bursty conditions.
Query Approximation Presentation
Slides
Summarization of three papers from list below
Paper 1 - Staying FIT: Efficient Load Shedding Techniques for Distributed Stream Processing (Tatbul, Cetintemel, and Zdonik, 2007)
The authors proposed a method of load shedding called FIT (Feasibile Input Tables) which can be used in a distributed, dynamic environment. This
technique is compared to a standard linear programming (LP) method executed statically on a central server (SOLVER). In each case, shedding is
performed by tuning drop operators (located at the input and after splits) which seek to maximize total throughput.
If the resource requirements and selectivity for each operator are known, it is possible to compute an optimal drop rate for each drop point using LP.
However, this is computationally expensive and cannot be accomplished at runtime. Instead, the authors define an N-dimensional space (where N is the
number of inputs) and precompute the optimal rate at specified points. In the SOLVER method, the load is analyzed at runtime and the input is scaled to
one of the known points at runtime. In the FIT method, the points represent feasible input requirements. At runtime, the nearest feasible point to the
current load is targetted and the load requirement is passed to the parent or the drop operator. The distributed FIT implementation is believed to
perform better in large-scale systems than a centralized FIT algorithm or the standard SOLVER method.
Paper 2 - Load Shedding for Aggregation Queries over Data Streams (Babcock, Datar, and Motwani, 2004)
The authors focus on performing load shedding for a specific categority of continous monitoning queries: sliding window aggregate queries (time-based
or tuple-based windows). Joins between multiple streams are ignored. The paper focuses on the aggregation functions SUM and COUNT, but points out that
AVG and MEDIAN can also be handled using similar methods. These aggregation operators maintain mean and standard deviation statistics to predict their
error rates. The goal of the load shedding operator is to minimize the maximum relative error between actual values and reported values across all
queries. The algorithm consists of two steps: determine the locations where load shedding should occur and determine the sampling rates necessary to
distribute error evenly.
If no queries share operators, it is obvious that load shedding should be performed at the beginning of the query path (because the savings will be
spread to all downstream operators). Likewise, if the queries do share operators, shedding should occur at the begining of the shared segment (i.e.
splits). The amount of load shedding is selected as necessary to reduce the waiting time for its leaf child to 0 given the selectivities and arrival
rates. Experiments show that the addition of drop operators at branches reduced % error from around 15% to around 5% compared to drops only at the
inputs. Proposed extensions to the algorithm include accounting for different query priorites.
Paper 3 - Query Processing, Resource Management, and Approximation in a Data Stream Management System (Motwani et al., 2003)
In this paper, the authors outline preliminary work on their STREAM system including an overview of the initial CQL query language, an introduction to
the concept of streams as a time sensitive set of tuples (append-only) which are compared to relations which are unordered (and may be updated,
deleted, or inserted). The authors also present the query processing architechture of the STREAM system. The system is based upon operators (which
reads a stream of tuples and operates on them), queues (which pass tuples from operator to operator), and synopses (which store the data in
hash-tables, sliding windows, histograms, etc.). In general, subplans are shared in the query plan.
The authors propose a method to reduce synopses size by utilizing input-stream statistics such as arrival order across streams and a k-tuple distance
expectatoin that exploits clustering. For a scheduling strategy, the authors suggest a hybrid greedy, FIFO approach in which chains of operators are
dealt with FIFO within the chain and greedy amongst chains.
When resources are insufficient, the system performs approximations either statically (through a window or sampling rate reduction) or dynamically
(compressing the synopsis using histograms, wavelets, and Bloom filters or performing load shedding).
Questions for Paper 4
Question 1:What is the difference between single-stream and multi-stream queries?
Answer 1: Multi-stream queries contain at least one join between two streams and are typically used to determine data correlation across two or
more streams for detecting duplications (e.g. packets passing through both routers of a two router network). Single-stream queries involve a single
stream where operators either pass through (e.g. selection), split, or aggregate. Single-stream queries may also join with static stored
relations.
One difference in scheduling lies in the fact that joins in multi-stream queries interfere with scheduling freedom as the tuples from one stream must
be joined to tuples from the other stream within a certain time frame (from an execution standpoint).
Question 2: What is a progresss chart?
Answer 2: A progess chart is a measure of processor time/tuple vs expected tuple size based on selectivity expectations. (Here tuple indictates
a large number of individual components where fractional tuples indicate a reduction in the number of the components. More formally, for n operators, a
progress chart will contain n points (ti, si) where ti is the amount of processor time devoted to the tuple, and si is the expected size.
The progress chart formalism is used to describe the progress of the tuple through the query system. It can be observed that the area under the curve
on the graph indicates the total time*memory requirements for that tuple and that by altering the query structure (through pushing selections up) and
scheuduling, it may be posible to minimize this area and thus resource requirement.
Questions for Paper 3
Question 1: How is stream/relation data stored in a query execution?
Answer 1: Relations and streams are stored in the same internal format: a tuple of data according to a schema, a timestamp, and whether the
tuple is being inserted or removed. The tuples themselves are stored in synopses to maximize data sharing. Queues between operators contain references
to the data in the synopses.
Question 2: What is meant by a monotonic relation, and what does it have to do with anything?
Answer 2: A monotonic relation is one which only increases in size (i.e. no tuples are removed from the relation. For example S [Range
Unbounded] can only increase in size (and R(t) is always a subset of R(t+j) for all j). Likewise, joining two monotonic relations results in a
monotonic relation.
If relations are monotonic they are indistinguishable (in the underlying data representation) from a stream (which is why monotonicity is the test used
to determine default behavior for relation-to-stream operators).
Critique of Paper 3
Slides can be found here.
Questions can be found here.
"The CQL Continuous Query Language: Semantic Foundations and Query Execution" (Arasu, Babu, and Widom, 2004)
Relational databases have been around for awhile and are well understood, but these systems operate on static non-time-sensitive structures. As data
flows at ever increasing rates, systems need some method to process this data in order to make decisions in real-time. The authors sought to design and
implement a query language for continuous data streams and produced a query language for their STREAM system called Continuous Query Language (CQL).
During the design process, they kept a few goals in mind:
1) They hoped to exploit well-understood relational semantics for reliability and for added usability. This rests on the assumption that people will be
more willing to use a new tool like CQL if it is similar to a tool in standard usage (like SQL).
2) They wanted simple tasks (like filtering) to be compact and simple to write. Again this reduces the effect of the learning curve and allows the user
to write useful queries quickly.
3) Query plans should be modular and pluggable. That is, for the purpose of operation individual components (queues, synopses, operators) need not be
aware of the context of their use.
4) The execution model should efficiently handle relations and streams. This is probably the principle contribution of CQL. Previous work had been
designed around stream-stream operation which must ignore a lot of existing relational data and paradigms. Efficiency is, of course, crucial for any
real-time system.
5) The architecture should be easily adaptable for experimentation. The authors were aware that the system was incomplete and were prepared to
experiment with algorithms for operator scheduling, load shedding, etc.
As mentioned above, CQL operates on two primitive data types -- relations and streams. A stream is a bag of elements {s,t} where is a tuple in the
schema of S and t is a timestamp associated with the element. Relations are a mapping from T to a bag of tuples such that at time t, all the tuples in
the relation are valid. This differs from standard SQL definitions in that time is explicitly accounted for and what is interesting, is the state of
the relation in a given slice of time. The data itself consists of three parts: the tuple, the timestamp, and a flag designating whether the tuple is
being inserted (into the relation or stream) or being deleted (from the relation only).
As mentioned above, the STREAM system consists of three types of components: queues, operators, and synopses. Queues bring input to the operators and
accept their output. They contain pointers to the actual data stored elsewhere (in the synopses). Synopses store the actual data and are "owned" by
individual operators. If two operators are able to share data, the downstream synopses points back to the upstream synopses and deals with a subset of
that data. Operators operate on their input data and pass output to one or more queues based upon the query plan.
There are three types of operators, distinguished by the type of data they operate on and what type of data they produce. Stream-Relation operators (or
windows) return the most recent tuples (either a set number of tuples or all tuples within a timeframe). They can also partition the tuples based upon
the content of the fields and return the most recent tuples for each partition. Relation-Relation operators are standard SQL operators like select,
binary join, aggregate, etc. Finally Relation-Stream operators can be one of three types. Istream operators output any tuples that first entered the
relation in the last timestamp. Dstream operators output any tuples that were first removed from the relation in the last timestamp. Rstream operators
output the entire contents of the relation in a given timestamp.
Note: Stream-Stream operators do not exist in CQL but can be emulated by replacing with a Stream-Relation operator, Relation-Relation operator, and a
Relation-Stream operator. This is how filters work. Filters are a relatively simple task and (accordig to the project goals) simple tasks should be
simple to write, but three operators for each filter seems complicated. Fortunately, this complexity is hidden from the user using default properties.
If a stream is used on an operator that expects a relation, an implicit window operator converts the stream into a relation containing all tuples in
the stream. Likewise, if the output relation is guaranteed to by monotonic, an implicit IStream operator is added. Thus a filter can be written using
only a Relation-Relation operator.
For the sake of efficiency, some operator pairs are converted into other pairs. For instance an [Unbounded] witndow operator with an IStream operator
is functionally equivalent to a [Now] window operator with an Rstream operator. But the [Now] window requires far less storage since the entire
contents of the synopses can be discarded after each timestamp. Likewise, Select operators can be moved below time-based window operators because the
increased selectivity requires less storage.
The authors have also outlined a few real-world considerations that must be taken into account. For instance, how does time "increment" within the
system? If a time-based window operator hasn't yet received all the tuples with a given timestamp (due to high latency, or a failing sensor), how does
the window operator know whether or not it can execute. The authors propose heartbeats after which point the system knows that a global time has passed
and all tuples will now be AFTER a given time. This can be done in several ways: either using the system clock itself, using sentinals on the input
streams and taking a local minimum at each operator, or by determing an upper bound on delay and updating time within a window.
System operators manage several behind-the-scenes considerations such as how best to handle tuple that arrive out of order and converting data into the
internal format. Likewise, STREAM contains an operator that manages load shedding through sampling.
Questions for
Paper 2
Question 1: What is load shedding and why is it used?
Answer 1: Load shedding is used in a stream processing system if data arrives faster than the CPU can process it. When this happens, the system
selectively drops incoming data. This is called load shedding.
Question 2: What is the disadvantage associated with greedy operator scheduling.
Answer 2: It is possible that always selecting the operator that removes the data prevents other operators from processing any input data
resulting in starvation.
Eight Papers over Stream Approximation/Load Shedding
- Tatbul, N. and Cetintemel, U. and Zdonik, S.
Staying FIT: efficient load shedding techniques for distributed stream processing. In Proceedings of the 33rd
international conference on Very large data bases, 2007.
- Babcock, B. and Datar, M. and Motwani, R. Load
shedding for aggregation queries over data streams., In Proceedings of 20th International Conference on Data
Engineering, 2004.
- Burton H. Bloom. Space/time trade-offs in hash coding with
allowable errors. In Communications of the ACM, 1970.
- Chakrabarti, K. and Garofalakis, M. and Rastogi, R. and Shim, K. Approximate query processing
using wavelets. In the VLDB Journal The International Journal on Very Large Data Bases, Vol. 10, N. 2, 2001.
- Motwani, R. and Widom, J. and Arasu, A. and Babcock, B. and Babu, S. and Datar, M. and Manku, G. and Olston,
C. and Rosenstein, J. and Varma, R. Query
processing, resource management, and approximation in a data stream management system. In Proceedings of the First
Biennial Conference on Innovative Data Systems Research (CIDR), 2003.
- Das, A. and Gehrke, J. and Riedewald, M. Semantic Approximation of Data Stream
Joins. In IEEE TRANSACTIONS ON KNOWLEDGE AND DATA ENGINEERING, 2005.
- N. Jain and P. Yalagandula and M. Dahlin and Y. Zhang. Self-Tuning,
Bandwidth-Aware Monitoring for Dynamic Data Streams. In Proceedings of 25th {IEEE} International Conference on Data
Engineering (ICDE), 2009.
- Thaper, N. and Guha, S. and Indyk, P. and Koudas, N.