Python操作ES

2021/6/27 14:17:08

本文主要是介绍Python操作ES,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

 

环境

使用pip安装elasticsearch包:

pip install elasticsearch==7.7.0
 

使用

引包

这里分别介绍使用elasticsearch包和request包查询ES的方式: 

使用request包可以补充elasticsearch包里不方便或者还没有实现的功能,作为对elasticsearch包的一个补充,建议组合使用。

  1.   from elasticsearch import Elasticsearch
  2.   import requests

ElasticSearch包

 获取ES对象

  1.    
  2.   #获取es连接
  3.   def get_es_engine(host,port,user=None,pwd=None):
  4.   if user and pwd:
  5.   es = Elasticsearch(host+':'+str(port), http_auth=(user, pwd), maxsize=15) # 有XPACK安全认证的ES集群
  6.   else:
  7.   es = Elasticsearch(host+':'+str(port), maxsize=15)#无安全认证的集群
  8.   return es

状态

  1.   es.ping()
  2.   es.info()

 

查询

  1.   #查单个记录,指定index,type,id
  2.   es.get(index='test2',id=1,doc_type='_doc')
  3.    
  4.   #根据条件查询,body里是DSL
  5.   bd={
  6.   "query":{
  7.   "bool":{
  8.   "should": [
  9.   {
  10.   "match_phrase_prefix":{
  11.   "email":"yikai"
  12.   }
  13.   }
  14.   ]
  15.   }
  16.   }
  17.   }
  18.   es.search(body=bd,index='test2')

 

  1.   #查看索引数据是否存在
  2.   es.exists(index='test2',id=1,doc_type='_doc')
  3.   Out[132]: True
  4.   es.exists(index='test2',id=2,doc_type='_doc')
  5.   Out[133]: False

更新

指定ID单条更新:

  1.   #指定ID进行更新单条记录
  2.   data={
  3.   "doc":{
  4.   "age":77
  5.   }
  6.   }
  7.   es.update(index='test2',id=3,doc_type='_doc',body=data)

根据DSL条件批量更新:

  1.   data_all={
  2.   "query": {
  3.   "match_all": {}
  4.   },
  5.   "script": {
  6.   "source": "ctx._source.age = params.age;",
  7.   "lang": "painless",
  8.   "params" : {
  9.   "age": "88"
  10.   }
  11.   }
  12.   }
  13.   es.update_by_query(index='test2',body=data_all)
  14.    
  15.   #语法参考
  16.   #https://www.elastic.co/guide/en/elasticsearch/painless/current/index.html

新增

插入一条记录:

  1.   data_ins={
  2.   "name" : "Rick.Wang",
  3.   "company" : "CSDN",
  4.   "age" : "10",
  5.   "email" : "wangyikai@csdn.com"
  6.   }
  7.    
  8.   es.index(index='test2',body=data_ins,doc_type='_doc',id=8)

删除

指定ID删除记录:

es.delete(index='test2',doc_type='_doc',id=8)
 

根据DSL条件批量删除:

  1.   bd= {'query': {'bool': {'should': [{'match_phrase_prefix': {'email': 'yikai'}}]}}}
  2.    
  3.   es.delete_by_query(index='test2',body=bd)

 

清空

清空索引不删除索引,等同于关系型数据库里的truncate table:

  1.   trunc={
  2.   "query": {"match_all": {}}
  3.   }
  4.   es.delete_by_query(index='test2',body=trunc)

使用BULK命令批量操作

批量插入

  1.   #JSON数据不能有回车换行
  2.   batch_data= [
  3.   {"index": {}},
  4.   {"name": "王义凯", "age": 11, "email":"wangyikai1@csdn.com", "company":"CSDN1"},
  5.   {"index": {}},
  6.   {"name": "wang,yi-kai", "age": 22, "email":"wangyikai2@csdn.com", "company":"CSDN2"},
  7.   {"index": {}},
  8.   {"name": "Rick.Wang", "age": 33, "email":"wangyikai3@csdn.com", "company":"CSDN3"},
  9.   {"index": {}},
  10.   {"name": "义凯王", "age": 44, "email":"wangyikai4@csdn.com", "company":"CSDN4"},
  11.   ]
  12.   es.bulk(index='test2',doc_type='_doc',body=batch_data)

 

