本文为您介绍开发及调试有关的常见问题。
在DDL和DML同在一个文本中提交运行时,DDL需要怎么声明?
在DDL和DML同在一个文本中提交运行时,DDL需要声明为CREATE TEMPORARY TABLE
,而不是声明为CREATE TABLE
。否则单击深度检查后,出现报错详情如下。
多个INSERT INTO语句需要怎么写?
将多个INSERT INTO语句写在BEGIN STATEMENT SET;
和END;
之间组成一个逻辑单元。详情请参见INSERT INTO语句。否则单击深度检查后,验证报错详情如下。
使用Entry point Main Arguments传参数时,需要传特殊字符,应该如何处理?
问题原因
使用Entry Point Main Arguments传参时,需要传特殊字符时,例如#$,使用反斜线(\)转义也无法识别,特殊字符出现会被丢弃。
解决方案
在作业运维页面单击目标作业名称,在运行参数配置区域其他配置中添加参数
env.java.opts: -Dconfig.disable-inline-comment=true
,具体操作请参见如何配置自定义的作业运行参数?。
为什么相同UDF JAR包在经过多次修改后上传失败?
问题原因
因为UDF里面限制了JAR包之间的类名不能重复。
解决方案
删除后重新上传。
在附加依赖文件中上传JAR包,并在代码中使用临时函数。临时函数使用方式详情请参见注册UDF。示例如下。
CREATE TEMPORARY FUNCTION `cp_record_reduce` AS 'com.taobao.test.udf.blink.CPRecordReduceUDF';
为什么使用POJO类作为UDTF返回类型时字段会出现“错位”?
问题详情
当使用POJO类作为UDTF返回类型,并在SQL中显式声明了UDTF返回列的别名列表(Alias Name)时,可能会出现字段错位(即使类型一致,但实际使用的字段可能与预期不符)问题。
例如,如果使用如下POJO类作为UDTF的返回类型,并根据自定义函数开发的要求进行打包并完成函数注册(这里使用作业级自定义函数注册方式)后,SQL校验会失败。
package com.aliyun.example; public class TestPojoWithoutConstructor { public int c; public String d; public boolean a; public String b; }
package com.aliyun.example; import org.apache.flink.table.functions.TableFunction; public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> { private static final long serialVersionUID = 1L; public void eval(String str1, Integer i2) { TestPojoWithoutConstructor p = new TestPojoWithoutConstructor(); p.d = str1 + "_d"; p.c = i2 + 2; p.b = str1 + "_b"; collect(p); } }
CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor'; CREATE TEMPORARY TABLE src ( id STRING, cnt INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE sink ( f1 INT, f2 STRING, f3 BOOLEAN, f4 STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);
SQL校验报错信息如下:
org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match. Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL. Hint: You will need to rewrite or cast the expression. Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING] Sink schema: [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING] at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)
看起来从UDTF返回的字段和POJO类中的字段可能错位了,SQL中字段c最终是BOOLEAN,而字段a是INT类型,和POJO类的定义恰好相反。
问题原因
根据POJO类的类型规则:
如果POJO类实现了有参构造函数,推导的返回类型会按构造函数的参数列表顺序。
如果POJO类缺少有参构造函数,就会按字段名的字典序重新排列。
在上述示例中,由于UDTF返回类型缺少有参构造函数,因此对应的返回类型为
BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d)
。虽然这一步并没有产生错误,但因为SQL中对返回字段加了重命名列表LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b)
,这导致对推导出的类型显式进行了重命名(基于字段位置进行映射),进而引发与POJO类中的字段错位问题,出现校验异常或非预期的数据错位问题。解决方案
POJO类缺少有参构造函数时,去掉对UDTF返回字段的显式重命名,如将上述SQL的INSERT语句改为:
-- POJO类无有参构造函数时,推荐显式选择需要的字段名,使用 T.* 时需要明确知晓实际返回的字段顺序。 SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
POJO类实现有参构造函数,以确定返回类型的字段顺序。这种情况下UDTF返回类型的字段顺序就是有参构造函数的参数顺序。
package com.aliyun.example; public class TestPojoWithConstructor { public int c; public String d; public boolean a; public String b; // Using specific fields order instead of alphabetical order public TestPojoWithConstructor(int c, String d, boolean a, String b) { this.c = c; this.d = d; this.a = a; this.b = b; } }
如何解决Flink依赖冲突问题?
问题现象
有明显报错,且引发错误的为Flink或Hadoop相关类。
java.lang.AbstractMethodError java.lang.ClassNotFoundException java.lang.IllegalAccessError java.lang.IllegalAccessException java.lang.InstantiationError java.lang.InstantiationException java.lang.InvocationTargetException java.lang.NoClassDefFoundError java.lang.NoSuchFieldError java.lang.NoSuchFieldException java.lang.NoSuchMethodError java.lang.NoSuchMethodException
无明显报错,但会引起一些不符合预期的现象,例如:
日志不输出或log4j配置不生效。
该类问题通常是由于依赖中携带了log4j相关配置导致的。需要排查作业JAR包中是否引入了log4j配置的依赖,可以通过在dependency中配置exclusions的方式去掉log4j配置。
说明如果必须要使用不同版本的log4j,需要使用maven-shade-plugin将log4j相关的类进行relocation。
RPC调用异常。
Flink的Akka RPC调用出现依赖冲突可能导致的异常,默认不会显示在日志中,需要开启Debug日志进行确认。
例如,Debug日志中出现
Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}
,但是JM日志在Registering TaskManager with ResourceID xxx
后,没有下文,直到资源请求超时报错NoResourceAvailableException
。此外TM持续报错Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}
。原因:开启Debug日志后,发现RPC调用报错
InvocationTargetException
,该报错导致TM Slot分配到一半失败出现状态不一致,RM持续尝试分配Slot失败无法恢复。
问题原因
作业JAR包中包含了不必要的依赖(例如基本配置、Flink、Hadoop和log4j依赖),造成依赖冲突从而引发各种问题。
作业需要的Connector对应的依赖未被打入JAR包中。
排查方法
查看作业pom.xml文件,判断是否存在不必要的依赖。
通过
jar tf foo.jar
命令查看作业JAR包内容,判断是否存在引发依赖冲突的内容。通过
mvn dependency:tree
命令查看作业的依赖关系,判断是否存在冲突的依赖。
解决方案
基本配置建议将scope全部设置为provided,即不打入作业JAR包。
DataStream Java
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
DataStream Scala
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
DataSet Java
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
DataSet Scala
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
添加作业需要的Connector对应的依赖,并将scope全部设置为compile(默认的scope是compile),即打入作业JAR包中,以Kafka Connector为例,代码如下。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency>
其他Flink、Hadoop和log4j依赖不建议添加。但是:
如果作业本身存在基本配置或Connector相关的直接依赖,建议将scope设置为provided,示例如下。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> </dependency>
如果作业存在基本配置或Connector相关的间接依赖,建议通过exclusion将依赖去掉,示例如下。
<dependency> <groupId>foo</groupId> <artifactId>bar</artifactId> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> </exclusions> </dependency>
报错:Could not parse type at position 50: expected but was . Input type string: ROW
报错详情
在SQL编辑器中编写SQL时,使用UDTF出现语法检查错误(红色波浪线)。
Caused by: org.apache.flink.table.api.ValidationException: Could not parse type at position 50: <IDENTIFIER> expected but was <KEYWORD>. Input type string: ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>
代码如下:
@FunctionHint( //input = @DataTypeHint("BYTES"), output = @DataTypeHint("ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>")) public class PointChangeMetaQPaser1 extends TableFunction<Row> { Logger logger = LoggerFactory.getLogger(this.getClass().getName()); public void eval(byte[] bytes) { try { String messageBody = new String(bytes, "UTF-8"); Map<String, String> resultDO = JSON.parseObject(messageBody, Map.class); logger.info("PointChangeMetaQPaser1 logger:" + JSON.toJSONString(resultDO)); collect(Row.of( getString(resultDO.get("resultId")), getString(resultDO.get("pointRange")), getString(resultDO.get("from")), getString(resultDO.get("to")), getString(resultDO.get("type")), getString(resultDO.get("pointScope")), getString(resultDO.get("userId")), getString(resultDO.get("point")), getString(resultDO.getOrDefault("triggerSource", "NULL")), getString(resultDO.getOrDefault("time", String.valueOf(System.currentTimeMillis()))), getString(resultDO.getOrDefault("uuid", String.valueOf(UUID.randomUUID()))) )); } catch (Exception e) { logger.error("PointChangeMetaQPaser1 error", e); } } private String getString(Object o) { if (o == null) { return null; } return String.valueOf(o); } }
报错原因
当使用DataTypeHint定义函数的数据类型时,系统保留的关键字被直接作为了字段名称。
解决方案
将变量名换成非关键字的名称,例如to换成fto,from换成ffrom等。
将已经用关键字取名的变量名加上反撇号(``)。