mongodb 和 Elasticsearch集群部署

2018-10-10 5569 0

部署环境

假设我们有两台服务器,10.2.2.164和10.2.2.61。在10.2.2.61上部署mongodb主服务器和elasticsearch主服务器以及mongo-connector,在10.2.2.164上部署elasticsearch分片。

安装mongodb

配置文件mongod.yml参考:

  1. systemLog:
  2. destination: file
  3. path: logs/mongod.log
  4. logAppend: true
  5. logRotate: rename
  6. timeStampFormat: iso8601-local
  7. # 存储引擎相关参数
  8. storage:
  9. # journal配置
  10. journal:
  11. enabled: true
  12. # 数据文件存储位置
  13. dbPath: data
  14. # 是否一个库一个文件夹
  15. directoryPerDB: true
  16. # 数据引擎
  17. engine: wiredTiger
  18. # WT引擎配置
  19. wiredTiger:
  20. engineConfig:
  21. # WT最大使用cache(根据服务器实际情况调节)
  22. cacheSizeGB: 4
  23. # 是否将索引也按数据库名单独存储
  24. directoryForIndexes: true
  25. # 表压缩配置
  26. collectionConfig:
  27. blockCompressor: snappy
  28. # 索引配置
  29. indexConfig:
  30. prefixCompression: true
  31. # 端口配置
  32. net:
  33. bindIp: 0.0.0.0
  34. port: 27017
  35. maxIncomingConnections: 65536
  36. wireObjectCheck: true
  37. ipv6: false
  38. # 进程配置
  39. processManagement:
  40. fork: true
  41. # 慢查询相关参数
  42. operationProfiling:
  43. slowOpThresholdMs: 100
  44. mode: slowOp
  45. # 复制集相关参数
  46. replication:
  47. oplogSizeMB: 51200
  48. replSetName: rs0
  49. enableMajorityReadConcern: false

字段解释:http://www.ywnds.com/?p=6502

先创建data和logs文件夹,然后使用mongod -f mongod.conf启动。可以只使用一台mongo服务器作为主服务器,后续根据需要再增减集群。使用下面的js脚本进行初始化

  1. config = {
  2. _id: "rs0",
  3. version: 1,
  4. members: [{
  5. _id: 0,
  6. host: "10.2.2.61:27017",
  7. priority: 10
  8. }]
  9. }
  10. rs.initiate(config)

然后执行:

  1. mongo localhost:27017/cmnews --quiet init.js

Primary与Secondary之间通过oplog来同步数据,Primary上的写操作完成后,会向特殊的local.oplog.rs特殊集合写入一条oplog,Secondary不断的从Primary取新的oplog并应用。oplog是操作记录,它是一个capped collection,在副本集群中,设置得太小可能导致secondary无法及时从primary同步数据。默认情况下是磁盘大小的5%。而且因为这个collection是capped,MongoDB启动之后无法直接修改其大小。

如果添加了复制集且复制集大幅落后与主节点,需要重新同步,可以先关闭删除复制集,清空其数据文件,并重新添加

  1. > rs.remove("10.2.2.164:27017")
  2. > rs.add("10.2.2.164:27017")

集群间的认证要使用keyfile文件来认证,否则可能无法添加成功。

安装shell提升工具mongo-hacker

  1. npm install -g mongo-hacker

副本集不能直接读,在~/.mongorc.js中增加一句

  1. rs.slaveOk();
  2. db.channel_info.find({}, {_id:0, id: 1}).pretty()

读写分离:

  1. import pymongo
  2. db = pymongo.MongoClient('127.0.0.1:27017', replicaSet='rs0', readPreference='secondaryPreferred').cmnews

启用log rotate

  1. >>use amdin
  2. >>db.runCommand( { logRotate : 1 } )

安装jdk8

下载 http://www.oracle.com/technetwork/java/javase/downloads/index.html

将其解压到目录/home/zhangkai/data/tools/opt/jdk1.8.0_121,在~/.bashrc中加入下面几行:

  1. export JAVA_HOME=/home/zhangkai/data/tools/opt/jdk1.8.0_121
  2. export JRE_HOME=$JAVA_HOME/jre
  3. export CLASS_PATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
  4. export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH

