Feeds:
Posts
Comments

I have blogged previously about this topic. It is often the most common but most overlooked aspect of building better performing reports using SQL. Reports will perform better if filtering, aggregation and sorting are pushed to data sources as much as possible.

dbvsbiserver_thumb.png

Advertisements

This is second article in the series of Cassandra Data Modeling best practices for efficient Spark SQL Joins. You can find previous posts here  and here.

Dimensional data modeling principles are still applicable when joining Cassandra tables using Spark SQL. However, we have to apply certain additional techniques due to the nature how Cassandra database works.

Since there is abundance of information on typical dimensional modeling approaches, I will be focusing more on the Cassandra and Spark SQL aspects of the modeling process. If you are new to dimensional modeling you can visit kimball site or do a quick search on dimensional modeling.

Denormalization

Principle #1: Minimize number of tables in Star Schema

Avoid snow flakes through denormalization. Keep the number of tables in as minimum as possible. As you have seen in Spark SQL article referenced above, having more tables in Join operation will add more shuffling and increases overall time of query execution.

Principle #2:  Avoid outer join relationships in the Star Schema

Use denormalization to remove outer join relationships. Spark SQL doesn’t pass filters to outer joining table (at least in version 1.4). So, outer-joining table will always become a full table scan.

Cassandra Specific Data Modeling Aspects – Partition Keys and Cluster Keys

Principle #3: Pick right partition keys to minimize number of partitions in Cassandra tables and apply filters in Cassandra as much as possible

Selecting right partition keys is the most important aspect of the data modeling process. In general, good Cassandra table partition key/keys meet following criteria.

Low but not too Low Cardinality: These keys enable you to store more rows in single partition. Most often these columns are used in Group By’s of your queries.

Used in Report filters: These keys are most common mandatory filters in your queries. Spark SQL queries much more efficient when partition predicates are pushed down to Cassandra database. This will help minimize the amount data flowing between Cassandra and Spark. Hence, more efficient compute resource utilization.

Non updatable: These columns are not updated.

Principle #4: Limit your max partition size to 1 GB

Cassandra read performance will suffer once your partition sizes start to grow beyond 1 GB.

Principle #5: Make sure partition growth is contained. Add computed columns to shard partitions if necessary

Depending on your business process and data model, you need to take preventive measures to ensure data in partitions cannot grow beyond limits. This may require adding extra columns to your partition keys to contain growth.

Ex: Adding a month as partition key on a transaction table ensure that new partitions are created every month.

Principle #6: Optimize number of partitions in your table

Continue to tweak your partition key list until your max partition size reaches 1 GB limit. This will ensure that your table will have optimal number of partitions. Having fewer partitions will lead to fewer Spark tasks during SQL execution.

It may not be feasible to have all partitions in table to have 1 GB size. This will not happen in 99% of data modeling situations. Your model will be fine as long as max partition in table is around 1 GB limit.

In some situations it may not be feasible to achieve 1 GB max partition size. This is OK. Main point here is to maximize the number of rows you can fit in a partition as much as possible.

Clustering Keys

Principle #7: Choose additional columns that make up primary keys to uniquely identify rows in a partition

Principle #8: Avoid picking updatable columns as cluster keys

Just like in relational tables, Cassandra primary keys are not updatable. Under the covers, Cassandra update is same as insert. So, updating keys will create a new row. Design appropriate application logic to implement updates if necessary.

Principle #9: Leverage predicate pushdown capability on cluster key columns

Ensure to keep most frequently filtered columns in the cluster key list at the top. Some time if your table has non-updatable columns that are used to filter rows most often, you can include them as part of cluster keys even if they are not required to define uniqueness of rows.

Similar to SQL performance Spark SQL performance also depends on several factors. Hardware resources like the size of your compute resources,  network bandwidth and your data model, application design, query construction etc.  Data model is the most critical factor among all non-hardware related factors. Initially, I wanted to blog about the data modeling aspects of optimization. However, I’ve decided to write about Spark-SQL inner workings as they are the core foundation behind efficient data modeling when using Spark-SQL.

I have used Datastax Enterprise 4.8 version for writing this article. DSE 4.8 comes with Spark 1.4.x and Cassandra 2.1.x. Although I’m explaining Spark-SQL from Cassandra data source perspective, similar concepts can be applied to other data sources supported by Spark SQL.

In order to optimize Spark SQL for high performance we first need to understand how Spark SQL is executed by Spark catalyst optimizer.

Let’s use below example Spark SQL statement.

 

Sample SQL

Spark creates a plan similar to below.

Plan

Spark plan can be divided into 3 phases.

Phase 1: Read data from individual tables. 

Phase 2:  Join tables

Phase 3: Aggregation.

Below is the hypothetical execution timeline for each phase.

Time

Optimization Rule #1:  Include predicates for all tables in Spark SQL query.

