AKS 上的 HDInsight 上的 Apache Flink® 群集中的 Hive 方言

重要

AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 通过此公告了解详细信息。

需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。

重要

此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款包括适用于 beta 版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight on AKS 的预览版信息 。 有关问题或功能建议,请在 AskHDInsight 上提交一个请求,其中包含详细信息,并关注我们以获取 有关 Azure HDInsight 社区的详细信息。

本文介绍如何在 AKS 上的 HDInsight 上的 Apache Flink 群集中使用 Hive 方言。

介绍

用户无法将默认 flink 方言更改为 hive 方言,以便在 AKS 群集上的 HDInsight 上使用。 所有 SQL 操作在切换为 Hive 方言后均失败,并出现以下错误。


*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*

此问题是由于一个未解决的Hive Jira引起的。 目前,Hive 假定系统类加载程序是 URLClassLoader 的实例。 在 Java 11中,这种假设不是这样。

  • 在 webssh 中执行以下步骤:

    1. 在 lib 位置删除现有的 flink-sql-connector-hive*jar
      rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
      
    2. webssh pod 中下载以下 jar,并将其添加到 /opt/flink-webssh/lib wget https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader/1.17.0下。 (上述 hive jar 包含修复 https://issues.apache.org/jira/browse/HIVE-27508
    mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
    mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
    
    1. 在“core-site.xml”部分下的 flink 配置管理中添加以下密钥:
      fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      
  • 下面是 hive-dialect 查询的概述

    • 在不分区的情况下在 Flink 中执行 Hive 方言
      root [ ~ ]# ./bin/sql-client.sh
      Flink SQL>
      Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
      [INFO] Execute statement succeed.
    
      Flink SQL> use catalog myhive;
      [INFO] Execute statement succeed.
    
      Flink SQL> load module hive;
      [INFO] Execute statement succeed.
    
      Flink SQL> use modules hive,core;
      [INFO] Execute statement succeed.
    
      Flink SQL> set table.sql-dialect=hive;
      [INFO] Session property has been set.
    
      Flink SQL> set sql-client.execution.result-mode=tableau;
      [INFO] Session property has been set.
    
      Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8
    
    
    > [!WARNING]
    > An illegal reflective access operation has occurred
    
    > [!WARNING]
    > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string
    
    > [!WARNING]
    > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils
    
    > [!WARNING]
    > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations
    
    > [!WARNING]
    >  All illegal access operations will be denied in a future release
    select explode(array(1,2,3));
    
    
    +----+-------------+
    | op |         col |
    +----+-------------+
    | +I |           1 |
    | +I |           2 |
    | +I |           3 |
    +----+-------------+
    
    Received a total of 3 rows
    
    Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837
    
    [INFO] Execute statement succeed.
    
    Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d');
    
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: d0542da4c4252f9494298666ff4e9f8e
    
    Flink SQL> set execution.runtime-mode=batch;
    [INFO] Session property has been set.
    
    Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31
    
    +-----+-------+
    | key | value |
    +-----+-------+
    |   1 |     a |
    |   1 |     a |
    |   2 |     b |
    |   3 |     c |
    |   3 |     c |
    |   3 |     c |
    |   4 |     d |
    |   5 |     e |
    +-----+-------+
    8 rows in set
    
    Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98
    
    [INFO] Exiting Flink SQL CLI Client...
    
    Shutting down the session...
    done.
    root [ ~ ]# exit
    

    数据将写入配置在 hive/warehouse 目录中的同一容器中。

    屏幕截图显示了容器表 1。

    • 在 Flink 中执行带有分区的 Hive 方言
  create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');

  insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');

屏幕截图显示了容器表 2。

屏幕截图显示了容器表 3。

参考