安装elasticsearch

安装步骤:http://www.wklken.me/posts/2016/06/29/deploy-es.html

下载 https://www.elastic.co/downloads/elasticsearch

将其解压到/home/zhangkai/data/tools/opt/elasticsearch-5.3.0,修改config/jvm.options,增加jvm内存

  1. -Xms4g
  2. -Xmx4g
  3. -Xss512k

修改config/elasticsearch.yaml

  1. cluster.name: escluster
  2. node.name: node-61
  3. node.master: true
  4. node.data: true
  5. path.data: data
  6. path.logs: logs
  7. network.host: 0.0.0.0
  8. #http.host: 0.0.0.0
  9. http.port: 9200
  10. http.enabled: true
  11. http.max_content_length: 100mb
  12. #transport.host: 127.0.0.1
  13. transport.tcp.port: 9300
  14. transport.tcp.compress: true
  15. discovery.zen.ping_timeout: 30s
  16. client.transport.ping_timeout: 60s
  17. discovery.zen.fd.ping_timeout: 30s
  18. discovery.zen.fd.ping_interval: 5s
  19. discovery.zen.fd.ping_retries: 10
  20. discovery.zen.minimum_master_nodes: 1
  21. discovery.zen.ping.unicast.hosts: ["10.2.2.61:9300", "10.2.2.164:9300"]
  22. #bootstrap.mlockall: true
  23. bootstrap.memory_lock: false
  24. bootstrap.system_call_filter: false
  25. action.auto_create_index: true
  26. #action.disable_delete_all_indices: true
  27. action.destructive_requires_name: true

ES 从 2.0 版本开始,默认的自动发现方式改为了单播(unicast)方式,discovery.zen.ping.unicast.hosts 配置提供节点的地址,这里使用的是transport的地址。

ES 5.x不能在配置文件中设置index相关设置,改为通过CRUD接口设置

  1. curl -H 'Content-Type:application/json' -XPUT 'http://localhost:9200/_all/_settings?pretty&preserve_existing=true' -d '{
  2. "index.number_of_shards" : 4,
  3. "index.number_of_replicas" : 0
  4. }'

将此配置拷贝到61和164机器上的部署路径,在164上更改node.name: node-164node.data: false防止脑裂,然后开始部署。

ElasticSearch部署有2种模式,Development和Production。默认network.host使用的是local IP(127.0.0.1)。如果要使用集群方式部署,network.host需要配置成外部IP。这个时候会ES会认为是部署成Production模式,进行启动时的强制检查。比如下面的错误

  1. max number of threads [1024] for user [zhangkai] is too low, increase to at least [2048]
  2. max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
  3. max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

Development时只是一个警告,Production时则是一个错误,会导致通不过bootstrap check。使用如下方式修改:

  1. ulimit -u 4096
  2. sudo echo 'vm.max_map_count = 655360' >> /etc/sysctl.conf
  3. sudo sysctl -p

编辑 /etc/security/limits.conf,加入

  1. zhangkai soft nofile 65536
  2. zhangkai hard nofile 131072
  3. zhangkai soft nproc 4096
  4. zhangkai hard nproc 8192
  5. zhangkai - memlock unlimited

注意 需要重新登录生效,使用ulimit -n验证。

如果要部署成Development,但是通过外网http访问,可以单独设置http.host为外网IP,network.host保持默认。

使用bin/elasticsearch在前台启动,确认无误后使用bin/elasticsearch -d在后台启动。执行curl http://127.0.0.1:9200,如果出现下面的信息则说明启动成功

  1. {
  2. "name" : "escluster",
  3. "cluster_name" : "node-164",
  4. "cluster_uuid" : "ofWCU22DSbuGrzne5AvZWQ",
  5. "version" : {
  6. "number" : "5.3.0",
  7. "build_hash" : "3adb13b",
  8. "build_date" : "2017-03-23T03:31:50.652Z",
  9. "build_snapshot" : false,
  10. "lucene_version" : "6.4.1"
  11. },
  12. "tagline" : "You Know, for Search"
  13. }

