Spark UDF实现demo
1 前言
使用Spark开发代码过程时,很多时候当前库中的算子不能满足业务需求。此时,UDFs(user defined functions) 派上非常大的作用。基于DataFrame(或者DataSet) 的Java(或Python、Scale) 可以轻松的定义注册UDF,但是想在SQL(SparkSQL、Hive) 中自定义或者想共用就遇到困难。这时,可以先按照一定规约自定义函数,再向Spark(或Hive)注册为永久函数,实现在Spark和Hive共享UDF的目的。
2 具体实现
根据官网1,可以知道,要实现UDF,至少需要继承UDAF、AbstractGenericUDAFResolver、GenericUDF、 GenericUDTF、UserDefinedAggregateFunction中的一个。如下已继承UDF为列进行说明:</br>
整体的实现包括两部:
- 继承父类开发UDF
- 注册UDF
2.1 继承父类开发UDF
2.1.1 基于java实现2
maven工程的pom.xml
代码语言:txt复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sogo</groupId>
<artifactId>sparkudf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spark.version>2.3.1</spark.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.16</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.4</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>一个类实现一个evaluate方法,定义一个UDF</br>
类中的main仅用于测试,打包前请先注解掉</br>
StringLengthUdf.java
代码语言:txt复制package com.sogo.sparkudf.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan@sogou-inc.com
* @Date: 2020/7/26
* @Time: 23:54
* @des:
*/
public class StringLengthUdf extends UDF {
// 默认调用 "evaluate" 方法
public int evaluate(String str) {
if (null == str) {
return 0;
} else {
return str.length();
}
}
// public static void main(String[] args) {
// StringLengthUdf stringLengthUdf = new StringLengthUdf();
// String str = "test";
// System.out.println("out:" stringLengthUdf.evaluate(str));
// }
}UDF输入多个参数 StringContainUdf.java
代码语言:txt复制package com.sogo.sparkudf.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.io.Serializable;
public class StringContainUdf extends UDF implements Serializable {
// 修改evaluate的形参,满足UDF不同输入参数及类型的场景
public Boolean evaluate(String s1, String s2) {
if (null == s1 || null == s2) {
return false;
} else return s1.contains(s2);
}
}2. 注册UDF
2.1 语法
2.1.1 通用语法
代码语言:txt复制CREATE [ OR REPLACE ] [ TEMPORARY ] FUNCTION [ IF NOT EXISTS ] function_name AS class_name [ resource_locations ]2.1.2 基于jar的语法
代码语言:txt复制CREATE OR REPLACE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf' USING JAR 'file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar';2.2 注册
实验发现,在SparkSQL中注册的UDF需要在Hive客户端再次启动时生效;而在Hive中注册的UDF立即在SparkSQL中生效。
有时明明注册了UDF,客户端也重新连接了,但依然找不到UDF,可能是不在同一数据库,这点也需要重点关注下。</br>
2.2.1 查看已注册的functions
代码语言:txt复制# 查看已注册的function(hive、SparkSQL)
show functions;
## 查看已注册的UDF(SparkSQL)
show user functions;2.2.2 在Hive中注册
代码语言:txt复制# 进入hive环境(若没有指定数据库,UDF将归当前数据库所有)
> hive
# 添加jar包
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
# 注册为临时UDF
CREATE TEMPORARY FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 注册为永久UDF
CREATE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 更新永久UDF(这种方法在hive中不可用)
CREATE OR REPLACE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 不更新,类似追加的方式
CREATE FUNCTION IF NOT EXISTS strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';每次添加显得麻烦,我们可以把将注册语句写入脚本,在进入hive前让它初始化。我们在配置SparkSQL时将这样做。
2.2.3 在SparkSQL中注册
在SparkSQL中,可以采用在Hive中注册的方法。下面采用初始化配置sql脚本的方式说明。</br>
.bashrc配置
代码语言:txt复制alias spark_sql="/opt/spark/bin/spark-sql
--master yarn
--deploy-mode client
--driver-memory 4G
--executor-memory 10G
--num-executors 80
--executor-cores 4
--name 'pyspark_cluster_lzx'
--queue adx_online
--database bigdata_lzx
--conf spark.dynamicAllocation.minExecutors=40
--conf spark.dynamicAllocation.maxExecutors=80
--conf spark.default.parallelism=1200
--conf spark.sql.shuffle.partitions=1200
--conf spark.eventLog.enabled=true
--conf spark.sql.autoBroadcastJoinThreshold=104857600
--hiveconf spark.hadoop.hive.cli.print.current.db=true
--hiveconf spark.hadoop.hive.cli.print.header=true
--hiveconf spark.hadoop.hive.resultset.use.unique.column.names=false
--jars file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
-i /search/work/bigdata/liuzhixuan/sparkudf/spark_udf.sql"注:--jars参数添加UDF的java实现到集群</br>
-i参数为预执行的代码</br>
spark_udf.sql
代码语言:txt复制CREATE OR REPLACE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf' USING JAR 'file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar';执行
代码语言:txt复制> spark_sql
> show user functions;结果
代码语言:txt复制spark-sql (default)> show user functions;
function
bigdata_lzx.strlen_udf_int
Time taken: 0.549 seconds, Fetched 1 row(s)
spark-sql (default)> select strlen_udf_int("liu");
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
Added [file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar]
bigdata_lzx.strlen_udf_int(liu)
3参考文献:
1. CREATE FUNCTION https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-ddl-create-function.html </br>
2. Hive UDF函数开发使用样例 https://sjq597.github.io/2015/11/25/Hive-UDF函数开发使用样例/


