Project Icon

influxdb-client-python

InfluxDB 2.x和Flux的Python客户端库

influxdb-client-python是InfluxDB 2.x和Flux的官方Python客户端库。该库支持数据写入和查询,具备同步、异步和批量写入功能,并能使用Flux语言进行灵活查询。它还包含InfluxDB 2.0 API客户端,可管理组织、用户和存储桶等。适用于Python 3.7+,提供详细文档和示例代码方便开发者使用。

influxdb-client-python

CircleCI codecov CI status PyPI package Anaconda.org package Supported Python versions Documentation status Slack Status

This repository contains the Python client library for use with InfluxDB 2.x and Flux. InfluxDB 3.x users should instead use the lightweight v3 client library. InfluxDB 1.x users should use the v1 client library.

For ease of migration and a consistent query and write experience, v2 users should consider using InfluxQL and the v1 client library.

The API of the influxdb-client-python is not the backwards-compatible with the old one - influxdb-python.

Documentation

This section contains links to the client library documentation.

InfluxDB 2.0 client features

Installation

InfluxDB python library uses RxPY - The Reactive Extensions for Python (RxPY).

Python 3.7 or later is required.

:warning:

It is recommended to use ciso8601 with client for parsing dates. ciso8601 is much faster than built-in Python datetime. Since it's written as a C module the best way is build it from sources:

Windows:

You have to install Visual C++ Build Tools 2015 to build ciso8601 by pip.

conda:

Install from sources: conda install -c conda-forge/label/cf202003 ciso8601.

pip install

The python package is hosted on PyPI, you can install latest version directly:

pip install 'influxdb-client[ciso]'

Then import the package:

import influxdb_client

If your application uses async/await in Python you can install with the async extra:

$ pip install influxdb-client[async]

For more info see How to use Asyncio.

Setuptools

Install via Setuptools.

python setup.py install --user

(or sudo python setup.py install to install the package for all users)

Getting Started

Please follow the Installation and then run the following:

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "my-bucket"

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)

write_api.write(bucket=bucket, record=p)

## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for table in tables:
    print(table)
    for row in table.records:
        print (row.values)


## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
    for cell in row:
        val_count += 1

Client configuration

Via File

A client can be configured via *.ini file in segment influx2.

The following options are supported:

  • url - the url to connect to InfluxDB
  • org - default destination organization for writes and queries
  • token - the token to use for the authorization
  • timeout - socket timeout in ms (default value is 10000)
  • verify_ssl - set this to false to skip verifying SSL certificate when calling API from https server
  • ssl_ca_cert - set this to customize the certificate file to verify the peer
  • cert_file - path to the certificate that will be used for mTLS authentication
  • cert_key_file - path to the file contains private key for mTLS certificate
  • cert_key_password - string or function which returns password for decrypting the mTLS private key
  • connection_pool_maxsize - set the number of connections to save that can be reused by urllib3
  • auth_basic - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
  • profilers - set the list of enabled Flux profilers
self.client = InfluxDBClient.from_config_file("config.ini")
[influx2]
url=http://localhost:8086
org=my-org
token=my-token
timeout=6000
verify_ssl=False

Via Environment Properties

A client can be configured via environment properties.

Supported properties are:

  • INFLUXDB_V2_URL - the url to connect to InfluxDB
  • INFLUXDB_V2_ORG - default destination organization for writes and queries
  • INFLUXDB_V2_TOKEN - the token to use for the authorization
  • INFLUXDB_V2_TIMEOUT - socket timeout in ms (default value is 10000)
  • INFLUXDB_V2_VERIFY_SSL - set this to false to skip verifying SSL certificate when calling API from https server
  • INFLUXDB_V2_SSL_CA_CERT - set this to customize the certificate file to verify the peer
  • INFLUXDB_V2_CERT_FILE - path to the certificate that will be used for mTLS authentication
  • INFLUXDB_V2_CERT_KEY_FILE - path to the file contains private key for mTLS certificate
  • INFLUXDB_V2_CERT_KEY_PASSWORD - string or function which returns password for decrypting the mTLS private key
  • INFLUXDB_V2_CONNECTION_POOL_MAXSIZE - set the number of connections to save that can be reused by urllib3
  • INFLUXDB_V2_AUTH_BASIC - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
  • INFLUXDB_V2_PROFILERS - set the list of enabled Flux profilers
self.client = InfluxDBClient.from_env_properties()