常用命令

  1. # 查看集群健康状态
  2. curl localhost:9200/_cluster/health?pretty
  3. # 查看分片信息
  4. curl localhost:9200/_cat/shards?pretty
  5. # 查看主节点信息
  6. curl localhost:9200/_cat/master?pretty
  7. # 查看mapping信息
  8. curl localhost:9200/cmnews/video_info/_mapping?pretty
  9. # 查看index设置
  10. curl localhost:9200/cmnews/_settings?pretty
  11. # 更新setting
  12. curl -H 'Content-Type:application/json' -XPUT localhost:9200/cmnews/_settings -d '
  13. {
  14. "index": {
  15. "number_of_replicas": 0
  16. }
  17. }'
  18. # 查看mapping
  19. curl -H 'Content-Type:application/json' -XGET localhost:9200/cmnews/_mapping/channel_info?pretty
  20. # 重建索引
  21. curl -H 'Content-Type:application/json' -XPOST localhost:9200/_reindex?pretty -d'
  22. {
  23. "source": {
  24. "index": "cmnews"
  25. },
  26. "dest": {
  27. "index": "cmnews_v1"
  28. }
  29. }'
  30. # 删除索引
  31. curl -XDELETE localhost:9200/cmnews

修改字段

ElasticSearch修改字段类型比较麻烦,要进行reindex,平滑升级的方式是使用同义词。

  1. 先创建一个所有,带上版本号,如 cmnews_v1。
  2. 创建一个指向本索引的同义词,通过同义词cmnews访问。

    1. curl -H 'Content-Type:application/json' -XPOST localhost:9200/_aliases?pretty -d '
    2. {
    3. "actions": [
    4. { "add": { "index": "cmnews_v1", "alias": "cmnews" }}
    5. ]
    6. }'
  3. 创建一个新的索引,如cmnews_v2, 刷入新的数据,修改同义词,指向v2。

    1. curl -H 'Content-Type:application/json' -XPOST localhost:9200/_aliases?pretty -d '
    2. {
    3. "actions": [
    4. { "remove": {"index": "cmnews_v1", "alias": "cmnews" }},
    5. { "add": { "index": "cmnews_v2", "alias": "cmnews" }}
    6. ]
    7. }'
  4. 删除老的cmnews_v1。

解决未分配分片

根据你的数据量大小,选择合适的分片数和副本集(最好保证至少一个副本集)。如果出现了UNASSIGNED的分片,可以尝试如下解决“

  1. 查询节点的唯一标识

    1. curl -H 'Content-Type:application/json' localhost:9200/_nodes/process?pretty
  2. 重新分配副本分片

    1. curl -H 'Content-Type:application/json' -XPOST 'localhost:9200/_cluster/reroute?pretty&retry_failed=true' -d '{
    2. "commands" : [ {
    3. "allocate_replica" : {
    4. "index" : "vine",
    5. "shard" : 3,
    6. "node" : "3jHjZKNaQM-4bGDUONpSBg"
    7. }
    8. }
    9. ]
    10. }'
  3. 如果要强制分配主分片(允许数据丢失)

    1. curl -H 'Content-Type:application/json' -XPOST 'localhost:9200/_cluster/reroute?pretty' -d'
    2. {
    3. "commands" : [
    4. {
    5. "allocate_stale_primary" : {
    6. "index" : "cmnews_v1",
    7. "shard" : 0,
    8. "node" : "lQaOnaHsQsO83Z2FVfWc6Q",
    9. "accept_data_loss":true
    10. }
    11. }
    12. ]
    13. }
    14. '

集群重启

先关闭集群的shard自动均衡

  1. curl -H 'Content-Type:application/json' -XPUT localhost:9200/_cluster/settings?pretty -d'
  2. {
  3. "transient" : {
  4. "cluster.routing.allocation.enable" : "none"
  5. }
  6. }'

重启要升级的节点后,再打开自动均衡

  1. curl -H 'Content-Type:application/json' -XPUT localhost:9200/_cluster/settings?pretty -d'
  2. {
  3. "transient" : {
  4. "cluster.routing.allocation.enable" : "all"
  5. }
  6. }'

安装kibana

kibana是ES的可视化工具,下载 https://www.elastic.co/downloads/kibana