Spark SQL tries to optimize tasks for parallel  execution. First, it will break join statements into individual table SQL statements. It will read all tables in parallel.

Total time spent in read phase = Max read time taken by a single table.

Here is why you need to include predicates against all tables.  In normal SQL against relational database we provide predicates against only on table. Usually, on the smaller tables. RDBMS would know you to leverage filtered data from smaller table to efficiently read other tables. If you write similar query in Spark SQL, spark will create individual table SELECT statements like below during read phase.

SQL1

 

This means only table T1 will get filtered data from its data source (Cassandra in this case) into spark memory. All other tables will become full table scans. Full table scans are bad for performance in Spark.

In order to achieve high read performance we have to write spark SQL like below.

SQL2

Spark will have better read performance when data is filtered at data source.

However, just adding any predicates against Cassandra table columns will guarantee that data will be filtered at Cassandra data source. There are certain predicate pushdown rules apply to Spark-SQL. These rules depend on the data source behind each table. I will be covering Cassandra predicate pushdown rules in a separate post.

Optimization Rule #2:  Minimize number of spark tasks in scan/read phase

Spark plan creates multiple stages in read phase to read each table. There is a separate stage for each table. The number of tasks in each stage depends on the number of data partitions spark has to read into memory. By having efficient partitioning strategy on tables and utilizing proper predicates against partitions you can minimize the number of tasks in read stage.

For example, Single partition scans using = operator will create fewer tasks than multi-partition scans using “IN” operator.

Optimization Rule #3:  Order of tables in FROM clause matters. Keep the table with the largest size at the top.

Spark table is based on Dataframe which is based on RDD. In simple terms, RDD is a distribute collection. Spark SQL JOIN operation is very similar to fold left operation on a collection.

This post is not about Scala or functional programming concepts. However, it helps to know how fold left operation works on a collection. Refer to below link for the explanation of fold left. http://alvinalexander.com/scala/how-to-walk-scala-collections-reduceleft-foldright-cookbook

As you can see in the picture of Spark plan JOIN operation requires shuffling data from second table executors to first table executors to perform JOIN operation. Shuffle is very expensive operation on IO & CPU. This operation is repeated until all tables to the right are merged with the result on the left.

By keeping the table with largest data size in join operation at the top you are avoiding shuffling largest data.

Optimization Rule #4:  Keep consistent partitioning strategy across JOINing tables 

By having common partition keys it makes it easier to write queries with filters that can be applied across many of the joining tables.

Optimization Rule #5:  Minimize the number of tables in JOIN.

As you can see in the timeline of the plan picture, reads are parallel operations. However, JOINs are sequential steps. That means every join step adds to the timeline of the total execution of the query.

Use proper denormalization techniques to reduce the number of tables in your data model.

 

Apache Cassandra and Apache Spark product integration is one of the emerging trends in big data world today. Together, these two products can offer several advantages. Much has already been said about Cassandra and Spark integration. There are several products in the marker place today offering enterprise grade products.

This article is aimed at providing few modeling suggestions when you have a need to join two or more Cassandra tables using Spark. This ability to join Cassandra tables using Spark will give your several data modeling advantages for ETL/ELT process, ability to balance data redundancy and query flexibility, data analysis using Spark data frame API and Spark SQL.

Apache Spark is a distributed SQL Engine framework that allows Joining of several data sources such as Hadoop Files, Hive, Cassandra, JDBC/ODBC data sources and others. This list is continuously growing.

Cassandra is a popular NoSQL database widely used in OLTP applications. Cassandra database has CQL language interface which looks similar to SQL language, but it is not quite the same.

While traditional relational data sources store their data in row format, Cassandra stores its data in row partitions using column families. Cassandra data arrangement inside partition is very similar to pivot spreadsheet like format.

The concept of denormalized data model is heavily emphasized until now due to Cassandra’s inability  to join tables. This still the case for pure Cassandra based on OLTP applications. However, new options are opening up for enterprises that are planning to integrate Apache Spark and Cassandra products.

 

Since this a heavy topic I want to release this in multiple sessions. First here is the explanation of how Spark SQL works.

https://intelligentinsight.wordpress.com/2016/07/05/optimizing-spark-sql-join-statements-for-high-performance/

Here is the link for Cassandra modeling best practices for Spark SQL joins.

https://intelligentinsight.wordpress.com/2016/07/09/cassandra-data-modeling-principles-for-spark-sql-joins/

 

 

Modern Business Intelligence tools offer capabilities to developers and end-users to abstract table relationship by offering metadata capabilities with in the tool. Administrators of the tool are responsible to setting up connections and tables definitions and relationships (certain advanced tools).

Users would then go into the tool and pick necessary tables/fields from a catalog or library to create their queries. They do not have to know physical location or connection details for accessing data.

