开发及调试

本文为您介绍开发及调试有关的常见问题。

DDLDML同在一个文本中提交运行时,DDL需要怎么声明?

DDLDML同在一个文本中提交运行时,DDL需要声明为CREATE TEMPORARY TABLE,而不是声明为CREATE TABLE。否则单击深度检查后,出现报错详情如下。

image

多个INSERT INTO语句需要怎么写?

将多个INSERT INTO语句写在BEGIN STATEMENT SET;END;之间组成一个逻辑单元。详情请参见INSERT INTO语句。否则单击深度检查后,验证报错详情如下。

image

使用Entry point Main Arguments传参数时,需要传特殊字符,应该如何处理?

  • 问题原因

    使用Entry Point Main Arguments传参时,需要传特殊字符时,例如#$,使用反斜线(\)转义也无法识别,特殊字符出现会被丢弃。

  • 解决方案

    作业运维页面单击目标作业名称,在运行参数配置区域其他配置中添加参数env.java.opts: -Dconfig.disable-inline-comment=true,具体操作请参见如何配置自定义的作业运行参数?

为什么相同UDF JAR包在经过多次修改后上传失败?

  • 问题原因

    因为UDF里面限制了JAR包之间的类名不能重复。

  • 解决方案

    • 删除后重新上传。

    • 在附加依赖文件中上传JAR包,并在代码中使用临时函数。临时函数使用方式详情请参见注册UDF。示例如下。

      CREATE TEMPORARY FUNCTION `cp_record_reduce` AS 'com.taobao.test.udf.blink.CPRecordReduceUDF';

      image

为什么使用POJO类作为UDTF返回类型时字段会出现“错位”?

  • 问题详情

    当使用POJO类作为UDTF返回类型,并在SQL中显式声明了UDTF返回列的别名列表(Alias Name)时,可能会出现字段错位(即使类型一致,但实际使用的字段可能与预期不符)问题。

    例如,如果使用如下POJO类作为UDTF的返回类型,并根据自定义函数开发的要求进行打包并完成函数注册(这里使用作业级自定义函数注册方式)后,SQL校验会失败。

    package com.aliyun.example;
    
    public class TestPojoWithoutConstructor {
    	public int c;
    	public String d;
    	public boolean a;
    	public String b;
    }
    package com.aliyun.example;
    
    import org.apache.flink.table.functions.TableFunction;
    
    public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> {
    	private static final long serialVersionUID = 1L;
    
    	public void eval(String str1, Integer i2) {
    		TestPojoWithoutConstructor p = new TestPojoWithoutConstructor();
    		p.d = str1 + "_d";
    		p.c = i2 + 2;
    		p.b = str1 + "_b";
    		collect(p);
    	}
    }
    CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor';
    
    CREATE TEMPORARY TABLE src ( 
      id STRING,
      cnt INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE sink ( 
      f1 INT,
      f2 STRING,
      f3 BOOLEAN,
      f4 STRING
    ) WITH (
     'connector' = 'print'
    );
    
    INSERT INTO sink
    SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);

    SQL校验报错信息如下:

    org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match.
    Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL.
    Hint: You will need to rewrite or cast the expression.
    
    Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING]
    Sink schema:  [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING]
    	at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)

    看起来从UDTF返回的字段和POJO类中的字段可能错位了,SQL中字段c最终是BOOLEAN,而字段aINT类型,和POJO类的定义恰好相反。

  • 问题原因

    根据POJO类的类型规则:

    • 如果POJO类实现了有参构造函数,推导的返回类型会按构造函数的参数列表顺序。

    • 如果POJO类缺少有参构造函数,就会按字段名的字典序重新排列。

    在上述示例中,由于UDTF返回类型缺少有参构造函数,因此对应的返回类型为BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d)。虽然这一步并没有产生错误,但因为SQL中对返回字段加了重命名列表LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b),这导致对推导出的类型显式进行了重命名(基于字段位置进行映射),进而引发与POJO类中的字段错位问题,出现校验异常或非预期的数据错位问题。

  • 解决方案

    • POJO类缺少有参构造函数时,去掉对UDTF返回字段的显式重命名,如将上述SQLINSERT语句改为:

      -- POJO类无有参构造函数时,推荐显式选择需要的字段名,使用 T.* 时需要明确知晓实际返回的字段顺序。
      SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
    • POJO类实现有参构造函数,以确定返回类型的字段顺序。这种情况下UDTF返回类型的字段顺序就是有参构造函数的参数顺序。

      package com.aliyun.example;
      
      public class TestPojoWithConstructor {
      	public int c;
      	public String d;
      	public boolean a;
      	public String b;
      
      	// Using specific fields order instead of alphabetical order
      	public TestPojoWithConstructor(int c, String d, boolean a, String b) {
      		this.c = c;
      		this.d = d;
      		this.a = a;
      		this.b = b;
      	}
      }

