The data silo problem is like arthritis for internet companies because almost everyone gets it as they get older. Businesses communicate with customers through websites, mobile apps, H5 pages and end devices. For one reason or another, it is difficult to integrate data from all these sources. The data remains where it is and cannot be linked together for further analysis. This is how data silos are created. As your business grows, the more diverse sources of customer data you will have and the more likely you will be trapped in data silos.
This is exactly what happens to the insurance company that I will talk about in this post. By 2023, they have already served over 500 million clients and signed 57 billion insurance contracts. When they started building a customer data platform (CDP) to accommodate this size of data, they used multiple components.
Data silo in CDP
Like most data platforms, their CDP 1.0 had a batch processing pipeline and a real-time streaming pipeline. The offline data was loaded, via Spark jobs, into Impala, where it was tagged and divided into groups. Meanwhile, Spark also sent it to NebulaGraph for OneID calculation (elaborated on later in this post). On the other hand, real-time data is tagged by Flink and then stored in HBase, ready for querying.
This led to a compute layer with many components in CDP: Impala, Spark, NebulaGraph and HBase.
As a result, offline tags, real-time tags, and chart data were scattered across multiple components. Their integration for further data services was expensive due to redundant storage and bulky data transfer. Moreover, due to storage differences, they had to increase the size of the CDH cluster and the NebulaGraph cluster, increasing resource and maintenance costs.
CDP based on Apache Doris
For CDP 2.0, they decided to introduce a unique solution to clean up the clutter. At the CDP 2.0 compute layer, Apache Doris takes over both real-time and offline data storage and computation.
Swallow offline data, use the Stream Load method. Their 30-thread swallow test shows that it can perform more than 300,000 upserts per second. Load data in real time, use a combination of Flink-Doris-Connector and Stream Load. Additionally, in real-time reports where they need to pull data from multiple external data sources, they use the Multi-Catalog feature to federal inquiries.
The user analysis workflow on this CDP goes like this. They first sort customer information and then add tags to each customer. Based on the tags, they divide customers into groups for more targeted analysis and work.
Then I’ll dive into these workloads and show you how Apache Doris accelerates them.
OneID
Has this ever happened to you when you have different user registration systems for your products and services? You can collect UserID A’s email from one product website, and later UserID B’s social security number from another. Then you find out that UserID A and UserID B actually belong to the same person because they have the same phone number.
That’s why OneID appears as an idea. This is to collect the user registration information of all lines of business into one large table in Apache Doris, sort them and ensure that one user has a unique OneID.
This is how they detect which registration data belongs to the same user using functions in Apache Doris.
Tagging services
This CDP contains information on 500 million customerswhich come from above 500 original tables and they are attached to the over 2000 tags in total.
By timeliness, tags can be divided into real-time tags and offline tags. Real-time tags are calculated by Apache Flink and written to a flat table in Apache Doris, while offline tags are calculated by Apache Doris as they are derived from the user attribute table, business table, and user behavior table in Doris. This is the company’s best practice in tagging data:
1. Offline tags
During peak data writes, a full update can easily cause an OOM error due to their huge amount of data. To avoid this, they use the INSERT INTO SELECT function of Apache Doris and enable partial column update. This will greatly reduce memory consumption and maintain system stability while loading data.
set enable_unique_key_partial_update=true;
insert into tb_label_result(one_id, labelxx)
select one_id, label_value as labelxx
from .....
2. Real-time tags
Partial column update is also available for real-time tags because even real-time tags update at different rates. All that is required is to set it up partial_columns
to true
.
curl --location-trusted -u root: -H "partial_columns:true" -H "column_separator:," -H "columns:id,balance,last_access_time" -T /tmp/test.csv http://127.0.0.1:48037/api/db1/user_profile/_stream_load
3. Queries with high competitiveness
With its current size of business, the company receives requests for competitive-level tag inquiries in excess of 5,000 QPS. They use a combination of strategies to guarantee high performance. First, they adopt Prepared Statement to precompile and preexecute SQL. Second, they fine-tune parameters for the Doris Backend and tables to optimize storage and performance. Finally, they enable row caching to complement the column-oriented Apache Doris.
disable_storage_row_cache = false
storage_page_cache_limit=40%
enable_unique_key_merge_on_write = true
store_row_column = true
light_schema_change = true
4. Calculation of marks (joining)
In practice, many markup services are implemented by joining multiple tables in a database. This often involves more than 10 tables. For optimal computational performance, they adopt the collocation group strategy in Doris.
Customer grouping
The customer clustering pipeline in CDP 2.0 goes like this: Apache Doris receives SQL from customer service, performs a calculation, and sends the result set to S3 object storage via SELECT INTO OUTFILE. The company divided its customers into 1 million groups. Customer grouping task that was running before 50 seconds in the Impala it just needs to be finished now 10 seconds in Doris.
In addition to grouping customers for more precise analysis, they sometimes do the analysis in the reverse direction. That is, to target a specific customer and find out which groups he/she belongs to. This helps analysts understand customer characteristics as well as how different customer groups overlap.
In Apache Doris this is implemented with BITMAP functions: BITMAP_CONTAINS
is a quick way to check if a customer is part of a certain group, and BITMAP_OR
, BITMAP_INTERSECT
and BITMAP_XOR
are choices for cross-analysis.
Conclusion
From CDP 1.0 to CDP 2.0, the insurance company adopts Apache Doris, a unified data warehouse, to replace Spark+Impala+HBase+NebulaGraph. This increases their data processing efficiency by breaking down data silos and streamlining the data processing pipeline. In CDP 3.0, they want to group their customers by combining real-time and offline tags for more diverse and flexible analysis. The Apache Doris community and the VeloDB team will continue to be a support partner during this upgrade.