While this is a great feature, I have often come across several shops where inefficient metadata design and poor queries cause a great deal of performance issues. My analysis is applicable to several tools available in the market place today. So I’m writing this blog at a conceptual level where readers can translate this into their respective tools and environments.

 

metadata

 

No matter how many promises are made by BI vendors, you will achieve optimal performance when majority of the work is lifted by database engines. Below are the best and case scenario’s when it comes performance. Keep these in mind before designing your metadata layer, logical models and queries.

 

db vs bi server

 

 

Let us examine below example to understand how performance is impacted by where work gets done.

 

Scenario 1: Single table query

Select col1, sum(col2) as amount from d1_table where col1 = ‘x’ group by col1;

 

BI Server recognizes that query belongs to one connection. Query gets pushed to database and BI server receives final results.

Performance: As good as DB performance

Scenario 2: Query with more than one table from same database connection

 

Select t2.col1, sum(t1.col2) as amount

from d1_table1 t1,

d1_table2 t2

where

t1.col1 = ‘x’

and t1.col1 = t2.col1

group by t1.col1;

 

 

 

BI Server recognizes that query belongs to one connection. Query gets pushed to database and BI server receives final results.

Performance: As good as DB performance

Scenario 3: Query with more than one table from different database connections

 

Select t2.col1, sum(t1.col2) as amount

from d1_table1 t1,

d2_table2 t2

where

t1.col1 = ‘x’

and t1.col1 = t2.col1

group by t1.col1;

BI server recognizes that query is using tables from different databases. So, BI server fetches individual table data from each connection and performs the join and aggregation on BI server.

This is very bad for performance especially if tables involved are large.

Increases stress on network.

Increases stress on BI server.

 

Performance: Bad

Scenario 4: Query between table and file

Similar concept like scenario3. BI server recognizes that join cannot be pushed to database. So, it bring all necessary data on to BI server to perform necessary join, aggregation and sort operations.

 

Recommendations:

I have discussed a common problem across different tools.  Exact solution is different for each tool. However, conceptually they are the same. Make sure that your join and aggregation is performed by databases as much as possible. This is the secret sauce for high performing BI architecture.

Over the years I have noticed this practices at several companies. In order to help developers avoid errors or to avoid time spent on proper training and knowledge transfer, complex queries are converted into views and handed off to development teams.

These views are great and they do help cut down developer errors and cut down Knowledge transfers. However, there is huge hidden cost behind using views for developing reports and analytics.

For example, lets review below view syntax.

 

Create or replace view myview as

Select t1.col1, t2.col2,t3.col3, t4.col4, t5.col5, t6.col6

From table1 t1,

        table2 t2,

        table3 t3,

        table4 t4,

        table5 t5,

        table6 t6

Where t1.col1 = t2.col2

…etc

 

Above view has 6 tables involved in a join. When using this view for reporting or querying purposes database is going to perform JOIN operations on all tables involved whether query is actually referring to individual table columns are not.

 

Users will see a significant slowness in performance due to this JOIN operation on all tables. The effect is negligible if tables are small and results are rendered quickly. However, the performance will worsen as table sizes increase.

 

Below are few alternative options. My advise may sound counter intuitive, but worth considering if you want to keep your users happy.

Option 1: For shops with BI tools

If you have any good business intelligence tools (Business Objects, OBIEE, Web Focus and others)  in your shop, model table relationships in the tool.

Proper tables will be brought into join relationship based on column selection.

Some of these tools allow you to pull in implicit columns if you must enforce certain joins whether or not columns selected by users

Option 2: Writing queries directly on database without any BI tools

Avoid using complex views for simplicity during development. Train your developers on building proper queries based on business needs.

Limit views to those situations where data must be restricted to satisfy all necessary join conditions due to business logic.

This is a common problem new OBIEE developers can run into.

Below analysis is showing Brand, Product, Revenue and revenue share by product with in Brand.

result0

 

Below is the formula used to calculate revenue percentage by product with in brand.

example

 

Problem: Percentages with in Brand total to 100%, which is correct.

However, grand total is showing 274.5%. I am expecting to see 100% in grand total row.

 

Root Cause:

query0

 

OBIEE is receiving SUM by product from database. All report context specific calculations ( SUM (REVENUE BY BRAND)) are being handled in BI server. So, OBIEE was able to show 100% at Brand level. However, BI server was not properly calculating results in grand total row. I don’t understand the exact reason, but it is failing to perform proper calculation.

 

 

Solution:

 

Change Percentage calculated column aggregation rule to “Server Complex Aggregate”

setting1

 

Result:

result1

 

query1

 

OBIEE is sending sum(revenue by Brand) calculation to database. See sum(d1.c1) over (partition by d1.c2) in above screenshot.

Also you can notice additional sum(revenue) sql statement sent to database for grand total row calculation.