# Back under a SQL Umbrella

## Unifying serving and analytical data; using a database for distributed machine learning

I had the great privilege of being able to attend VLDB 2019 in person this year. The conference was packed with interesting people, posters, and presentations and I was made to feel very welcome by everyone I met. If you're a practitioner wondering whether or not an academic conference is for you, I can highly recommend it. I've chosen two papers from the conference for this edition, out of many strong candidates.

Procella is the latest in a long line of data processing systems at Google. What's unique about it is that it's a single store handling reporting, embedded statistics, time series, and ad-hoc analysis workloads under one roof. It's SQL on top, cloud-native underneath, and it's serving billions of queries per day over tens of petabytes of data.

There's one big data use case that Procella isn't handling today though, and that's machine learning. But in 'Declarative recursive computation on an RDBMS... or, why you should use a database for distributed machine learning,' Jankov et al. make the case for the database being the ideal place to handle the most demanding of distributed machine learning workloads.

Could everything be coming back together again under a SQL umbrella?

### Procella: unifying serving and analytical data at YouTube

Academic papers aren't usually set to music, but if they were the chorus of Queen's “I want it all (and I want it now…)” seems appropriate here. Anchored in the primary use case of supporting Google's YouTube business, what we're looking at here could well be the future of data processing at Google. Well, I say the future, but “Procella has now been in production for multiple years. Today, it is deployed in over a dozen data centers and serves hundreds of billions of queries per day over tens of petabytes of data…” So maybe what we're looking at is the future of data processing for the rest of us!

Google already has Dremel, Mesa, Photon, F1, PowerDrill, and Spanner, so why did they need yet another data processing system? Because they had too many data processing systems! ;)

Large organizations… are dealing with exploding data volume and increasing demand for data driven applications. Broadly, these can be categorized as: reporting and dashboarding, embedded statistics in pages, time-series monitoring, and ad-hoc analysis. Typically, organizations build specialized infrastructure for each of these use cases. This, however, creates silos of data and processing, and results in a complex, expensive, and harder to maintain infrastructure.

When each of those use cases is powered by a dedicated back-end, investments in better performance, improved scalability and efficiency etc. are divided. And given the growing scale of YouTube, some of those backend systems were starting to creak. Moreover, moving data between the different systems to support different use cases can lead to tricky ETL pipelines.

The big hairy audacious goal of Procella was to “implement a superset of capabilities required to address all of the four use cases… with high scale and performance, in a single product” (aka HTAP: Hybrid Transaction/Analytic Processing). That's hard for many reasons, including the differing trade-offs between throughput and latency that need to be made across the use cases.

### Use cases

Reporting and dashboarding use cases (e.g. to understand YouTube video performance) drive tens of thousands of canned (known in advance) queries per second, that need to be served with latency in the tens of milliseconds. Each data source being queried over can add hundreds of billions of new rows every day. Oh, and in additional to low latency, “we require access to fresh data.”

For just the YouTube Analytics application, we're looking at metrics like this, with a 99%-ile latency of 412ms:

Embedded statistics use cases include the various counters such as views, likes, and subscriptions that are included in pages. Here the query volumes run to hundreds of billions of queries per day, with millisecond latency requirements (Procella achieves 99%-ile latency of 3.3ms here).

Time-series monitoring workloads have similar properties to dashboarding (relatively simple canned queries, but a need for fresh data). Additional query features such as approximation and dedicated time-series functions are needed here too.

Ad-hoc analysis use cases support internal teams performing complex ad-hoc analyses to understand usage trends and determine how to improve products. These are comparatively lower volume queries with moderate latency requirements, but they can be complex and the query patterns are highly unpredictable.

### Procella system overview

The paper covers a lot of territory. In this write-up we'll look at some of the high-level architectural principles, and then I'm going to cherry-pick details relating to how Procella achieves its performance, latency, and data freshness goals.

To its clients, Procella is a SQL query engine (SQL-all-the-things). Under the covers, it's a sophisticated distributed system built on the tenets of cloud-native systems:

• Disaggregated (remote) storage, with read or write operations performed via RPCs, and immutable data (append only files).
• Shared compute tier designed to scale horizontally with many small tasks rather than a small number of large tasks. (Little or) no local state.
• Fast recovery from the loss of any single machine
• Sophisticated strategies for handling stragglers, badly behaving tasks, and periodic unavailability

The big picture looks like this:

Taken together, these principles enable Procella to scale, but achieving the desired levels of performance while supporting “an almost complete implementation of standard SQL, including complex multi-stage joins, analytic functions and set operations, with several useful extensions such as approximate aggregations, handling complex nested and repeated schemas, user-defined functions, and more” is a whole other challenge. Let's now take a look at some of the things that help to make Procella fast.

### Making Procella fast

#### Cache all the things

Procella achieves high scalability and efficiency by segregating storage (in Colossus) from compute (on Borg). However, this imposes significant overheads for reading or even opening files, since multiple RPCs are involved for each. Procella employs multiple caches to mitigate this networking penalty.

The nice thing though about files that are immutable once closed is that you don't have to worry about cache invalidation.

Procella agressively caches metadata, columnar file headers, and columnar data (using a newly developed data format, Artus, that gives data the same representation on disk and in memory). Given sufficient memory, Procella can essentially become an in-memory database. For their reporting instance, only about 2% of all data can fit in memory, but the system still achieves a 99%+ file handle cache hit rate, and a 90% data cache hit rate.

One of the secrets to those high hit rates is affinity scheduling. Requests to data servers and metadata servers are routed such that operations on the same data/metadata go the the same server with high probability. Another feature of the storage layer, the fact that all data is available from anywhere, means that the affinity scheduling is an optimisation and requests can still be served if they do end up being routed elsewhere for some reason.

#### Heavily optimise and pre-compute data formats

Since Procella aims to cover [large scans typical in ad-hoc analysis workloads] and several other use cases requiring fast lookups and range scans, we built a new columnar file format called Artus, which is designed for high performance on both lookups and scans.

We could really do with a dedicated paper just on Artus itself, but here are some of the highlights.

• Heavy use of custom encodings, instead of generic compression algorithms such as LZW.
• Multi-pass encoding, with a first fast pass to understand the shape of the data (e.g. number of distinct values, range, sort order etc.), followed by selection of an optimal encoding strategy, and then a second pass that does the full encoding using the selected strategy.

Artus uses a variety of methods to encode data: dictionary and indexer types, run-length, delta, etc. to achieve compression within 2x of strong general string compression (e.g. ZSTD) while still being able to directly operate on the data. Each encoding has estimation methods for how small and fast it will be on the data supplied. (Emphasis mine).

• For sorted columns, Artus' encodings allow fast lookups in $O(\log N)$ time, with $O(1)$ seeks to a given row number.
• For nested and repeated data types Artus uses a novel tree representation that notes whether a given field occurs, and eliminates any subtree below a missing parent field.
• Artus exposes encoding information such as dictionary indices and run-length encoding information to the evaluation engine, and can support some filtering operations directly within its API. “This allows us to aggressively push such computations down to the data format, resulting in large performance gains in many common cases.
• Keeps rich metadata in file and column headers (sort order, min and max, detailing encoding information, bloom filters, and so on), making many pruning operations possible without the need to read the actual data in the column.

Evaluated over four typical YouTube Analytics query patterns, here are the performance and memory consumption figures for Artus vs Capacitor, Google BigQuery's columnar storage format.

High performance evaluation is critical for low latency queries. Many modern analytical systems today achieve this by using LLVM to compile the execution plan to native code at query time. However, Procella needs to serve both analytical and high QPS serving use cases, and for the latter, the compilation time can often become the bottleneck. The Procella evaluation engine, Superluminal, takes a different approach…

Superluminal makes extensive use of C++ template metaprogramming and operates on the underlying data encodings natively. No intermediate representations are materialized.

