In the fast-paced world of data analytics, moving from legacy systems like Hive to modern platforms such as Trino and Spark is crucial for organizations looking to enhance performance and analytical capabilities. This blog delves into the complexities of migrating from Hive to Trino and Spark, highlighting the challenges faced, solutions implemented, and valuable insights gained during this transformative journey.
Background: Why migrate from Hive?
For years, Hive was the cornerstone of our data processing architecture, effectively managing large-scale data workloads. However, as the demand for faster and more flexible data processing grew, it became clear that relying solely on Hive had its limitations.
Many users and engineers familiar with SQL were seeking a more efficient system. Trino emerged as an attractive alternative due to its robust SQL analytics and ability to handle interactive queries, offering a familiar SQL interface for both users and engineers.
However, for resource-intensive tasks requiring substantial memory, Trino alone was insufficient. In these scenarios, Apache Spark proved invaluable, providing the necessary power and flexibility to handle demanding workloads. This combination of Trino and Spark allows us to leverage the strengths of both platforms to meet our diverse data processing needs.
Key challenges
Ensuring behavioral consistency
Maintaining consistency in SQL query results during the transition from Hive to newer platforms was a challenge. Despite some historical inaccuracies in Hive logic, it was crucial to preserve these results to avoid unintended business impacts. For instance, Trino's automatic lowercasing of non-ASCII alphabets differs from Hive's behavior and could potentially disrupt established business logic. Ensuring that such discrepancies do not adversely affect outcomes was a complex but necessary task.
Service level management
Managing high-resource tasks while ensuring timely completion was critical. These tasks needed to reliably meet service level agreements (SLAs), as delays or inconsistencies could directly affect numerous business operations.
Tackling high query costs and inefficiencies
High operational costs associated with frequent table updates were a primary challenge with Hive, especially for high-volume jobs. Handling data volumes of 600GB per partition and processing 15 partitions daily led to significant resource consumption and inefficiencies.
The transition to Trino: A deep dive
Business logic and environment
We gather behavioral and transaction data from Yahoo! JAPAN's web and mobile applications, focusing on Yahoo! JAPAN Shopping, to enhance user experience. This data is processed through robust pipelines built on batch processing workflows like Apache Airflow, utilizing a combination of SQL processing, Hadoop, object storages, and various SQL databases.
With over 300 data processing pipelines, we undertook the ambitious task of migrating more than 200 statistical workflows from Hive, where data was predominantly stored on Hadoop.
Why we chose Trino
Trino excels at fast, flexible querying on data stored in HDFS with Hive Meta Store. Its in-memory processing, pipelined execution, and optimizations for columnar data formats align perfectly with our needs. The enterprise data warehouse (EDW) team's familiarity with SQL made Trino's wide range of standard functionalities a significant advantage, allowing analysts to focus on business logic without complex syntax. Additionally, Trino's performance surpasses Hive, offering faster query execution and more efficient resource use.
Implementation strategy
To transition to Trino, we divided migration tasks at the pipeline level, focusing on each Airflow DAG. This involved modifying SQL scripts to be Trino-compatible and changing the execution endpoint from Hive to Trino. Each change underwent rigorous testing, code review, and a structured release process. Our goal was to migrate all Hive jobs to Trino while ensuring consistent output and minimal business impact. This approach allowed us to leverage Trino's strengths while maintaining data processing reliability and effectiveness.
Lessons learned and best practices from Hive to Trino
Behavioral consistency issues
Metadata synchronization
A significant challenge during the migration was ensuring proper metadata synchronization between Trino and the Hive metastore. We observed that Trino insert operations on partitioned Hive tables did not always update or synchronize with the Hive metastore correctly, leading to delays and inconsistencies in data visibility and partitioning.
- Issue: Trino insert queries on partitioned Hive tables sometimes failed to sync metadata properly, resulting in data not being immediately visible or correctly partitioned.
- Solution: We addressed this issue by adding the command
CALL system.sync_partition_metadata
to the task involving all partitioned tables in the workflow engine. This ensures a full synchronization of partition metadata between Trino and the Hive metastore, accurately reflecting all changes.
CALL system.sync_partition_metadata('schema', 'table', 'FULL');
Metadata and I/O issues with partition deletions or overwrites
Task failures or retries often occurred after deleting or overwriting partitions.
- Issue: Depending on partition size, the number of partitions, and the system status, deleting partitions could take a long time, leading to task retries or failures. This was especially problematic with concurrent access on nested partitioned tables, such as daily or monthly partitions.
- Solution: We mitigated this by dividing tables based on time, such as creating separate daily and monthly tables.
Tinyint data type
Incompatibilities arose when inserting tinyint data through Trino.
- Issue: Trino’s insertion of tinyint data does not generate or update statistical metadata, making it unreadable by Hive.
- Solution: We set the following Hive properties on Hive:
set hive.stats.fetch.partition.stats=false;
set hive.stats.fetch.column.stats=false;
DDL issue: Setting tab separator in Trino
Unable to use "\t" as textfile_field_separator
.
- Issue: In Trino, using "\t" directly as a
textfile_field_separator
is not supported. This can pose challenges when trying to create tables with tab-separated fields. - Solution: Instead of "\t", use the Unicode escape sequence
U&'\0009'
to specify a tab character as the field separator.
format = 'TEXTFILE',
textfile_field_separator = U&'\0009'
DDL issue: Defining empty string as NULL in Trino
No option to define NULL as an empty string.
- Issue: In Trino, there isn't a direct option in the DDL to define a NULL to an empty string.
- Solution: To handle this issue, use the
COALESCE
function in your queries.COALESCE
allows you to replace NULL values with a specified default value, such as an empty string, during query execution.
SELECT COALESCE(val, '')
FROM example_table;
Decimal CAST
Decimal type data calculation result difference.
- Issue: When performing calculations involving decimal data types with precision exceeding 15 digits, discrepancies in results can occur between Hive and Trino.
- Solution: To ensure consistent results across platforms, it's crucial to explicitly set the decimal precision in your queries. By defining the precision, you can control the accuracy of the calculations and minimize discrepancies.
-- Example Hive double use 16 decimal precision
SELECT CAST(val AS decimal(17, 16)) FROM example_table;
Data ordering with NULL values
Trino and Hive handle NULL ordering differently in queries.
- Issue: Hive places NULL values at the beginning, while Trino places them at the end.
- Solution: Use explicit NULL ordering.
Hive:
SELECT val FROM example_table ORDER BY val;
Trino:
SELECT val FROM example_table ORDER BY val NULLS LAST;
Handling escape characters
Differences in escape character handling affected query results.
- Issue: Hive uses double backslashes for escape characters, while Trino uses a single backslash.
- Solution: Adjust escape characters accordingly.
Hive:
SELECT regexp_replace(url, '\\', '');
Trino:
SELECT regexp_replace(url, '\', '');
Querying complex data types
Errors occurred when querying complex data types like maps or structures in Trino.
- Issue: Trino threw errors when encountering NULL or inconsistent data in complex types.
- Solution: Use
element_at
in Trino.
Hive:
SELECT col_name['key'];
Trino:
SELECT element_at(col_name, 'key');
Numeric type casting and calculations
Trino does not automatically cast numeric types or compare different types.
- Issue: Comparisons or calculations involving different numeric types led to errors, and integer division results differed from Hive.
- Solution: Ensure proper type casting with
CAST
and useTRY_CAST
to handle NULL or inconsistent data types.
Challenges unresolved by Trino
While Trino addressed many of our data processing needs, certain challenges remained beyond its capabilities.
- Hive UDFs: Handling custom Hive User-Defined Functions (UDFs) presented compatibility issues that Trino could not fully resolve within our time constraints.
- Resource-Intensive Tasks: Tasks requiring massive memory usage, beyond what Trino's architecture could efficiently support, necessitated an alternative approach.
Addressing remaining challenges with Spark
To tackle the challenges that Trino couldn't address, we turned to Apache Spark, which offered the necessary power and flexibility to handle demanding workloads effectively. Spark's robust architecture enabled us to execute complex tasks and manage high memory demands, making it an ideal complement to Trino's strengths in interactive querying.
Comparability issues
Fortunately, compatibility issues, such as those involving Hive UDFs, were largely manageable. We could either use Hive UDFs directly in Spark or rewrite queries to accommodate differences. Here are some notable points:
Setting variables in queries
Set variables in a query are not supported by Trino and Spark.
- Issue: Hive allowed setting variables within queries for time calculations, a feature not supported by Trino or Spark.
- Solution: We utilized Python functions in Airflow to perform time calculations and passed these as execution variables to both Trino and Spark.
Resource-intensive task challenges
Limited Spark memory resulting in slower jobs
Spark tasks were too slow compared to Hive.
- Issue: Some Hive tasks used over 100TB of memory at peak, but Spark tasks were limited to about 10% of the peak memory, resulting in slower jobs.
- Solution: We worked on increasing the memory allocation for Spark tasks to better accommodate these demands.
Memory usage discrepancies
Spark used more memory than Hive.
- Issue: Spark required a higher memory footprint than Hive but delivered superior CPU efficiency, improving performance by approximately 2.4 times. However, achieving speeds equivalent to Hive sometimes required substantial memory resources, necessitating careful planning and resource allocation.
- Solution: Disabling Spark's adaptive options like
skewJoin
andcoalescePartition
could save memory, though this was not recommended due to potential slowdowns in execution.
# Setting the option below to false could save memory, but it might slow down execution. However, it may be necessary if the task reaches the physical memory limit
spark.sql.adaptive.skewJoin.enabled
spark.sql.adaptive.coalescePartitions.enabled
Small file generation
Spark generated many small files.
- Issue: The use of many executors and numerous shuffle partitions led to the generation of a large number of small files.
- Solution: As our operations were SQL-based, our primary approach was to use the
DISTRIBUTE BY
clause in SQL to better manage file output and reduce the proliferation of small files.
By addressing these challenges through strategic use of Apache Spark, we effectively complemented Trino's capabilities, ensuring that our data processing infrastructure remained robust, efficient, and scalable. This approach allowed us to tackle complex tasks while maintaining high performance and resource efficiency.
Still remaining challenges in progress
While we've made significant strides in addressing our data processing challenges, some tasks remain in progress. For instance, we continue to tackle a task that involves processing and updating a total of 9 TB of data, with each partition comprising 600GB, on a daily basis. Our current strategies under consideration include:
Data restructuring
By revisiting the structural design of our datasets, we aim to streamline operations and reduce unnecessary data updates.
Bucketing
Implementing user-id-based bucketing was considered to manage dataset size more effectively.
Exploring new technologies like Apache Iceberg
Although Iceberg promises advanced data management capabilities like schema evolution and partition pruning, its implementation feasibility remains challenging due to existing infrastructure constraints.
Conclusion
The migration from Hive to Trino and Spark has been transformative, unlocking new analytical possibilities and enhancing data processing frameworks. Despite challenges, the insights gained highlight the transformative power of these platforms. As we refine strategies and explore innovations like Apache Iceberg, our dedication to advanced solutions remains unwavering, driving efficiency and innovation in our data practices.
We hope our migration experiences provide valuable insights and inspire others embarking on similar journeys. Thank you for engaging with our story; we invite discussions to further explore this exciting topic.