长亭百川云 - 文章详情

如何导入数十亿DNS数据到Elasticsearch中

RapidDNS

56

2024-07-14

大家好,我是BaCde。转眼间距离上次更新已经是一年前了。计划了一些输出的内容因为各种原因一直没能完成。会尽量抽出时间来写写最近积累的一些内容,计划后续会逐步跟大家分享。

进入今天的主题。写这一部分内容主要还是有朋友问起如何导入的问题,所以今天就来说说数据导入部分。

针对数十亿数据的批量写入,可以从三个方面去优化,分为是硬件、Elasticsearch 优化、脚本优化。具体需要根据具体实际情况来权衡。

硬件部分,其实也不需要花费很大,主要考虑的是磁盘写入性能。硬件就是在硬盘上下功夫,有条件的可以使用 SSD 做 Raid,我比较偏好 Raid10,但是相对比较费钱。如果不考虑组Raid,那么单块的 SSD 硬盘,选择支持NVME协议 M.2 接口的 SSD,读写性能已经足够。再差一些的可以选择专业些的STAT接口的SSD,其性能也要比机械硬盘强上几倍。具体选购大家自行去网上查下文章即可。我目前使用的主要有Intel和三星的 SSD。

Elasticsearch 的优化之前内容里有提到过,就是修改一些配置。另外还需要注意的就是版本上的差异,自从 Elasticsearch 7 以后的默认应该都是一个分片和副本,由于涉及数据量很大,分片数量要做调整。

之前的配置就是在写入过程中修改刷新时间和副本数,以提升写入的速度。然后等写入完成,再修改回去即可。

`{`  `"index": {`    `"refresh_interval": -1,`    `"number_of_replicas": 0`  `}``}`

最后就是导入脚本,其实现在导入数据已经相当的简单,从原来的Bulk到现在的 streaming_bulk 和 parallel_bulk 这种更简单便捷的方式。直接按照官方文档给出的示例就可以快速开发出来。顺便说一下,建议大家不了解 python 迭代器、生成器、yield 等内容的可以去学习一下。详细的内容可以看官方文档。地址:https://elasticsearch-py.readthedocs.io/en/master/helpers.html

最后,上代码:

数据结构内容例子,数据的获取大家自行解决,结构不同根据实际情况做调整即可。

`{"timestamp":"1640909596","name":"02k0ue.yumenggou.com","type":"cname","value":"ziyuan.baidu.com."}``{"timestamp":"1640909798","name":"02k1.com","type":"mx","value":"0 mail.02k1.com."}``{"timestamp":"1640909561","name":"02k1.friendlyvote.com","type":"cname","value":"traff-1.hugedomains.com."}``{"timestamp":"1640909566","name":"02k1289.jlszhkjyxgs.com","type":"cname","value":"ziyuan.baidu.com."}``{"timestamp":"1640909554","name":"02k16t25.coinstimes.com","type":"cname","value":"traff-1.hugedomains.com."}``{"timestamp":"1640909583","name":"02k1799.elegancehouse.com","type":"cname","value":"traff-1.hugedomains.com."}``{"timestamp":"1640909540","name":"02k199jx8.xiangyishu.com","type":"cname","value":"overdue.aliyun.com."}``{"timestamp":"1640909539","name":"02k1w.txgc010.com","type":"cname","value":"sg01.pylist-blog.com."}``{"timestamp":"1640909559","name":"02k2.friendlyvote.com","type":"cname","value":"traff-1.hugedomains.com."}``{"timestamp":"1640909548","name":"02k262.huiyanji.com","type":"cname","value":"ziyuan.baidu.com."}`

代码示例

`try:`    `import simplejson as json``except ImportError:`    `import json``   ``from elasticsearch import helpers``   ``es = elasticsearch.Elasticsearch(hosts=[{"host": "127.0.0.1", "port": "9200"}], retry_on_timeout=True,max_retries=10)``   ``def importdns(filename):`    `with open(filename,"r",encoding='utf-8') as fp:`        `for line in fp:`            `if line:`                `temp = json.loads(line)`                `# 中间可以加入自己需要做处理的代码。看各自的需求`                `temp["_index"] = "rapiddns-" + temp["type"]`                `yield temp``   ``path = "dns_data.json"``   ``for ok, response in helpers.streaming_bulk(client=es,max_retries=5,chunk_size=5000,actions=importdns(path,)):`    `if not ok:`        `print(response)``   `

本文为原创内容,转载请标明出处。

关注我,不迷路。

相关推荐
关注或联系我们
添加百川云公众号,移动管理云安全产品
咨询热线:
4000-327-707
百川公众号
百川公众号
百川云客服
百川云客服

Copyright ©2024 北京长亭科技有限公司
icon
京ICP备 2024055124号-2