如何解决Flink依赖冲突问题?

  • 问题现象

    • 有明显报错,且引发错误的为FlinkHadoop相关类。

      java.lang.AbstractMethodError
      java.lang.ClassNotFoundException
      java.lang.IllegalAccessError
      java.lang.IllegalAccessException
      java.lang.InstantiationError
      java.lang.InstantiationException
      java.lang.InvocationTargetException
      java.lang.NoClassDefFoundError
      java.lang.NoSuchFieldError
      java.lang.NoSuchFieldException
      java.lang.NoSuchMethodError
      java.lang.NoSuchMethodException
    • 无明显报错,但会引起一些不符合预期的现象,例如:

      • 日志不输出或log4j配置不生效。

        该类问题通常是由于依赖中携带了log4j相关配置导致的。需要排查作业JAR包中是否引入了log4j配置的依赖,可以通过在dependency中配置exclusions的方式去掉log4j配置。

        说明

        如果必须要使用不同版本的log4j,需要使用maven-shade-pluginlog4j相关的类进行relocation。

      • RPC调用异常。

        FlinkAkka RPC调用出现依赖冲突可能导致的异常,默认不会显示在日志中,需要开启Debug日志进行确认。

        例如,Debug日志中出现Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx},但是JM日志在Registering TaskManager with ResourceID xxx后,没有下文,直到资源请求超时报错NoResourceAvailableException。此外TM持续报错Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}

        原因:开启Debug日志后,发现RPC调用报错InvocationTargetException,该报错导致TM Slot分配到一半失败出现状态不一致,RM持续尝试分配Slot失败无法恢复。

  • 问题原因

    • 作业JAR包中包含了不必要的依赖(例如基本配置、Flink、Hadooplog4j依赖),造成依赖冲突从而引发各种问题。

    • 作业需要的Connector对应的依赖未被打入JAR包中。

  • 排查方法

    • 查看作业pom.xml文件,判断是否存在不必要的依赖。

    • 通过jar tf foo.jar命令查看作业JAR包内容,判断是否存在引发依赖冲突的内容。

    • 通过mvn dependency:tree命令查看作业的依赖关系,判断是否存在冲突的依赖。

  • 解决方案

    • 基本配置建议将scope全部设置为provided,即不打入作业JAR包。

      • DataStream Java

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataStream Scala

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Java

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Scala

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    • 添加作业需要的Connector对应的依赖,并将scope全部设置为compile(默认的scopecompile),即打入作业JAR包中,以Kafka Connector为例,代码如下。

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>${flink.version}</version>
      </dependency>
    • 其他Flink、Hadooplog4j依赖不建议添加。但是:

      • 如果作业本身存在基本配置或Connector相关的直接依赖,建议将scope设置为provided,示例如下。

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <scope>provided</scope>
        </dependency>
      • 如果作业存在基本配置或Connector相关的间接依赖,建议通过exclusion将依赖去掉,示例如下。

        <dependency>
            <groupId>foo</groupId>
              <artifactId>bar</artifactId>
              <exclusions>
                <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
               </exclusion>
            </exclusions>
        </dependency>

报错:Could not parse type at position 50: expected but was . Input type string: ROW

  • 报错详情

    SQL编辑器中编写SQL时,使用UDTF出现语法检查错误(红色波浪线)。

    Caused by: org.apache.flink.table.api.ValidationException: Could not parse type at position 50: <IDENTIFIER> expected but was <KEYWORD>. Input type string: ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>

    代码如下:

    @FunctionHint(
        //input = @DataTypeHint("BYTES"),
        output = @DataTypeHint("ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>"))
    public class PointChangeMetaQPaser1 extends TableFunction<Row> {
    
        Logger logger = LoggerFactory.getLogger(this.getClass().getName());
    
        public void eval(byte[] bytes) {
            try {
                String messageBody = new String(bytes, "UTF-8");
                Map<String, String> resultDO = JSON.parseObject(messageBody, Map.class);
                logger.info("PointChangeMetaQPaser1 logger:" + JSON.toJSONString(resultDO));
    
                collect(Row.of(
                        getString(resultDO.get("resultId")),
                        getString(resultDO.get("pointRange")),
                        getString(resultDO.get("from")),
                        getString(resultDO.get("to")),
                        getString(resultDO.get("type")),
                        getString(resultDO.get("pointScope")),
                        getString(resultDO.get("userId")),
                        getString(resultDO.get("point")),
                        getString(resultDO.getOrDefault("triggerSource", "NULL")),
                        getString(resultDO.getOrDefault("time", String.valueOf(System.currentTimeMillis()))),
                        getString(resultDO.getOrDefault("uuid", String.valueOf(UUID.randomUUID())))
                ));
            } catch (Exception e) {
                logger.error("PointChangeMetaQPaser1 error", e);
            }
        }
    
        private String getString(Object o) {
            if (o == null) {
                return null;
            }
            return String.valueOf(o);
        }
    }
  • 报错原因

    当使用DataTypeHint定义函数的数据类型时,系统保留的关键字被直接作为了字段名称。

  • 解决方案

    • 将变量名换成非关键字的名称,例如to换成fto,from换成ffrom等。

    • 将已经用关键字取名的变量名加上反撇号(``)。