UDTF介绍
UDTF(User Defined Table Function)将输入扩展为多行。
接口
public abstract class UserDefinedFunction implements Serializable {
/**
* Init method for the user defined function.
*/
public void open(FunctionContext context) {
}
/**
* Close method for the user defined function.
*/
public void close() {
}
}
public abstract class UDTF extends UserDefinedFunction {
protected List<Object[]> collector;
public UDTF() {
this.collector = Lists.newArrayList();
}
/**
* Collect the result.
*/
protected void collect(Object[] output) {
}
/**
* Returns type output types for the function.
* @param paramTypes The parameter types of the function.
* @param outFieldNames The output fields of the function in the sql.
*/
public abstract List<Class<?>> getReturnType(List<Class<?>> paramTypes,
List<String> outFieldNames);
}
每个UDTF都应该有一个或多个eval方法。
示例
public class Split extends UDTF {
private String splitChar = ",";
public void eval(String text) {
evalInternal(text);
}
public void eval(String text, String separator) {
evalInternal(text, separator);
}
private void evalInternal(String... args) {
if (args != null && (args.length == 1 || args.length == 2)) {
if (args.length == 2 && StringUtils.isNotEmpty(args[1])) {
splitChar = args[1];
}
String[] lines = StringUtils.split(args[0], splitChar);
for (String line : lines) {
collect(new Object[]{line});
}
}
}
@Override
public List<Class<?>> getReturnType(List<Class<?>> paramTypes,
List<String> outputFields) {
return Collections.singletonList(String.class);
}
}
CREATE Function my_split AS 'com.antgroup.geaflow.dsl.udf.Split';
SELECT t.id, u.name FROM users u, LATERAL table(my_split(u.ids)) as t(id);