Make Apache Spark better with CarbonData

Brijoobopanna
9 min readJun 6, 2021

--

Spark is no doubt a powerful processing engine and a distributed cluster computing framework for faster processing. Unfortunately there are few areas where spark has drawbacks. If we combine Apache Spark with Apache CarbonData, it can overcome those drawbacks. Few of the drawbacks with Apache Spark are as below:

  1. No Support for ACID transaction
  2. No data quality enforcement
  3. Small files problem
  4. Inefficient data skipping

What is ACID?

Spark and ACID

  1. ATOMICITY:

The A in ACID stands for Atomicity. Basically, it means, either all or nothing. So, when you are using spark data frame writer API, it should either write full data or nothing. Let’s quickly look at the Spark documentation. As per Spark Documentation: “It is important to realize that these save mode (overwrite) do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data.”

Though the entire situation looks a bit scary, in reality it is not that bad. Spark dataframe API internally performs job level commit, which helps to achieve some degree of atomicity and this works with “append” mode using Hadoop’s FileOutputCommitter. However, the default implementation comes with a performance overhead, especially when using cloud storage [S3/OBS] instead of HDFS.

We can now run the below code to demonstrate that the Spark overwrite is not Atomic and it can cause data corruption or data loss. The first part of the code imitates job 1 which creates 100 records and saves it into the ACIDpath directory. The second part of the code imitates job 2, which tries to overwrite the existing data but raises an exception in the middle of the operation. The outcome of these two jobs was a data loss. In the end, we lost the data that was created by the first job.

As a result of the exception, the job level commit does not happen, so the new file is not saved. Since Spark deleted the old file, we lost the existing data. Spark data frame writer APIs are not Atomic, but it behaves like an Atomic operation for append operation

2. CONSISTENCY:

Distributed systems are often built on top of machines that have lower availability. Consistency is a key concern in highly available systems. A system is consistent if all nodes see and return the same data, at the same time. There are several consistency models, the most common one used in distributed systems being strong consistency, weak consistency and eventual consistency. We understood that the overwrite mode of Spark writer API will delete the old file first and then will place the new one. So, in between these two states there will be a time when no data will be available. If our job fails then we will lose the data. Which means there is no smooth transaction between these two operations. This is a typical Atomicity problem with the Spark overwrite operation. And this problem also breaks the data consistency. Spark API lacks consistency. Hence, Spark write mode does not support consistency.

3. Isolation and Durability in Spark

Isolation means separation. Separation from any other concurrent operation. Suppose we are writing to a dataset which is not yet committed and there is another concurrent process which is reading/writing on the same dataset. As per isolation property, in this situation, one should not impact others. A typical database would offer different isolation levels, such as read committed and serializable. Though spark has task level commit and job level commit, due to lack of atomicity in write operation Spark cannot offer proper isolation.

Finally, Durability is a committed state/data saved by the system such that, even in the event of a failure and system restart, the data is available in its correct state. Durability is offered by the storage layer and in case of spark application it is the role of HDFS and S3/OBS. However, when Spark does not provide proper commit due to lack of atomicity, and without proper commit we cannot expect durability.

If we observe carefully all these ACID properties are interconnected. Due to lack of Atomicity we lose Consistency and Isolation, and due to lack of Isolation we lose Durability.

Lack of Schema Enforcement:

We know Spark implies Schema on read. Hence when we write any data it will not throw an exception, if there is any schema mismatch. Let us try to understand this with an example. Let us have an input array with below records. The below program will read the csv and transform to a DF

The program reads from a CSV, write back in parquet format and display the data. The output is as below

Let’s read another input CSV file with the ‘Cost’ column having decimal values instead of integers (as shown below) and do an append operation to the above file.

In this case our program will read the CSV, write into Parquet format without any exception. The moment we want to show/display the data-frame our program will throw an error

It’s happening because Spark never validates schema during write operation. The schema of the “Cost” column is inferred as Integer during first load and during second write it appends double type data without any issue. The moment we read the appended data and call an action it throws an error due to incompatibility of the schema.

How to overcome the above drawbacks of Spark

