Feeds:
Posts
Comments

Archive for the ‘Apache Spark’ Category

This is second article in the series of Cassandra Data Modeling best practices for efficient Spark SQL Joins. You can find previous posts here  and here.

Dimensional data modeling principles are still applicable when joining Cassandra tables using Spark SQL. However, we have to apply certain additional techniques due to the nature how Cassandra database works.

Since there is abundance of information on typical dimensional modeling approaches, I will be focusing more on the Cassandra and Spark SQL aspects of the modeling process. If you are new to dimensional modeling you can visit kimball site or do a quick search on dimensional modeling.

Denormalization

Principle #1: Minimize number of tables in Star Schema

Avoid snow flakes through denormalization. Keep the number of tables in as minimum as possible. As you have seen in Spark SQL article referenced above, having more tables in Join operation will add more shuffling and increases overall time of query execution.

Principle #2:  Avoid outer join relationships in the Star Schema

Use denormalization to remove outer join relationships. Spark SQL doesn’t pass filters to outer joining table (at least in version 1.4). So, outer-joining table will always become a full table scan.

Cassandra Specific Data Modeling Aspects – Partition Keys and Cluster Keys

Principle #3: Pick right partition keys to minimize number of partitions in Cassandra tables and apply filters in Cassandra as much as possible

Selecting right partition keys is the most important aspect of the data modeling process. In general, good Cassandra table partition key/keys meet following criteria.

Low but not too Low Cardinality: These keys enable you to store more rows in single partition. Most often these columns are used in Group By’s of your queries.

Used in Report filters: These keys are most common mandatory filters in your queries. Spark SQL queries much more efficient when partition predicates are pushed down to Cassandra database. This will help minimize the amount data flowing between Cassandra and Spark. Hence, more efficient compute resource utilization.

Non updatable: These columns are not updated.

Principle #4: Limit your max partition size to 1 GB

Cassandra read performance will suffer once your partition sizes start to grow beyond 1 GB.

Principle #5: Make sure partition growth is contained. Add computed columns to shard partitions if necessary

Depending on your business process and data model, you need to take preventive measures to ensure data in partitions cannot grow beyond limits. This may require adding extra columns to your partition keys to contain growth.

Ex: Adding a month as partition key on a transaction table ensure that new partitions are created every month.

Principle #6: Optimize number of partitions in your table

Continue to tweak your partition key list until your max partition size reaches 1 GB limit. This will ensure that your table will have optimal number of partitions. Having fewer partitions will lead to fewer Spark tasks during SQL execution.

It may not be feasible to have all partitions in table to have 1 GB size. This will not happen in 99% of data modeling situations. Your model will be fine as long as max partition in table is around 1 GB limit.

In some situations it may not be feasible to achieve 1 GB max partition size. This is OK. Main point here is to maximize the number of rows you can fit in a partition as much as possible.

Clustering Keys

Principle #7: Choose additional columns that make up primary keys to uniquely identify rows in a partition

Principle #8: Avoid picking updatable columns as cluster keys

Just like in relational tables, Cassandra primary keys are not updatable. Under the covers, Cassandra update is same as insert. So, updating keys will create a new row. Design appropriate application logic to implement updates if necessary.

Principle #9: Leverage predicate pushdown capability on cluster key columns

Ensure to keep most frequently filtered columns in the cluster key list at the top. Some time if your table has non-updatable columns that are used to filter rows most often, you can include them as part of cluster keys even if they are not required to define uniqueness of rows.

Read Full Post »

Similar to SQL performance Spark SQL performance also depends on several factors. Hardware resources like the size of your compute resources,  network bandwidth and your data model, application design, query construction etc.  Data model is the most critical factor among all non-hardware related factors. Initially, I wanted to blog about the data modeling aspects of optimization. However, I’ve decided to write about Spark-SQL inner workings as they are the core foundation behind efficient data modeling when using Spark-SQL.

I have used Datastax Enterprise 4.8 version for writing this article. DSE 4.8 comes with Spark 1.4.x and Cassandra 2.1.x. Although I’m explaining Spark-SQL from Cassandra data source perspective, similar concepts can be applied to other data sources supported by Spark SQL.

In order to optimize Spark SQL for high performance we first need to understand how Spark SQL is executed by Spark catalyst optimizer.

Let’s use below example Spark SQL statement.

 

Sample SQL

Spark creates a plan similar to below.

Plan

Spark plan can be divided into 3 phases.

Phase 1: Read data from individual tables. 

Phase 2:  Join tables

Phase 3: Aggregation.

Below is the hypothetical execution timeline for each phase.