The query optimiser then combines both static and dynamic (adaptive) query optimisation techniques. A rule-based optimiser applies standard logical rewrites. Then when the query is running Procella gathers statistics based on a sample of the actual data used in the query, and uses these to determine what to do next.

Adaptive techniques have enabled powerful optimizations hard to achieve with traditional cost-based optimizers, while greatly simplifying our system, as we do not have to collect and maintain statistics on the data (especially hard when ingesting data at a very high rate) and we do not have to build complex estimation models that will likely be useful only for a limited subset of queries.

Aggregation, join, and sort strategies are all adapted at runtime based on ongoing learnings during query execution. For queries with strict low latency targets it's possible to fully define the execution strategy up front and turn off the adaptive optimiser.

#### Do the least work possible, and do the work in the best place possible

As well as pushing computation down to the leaves as far as possible, Procella also has a variety of join strategies to make distributed joins as efficient as possible. I'm out of space to describe them all here, but you'll find a good short summary in §3.5 of the paper.

For heavy aggregations, Procella adds intermediate aggregation operators at the input of the final aggregation to prevent it becoming a bottleneck.

### And there's more!

There's a lot of additional information in the full paper that I didn't get to cover. If this topic interests you, it's well worth a read…

### Declarative recursive computation on an RDBMS… or, why you should use a database for distributed machine learing

Jankov et al., VLDB’19

If you think about a system like Procella that's combining transactional and analytic workloads on top of a cloud-native architecture, extensions to SQL for streaming, dataflow based materialized views (see e.g. Naiad, Noria, Multiverses, and also check out what Materialize are doing here), the ability to use SQL interfaces to query over semi-structured and unstructured data, and so on, then a picture begins to emerge of a unifying large-scale data platform with a SQL query engine on top that addresses all of the data needs of an organisation in a one-stop shop. Except there's one glaring omission from that list: handling all of the machine learning use cases.

Machine learning inside a relational database has been done before, most notably in the form of MADlib, which was integrated into Greenplum during my time at Pivotal. The Apache MADLib project is still going strong, and the recent (July 2019) 1.16 release even includes some support for deep learning.

To make that vision of a one-stop shop for all of an organisation's data needs come true, we need to be able to handle the most demanding large scale machine learning tasks – those requiring distributed learning. Today's paper choice, subtitled “Why you should use a database for distributed machine learning” seeks to fill that gap. It would be a bold departure from the current imperative high-level DSL based approach to deep learning taken by TensorFlow et al., but at the same time it also offers some compelling looking advantages. Even in deep learning, do all roads eventually lead back to SQL??

We consider how to make a very small set of changes to a modern relational database management system (RDBMS) to make it suitable for distributed learning computation… We also show that there are key advantages to using an RDBMS as a machine learning platform.

### Why would anyone want to do that???

Why would anyone want to run deep learning computations within an RDBMS??

On the one hand, because current deep learning approaches to distribution can only go so far just with data parallelism based-approaches (one model, split up the data), and instead are pushing into model-parallelism to get to the next level (split up the model itself across multiple nodes).

The distributed key-value stores (known as parameter servers) favored by most existing Big Data ML systems (such as TensorFlow and Petuum) make it difficult to build model parallel computations, even “by hand”.

On the other hand, by switching to a relational model of computation, model parallelism looks pretty similar to data parallelism, and we can take advantage of heavily optimised distributed database engines. SQL provides a declarative programming interface, below which the system itself can figure out the most effective execution plans based on data size and statistics, layout, compute hardware etc..

### Be careful what you ask for (materialize)

If we assume an RDBMS 'lightly augmented' to handle matrix and vector data types, such as SimSQL then we're actually not far away from being able to express machine learning computations in SQL today.

Given a weights table W storing chunks of the weights matrix, and an activations table A storing activations, and an AEW table storing the values needed to compute weights for the next iteration, then we can express the backward pass of iteration i of a feed-forward machine learning model with eight hidden layers in SQL.

