TOC
ClickHouse UDF 使用指南
背景
公司内部需要使用ClickHouse进行IP到运营商信息的查询,但ClickHouse本身不支持此功能。通过调研发现可以使用UDF(用户自定义函数)来实现。
原理
ClickHouse通过执行外部程序或脚本来处理数据。UDF在服务端执行,依赖于服务端环境,需要预先安装所需依赖。数据通过标准输入传递给UDF,UDF通过标准输出返回数据。用户通过SQL语句调用UDF,UDF可以返回基础数据、数组、元组、Map等数据类型。
常见的可以通过UDF实现的功能包括:IP转运营商、IP转地理位置、字符串处理、数据加密、数据解密等。
使用步骤
1. 定义UDF函数
首先,需要在XML配置文件中定义UDF函数,包括函数名、脚本名、参数、返回值等,并通过user_defined_executable_functions_config
配置告知ClickHouse该配置文件的位置。
例如,下面是一个官方文档提供的简单UDF配置文件,它定义了一个名为test_function_python
的UDF函数,该函数接受一个UInt64类型的参数,并返回一个String类型的值,执行的脚本为test_function.py
。
<!-- 放置在 `/etc/clickhouse-server/*_function.xml` -->
<functions>
<function>
<type>executable</type>
<name>test_function_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
<name>value</name>
</argument>
<format>TabSeparated</format>
<command>test_function.py</command>
</function>
</functions>
2. 编写UDF脚本
接下来,编写符合ClickHouse要求的UDF脚本,并将脚本文件放到user_scripts_path
配置指定的目录下。确保脚本文件有可执行权限。
示例脚本:
# 放置在 `/etc/clickhouse-server/user_scripts/test_function.py`
#!/usr/bin/python3
import sys
if __name__ == '__main__':
for line in sys.stdin:
print("Value " + line, end='')
sys.stdout.flush()
3. 测试UDF函数
完成上述步骤后,可以直接测试UDF函数,无需重启。
示例SQL查询:
SELECT test_function_python(toUInt64(2));
结果:
┌─test_function_python(2)─┐
│ Value 2 │
└─────────────────────────┘
注意事项
在使用UDF时,需要注意以下几点:
- 确认Python3环境安装正确,否则ClickHouse会报错
DB::ErrnoException: Cannot write into pipe, errno: 32, strerror: Broken pipe: While executing TabSeparatedRowOutputFormat
。 - 确认Python脚本有可执行权限,并且可以在命令行中正常执行。
- 确认配置文件和脚本文件的路径正确。
示例
示例1:带参数的UDF
在这个示例中,我们定义了一个带参数的UDF函数。
<functions>
<function>
<type>executable</type>
<execute_direct>true</execute_direct>
<name>test_function_parameter_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
<name>arg1</name>
</argument>
<format>TabSeparated</format>
<command>test_function_parameter_python.py {param1:String}</command>
</function>
</functions>
对应的Python脚本如下:
#!/usr/bin/python3
import sys
if __name__ == "__main__":
param1 = sys.argv[1]
for line in sys.stdin:
print("Parameter: " + str(param1) + ", value: " + str(line), end="")
sys.stdout.flush()
调用示例:
SELECT test_function_parameter_python('p1')(toUInt64(2));
结果:
┌─test_function_parameter_python('p1')(toUInt64(2))─┐
│ Parameter: 'p1', value: 2 │
└───────────────────────────────────────────────────┘
示例2:返回Map的UDF
在这个示例中,我们定义了一个返回Map类型的UDF函数。
<functions>
<function>
<type>executable</type>
<name>test_function_map_json</name>
<return_type>Map(String, Integer)</return_type>
<return_name>default</return_name>
<argument>
<type>UInt64</type>
<name>argument_1</name>
</argument>
<argument>
<type>UInt64</type>
<name>argument_2</name>
</argument>
<format>JSONEachRow</format>
<command>test_function_map_json.py</command>
</function>
</functions>
对应的Python脚本如下:
#!/usr/bin/python3
import sys
import json
if __name__ == '__main__':
for line in sys.stdin:
value = json.loads(line)
first_arg = int(value['argument_1'])
second_arg = int(value['argument_2'])
result = {'arg1': first_arg, 'arg2': second_arg, 'sum': first_arg + second_arg}
print(json.dumps({'default': result}), end='\n')
sys.stdout.flush()
调用示例:
SELECT test_function_map_json(3, 5);
结果:
┌─test_function_map_json(3, 5)─┐
│ {'arg1':3,'arg2':5,'sum':8} │
└──────────────────────────────┘
编写UDF的建议
- 使用Map作为返回值,可以返回更多信息。
- 复杂的数据处理尽量不要抛出异常,应处理合适的数据行,并将异常信息作为返回值返回。
- 性能优化建议:
- ClickHouse以Block为单位调用UDF,可以在UDF中通过并发提高性能,但需要确保stdout输出顺序与stdin输入顺序一致。
type
默认为executable,会为每个Block启动一个新进程。如果UDF初始化开销较大,可考虑使用executable_pool
类型,减少进程启动和销毁开销。