The above problems can be managed if we plugin CarbonData as an additional layer of Storage solution with Apache Spark.

What is CarbonData

Because Hadoop Distributed File System (HDFS) and object stores are like file systems, they are not designed for providing transactional support. Implementing transactions in distributed processing environments is a challenging problem. For example, implementation typically has to consider locking access to the storage system, which comes at the cost of overall throughput performance. Storage solutions such as Apache CarbonData solve these ACID requirements of data-lakes, efficiently by pushing these transactional semantics and rules into the file formats themselves or with metadata and file formats combination. CarbonData plays an intermediary service between Apache Spark and the storage system. Now the responsibility of complying with ACID is taken care of by CarbonData. The underlying storage system could be anything like HDFS, Huawei OBS, Amazon S3 or Azure Blob Storage. Few important features CarbonData offers to Spark are:

  1. ACID transactions.
  2. Schema enforcement/Schema validation.
  3. Enables Updates, Deletes and Merge.
  4. Automatic data indexing.

CarbonData in Apache Spark: ACID

In the above code snippet, the first part of the code imitates job-1, creates 100 records and saves it into the ACIDpath directory. The second part of the code imitates job-2, which tries to overwrite the existing data but throws an exception in the middle of the operation.

The outcome of these two jobs is a data loss. In the end, we lost the data that was created by the first job. Now let’s change the code as shown below to work with CarbonData.

Execute the first job and count the number of rows. As expected, you will get 100 rows.

If you check the data directory, you will see a snappy compressed CarbonData file. That data file holds 100 rows in a columnar encoded format. You will also see a metadata directory that contains a tablestatus file. Now execute the second job. What is your expectation from the second job? As earlier, this job should try to do the following things.

  1. Delete the previous file.
  2. Create a new file and start writing records.
  3. Throw a runtime exception in the middle of the job.

As a result of an exception, the job level commit does not happen, we lost the existing data as observed above without having the CarbonData solution integrated.

But now if you execute the second job, you will still get an exception. Then, count the rows. You get the output as 100 and you don’t lose older records. Looks like CarbonData has made the Overwrite atomic. Let’s look at the data directory, you will find two CarbonData files.

One file was created by the first job and the other file is created by the job 2. Instead of deleting the older file, the job2 directly created a new file and started writing data into the new one. This approach leaves the old data state unchanged. And that is why we didn’t lose the older data because the older files remain untouched. The new incomplete file is also there but the data in the new incomplete file is not read. The logic is hidden in the metadata directory and managed using the tablestatus file. The second job could not create a successful entry in the tablestatus file because it failed in the middle. The read API will not read files for which the entry in tablestatus file is Marked for Delete.

This time, let us write code logic without any exception and overwrite the old 100 records with just 50 records.

Now the record count shows 50. As expected. So, you have overwritten the older data set of 100 rows with a new data set of 50 rows.

CarbonData brings metadata management onto Apache Spark and makes Spark data writer APIs atomic, solving the data consistency problem. Once that consistency problem is solved, CarbonData would be able to offer updates and delete functionality.

Spark With CarbonData: Schema Enforcement

Let us consider a simple user scenario where data arrives in multiple batches for transformation. Here for simplicity let us assume just 2 batches of data and the second batch of data carry some column data with a different type compared to the 1st.

To start the experiment let us read the data as shown from table1 and write the data with and without CarbonData. We are able to write data with and without CarbonData using ‘Overwrite’ Mode.

Let us now read the second table having double type of data for cost column type and then write the dataframe into Parquet and CarbonTables (Note:_c2 is integer type and we are trying to append double type data). There is no issue to append schema mismatched data using parquet, But when the program tries to append the same into CarbonData table it will throw an error:

So, based of above experiment's we can see CarbonData validates the schema before writing into the underlying storage, which means CarbonData uses schema validation on write. If the type is not compatible then CarbonData will cancel the transaction. This will help to track the issue at the beginning rather than mixing up with good data and then try to find out the root cause.

In our next blog let us continue to discuss more on:

  • Delete and update records using CarbonData.
  • Efficient Upserts into Data Lakes with Merge statement.

--

--

Brijoobopanna
Brijoobopanna

No responses yet