Profile query

The Flux Profiler package provides performance profiling tools for Flux queries and operations.

You can enable printing profiler information of the Flux query in client library by:

  • set QueryOptions.profilers in QueryApi,
  • set INFLUXDB_V2_PROFILERS environment variable,
  • set profilers option in configuration file.

When the profiler is enabled, the result of flux query contains additional tables "profiler/". In order to have consistent behaviour with enabled/disabled profiler, FluxCSVParser excludes "profiler/" measurements from result.

Example how to enable profilers using API:

q = '''
    from(bucket: stringParam)
      |> range(start: -5m, stop: now())
      |> filter(fn: (r) => r._measurement == "mem")
      |> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
      |> aggregateWindow(every: 1m, fn: mean)
      |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
p = {
    "stringParam": "my-bucket",
}

query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
csv_result = query_api.query(query=q, params=p)

Example of a profiler output:

===============
Profiler: query
===============

from(bucket: stringParam)
  |> range(start: -5m, stop: now())
  |> filter(fn: (r) => r._measurement == "mem")
  |> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
  |> aggregateWindow(every: 1m, fn: mean)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

========================
Profiler: profiler/query
========================
result              : _profiler
table               : 0
_measurement        : profiler/query
TotalDuration       : 8924700
CompileDuration     : 350900
QueueDuration       : 33800
PlanDuration        : 0
RequeueDuration     : 0
ExecuteDuration     : 8486500
Concurrency         : 0
MaxAllocated        : 2072
TotalAllocated      : 0
flux/query-plan     :

digraph {
  ReadWindowAggregateByTime11
  // every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"
  pivot8
  generated_yield

  ReadWindowAggregateByTime11 -> pivot8
  pivot8 -> generated_yield
}


influxdb/scanned-bytes: 0
influxdb/scanned-values: 0

===========================
Profiler: profiler/operator
===========================
result              : _profiler
table               : 1
_measurement        : profiler/operator
Type                : *universe.pivotTransformation
Label               : pivot8
Count               : 3
MinDuration         : 32600
MaxDuration         : 126200
DurationSum         : 193400
MeanDuration        : 64466.666666666664

===========================
Profiler: profiler/operator
===========================
result              : _profiler
table               : 1
_measurement        : profiler/operator
Type                : *influxdb.readWindowAggregateSource
Label               : ReadWindowAggregateByTime11
Count               : 1
MinDuration         : 940500
MaxDuration         : 940500
DurationSum         : 940500
MeanDuration        : 940500.0

You can also use callback function to get profilers output. Return value of this callback is type of FluxRecord.

Example how to use profilers with callback:

class ProfilersCallback(object):
   def __init__(self):
       self.records = []

   def __call__(self, flux_record):
       self.records.append(flux_record.values)

callback = ProfilersCallback()

query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for profiler in callback.records:
   print(f'Custom processing of profiler result: {profiler}')

Example output of this callback:

Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n  ReadRange2\r\n  generated_yield\r\n\r\n  ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}
Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}

How to use

Writes

The WriteApi supports synchronous, asynchronous and batching writes into InfluxDB 2.0. The data should be passed as a InfluxDB Line Protocol, Data Point or Observable stream.

:warning:

The WriteApi in batching mode (default mode) is supposed to run as a singleton. To flush all your data you should wrap the execution using with client.write_api(...) as write_api: statement or call write_api.close() at the end of your script.

The default instance of WriteApi use batching.

The data could be written as

  1. string or bytes that is formatted as a InfluxDB's line protocol
  2. Data Point structure
  3. Dictionary style mapping with keys: measurement, tags, fields and time or custom structure
  4. NamedTuple
  5. [Data
项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

白日梦AI

白日梦AI提供专注于AI视频生成的多样化功能,包括文生视频、动态画面和形象生成等,帮助用户快速上手,创造专业级内容。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

讯飞绘镜

讯飞绘镜是一个支持从创意到完整视频创作的智能平台,用户可以快速生成视频素材并创作独特的音乐视频和故事。平台提供多样化的主题和精选作品,帮助用户探索创意灵感。

Project Cover

讯飞文书

讯飞文书依托讯飞星火大模型,为文书写作者提供从素材筹备到稿件撰写及审稿的全程支持。通过录音智记和以稿写稿等功能,满足事务性工作的高频需求,帮助撰稿人节省精力,提高效率,优化工作与生活。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号