网站开发对招聘人员要求,wordpress手动更新插件,建设宣传家乡的网站,影视公司联系方式目录 ES Python客户端介绍封装代码测试代码参考 ES Python客户端介绍
官方提供了两个客户端elasticsearch、elasticsearch-dsl
pip install elasticsearchpip install elasticsearch-dsl第二个是对第一个的封装#xff0c;类似ORM操作数据库#xff0c;可以.filter、.group… 目录 ES Python客户端介绍封装代码测试代码参考 ES Python客户端介绍
官方提供了两个客户端elasticsearch、elasticsearch-dsl
pip install elasticsearchpip install elasticsearch-dsl第二个是对第一个的封装类似ORM操作数据库可以.filter、.groupby个人感觉很鸡肋star数也不多。平时使用的时候一般会在kibana上测试然后直接把query拷贝过来获取更多数据所以这里做下第一个的封装。
封装代码
封装后依然暴露了es方便有特殊情况下使用index一般很少改动就直接放到对象中了可以使用set_index修改常用的应该是get_doc和get_doc_scroll来获取少量和全量数据
代码测试时使用的是7.17.12版本大于此版本可能由于官方改动出异常
pip install elasticsearch7.17.12es.py
import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dictclass ESClient:def __init__(self, host127.0.0.1,index, http_auth None):self.index indexif http_auth is None:self.es Elasticsearch(hostshost)else:self.es Elasticsearch(hostshost, http_authhttp_auth)print(success to connect host)def close(self):self.es.close()# 设置索引def set_index(self,index:str):self.index index# 创建索引def create_index(self, index_name: str, mappingsNone):res self.es.indices.create(indexindex_name, mappingsmappings)return res# 删除索引def delete_index(self, index_name: str):res self.es.indices.delete(indexindex_name)return res# 获取索引def get_index(self, index_name: str):res self.es.indices.get(indexindex_name)return res# 创建文档单个def create_doc(self,body, _id.join(random.sample(string.ascii_lettersstring.ascii_uppercasestring.digits,20))):res self.es.create(indexself.index, bodybody, id_id)return res# 创建文档(批量)def create_doc_bulk(self, docs: List[Dict]):actions []for doc in docs:action {_index: self.index,_op_type: create,_id: .join(random.sample(string.ascii_lettersstring.ascii_uppercasestring.digits,20))}for k,v in doc.items():action[k] vactions.append(action)res bulk(clientself.es, actionsactions)return res# 删除文档def delete_doc(self, doc_id):res self.es.delete(indexself.index, iddoc_id)return res# 更新文档def update_doc(self, doc_id, doc:Dict):body {doc : doc}res self.es.update(indexself.index, iddoc_id, bodybody)return res# 分页获取超过100000的文档def get_doc_scroll(self,query:Dict):res self.es.search(indexself.index,size10000,bodyquery,search_typequery_then_fetch,scroll5m)data_list []hits res.get(hits)scroll_id res.get(_scroll_id)total_value 0# total 可能为Dict或intif isinstance(hits.get(total),Dict):total_value hits.get(total).get(value)else:total_value hits.get(total)if total_value0:for data in hits.get(hits):data_list.append(data.get(_source))return scroll_id,data_list# 通过scroll_id分页获取后序文档def get_doc_by_scroll_id(self,scroll_id):page self.es.scroll(scroll_idscroll_id,scroll5m)data_list []scroll_id page.get(_scroll_id)for data in page.get(hits).get(hits):data_list.append(data)return scroll_id,data_list# 清空scroll_id防止服务端不够用def clear_scroll(self,scroll_id):self.es.clear_scroll(scroll_id)# 获取索引的hits内容一般用于获取文档id、总数def get_doc_all(self):res self.es.search(indexself.index)return res[hits]# 获取一个文档def get_doc_by_id(self, id_):res self.es.get(indexself.index, idid_)return res[_source]# 获取所有文档的_source内容(小于100000)def get_doc(self,query:Dict,size:int100000):query[size] sizeres self.es.search(indexself.index,bodyquery)data_list []hits res.get(hits)total_value 0# total 可能为Dict或intif isinstance(hits.get(total), Dict):total_value hits.get(total).get(value)else:total_value hits.get(total)if total_value 0:for data in hits.get(hits):data_list.append(data.get(_source))return data_list# 聚合查询分组条件名为group_by,返回bucketsdef get_doc_agg(self, query):res self.es.search(indexself.index, bodyquery)return res[aggregations][group_by].get(buckets)# 统计查询统计条件为stats_by返回最值、平均值等def get_doc_stats(self,query):res self.es.search(indexself.index,bodyquery)return res[aggregations][stats_by]测试代码
import unittest
from es import ESClientcli ESClient(hosthttp://10.28.144.3:9200,http_auth[elastic,changeme])
def test_create_index():res cli.create_index(index_nametest)print(res)def test_delete_index():res cli.delete_index(index_nametest)print(res)def test_get_index():res cli.get_index(index_nametest)print(res)def test_set_index():cli.set_index(indextest)def test_create_doc():body {name: lady_killer9,age: 19}res cli.create_doc(bodybody)print(res)def test_create_doc_bulk():from copy import deepcopybody {name: lady_killer9}users []for i in range(100001):tmp deepcopy(body)tmp[age] iusers.append(tmp)res cli.create_doc_bulk(docsusers)print(res)def test_get_doc_all():res cli.get_doc_all()print(res)def test_get_doc_by_id():res cli.get_doc_by_id(jHALXDQaENQZPM4C9EUt)print(res)def test_get_doc():query {query: {match_all: {}}}res cli.get_doc(queryquery,size20)print(res)def test_update_doc():body{name: lady_killer_after_update}res cli.update_doc(doc_idjHALXDQaENQZPM4C9EUt,docbody)print(res)def test_delete_doc():res cli.delete_doc(doc_idjHALXDQaENQZPM4C9EUt)print(res)def test_get_doc_agg():query {aggs: {group_by: {terms: {field: age}}}}res cli.get_doc_agg(queryquery)print(res)def test_get_doc_stats():query {aggs: {stats_by: {stats: {field: age}}}}res cli.get_doc_stats(queryquery)print(res)def test_get_doc_scroll():query {query: {match_all: {}}}scroll_id,data_list cli.get_doc_scroll(queryquery)res []while data_list:res.extend(data_list)scroll_id,data_list cli.get_doc_by_scroll_id(scroll_idscroll_id)print(len(res))if __name__ __main__:# test_delete_index()test_create_index()test_get_index()# test_set_index()# test_create_doc()# test_create_doc_bulk()# test_get_doc_all()# test_update_doc()# test_get_doc_by_id()# test_get_doc()# test_delete_doc()# test_get_doc_agg()# test_get_doc_stats()# test_get_doc_scroll()cli.close()测试截图 更多python相关内容【python总结】python学习框架梳理
本人b站账号一路狂飚的蜗牛
有问题请下方评论转载请注明出处并附有原文链接谢谢如有侵权请及时联系。如果您感觉有所收获自愿打赏可选择支付宝18833895206小于您的支持是我不断更新的动力。
参考
github-elasticsearch github-elasticsearch-dsl