ClickHouse UDF介绍及实践

Tuesday, December 17, 2024

TOC

ClickHouse UDF 使用指南

背景

公司内部需要使用ClickHouse进行IP到运营商信息的查询,但ClickHouse本身不支持此功能。通过调研发现可以使用UDF(用户自定义函数)来实现。

原理

ClickHouse通过执行外部程序或脚本来处理数据。UDF在服务端执行,依赖于服务端环境,需要预先安装所需依赖。数据通过标准输入传递给UDF,UDF通过标准输出返回数据。用户通过SQL语句调用UDF,UDF可以返回基础数据、数组、元组、Map等数据类型。

常见的可以通过UDF实现的功能包括:IP转运营商、IP转地理位置、字符串处理、数据加密、数据解密等。

使用步骤

1. 定义UDF函数

首先,需要在XML配置文件中定义UDF函数,包括函数名、脚本名、参数、返回值等,并通过user_defined_executable_functions_config配置告知ClickHouse该配置文件的位置。

例如,下面是一个官方文档提供的简单UDF配置文件,它定义了一个名为test_function_python的UDF函数,该函数接受一个UInt64类型的参数,并返回一个String类型的值,执行的脚本为test_function.py

<!-- 放置在 `/etc/clickhouse-server/*_function.xml` -->
<functions>
    <function>
        <type>executable</type>
        <name>test_function_python</name>
        <return_type>String</return_type>
        <argument>
            <type>UInt64</type>
            <name>value</name>
        </argument>
        <format>TabSeparated</format>
        <command>test_function.py</command>
    </function>
</functions>

2. 编写UDF脚本

接下来,编写符合ClickHouse要求的UDF脚本,并将脚本文件放到user_scripts_path配置指定的目录下。确保脚本文件有可执行权限。

示例脚本:

# 放置在 `/etc/clickhouse-server/user_scripts/test_function.py`

#!/usr/bin/python3

import sys

if __name__ == '__main__':
    for line in sys.stdin:
        print("Value " + line, end='')
        sys.stdout.flush()

3. 测试UDF函数

完成上述步骤后,可以直接测试UDF函数,无需重启。

示例SQL查询:

SELECT test_function_python(toUInt64(2));

结果:

┌─test_function_python(2)─┐
│ Value 2                 │
└─────────────────────────┘

注意事项

在使用UDF时,需要注意以下几点:

  • 确认Python3环境安装正确,否则ClickHouse会报错DB::ErrnoException: Cannot write into pipe, errno: 32, strerror: Broken pipe: While executing TabSeparatedRowOutputFormat
  • 确认Python脚本有可执行权限,并且可以在命令行中正常执行。
  • 确认配置文件和脚本文件的路径正确。

示例

示例1:带参数的UDF

在这个示例中,我们定义了一个带参数的UDF函数。

<functions>
    <function>
        <type>executable</type>
        <execute_direct>true</execute_direct>
        <name>test_function_parameter_python</name>
        <return_type>String</return_type>
        <argument>
            <type>UInt64</type>
            <name>arg1</name>
        </argument>
        <format>TabSeparated</format>
        <command>test_function_parameter_python.py {param1:String}</command>
    </function>
</functions>

对应的Python脚本如下:

#!/usr/bin/python3

import sys

if __name__ == "__main__":
    param1 = sys.argv[1]
    for line in sys.stdin:
        print("Parameter: " + str(param1) + ", value: " + str(line), end="")
        sys.stdout.flush()

调用示例:

SELECT test_function_parameter_python('p1')(toUInt64(2));

结果:

┌─test_function_parameter_python('p1')(toUInt64(2))─┐
│ Parameter: 'p1', value: 2                         │
└───────────────────────────────────────────────────┘

示例2:返回Map的UDF

在这个示例中,我们定义了一个返回Map类型的UDF函数。

<functions>
    <function>
        <type>executable</type>
        <name>test_function_map_json</name>
        <return_type>Map(String, Integer)</return_type>
        <return_name>default</return_name>
        <argument>
            <type>UInt64</type>
            <name>argument_1</name>
        </argument>
        <argument>
            <type>UInt64</type>
            <name>argument_2</name>
        </argument>
        <format>JSONEachRow</format>
        <command>test_function_map_json.py</command>
    </function>
</functions>

对应的Python脚本如下:

#!/usr/bin/python3

import sys
import json

if __name__ == '__main__':
    for line in sys.stdin:
        value = json.loads(line)
        first_arg = int(value['argument_1'])
        second_arg = int(value['argument_2'])
        result = {'arg1': first_arg, 'arg2': second_arg, 'sum': first_arg + second_arg}
        print(json.dumps({'default': result}), end='\n')
        sys.stdout.flush()

调用示例:

SELECT test_function_map_json(3, 5);

结果:

┌─test_function_map_json(3, 5)─┐
│ {'arg1':3,'arg2':5,'sum':8}  │
└──────────────────────────────┘

编写UDF的建议

  • 使用Map作为返回值,可以返回更多信息。
  • 复杂的数据处理尽量不要抛出异常,应处理合适的数据行,并将异常信息作为返回值返回。
  • 性能优化建议:
    1. ClickHouse以Block为单位调用UDF,可以在UDF中通过并发提高性能,但需要确保stdout输出顺序与stdin输入顺序一致。
    2. type默认为executable,会为每个Block启动一个新进程。如果UDF初始化开销较大,可考虑使用executable_pool类型,减少进程启动和销毁开销。