W (ITER, LAYER, ROW, COL, MAT) A (ITER, LAYER, COL, ACT) AEW (LAYER, ROW, COL, ACT, ERR, MAT)

Aside from the fact that this doesn't look very pretty (cf. MLP in MADLib), it's also quite imperative for SQL (note the for loop). The problem caused by that is the AEW table used to materialize the state passed between iterations: that state can grow very large very quickly. We'd like to get rid of the imperative for-loop so that we can implement pipelining behind a declarative query expression.

### Recursive SQL extensions

Classic SQL already has recursion support through something called “Common Table Expressions.” Here we want something a little different to that, essentially the ability to express lazy materialization over arbitrarily large sequences of tables.

The authors introduce the notion of multiple versions of a table accessed via array-style indices, using Pascal's triangle as an example.

Given a base table, e.g.

CREATE TABLE pascalsTri[0][0] (val) AS  SELECT val FROM VALUES (1);

Then we can define additional versions recursively, e.g. for the diagonals of Pascal's triangle (j = i):

CREATE TABLE pascalsTri[i:1...][i] (val) AS  SELECT * FROM pascalsTri[i-1][i-1];

For the cases where j = 0:

CREATE TABLE pascalsTri[i:1...][0] (val) AS  SELECT * FROM pascalsTri[i-1][0];

And for everything else:

CREATE TABLE pascalsTri[i:2...][j:1...i-1] (val) AS  SELECT pt1.val + pt2.val AS val  FROM pascalsTri[i-1][j-1] AS pt1,    pascalsTri[i-1][j] AS pt2;

So that later on we can issue a query such as SELECT * FROM pascalsTri[56][23] and the system will unwind the recursion to express the required computation as a single relational algebra statement.

The EXECUTE keyword allows queries over multiple versions of a table at the same time. For example:

EXECUTE (  FOR j in 0...50:   SELECT * FROM pascalsTri[50][j]);

We can also request that a table be explicitly materialized using the MATERIALIZE keyword (for dynamic programming), and we can merge multiple recursively defined tables using UNION.

### Recursive learning in SQL

Armed with these new extensions we can define a forward pass computation (computing the level of activation of the neurons at each layer) like this:

-- First layer of activations CREATE TABLE A[i:0...][j:0] (COL, ACT) AS  SELECT DI.COL, DI.VAL  FROM DATA_INPUT AS DI; -- Propagating activations CREATE TABLE WI[i:0...][j:1...9] (COL, VAL) AS  SELECT W.COL, SUM(matmul(A.ACT, W.MAT))  FROM W[i][j] AS w, A[i][j-1] AS A  WHERE W.ROW = A.COL  GROUP BY W.COL; -- Subsequent activations CREATE TABLE A[i:0...][j:1...8] (COL,ACT) AS  SELECT WI.COL, relu(WI.VAL + B.VEC)  FROM WI[i][j] AS WI, B[i][j] AS B  WHERE WI.COL = B.COL; -- Prediction CREATE TABLE A[i:0...][j:9] (COL, ACT) AS  SELECT WI.COL, softmax(WI.VAL + B.VEC)  FROM WI[i][j] AS WI, B[i][j] AS B  WHERE WI.COL = B.COL;

The backward pass can be expressed similarly:

### Execution planning for efficiency

The paper goes into quite some detail on how the large and complex computations implied by SQL specifications of ML workloads can be efficiently compiled and executed by an RDBMS. Here I'll just touch on the highlights.

From the query, we can unroll the recursion to create a single, un-optimised relational algebra plan (DAG). The next step is to chop that plan up into pieces, with each sub-plan being called a frame. Clearly the chosen decomposition can have a big impact on the resulting query performance. If we go too granular we might lose the opportunity to find good optimisations within a frame (e.g., optimal join orderings). So frames have a minimum size. The next major consideration is the amount of inter-frame communication required by a given decomposition. A good approximation for the this is the number of pipeline breakers introduced.