将其解压到/home/zhangkai/data/tools/opt/kibana-5.3.0-linux-x86_64, 编辑config/kibana.yml 修改elasticsearch.url

  1. elasticsearch.url: "http://10.2.2.164:9200"
  2. server.host: "0.0.0.0"

安装x-pack

elasticsearch和kibana都需要安装

  1. bin/elasticsearch-plugin install x-pack
  2. bin/kibana-plugin install x-pack

关闭安全性设置(可选),在elasticsearch.xml和kibana.xml中都加上下面一句

  1. xpack.security.enabled: false

Elasticsearch备份恢复工具

https://github.com/medcl/elasticsearch-migration

安装mongo-connector

使用下面的命令安装mongo和elasticsearch的同步工具

  1. pip install mongo-connector 'elastic2-doc-manager[elastic5]'

注意事项

mongo-connector在同步时,如果没有找到会直接报错。修改 mongo_connector/doc_managers/elastic2_doc_manager.py 第625行为if ES_doc.get('found'):

mongo-connector通过mongodb的oplog去将数据同步到ES。mongo-connector会为mongodb的database创建一个index,collection则是index中的不同type。在同步之前,最好先建立elasticsearch的mapping表,设置字段类型。

settings.json示例:

  1. {
  2. "settings": {
  3. "refresh_interval": "5s",
  4. "number_of_shards": 4,
  5. "number_of_replicas": 0
  6. },
  7. "mappings": {
  8. "_default_": {
  9. "_all": {
  10. "enabled": false
  11. }
  12. },
  13. "channel_info": {
  14. "_all": {
  15. "enabled": false
  16. },
  17. "properties": {
  18. "id": {
  19. "type": "keyword",
  20. "store": true
  21. },
  22. "tags": {
  23. "type": "text",
  24. "store": true,
  25. "fields": {
  26. "keyword": {
  27. "type": "keyword"
  28. }
  29. }
  30. },
  31. "publishedAt": {
  32. "type": "date",
  33. "format": "yyyy-MM-dd'T'HH:mm:ss||yyyy-MM-dd HH:mm:ss||epoch_millis",
  34. "store": true
  35. },
  36. "regionRestriction": {
  37. "type": "object",
  38. "properties": {
  39. "allowed": {
  40. "type": "keyword"
  41. },
  42. "blocked": {
  43. "type": "keyword"
  44. }
  45. }
  46. }
  47. }
  48. }
  49. }
  50. }

创建索引和别名:

  1. curl -H 'Content-Type:application/json' -XDELETE localhost:9200/cmnews_v1?pretty
  2. curl -H 'Content-Type:application/json' -XDELETE localhost:9200/mongodb_meta?pretty
  3. curl -H 'Content-Type:application/json' -XPUT localhost:9200/cmnews_v1?pretty -d @settings.json
  4. curl -H 'Content-Type:application/json' -XPOST localhost:9200/_aliases?pretty -d @aliases.json

注意事项

如果没有oplog.timestamp文件,则mongo-connector会进行一次全量同步,该过程会花费较长时间。首次同步时,建议先关闭复制集,以便快速同步,同步完成后再设置复制集

  1. curl -H 'Content-Type:application/json' -XPUT localhost:9200/mongodb_meta?pretty -d '
  2. {
  3. "settings": {
  4. "refresh_interval": "-1",
  5. "number_of_shards": 4,
  6. "number_of_replicas": 0
  7. }
  8. }'

同步完成后,再更改回来

  1. curl -H 'Content-Type:application/json' -XPUT localhost:9200/mongodb_meta/_settings -d '
  2. {
  3. "index": {
  4. "refresh_interval": "5s",
  5. "number_of_replicas": 1
  6. }
  7. }'

