This document provides details of some points to consider for performance optimization of U-SQL tables and the queries running against them.
The real power of a distributed system and distributed processing comes from the fact data is processed in distributed manners by multiple computers and by minimizing data movement across these computers during data processing. Though the U-SQL engine has been designed to process the data where it exists (moving the compute to the data rather than moving data to the compute), it also provides the designer with some control over deciding the data distribution strategy for a U-SQL table. U-SQL provides four different distributions (HASH, RANGE, ROUND ROBIN, and DIRECT HASH). These four distribution schemes can be broadly groups into two categories: predictable distribution and un-predictable distribution.
HASH, DIRECT HASH, and RANGE distribution schemes provide predictable behavior by placing a row to a specific distribution bucket based on key(s) provided. The selection of the distribution key is done by considering certain factors such as minimizing the data skewness, minimizing the data shuffling during query execution, and the types of queries being executed on the system.
When you use a ROUND ROBIN distribution scheme, the data is evenly (or as evenly as possible) distributed among all the distribution buckets randomly. Round-robin tables do not exhibit signs of data skewness because the data is stored evenly across the distribution buckets. However, query execution might involve lots of data shuffling (based on the size of the data and the query being executed) among distribution buckets.
Now let's deep dive into the performance impact of selecting various types of distribution schemes. For example, consider the following dataset stored in into two tables, one that uses ROUND ROBIN as the distribution scheme and the other, which is hashed on Domain column:
When you store the data in the two tables above, the data distribution for both tables should be similar to that shown below (for simplicity, I have not shown all the data and distributions):
Now, execute the query below to determine the number of clicks for the CNN domain only, and then examine the execution plan:
@ClickData = SELECT * FROM MyDB.dbo.<<TableName>>; @rows = SELECT * FROM @ClickData WHERE Domain == "cnn.com"; OUTPUT @rows TO "/output.tsv" USING Outputters.tsv(); |
As expected, a query on a ROUND ROBIN table scans through all distributions (as CNN could be in any of those distributions), whereas in the case of a query on the other table (Hashed on Domain), the U-SQL engine knows the specific distributions (which are known to have "cnn.com" value) to scan and eliminates the other distributions during processing. That's how your table design impacts the performance, minimizes the data movement\processing time, and uses the true power or a distributed system. It's equally important to choose the right distribution hash key based on your query patterns (joins, filters, and groupings) and data distribution (to avoid data stickiness).
Now, execute below query to get number of clicks for each domain and examine the execution plan:
@ClickData = SELECT * FROM MyDB.dbo.<<TableName>>; @rows = SELECT Domain, SUM(Clicks) AS TotalClicks FROM @ClickData GROUP BY Domain; OUTPUT @rows TO "/output.tsv" USING Outputters.tsv(); |
As shown in the figure below, the dataset and distribution elimination does not apply in the HASH(Domain), but with the ROUND_ROBIN table there is an overhead of data shuffling during the aggregation process across distributions. Each domain can be in either of the distributions, but in the case of a hashed table, the U-SQL engine does the full aggregation.
As previously discussed in detail, the hashing distribution scheme leverages the power of a distributed system by processing data with minimal data scan and data shuffling. A summary of the benefits and considerations when choosing the best distribution hashing key follows:
Partitioning helps in loading incremental daily or hourly data, by way of deleting the existing partition (containing old data) and adding a new partition (with new data). But even more importantly, partitioning plays role in performance optimization by techniques called partition elimination and parallel processing. Let's demonstrate how these techniques work in U-SQL query execution and how the U-SQL engine optimizes performance.
First, let's create two distributed tables (one partitioned and other one non-partitioned) and load 12 months' data into them.
DROP TABLE IF EXISTS FactInternetSales_NonPartitioned; CREATE TABLE FactInternetSales_NonPartitioned ( ProductKey int, OrderDateKey int, DueDateKey int, ShipDateKey int, CustomerKey int, PromotionKey int, CurrencyKey int, SalesTerritoryKey int, SalesOrderNumber string, SalesOrderLineNumber int, RevisionNumber int, OrderQuantity int, UnitPrice decimal, ExtendedAmount decimal, UnitPriceDiscountPct decimal, DiscountAmount decimal, ProductStandardCost decimal, TotalProductCost decimal, SalesAmount decimal, TaxAmt decimal, Freight decimal, CarrierTrackingNumber string, CustomerPONumber string, OrderDate DateTime, DueDate DateTime, ShipDate DateTime, CalendarYearMonth string, INDEX IDX_FactInternetSales_NonPartitioned_CustomerKey CLUSTERED(CustomerKey ASC) DISTRIBUTED BY HASH(CustomerKey) INTO 25 ); @HistoricalData = EXTRACT ProductKey int, OrderDateKey int, DueDateKey int, ShipDateKey int, CustomerKey int, PromotionKey int, CurrencyKey int, SalesTerritoryKey int, SalesOrderNumber string, SalesOrderLineNumber int, RevisionNumber int, OrderQuantity int, UnitPrice decimal, ExtendedAmount decimal, UnitPriceDiscountPct decimal, DiscountAmount decimal, ProductStandardCost decimal, TotalProductCost decimal, SalesAmount decimal, TaxAmt decimal, Freight decimal, CarrierTrackingNumber string, CustomerPONumber string, OrderDate DateTime, DueDate DateTime, ShipDate DateTime, CalendarYearMonth string FROM "/Data/Fact/FactInternetSales_2013.csv" USING Extractors.Csv(skipFirstNRows : 1); INSERT FactInternetSales_NonPartitioned SELECT * FROM @HistoricalData; |
DROP TABLE IF EXISTS FactInternetSales_Partitioned; CREATE TABLE FactInternetSales_Partitioned ( ProductKey int, OrderDateKey int, DueDateKey int, ShipDateKey int, CustomerKey int, PromotionKey int, CurrencyKey int, SalesTerritoryKey int, SalesOrderNumber string, SalesOrderLineNumber int, RevisionNumber int, OrderQuantity int, UnitPrice decimal, ExtendedAmount decimal, UnitPriceDiscountPct decimal, DiscountAmount decimal, ProductStandardCost decimal, TotalProductCost decimal, SalesAmount decimal, TaxAmt decimal, Freight decimal, CarrierTrackingNumber string, CustomerPONumber string, OrderDate DateTime, DueDate DateTime, ShipDate DateTime, CalendarYearMonth string, INDEX IDX_FactInternetSales_Partitioned_CustomerKey CLUSTERED(CustomerKey ASC) PARTITIONED BY (CalendarYearMonth) DISTRIBUTED BY HASH(CustomerKey) INTO 25 ); ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201301"); //Jan, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201302"); //Feb, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201303"); //Mar, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201304"); //Apr, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201305"); //May, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201306"); //Jun, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201307"); //Jul, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201308"); //Aug, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201309"); //Sep, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201310"); //Oct, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201311"); //Nov, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("201312"); //Dec, 2013 ALTER TABLE FactInternetSales_Partitioned ADD PARTITION ("209912"); //Default Partition @HistoricalData = EXTRACT ProductKey int, OrderDateKey int, DueDateKey int, ShipDateKey int, CustomerKey int, PromotionKey int, CurrencyKey int, SalesTerritoryKey int, SalesOrderNumber string, SalesOrderLineNumber int, RevisionNumber int, OrderQuantity int, UnitPrice decimal, ExtendedAmount decimal, UnitPriceDiscountPct decimal, DiscountAmount decimal, ProductStandardCost decimal, TotalProductCost decimal, SalesAmount decimal, TaxAmt decimal, Freight decimal, CarrierTrackingNumber string, CustomerPONumber string, OrderDate DateTime, DueDate DateTime, ShipDate DateTime, CalendarYearMonth string FROM "/Data/Fact/FactInternetSales_2013.csv" USING Extractors.Csv(skipFirstNRows : 1); INSERT FactInternetSales_Partitioned ON INTEGRITY VIOLATION MOVE TO PARTITION ("209912") SELECT * FROM @HistoricalData; |
Now let's run the following query to summarize the sales amount for each month and analyze the execution plan for the non-partitioned table:
//Querying data for all the months @AggregationQueryNonPartitionedAllMonths = SELECT CalendarYearMonth, SUM(SalesAmount) AS TotalSalesAmount FROM FactInternetSales_NonPartitioned GROUP BY CalendarYearMonth; OUTPUT @AggregationQueryNonPartitionedAllMonths TO "/Data/Fact/Output/AggregationQueryNonPartitionedAllMonthsOutput.csv" USING Outputters.Csv(); |
As shown in the figure below, the query reads all the data from that table (~12 MB) and then it does summarization by grouping on month.
Now let's execute the following script to summarize the sales amount for the last 3 months only. Again, the query reads all the data from that table (~12 MB), applies a filter to consider only the last three months of data, and then does summarization by grouping on those selected months.
//Querying data for selected months only @AggregationQueryNonPartitionedSelectedMonths = SELECT CalendarYearMonth, SUM(SalesAmount) AS TotalSalesAmount FROM FactInternetSales_NonPartitioned WHERE CalendarYearMonth IN ("201312", "201311", "201310") GROUP BY CalendarYearMonth; OUTPUT @AggregationQueryNonPartitionedSelectedMonths TO "/Data/Fact/Output/AggregationQueryNonPartitionedSelectedMonthsOutput.csv" USING Outputters.Csv(); |
You can verify this by expanding the vertex, which indicates that the query reads 52801 rows (total rows in the table) and applies a filter to limit processing to the last 3 months of data (16044 rows) only.
Now let's run through the same exercise using the partitioned table:
//Querying data for all the months @AggregationQueryPartitionedAllMonths = SELECT CalendarYearMonth, SUM(SalesAmount) AS TotalSalesAmount FROM FactInternetSales_Partitioned GROUP BY CalendarYearMonth; OUTPUT @AggregationQueryPartitionedAllMonths TO "/Data/Fact/Output/AggregationQueryPartitionedAllMonthsOutput.csv" USING Outputters.Csv(); |
Notice that the query again reads all the data from the table as we want summarization of sales amount by month for all the available months and we haven't specified any filter condition. Notice this time that there are 13 vertices (one for each month [12] plus 1 default Catch-All partition) working together in parallel. This is a very small dataset and hence you will not notice much difference in the execution time. However, consider processing a larger data set and the amount of time that you can save by leveraging this parallel execution (after all, U-SQL is designed and optimized for big data processing).
Now let's execute the following script to summarize the sales amount for the last 3 months only from the partitioned table. In this case, notice that the query only reads data from partitions for last 3 months.
//Querying data for selected months only @AggregationQueryPartitionedSelectedMonths = SELECT CalendarYearMonth, SUM(SalesAmount) AS TotalSalesAmount FROM FactInternetSales_Partitioned WHERE CalendarYearMonth IN ("201312", "201311", "201310") GROUP BY CalendarYearMonth; OUTPUT @AggregationQueryPartitionedSelectedMonths TO "/Data/Fact/Output/AggregationQueryPartitionedSelectedMonthsOutput.csv" USING Outputters.Csv(); |
Behind the scenes, the U-SQL engine, eliminates all the data or partitions that are not needed for your query execution and only considers the last 3 months' partitions (~4MB or 16044 rows) and processes all these partitions in parallel.
Reference