A pipeline breaker occurs when the output of one operator must be materialized to disk or transferred over the network, as opposed to being directly communicated from operator to operator via CPU cache, or, in the worst case, via RAM. An induced pipeline breaker is one that would not have been present in an optimal physical plan but was forced by the cut.

Even figuring out whether a given cut will introduce a pipeline breaker is not straightforward, but can be estimated probabilistically. This cost feeds into an overall optimisation to find the best plan decomposition, as an instance of the well-known generalized quadratic assignment problem (GQAP). One of things that's well-known about GQAP is that it's a very hard problem! You can probably guess what's coming next…. a greedy approximation.

Starting from one source operator, we greedily add operators to the frame, selecting the one that yields the smallest increase in frame cost, until the frame size exceeds a minimum threshold. The results are dependent on the initial source node you happened to pick, but this can be remedied by running the greedy algorithm once for every possibly starting operation.

### Experimental results

The evaluation compares distributed implementations of a multi-layer feed-forward neural network (FFNN), the Word2Vec algorithm, and a distributed collapsed Gibbs sampler for LDA. The implementations require both SQL and also some UDFs written in C++. The table below shows representative line counts as compared to TensorFlow and Spark. Once a library of UDFs have been built up of course, they could be reused across computations.

For the feed-forward neural network, TensorFlow using GPUs is still significantly faster with smaller hidden layers, but cannot scale. Beyond that, the RDBMS wins:

For the Word2Vec and LDA computations the speed-ups as the number of dimensions/topics grows are very significant (e.g 8.5 minutes vs almost 5hrs for LDA with 50,000 topics).

We have shown that model parallel, RDBMS-based ML computations scale well compared to TensorFlow, and that for Word2Vec and LDA, the RDMBS-based computations can be faster than TensorFlow. The RDBMS was slower than TensorFlow for GPU-based implementations of neural networks, however. Though some of this discrepancy was due to the fact that we implemented our ideas on top of a research prototype, high-latency Java/Hadoop system, reducing that gap is an attractive target for future work.

Adrian Colyer is a venture partner with Accel in London, where it's his job to help find and build great technology companies across Europe and Israel. (If you're working on an interesting technology-related business he would love to hear from you: you can reach him at [email protected].) Prior to joining Accel, he spent more than 20 years in technical roles, including CTO at Pivotal, VMware, and SpringSource.

Reprinted with permission from https://blog.acolyer.org

Originally published in Queue vol. 17, no. 5
see this item in the ACM Digital Library

Related:

Pat Helland - Identity by Any Other Name
New emerging systems and protocols both tighten and loosen our notions of identity, and that’s good! They make it easier to get stuff done. REST, IoT, big data, and machine learning all revolve around notions of identity that are deliberately kept flexible and sometimes ambiguous. Notions of identity underlie our basic mechanisms of distributed systems, including interchangeability, idempotence, and immutability.

Raymond Blum, Betsy Beyer - Achieving Digital Permanence
Today’s Information Age is creating new uses for and new ways to steward the data that the world depends on. The world is moving away from familiar, physical artifacts to new means of representation that are closer to information in its essence. We need processes to ensure both the integrity and accessibility of knowledge in order to guarantee that history will be known and true.

Graham Cormode - Data Sketching
Do you ever feel overwhelmed by an unending stream of information? It can seem like a barrage of new email and text messages demands constant attention, and there are also phone calls to pick up, articles to read, and knocks on the door to answer. Putting these pieces together to keep track of what’s important can be a real challenge. In response to this challenge, the model of streaming data processing has grown in popularity. The aim is no longer to capture, store, and index every minute event, but rather to process each observation quickly in order to create a summary of the current state.

Heinrich Hartmann - Statistics for Engineers
Modern IT systems collect an increasing wealth of data from network gear, operating systems, applications, and other components. This data needs to be analyzed to derive vital information about the user experience and business performance. For instance, faults need to be detected, service quality needs to be measured and resource usage of the next days and month needs to be forecast.