如果oplog size设置的过小,可能会导致mongo-connector同步时所保存的那条oplog信息已经被覆盖了,他就不会继续同步了。所以要保证oplog足够大,防止oplog被覆盖。

  1. 2017-04-13 02:55:00,599 [DEBUG] mongo_connector.oplog_manager:830 - OplogThread: reading last checkpoint as Timestamp(1492023299, 12222)
  2. 2017-04-13 02:55:00,609 [DEBUG] mongo_connector.oplog_manager:704 - OplogThread: Oldest oplog entry has timestamp Timestamp(1492024942, 7177).
  3. 2017-04-13 02:55:00,610 [ERROR] mongo_connector.oplog_manager:202 - OplogThread: Last entry no longer in oplog cannot recover! Collection(Database(MongoClient(host=['10.2.2.164:27017'], document_class=dict, tz_aware=False, connect=True, replicaset=u'rs0'), u'local'), u'oplog.rs')

数据遗漏时的同步
mongo-connector在同步时可能会遗漏数据,可以使用下面的脚本sync.py进行同步检查

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. '''
  4. Author: zhangkai
  5. Last modified: 2017-04-23 12:29
  6. '''
  7. import os
  8. import bson
  9. import json
  10. import pymongo
  11. import math
  12. import tornado.httpclient
  13. import tornado.curl_httpclient
  14. import tornado.queues
  15. import tornado.locks
  16. import logging
  17. logging.basicConfig(level=logging.INFO,
  18. format="[%(levelname)s %(asctime)s %(module)s:%(lineno)d] %(message)s",
  19. datefmt="%Y-%m-%d %H:%M:%S")
  20. class Sync(object):
  21. '''
  22. export command:
  23. mongoexport -d cmnews -c video_info --fields='_id' --sort='{_id:1}' --type='csv' --noHeaderLine -o data/cmnews.video_info.csv
  24. cat data/cmnews.video_info.csv | awk -F '[()]' '{print $2}' > data/cmnews.video_info.init.ids
  25. '''
  26. def __init__(self, database, collection):
  27. self.http = tornado.curl_httpclient.CurlAsyncHTTPClient(max_clients=10)
  28. self.queue = tornado.queues.Queue()
  29. self.lock = tornado.locks.Lock()
  30. self.table = pymongo.MongoClient()[database][collection]
  31. self.logger = logging
  32. self.missing_file = "data/%s.%s.missing.ids" % (database, collection)
  33. self.database = database
  34. self.collection = collection
  35. @tornado.gen.coroutine
  36. def worker(self):
  37. while not self.queue.empty():
  38. try:
  39. resp = yield self.http.fetch('http://10.2.2.164:9200/_cat/shards', raise_error=False)
  40. if not (resp.code == 200 and resp.body.find('UNASSIGNED') == -1):
  41. self.logger.error('UNASSIGNED shards found')
  42. raise tornado.gen.Return(os._exit(1))
  43. ids = yield self.queue.get()
  44. url = 'http://10.2.2.164:9200/%s/%s/_search' % (self.database, self.collection)
  45. body = {
  46. "size": len(ids),
  47. "_source": "",
  48. "query": {
  49. "terms": {
  50. "_id": ids
  51. }
  52. }
  53. }
  54. resp = yield self.http.fetch(url, method="POST", body=json.dumps(body), raise_error=False)
  55. if resp.code == 200:
  56. self.logger.info("%s fetch succeed", ids[0])
  57. ret = json.loads(resp.body)
  58. if len(ids) != ret['hits']['total']:
  59. exists_ids = map(lambda x: x['_id'], ret['hits']['hits'])
  60. missing_ids = set(ids) - set(exists_ids)
  61. self.logger.info('missing_ids: %s', len(missing_ids))
  62. with (yield self.lock.acquire()):
  63. self.missing_ids |= missing_ids
  64. else:
  65. self.logger.error("%s fetch failed, code: %s, error: %s", ids[0], resp.code, resp.error)
  66. self.queue.put(ids)
  67. except Exception as e:
  68. self.logger.exception(e)
  69. finally:
  70. self.queue.task_done()
  71. @tornado.gen.coroutine
  72. def start(self):
  73. self.missing_ids = set()
  74. ids = map(lambda line: line.strip(), open(self.missing_file))
  75. self.logger.info('total ids: %s', len(ids))
  76. for i in xrange(int(math.ceil(len(ids)/1000.0))):
  77. self.logger.info('reading %s', i * 1000)
  78. yield self.queue.put(ids[i*1000:(i+1)*1000])
  79. for _ in xrange(10):
  80. self.worker()
  81. yield self.queue.join()
  82. with open(self.missing_file, 'w') as fp:
  83. fp.write('\n'.join(self.missing_ids))
  84. def slove(self):
  85. missing_ids = map(lambda line: line.strip(), open(self.missing_file))
  86. if missing_ids:
  87. self.logger.info("%s.%s missing %s", self.database, self.collection, len(missing_ids))
  88. total_ids = map(lambda id: bson.ObjectId(id), missing_ids)
  89. for i in xrange(int(math.ceil(len(total_ids)/1000.0))):
  90. self.logger.info('syncing %s.%s processing: %s', self.database, self.collection, i*1000)
  91. ids = total_ids[i*1000:(i+1)*1000]
  92. cursor = self.table.find({'_id': {'$in': ids}})
  93. docs = [c for c in cursor]
  94. if docs:
  95. self.table.delete_many({'_id': {'$in': ids}})
  96. self.table.insert_many(docs)
  97. return False
  98. else:
  99. return True
  100. if __name__ == '__main__':
  101. client = Sync('cmnews', 'channel_info')
  102. tornado.ioloop.IOLoop.current().run_sync(client.start)