Time

Optimization Rule #1:  Include predicates for all tables in Spark SQL query.

Spark SQL tries to optimize tasks for parallel  execution. First, it will break join statements into individual table SQL statements. It will read all tables in parallel.

Total time spent in read phase = Max read time taken by a single table.

Here is why you need to include predicates against all tables.  In normal SQL against relational database we provide predicates against only on table. Usually, on the smaller tables. RDBMS would know you to leverage filtered data from smaller table to efficiently read other tables. If you write similar query in Spark SQL, spark will create individual table SELECT statements like below during read phase.

SQL1

 

This means only table T1 will get filtered data from its data source (Cassandra in this case) into spark memory. All other tables will become full table scans. Full table scans are bad for performance in Spark.

In order to achieve high read performance we have to write spark SQL like below.

SQL2

Spark will have better read performance when data is filtered at data source.

However, just adding any predicates against Cassandra table columns will guarantee that data will be filtered at Cassandra data source. There are certain predicate pushdown rules apply to Spark-SQL. These rules depend on the data source behind each table. I will be covering Cassandra predicate pushdown rules in a separate post.

Optimization Rule #2:  Minimize number of spark tasks in scan/read phase

Spark plan creates multiple stages in read phase to read each table. There is a separate stage for each table. The number of tasks in each stage depends on the number of data partitions spark has to read into memory. By having efficient partitioning strategy on tables and utilizing proper predicates against partitions you can minimize the number of tasks in read stage.

For example, Single partition scans using = operator will create fewer tasks than multi-partition scans using “IN” operator.

Optimization Rule #3:  Order of tables in FROM clause matters. Keep the table with the largest size at the top.

Spark table is based on Dataframe which is based on RDD. In simple terms, RDD is a distribute collection. Spark SQL JOIN operation is very similar to fold left operation on a collection.

This post is not about Scala or functional programming concepts. However, it helps to know how fold left operation works on a collection. Refer to below link for the explanation of fold left. http://alvinalexander.com/scala/how-to-walk-scala-collections-reduceleft-foldright-cookbook

As you can see in the picture of Spark plan JOIN operation requires shuffling data from second table executors to first table executors to perform JOIN operation. Shuffle is very expensive operation on IO & CPU. This operation is repeated until all tables to the right are merged with the result on the left.

By keeping the table with largest data size in join operation at the top you are avoiding shuffling largest data.

Optimization Rule #4:  Keep consistent partitioning strategy across JOINing tables 

By having common partition keys it makes it easier to write queries with filters that can be applied across many of the joining tables.

Optimization Rule #5:  Minimize the number of tables in JOIN.

As you can see in the timeline of the plan picture, reads are parallel operations. However, JOINs are sequential steps. That means every join step adds to the timeline of the total execution of the query.

Use proper denormalization techniques to reduce the number of tables in your data model.

 

Read Full Post »

Apache Cassandra and Apache Spark product integration is one of the emerging trends in big data world today. Together, these two products can offer several advantages. Much has already been said about Cassandra and Spark integration. There are several products in the marker place today offering enterprise grade products.

This article is aimed at providing few modeling suggestions when you have a need to join two or more Cassandra tables using Spark. This ability to join Cassandra tables using Spark will give your several data modeling advantages for ETL/ELT process, ability to balance data redundancy and query flexibility, data analysis using Spark data frame API and Spark SQL.

Apache Spark is a distributed SQL Engine framework that allows Joining of several data sources such as Hadoop Files, Hive, Cassandra, JDBC/ODBC data sources and others. This list is continuously growing.

Cassandra is a popular NoSQL database widely used in OLTP applications. Cassandra database has CQL language interface which looks similar to SQL language, but it is not quite the same.

While traditional relational data sources store their data in row format, Cassandra stores its data in row partitions using column families. Cassandra data arrangement inside partition is very similar to pivot spreadsheet like format.

The concept of denormalized data model is heavily emphasized until now due to Cassandra’s inability  to join tables. This still the case for pure Cassandra based on OLTP applications. However, new options are opening up for enterprises that are planning to integrate Apache Spark and Cassandra products.

 

Since this a heavy topic I want to release this in multiple sessions. First here is the explanation of how Spark SQL works.

https://intelligentinsight.wordpress.com/2016/07/05/optimizing-spark-sql-join-statements-for-high-performance/

Here is the link for Cassandra modeling best practices for Spark SQL joins.

https://intelligentinsight.wordpress.com/2016/07/09/cassandra-data-modeling-principles-for-spark-sql-joins/

 

 

Read Full Post »