大家好,我是BaCde。转眼间距离上次更新已经是一年前了。计划了一些输出的内容因为各种原因一直没能完成。会尽量抽出时间来写写最近积累的一些内容,计划后续会逐步跟大家分享。
进入今天的主题。写这一部分内容主要还是有朋友问起如何导入的问题,所以今天就来说说数据导入部分。
针对数十亿数据的批量写入,可以从三个方面去优化,分为是硬件、Elasticsearch 优化、脚本优化。具体需要根据具体实际情况来权衡。
硬件部分,其实也不需要花费很大,主要考虑的是磁盘写入性能。硬件就是在硬盘上下功夫,有条件的可以使用 SSD 做 Raid,我比较偏好 Raid10,但是相对比较费钱。如果不考虑组Raid,那么单块的 SSD 硬盘,选择支持NVME协议 M.2 接口的 SSD,读写性能已经足够。再差一些的可以选择专业些的STAT接口的SSD,其性能也要比机械硬盘强上几倍。具体选购大家自行去网上查下文章即可。我目前使用的主要有Intel和三星的 SSD。
Elasticsearch 的优化之前内容里有提到过,就是修改一些配置。另外还需要注意的就是版本上的差异,自从 Elasticsearch 7 以后的默认应该都是一个分片和副本,由于涉及数据量很大,分片数量要做调整。
之前的配置就是在写入过程中修改刷新时间和副本数,以提升写入的速度。然后等写入完成,再修改回去即可。
`{` `"index": {` `"refresh_interval": -1,` `"number_of_replicas": 0` `}``}`
最后就是导入脚本,其实现在导入数据已经相当的简单,从原来的Bulk到现在的 streaming_bulk 和 parallel_bulk 这种更简单便捷的方式。直接按照官方文档给出的示例就可以快速开发出来。顺便说一下,建议大家不了解 python 迭代器、生成器、yield 等内容的可以去学习一下。详细的内容可以看官方文档。地址:https://elasticsearch-py.readthedocs.io/en/master/helpers.html
最后,上代码:
数据结构内容例子,数据的获取大家自行解决,结构不同根据实际情况做调整即可。
`{"timestamp":"1640909596","name":"02k0ue.yumenggou.com","type":"cname","value":"ziyuan.baidu.com."}``{"timestamp":"1640909798","name":"02k1.com","type":"mx","value":"0 mail.02k1.com."}``{"timestamp":"1640909561","name":"02k1.friendlyvote.com","type":"cname","value":"traff-1.hugedomains.com."}``{"timestamp":"1640909566","name":"02k1289.jlszhkjyxgs.com","type":"cname","value":"ziyuan.baidu.com."}``{"timestamp":"1640909554","name":"02k16t25.coinstimes.com","type":"cname","value":"traff-1.hugedomains.com."}``{"timestamp":"1640909583","name":"02k1799.elegancehouse.com","type":"cname","value":"traff-1.hugedomains.com."}``{"timestamp":"1640909540","name":"02k199jx8.xiangyishu.com","type":"cname","value":"overdue.aliyun.com."}``{"timestamp":"1640909539","name":"02k1w.txgc010.com","type":"cname","value":"sg01.pylist-blog.com."}``{"timestamp":"1640909559","name":"02k2.friendlyvote.com","type":"cname","value":"traff-1.hugedomains.com."}``{"timestamp":"1640909548","name":"02k262.huiyanji.com","type":"cname","value":"ziyuan.baidu.com."}`
代码示例
`try:` `import simplejson as json``except ImportError:` `import json`` ``from elasticsearch import helpers`` ``es = elasticsearch.Elasticsearch(hosts=[{"host": "127.0.0.1", "port": "9200"}], retry_on_timeout=True,max_retries=10)`` ``def importdns(filename):` `with open(filename,"r",encoding='utf-8') as fp:` `for line in fp:` `if line:` `temp = json.loads(line)` `# 中间可以加入自己需要做处理的代码。看各自的需求` `temp["_index"] = "rapiddns-" + temp["type"]` `yield temp`` ``path = "dns_data.json"`` ``for ok, response in helpers.streaming_bulk(client=es,max_retries=5,chunk_size=5000,actions=importdns(path,)):` `if not ok:` `print(response)`` `
本文为原创内容,转载请标明出处。
关注我,不迷路。