Interesting... it's actually associativity that matters for this class of distributed query execution problems (in particular, for AVG). While a/b != b/a indeed violates commutativity, the reason AVG doesn't distribute is that AVG(a, b, c, d, e) != AVG(AVG(a, b), AVG(c, d, e)), i.e. (1 + 2 + 3 + 4 + 5)/5 != ((1 + 2)/2 + (3 + 4 + 5)/3)/2. Notice that we're not reversing the order of any operations, merely the way in which they are grouped. Sum "scales" because a + b + c + d + e = (a + b) + (c + d + e) -> you can imagine computing a + b on one node and c + d + e on another, and then combining their sums together. GROUP_CONCAT (an aggregate that concatenates strings) is a good example of a non-commutative aggregate operation that is still associative. In fact, on a system that is distributed on non-overlapping ranges, you can straightforwardly merge a GROUP_CONCAT() operation because GROUP_CONCAT(a,b,c,d,e) = GROUP_CONCAT(GROUP_CONCAT(a,b), GROUP_CONCAT(c,d,e)).
In the blog post, I used the + operator and sum() aggregate function interchangeably to be brief. Actually, those two operations are related, but have different representations in distributed relational algebra. I updated the first footnote in the post to reflect that.
For your comment, we have in fact two questions. First, is the ExtendedOp commutative with the Collect operator? Second, if it isn't, what properties do our transformations need to respect so that we can pull up the Collect? (equivalence property and associativity)
It's hard to be comprehensive about distributed relational algebra in a blog post. For example, the given logical tree doesn't have enough operator primitives to express large table joins. If you'd like, I'd be happy to get together and chat more about the details.
Perhaps I was missing the point, but my first thought with the AVG counterexample was to have each distributed query return (SUM, COUNT), both of which are nicely associative and commutative, and then only at final step do ((SUM of SUMS) / (SUM of COUNTS)).
Let me ask you this - how would I use citusdb in such a way that it does not become a tax on my growth?
In other words, big data usually precedes big revenue, but most data products are priced per datum not per revenue.
So to put in indelicately, who can afford this and if they could, why would they? (after all those who could afford it, eg: bloomberg have strong reasons not to)
Umur from Citus here. Our goal is to make CitusDB an enabler for your growth by making scaling out simple for you and your dev, ops and analyst teams. If we've made it a tax instead and haven't saved you significant time, effort and complexity in the process, we're not doing our job.
At a practical level, we offer several ways to accomplish this:
- We provide free, open-source extensions on standard PostgreSQL (pg_shard, cstore_fdw)
- We provide a free community edition of CitusDB for added functionality (e.g. massively parallel analytic queries, distributed joins)
- For enterprises, we provide a sitewide, unlimited license of CitusDB Enterprise. For smaller projects there, we provide support and a per-node license.
- For start-ups, we provide a flat rate of CitusDB Enterprise irrespective of your data volume.
The right approach depends on the company and the use-case. Either way, and given the quick time-to-deployment, any of the approaches should end up as a major cost saver.
One big difference seems to be that Oracle RAC uses a shared disk for each node. This means you need a fast disk. Oracle ships a lot of data between RAC nodes so you need a fast interconnect between your nodes (e.g. infiniband).
After a very cursory look, isn't the answer to download and use one of Citus' free, open source extensions to the free, open source postgres? (Depending on your use-case either pg-shard or cstore-fdw)
Interesting but missing a bit of rigor up to being wrong.
If we can compute a sum as a sum of sub-sums,
or a count as sum of sub-counts,
this is no because addition is commutative `a+b=b+a`
(i.e order of operands doesn't matter),
but because addition is associative `(a+b)+c=a+(b+c)`
(i.e order of operations doesn't matter).
So we can define, `sum(a,b,..,z) = a+b+..+z`
whatever is the order of the operations
(say `(((a+b)+c)+..+z)` or `(a+(b+(c+...+z)))` or ...).
And if we have to compute the sum of two lists xs and ys,
we can compute either `sum(append(xs,ys))` or `sum(sum(xs),sum(ys))`.
Likewise, if we can "pull up Collect nodes and push down Computation nodes" in some cases
this is not because the involved computation commutes,
but because the operation is in some sense compatible
with the collection structure and its collect operation.
What we need is an associative operation used to merge the results computed on parts of the collection, so:
For summation and counting, the merge operation is addition.
For filtering the merge operation is simply the former collection collect operation.
So we have:
The abstract concept behind all this is monoid homomorphims (1) and, if you are looking for further readings, I wrote a post on how this concept is related to map-reduce and parallelism (2).
Can someone explain why one can't simply average the individual average results as the author wrote below:
""
No, we can't run averages on worker nodes, and then average those out. We need to have each worker node compute their sum(order_value) and count(order_value), and then sum(sum()) / sum(count()) on the coordinator node.
""?
hence, running division on each node is not the same as finding the division across all orders. (replace my use of division with "average" and it's the same concept).
AFAIK, Datomic doesn't have the same kind of distributed load management out of the box for you, where you can do analytical queries over the whole data set.
Datomic is designed such that load from analytic queries is local to the quering machine (apart from storage retrieval which can be cached and replicated), thus it does not need to be ran on a shadow server or some distanced system from the live production system.
Because the data is immutable in Datomic, there is no need for worrying about locks on tables or documents for contention of future writes--since it reads the data at the time of the query starting from immutable files (joined with the database transaction 'novelty' buffer/log since the last database indexing operation).
It is also up to the querying machine to store the conclusions from such queries in whatever way they like. (If this means going to another datomic instance, or put on HDFS or GFS, by all means.)
From what I gather, underneath Datomic is an event sourcing database, which is a model that already scales "for free".
Further optimization is the fact the query engine lies on the application, so if you have N application servers you have N CPUs available for querying - as opposed to overloading a master server or having to provision read slaves.
It can be used for event sourcing, but it does prune past data (for single-value values, compared to set-like values) in the current database indexes. Old data is still available in older indexes.