Project Icon

phpClickHouse

PHP客户端库实现ClickHouse数据库高效操作

phpClickHouse是一个用于操作ClickHouse数据库的PHP客户端库。该项目支持异步查询、批量插入和HTTP压缩,无需额外依赖。通过简洁的API,开发者可高效地与ClickHouse交互,进行数据分析和处理。phpClickHouse适用于各种规模的项目,提供稳定可靠的性能。

PHP ClickHouse 封装

下载量 Packagist 许可证

特性

  • 无依赖,仅需 Curl (支持 php >=7.1)
  • 支持并行查询 (异步)
  • 从 CSV 文件异步批量插入
  • 支持 Http 压缩 (Gzip),用于批量插入
  • 查找活跃主机,检查集群
  • 支持 SELECT WHERE IN ( 本地 csv 文件 )
  • SQL 条件和模板
  • 获取表大小和数据库大小
  • 列出分区
  • 在集群中截断表
  • 插入数组作为列
  • 获取集群中的主节点副本
  • 获取所有节点的表大小
  • 异步获取 ClickHouse 进度函数
  • 流式读/写和闭包函数

俄文文章 habr.com 1 habr.com 2

使用 composer 安装

composer require smi2/phpclickhouse

在 PHP 中使用


// 引入自动加载
$db = new ClickHouseDB\Client(['配置数组']);

if (!$db->ping()) echo '连接错误';

最新稳定版本支持

  • php 5.6 <= 1.1.2
  • php 7.2 <= 1.3.10
  • php 7.3 >= 1.4.x

Packagist

开始使用

连接并选择数据库:

$config = [
    'host' => '192.168.1.1',
    'port' => '8123',
    'username' => 'default',
    'password' => '',
    'https' => true
];
$db = new ClickHouseDB\Client($config);
$db->database('default');
$db->setTimeout(1.5);      // 1 秒,仅支持整数值
$db->setTimeout(10);       // 10 秒
$db->setConnectTimeOut(5); // 5 秒
$db->ping(true); // 如果无法连接则抛出异常

显示表:

print_r($db->showTables());

创建表:

$db->write('
    CREATE TABLE IF NOT EXISTS summing_url_views (
        event_date Date DEFAULT toDate(event_time),
        event_time DateTime,
        site_id Int32,
        site_key String,
        views Int32,
        v_00 Int32,
        v_55 Int32
    )
    ENGINE = SummingMergeTree(event_date, (site_id, site_key, event_time, event_date), 8192)
');

显示创建表语句:

echo $db->showCreateTable('summing_url_views');

插入数据:

$stat = $db->insert('summing_url_views',
    [
        [time(), 'HASH1', 2345, 22, 20, 2],
        [time(), 'HASH2', 2345, 12, 9,  3],
        [time(), 'HASH3', 5345, 33, 33, 0],
        [time(), 'HASH3', 5345, 55, 0, 55],
    ],
    ['event_time', 'site_key', 'site_id', 'views', 'v_00', 'v_55']
);

如果需要插入 UInt64 值,可以使用 ClickHouseDB\Type\UInt64 DTO 包装该值。

$statement = $db->insert('table_name',
    [
        [time(), UInt64::fromString('18446744073709551615')],
    ],
    ['event_time', 'uint64_type_column']
);
UInt64::fromString('18446744073709551615')

查询:

$statement = $db->select('SELECT * FROM summing_url_views LIMIT 2');

使用 Statement:

// 获取选择的行数
$statement->count();

// 获取总行数
$statement->countAll();

// 获取一行
$statement->fetchOne();

// 获取极值最小值
print_r($statement->extremesMin());

// 获取总计行
print_r($statement->totals());

// 获取所有结果
print_r($statement->rows());

// 获取总请求时间
print_r($statement->totalTimeRequest());

// 获取原始 JsonDecode 数组,以节省内存
print_r($statement->rawData());

// 获取原始 curl_info 响应
print_r($statement->responseInfo());

// 获取人类可读的大小信息
print_r($statement->info());

// 如果 clickhouse-server 版本 >= 54011
$db->settings()->set('output_format_write_statistics',true);
print_r($statement->statistics());


// Statement 迭代器
$state=$this->client->select('SELECT (number+1) as nnums FROM system.numbers LIMIT 5');
foreach ($state as $key=>$value) {
    echo $value['nnums'];
}

将查询结果作为树形结构:

$statement = $db->select('
    SELECT event_date, site_key, sum(views), avg(views)
    FROM summing_url_views
    WHERE site_id < 3333
    GROUP BY event_date, url_hash
    WITH TOTALS
');

print_r($statement->rowsAsTree('event_date.site_key'));

/*
(
    [2016-07-18] => Array
        (
            [HASH2] => Array
                (
                    [event_date] => 2016-07-18
                    [url_hash] => HASH2
                    [sum(views)] => 12
                    [avg(views)] => 12
                )
            [HASH1] => Array
                (
                    [event_date] => 2016-07-18
                    [url_hash] => HASH1
                    [sum(views)] => 22
                    [avg(views)] => 22
                )
        )
)
*/

删除表:

$db->write('DROP TABLE IF EXISTS summing_url_views');

特性

并行查询 (异步)

$state1 = $db->selectAsync('SELECT 1 as ping');
$state2 = $db->selectAsync('SELECT 2 as ping');

// 执行
$db->executeAsync();

// 结果
print_r($state1->rows());
print_r($state2->fetchOne('ping'));

从 CSV 文件并行批量插入

$file_data_names = [
    '/tmp/clickHouseDB_test.1.data',
    '/tmp/clickHouseDB_test.2.data',
    '/tmp/clickHouseDB_test.3.data',
    '/tmp/clickHouseDB_test.4.data',
    '/tmp/clickHouseDB_test.5.data',
];

// 插入所有文件
$stat = $db->insertBatchFiles(
    'summing_url_views',
    $file_data_names,
    ['event_time', 'site_key', 'site_id', 'views', 'v_00', 'v_55']
);

并行错误处理

不使用 executeAsync 的 selectAsync

$select = $db->selectAsync('SELECT * FROM summing_url_views LIMIT 1');
$insert = $db->insertBatchFiles('summing_url_views', ['/tmp/clickHouseDB_test.1.data'], ['event_time']);
// 抛出 'Exception',错误信息为 'Queue must be empty, before insertBatch, need executeAsync'

参见 example/exam5_error_async.php

Gzip 和 enable_http_compression

即时读取 CSV 文件并使用 zlib.deflate 压缩。

$db->settings()->max_execution_time(200);
$db->enableHttpCompression(true);

$result_insert = $db->insertBatchFiles('summing_url_views', $file_data_names, [...]);


foreach ($result_insert as $fileName => $state) {
    echo $fileName . ' => ' . json_encode($state->info_upload()) . PHP_EOL;
}

参见速度测试 example/exam08_http_gzip_batch_insert.php

最大执行时间

$db->settings()->max_execution_time(200); // 秒

不带端口的连接

$config['host']='blabla.com';
$config['port']=0;
// getUri() === 'http://blabla.com'


$config['host']='blabla.com/urls';
$config['port']=8765;
// getUri() === 'http://blabla.com/urls'

$config['host']='blabla.com:2224';
$config['port']=1234;
// getUri() === 'http://blabla.com:2224'





表大小和数据库大小

结果以人类可读的格式显示

print_r($db->databaseSize());
print_r($db->tablesSize());
print_r($db->tableSize('summing_partions_views'));

分区

$count_result = 2;
print_r($db->partitions('summing_partions_views', $count_result));

使用本地 CSV 文件进行 SELECT WHERE IN

$file_name_data1 = '/tmp/temp_csv.txt'; // 两列文件 [int,string]
$whereIn = new \ClickHouseDB\Query\WhereInFile();
$whereIn->attachFile($file_name_data1, 'namex', ['site_id' => 'Int32', 'site_hash' => 'String'], \ClickHouseDB\Query\WhereInFile::FORMAT_CSV);
$result = $db->select($sql, [], $whereIn);

// 参见 example/exam7_where_in.php

绑定

绑定:

$date1 = new DateTime("now"); // DateTimeInterface

$Bindings = [
  'select_date' => ['2000-10-10', '2000-10-11', '2000-10-12'],
  'datetime'=>$date,
  'limit' => 5,
  'from_table' => 'table'
];

$statement = $db->selectAsync("SELECT FROM {from_table} WHERE datetime=:datetime limit {limit}", $Bindings);

// 在 {KEY} 中双重绑定
$keys=[
            'A'=>'{B}',
            'B'=>':C',
            'C'=>123,
            'Z'=>[':C',':B',':C']
        ];
$this->client->selectAsync('{A} :Z', $keys)->sql() // ==   "123 ':C',':B',':C' FORMAT JSON",


简单的 SQL 条件和模板

条件已被弃用,如需使用: $db->enableQueryConditions();

使用 QueryConditions 的示例:


$db->enableQueryConditions();

$input_params = [
  'select_date' => ['2000-10-10', '2000-10-11', '2000-10-12'],
  'limit' => 5,
  'from_table' => 'table'
];

$select = '
    SELECT * FROM {from_table}
    WHERE
    {if select_date}
        event_date IN (:select_date)
    {else}
        event_date=today()
    {/if}
    {if limit}
    LIMIT {limit}
    {/if}
';

$statement = $db->selectAsync($select, $input_params);
echo $statement->sql();

/*
SELECT * FROM table
WHERE
event_date IN ('2000-10-10','2000-10-11','2000-10-12')
LIMIT 5
FORMAT JSON
*/

$input_params['select_date'] = false;
$statement = $db->selectAsync($select, $input_params);
echo $statement->sql();

/*
SELECT * FROM table
WHERE
event_date=today()
LIMIT 5
FORMAT JSON
*/

$state1 = $db->selectAsync(
    'SELECT 1 as {key} WHERE {key} = :value',
    ['key' => 'ping', 'value' => 1]
);

// SELECT 1 as ping WHERE ping = "1"

自定义查询退化示例见 exam16_custom_degeneration.php

SELECT {ifint VAR} result_if_intval_NON_ZERO{/if}
SELECT {ifint VAR} result_if_intval_NON_ZERO {else} BLA BLA{/if}

设置

设置任何参数的三种方式

// 在配置数组中
$config = [
    'host' => 'x',
    'port' => '8123',
    'username' => 'x',
    'password' => 'x',
    'settings' => ['max_execution_time' => 100]
];
$db = new ClickHouseDB\Client($config);

// 通过构造函数设置
$config = [
    'host' => 'x',
    'port' => '8123',
    'username' => 'x',
    'password' => 'x'
];
$db = new ClickHouseDB\Client($config, ['max_execution_time' => 100]);

// 使用 set 方法
$config = [
    'host' => 'x',
    'port' => '8123',
    'username' => 'x',
    'password' => 'x'
];
$db = new ClickHouseDB\Client($config);
$db->settings()->set('max_execution_time', 100);

// 使用 apply 数组方法
$db->settings()->apply([
    'max_execution_time' => 100,
    'max_block_size' => 12345
]);

// 检查
if ($db->settings()->getSetting('max_execution_time') !== 100) {
    throw new Exception('设置工作异常');
}

// 参

streamRead 类似于 WriteToFile

$stream = fopen('php://memory','r+');
$streamRead=new ClickHouseDB\Transport\StreamRead($stream);

$r=$client->streamRead($streamRead,'SELECT sin(number) as sin,cos(number) as cos FROM {table_name} LIMIT 4 FORMAT JSONEachRow', ['table_name'=>'system.numbers']);
rewind($stream);
while (($buffer = fgets($stream, 4096)) !== false) {
    echo ">>> ".$buffer;
}
fclose($stream); // 需要关闭流

// 发送到闭包

$stream = fopen('php://memory','r+');
$streamRead=new ClickHouseDB\Transport\StreamRead($stream);
$callable = function ($ch, $string) use ($stream) {
    // 对 _BLOCK_ 数据进行一些魔法处理
    fwrite($stream, str_ireplace('"sin"','"max"',$string));
    return strlen($string);
};

$streamRead->closure($callable);

$r=$client->streamRead($streamRead,'SELECT sin(number) as sin,cos(number) as cos FROM {table_name} LIMIT 44 FORMAT JSONEachRow', ['table_name'=>'system.numbers']);

插入关联批量数据

 $oneRow = [
            'one' => 1,
            'two' => 2,
            'thr' => 3,
            ];
            $failRow = [
                'two' => 2,
                'one' => 1,
                'thr' => 3,
            ];

$db->insertAssocBulk([$oneRow, $oneRow, $failRow])

认证方法

   AUTH_METHOD_HEADER       = 1;
   AUTH_METHOD_QUERY_STRING = 2;
   AUTH_METHOD_BASIC_AUTH   = 3;

在配置中设置 auth_method

$config=[
    'host'=>'host.com',
    //...
    'auth_method'=>1,
];

进度函数

// 应用函数

$db->progressFunction(function ($data) {
    echo "调用函数:".json_encode($data)."\n";
});
$st=$db->select('SELECT number,sleep(0.2) FROM system.numbers limit 5');


// 打印
// ...
// 调用函数:{"read_rows":"2","read_bytes":"16","total_rows":"0"}
// 调用函数:{"read_rows":"3","read_bytes":"24","total_rows":"0"}
// ...

ssl CA

$config = [
    'host' => 'cluster.clickhouse.dns.com', // 集群中任意节点名称
    'port' => '8123',
    'sslCA' => '...', 
];

集群


$config = [
    'host' => 'cluster.clickhouse.dns.com', // 集群中任意节点名称
    'port' => '8123',
    'username' => 'default', // 所有节点使用相同的登录名和密码
    'password' => ''
];


// 客户端首先通过DNS连接第一个节点,读取IP列表,然后连接所有节点以检查是否正常

$cl = new ClickHouseDB\Cluster($config);
$cl->setScanTimeOut(2.5); // 2500毫秒,每个节点的最大连接时间

// 检查复制状态是否正常
if (!$cl->isReplicasIsOk())
{
    throw new Exception('复制状态异常,错误='.$cl->getError());
}

// 获取节点数组和集群
print_r($cl->getNodes());
print_r($cl->getClusterList());


// 通过集群获取节点
$name='some_cluster_name';
print_r($cl->getClusterNodes($name));

// 获取数量
echo "> 分片数   = ".$cl->getClusterCountShard($name)."\n";
echo "> 副本数 = ".$cl->getClusterCountReplica($name)."\n";

// 通过表获取节点并打印每个节点的大小
$nodes=$cl->getNodesByTable('shara.adpreview_body_views_sharded');
foreach ($nodes as $node)
{
    echo "$node > \n";
    // 选择一个节点
    print_r($cl->client($node)->tableSize('adpreview_body_views_sharded'));
    print_r($cl->client($node)->showCreateTable('shara.adpreview_body_views'));
}

// 操作单个节点

// 通过IP选择,如 "*.248*" = `123.123.123.248`,分隔符 ';',如果未找到则选择第一个节点
$cli=$cl->clientLike($name,'.298;.964'); // 首先查找 .298 然后 .964,结果是 ClickHouseDB\Client

$cli->ping();

// 在集群上截断表
$result=$cl->truncateTable('dbNane.TableName_sharded');

// 获取一个活跃节点(随机)
$cl->activeClient()->setTimeout(500);
$cl->activeClient()->write("DROP TABLE IF EXISTS default.asdasdasd ON CLUSTER cluster2");

// 查找 `is_leader` 节点
$cl->getMasterNodeForTable('dbNane.TableName_sharded');

// 错误
var_dump($cl->getError());

返回极值

$db->enableExtremes(true);

启用查询日志

您可以在ClickHouse中记录所有查询

$db->enableLogQueries();
$db->select('SELECT 1 as p');
print_r($db->select('SELECT * FROM system.query_log')->rows());

检查是否存在

$db->isExists($database,$table);

调试和详细输出

$db->verbose();

详细输出到文件|流:

    // 初始化客户端
    $cli = new Client($config);
    $cli->verbose();
    // 临时流
    $stream = fopen('php://memory', 'r+');
    // 设置流到curl
    $cli->transport()->setStdErrOut($stream);
    // 执行curl
    $st=$cli->select('SElect 1 as ppp');
    $st->rows();
    // 倒回 
    fseek($stream,0,SEEK_SET);
    
    // 输出
    echo stream_get_contents($stream);

开发和PHPUnit测试

  • 不要忘记运行composer install。它应该设置PSR-4自动加载。
  • 然后您可以简单地运行vendor/bin/phpunit,它应该输出以下内容
cp phpunit.xml.dist phpunit.xml
mcedit phpunit.xml

在phpunit.xml中编辑常量:

<php>
    <env name="CLICKHOUSE_HOST" value="127.0.0.1" />
    <env name="CLICKHOUSE_PORT" value="8123" />
    <env name="CLICKHOUSE_USER" value="default" />
    <env name="CLICKHOUSE_DATABASE" value="phpChTestDefault" />
    <env name="CLICKHOUSE_PASSWORD" value="" />
    <env name="CLICKHOUSE_TMPPATH" value="/tmp" />
</php>

运行docker ClickHouse服务器

cd ./tests
docker-compose up

运行测试


./vendor/bin/phpunit

./vendor/bin/phpunit --group ClientTest

./vendor/bin/phpunit --group ClientTest --filter testInsertNestedArray

./vendor/bin/phpunit --group ConditionsTest

运行PHPStan

# 主要
./vendor/bin/phpstan analyse src tests --level 7
# 仅src
./vendor/bin/phpstan analyse src --level 7

# 示例
./vendor/bin/phpstan analyse example -a ./example/Helper.php

许可证

MIT

更新日志

请参阅 changeLog.md

项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

吐司

探索Tensor.Art平台的独特AI模型,免费访问各种图像生成与AI训练工具,从Stable Diffusion等基础模型开始,轻松实现创新图像生成。体验前沿的AI技术,推动个人和企业的创新发展。

Project Cover

SubCat字幕猫

SubCat字幕猫APP是一款创新的视频播放器,它将改变您观看视频的方式!SubCat结合了先进的人工智能技术,为您提供即时视频字幕翻译,无论是本地视频还是网络流媒体,让您轻松享受各种语言的内容。

Project Cover

美间AI

美间AI创意设计平台,利用前沿AI技术,为设计师和营销人员提供一站式设计解决方案。从智能海报到3D效果图,再到文案生成,美间让创意设计更简单、更高效。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号