数据生成脚本start.sh

  1. set -e -x
  2. database="cmnews"
  3. collections="channel_info video_info"
  4. for collection in $collections
  5. do
  6. missing_file=data/$database.$collection.missing.ids
  7. checking_file=data/$database.$collection.checking.id
  8. if test -e $checking_file; then
  9. id=`cat $checking_file`
  10. mongoexport -d $database -c $collection \
  11. --query="{_id: {\$gte: ObjectId('$id')}}" \
  12. --fields='_id' \
  13. --sort='{_id:1}' \
  14. --type='csv' --noHeaderLine \
  15. -o $missing_file
  16. else
  17. mongoexport -d $database -c $collection \
  18. --fields='_id' \
  19. --sort='{_id:1}' \
  20. --type='csv' --noHeaderLine \
  21. -o $missing_file
  22. fi
  23. num=`cat $missing_file | wc -l`
  24. if [[ $num -gt 0 ]]; then
  25. sed 's/ObjectId(\(.*\))/\1/g' -i $missing_file
  26. tail -n1 $missing_file > $checking_file
  27. fi
  28. done

安装supervisor

  1. pip install supervisor
  2. mkdir -p /home/zhangkai/tools/runtime/supervisor
  3. echo_supervisord_conf > /home/zhangkai/tools/runtime/supervisor/supervisord.conf

supervisord.conf配置

  1. [program:mongo-connector]
  2. command = mongo-connector -m localhost:27017 -t localhost:9200 -d elastic2_doc_manager --continue-on-error
  3. directory = /home/zhangkai/data/tools/runtime/connector
  4. user = zhangkai
  5. autostart = true
  6. autorestart = true
  7. redirect_stderr = true
  8. stdout_logfile = /home/zhangkai/data/tools/runtime/connector/logs/out.log
  9. [program:elasticsearch]
  10. command = /home/zhangkai/data/tools/runtime/elasticsearch-5.3.0/bin/elasticsearch
  11. directory = /home/zhangkai/data/tools/runtime/elasticsearch-5.3.0
  12. user = zhangkai
  13. autostart = true
  14. autorestart = true
  15. redirect_stderr = true
  16. stdout_logfile = /home/zhangkai/data/tools/runtime/elasticsearch-5.3.0/logs/out.log
  17. [program:kibana]
  18. command = /home/zhangkai/data/tools/runtime/kibana-5.3.0/bin/kibana
  19. directory = /home/zhangkai/data/tools/runtime/kibana-5.3.0
  20. user = zhangkai
  21. autostart = true
  22. autorestart = true
  23. redirect_stderr = true
  24. stdout_logfile = /home/zhangkai/data/tools/runtime/kibana-5.3.0/logs/out.log

启动supervisor开始同步

  1. supervisord -c /home/zhangkai/tools/runtime/supervisor/supervisord.conf
  2. supervisorctl -c /home/zhangkai/tools/runtime/supervisor/supervisord.conf