批量插入更新删除

 使用bulk命令可批量对不同的索引进行插入更新删除等操作:

  1.   #批量对不同的索引进行增删改查操作,每个json一行
  2.   batch_action=[
  3.   {"index": {"_index": "test2", "_type": "_doc", "_id": "999"}},
  4.   {"name": "rick99", "age": 99, "email":"wangyikai9@csdn.com", "company":"CSDN9" },
  5.   {"index": {"_index": "test2", "_type": "_doc", "_id": "888"}},
  6.   {"name": "rick88", "age": 88, "email":"wangyikai8@csdn.com", "company":"CSDN8" },
  7.   {"delete": {"_index": "test2", "_type": "_doc", "_id": "999"}},
  8.   {"create": {"_index" : "test2", "_type" : "_doc", "_id": "000"}},
  9.   {"name": "rick00", "age": 100, "email":"wangyikai0@csdn.com", "company":"CSDN0" },
  10.   {"update": {"_index": "test2", "_type": "_doc", "_id": "888"}},
  11.   {"doc": {"age": "888"}}
  12.   ]
  13.   es.bulk(index='test2',doc_type='_doc',body=batch_action)

使用bulk批量操作的时候,对于不同的操作类型,一定要在前面加上与之对应的操作头信息({“index”: {}}, {‘delete’: {…}}, …),否则会报TransportError(400, u’illegal_argument_exception’)的错误。  

Request包

前面介绍过ES支持Restful接口,我们可以使用curl命令对其进行操作,同样我们也可以使用python里的request包访问操作ES库。

GET查询

使用get函数查询ES数据:

  1.   import requests
  2.   es_http = 'http://localhost:9200'
  3.   index='test2'
  4.   type='_doc'
  5.   id='888'
  6.   auth=('elastic','r12345635x') #tuple格式的账号密码,如果没有开启xpack安全认证可忽略此参数
  7.    
  8.   #查询指定的id数据
  9.   res=requests.get(es_http+'/'+index+'/'+type+'/'+id,auth=auth) #如果没有安全认证则不需要auth参数
  10.   res.text
  11.    
  12.   #查询该索引下所有数据
  13.   res=requests.get(es_http+'/'+index+'/_search',auth=auth)
  14.   res.text
  15.    
  16.   #使用DSL查询数据
  17.   bd={
  18.   "query":{
  19.   "bool":{
  20.   "should": [
  21.   {
  22.   "match_phrase_prefix":{
  23.   "name":"rick.wang"
  24.   }
  25.   }
  26.   ]
  27.   }
  28.   }
  29.   }
  30.   res=requests.get(es_http+'/'+index+'/_search/?pretty',auth=auth,json=bd)#pretty是为了格式化json样式,看起来更好看,可以忽略
  31.   print(res.text)

POST

使用POST方法可以与在Kibana中进行一样的操作,比如插入一条记录,比如根据DSL批量更新: 

  1.   #使用POST方法往ES中插入数据
  2.   data={"name": "rick999", "age": 999, "email":"wangyikai999@csdn.com", "company":"CSDN999" }
  3.   res = requests.post(es_http+'/'+index+'/_doc/999',auth=auth,json=data)
  4.   res.text
  5.    
  6.   res = requests.get(es_http+'/'+index+'/_doc/999',auth=auth)
  7.   res.text
  8.    
  9.    
  10.   #使用POST方法根据DSL对ES进行操作
  11.   bd={
  12.   "query": {
  13.   "match_all": {}
  14.   },
  15.   "script": {
  16.   "source": "ctx._source.age = params.age;",
  17.   "lang": "painless",
  18.   "params" : {
  19.   "age": "99"
  20.   }
  21.   }
  22.   }
  23.   res = requests.post(es_http+'/'+index+'/_update_by_query',auth=auth,json=bd)
  24.   res.text

PUT

使用PUT可以创建索引

  1.   res = requests.put(es_http+'/'+'new_index',auth=auth)
  2.   res.text

DELETE

使用DELETE方法可以删除索引 删除数据等

  1.   #往新创建的索引里插入一条记录
  2.   data={"name": "rick999", "age": 999, "email":"wangyikai999@csdn.com", "company":"CSDN999" }
  3.   requests.post(es_http+'/'+'new_index'+'/_doc/999',auth=auth,json=data)
  4.   #判断ID为999的是否存在
  5.   es.exists(index='new_index',id=999,doc_type='_doc')
  6.    
  7.   #使用DELETE方法删除ID为999的记录
  8.   requests.delete(es_http+'/'+'new_index'+'/_doc/999',auth=auth)
  9.   #判断ID为999的是否存在
  10.   es.exists(index='new_index',id=999,doc_type='_doc')
  11.    
  12.   #使用DELETE方法删除new_index的索引
  13.   res=requests.delete(es_http+'/'+'new_index',auth=auth) #删除索引
  14.   res.text

 

 

 



这篇关于Python操作ES的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程