mongodb 和 Elasticsearch集群部署
部署环境
假设我们有两台服务器,10.2.2.164和10.2.2.61。在10.2.2.61上部署mongodb主服务器和elasticsearch主服务器以及mongo-connector,在10.2.2.164上部署elasticsearch分片。
安装mongodb
配置文件mongod.yml参考:
systemLog:
destination: file
path: logs/mongod.log
logAppend: true
logRotate: rename
timeStampFormat: iso8601-local
# 存储引擎相关参数
storage:
# journal配置
journal:
enabled: true
# 数据文件存储位置
dbPath: data
# 是否一个库一个文件夹
directoryPerDB: true
# 数据引擎
engine: wiredTiger
# WT引擎配置
wiredTiger:
engineConfig:
# WT最大使用cache(根据服务器实际情况调节)
cacheSizeGB: 4
# 是否将索引也按数据库名单独存储
directoryForIndexes: true
# 表压缩配置
collectionConfig:
blockCompressor: snappy
# 索引配置
indexConfig:
prefixCompression: true
# 端口配置
net:
bindIp: 0.0.0.0
port: 27017
maxIncomingConnections: 65536
wireObjectCheck: true
ipv6: false
# 进程配置
processManagement:
fork: true
# 慢查询相关参数
operationProfiling:
slowOpThresholdMs: 100
mode: slowOp
# 复制集相关参数
replication:
oplogSizeMB: 51200
replSetName: rs0
enableMajorityReadConcern: false
字段解释:http://www.ywnds.com/?p=6502
先创建data和logs文件夹,然后使用mongod -f mongod.conf启动。可以只使用一台mongo服务器作为主服务器,后续根据需要再增减集群。使用下面的js脚本进行初始化
config = {
_id: "rs0",
version: 1,
members: [{
_id: 0,
host: "10.2.2.61:27017",
priority: 10
}]
}
rs.initiate(config)
然后执行:
mongo localhost:27017/cmnews --quiet init.js
Primary与Secondary之间通过oplog来同步数据,Primary上的写操作完成后,会向特殊的local.oplog.rs特殊集合写入一条oplog,Secondary不断的从Primary取新的oplog并应用。oplog是操作记录,它是一个capped collection,在副本集群中,设置得太小可能导致secondary无法及时从primary同步数据。默认情况下是磁盘大小的5%。而且因为这个collection是capped,MongoDB启动之后无法直接修改其大小。
如果添加了复制集且复制集大幅落后与主节点,需要重新同步,可以先关闭删除复制集,清空其数据文件,并重新添加
> rs.remove("10.2.2.164:27017")
> rs.add("10.2.2.164:27017")
集群间的认证要使用keyfile文件来认证,否则可能无法添加成功。
安装shell提升工具mongo-hacker
npm install -g mongo-hacker
副本集不能直接读,在~/.mongorc.js中增加一句
rs.slaveOk();
db.channel_info.find({}, {_id:0, id: 1}).pretty()
读写分离:
import pymongo
db = pymongo.MongoClient('127.0.0.1:27017', replicaSet='rs0', readPreference='secondaryPreferred').cmnews
启用log rotate
>>use amdin
>>db.runCommand( { logRotate : 1 } )
安装jdk8
下载 http://www.oracle.com/technetwork/java/javase/downloads/index.html
将其解压到目录/home/zhangkai/data/tools/opt/jdk1.8.0_121,在~/.bashrc中加入下面几行:
export JAVA_HOME=/home/zhangkai/data/tools/opt/jdk1.8.0_121
export JRE_HOME=$JAVA_HOME/jre
export CLASS_PATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
安装elasticsearch
安装步骤:http://www.wklken.me/posts/2016/06/29/deploy-es.html
下载 https://www.elastic.co/downloads/elasticsearch
将其解压到/home/zhangkai/data/tools/opt/elasticsearch-5.3.0,修改config/jvm.options,增加jvm内存
-Xms4g
-Xmx4g
-Xss512k
修改config/elasticsearch.yaml
cluster.name: escluster
node.name: node-61
node.master: true
node.data: true
path.data: data
path.logs: logs
network.host: 0.0.0.0
#http.host: 0.0.0.0
http.port: 9200
http.enabled: true
http.max_content_length: 100mb
#transport.host: 127.0.0.1
transport.tcp.port: 9300
transport.tcp.compress: true
discovery.zen.ping_timeout: 30s
client.transport.ping_timeout: 60s
discovery.zen.fd.ping_timeout: 30s
discovery.zen.fd.ping_interval: 5s
discovery.zen.fd.ping_retries: 10
discovery.zen.minimum_master_nodes: 1
discovery.zen.ping.unicast.hosts: ["10.2.2.61:9300", "10.2.2.164:9300"]
#bootstrap.mlockall: true
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
action.auto_create_index: true
#action.disable_delete_all_indices: true
action.destructive_requires_name: true
ES 从 2.0 版本开始,默认的自动发现方式改为了单播(unicast)方式,discovery.zen.ping.unicast.hosts 配置提供节点的地址,这里使用的是transport的地址。
ES 5.x不能在配置文件中设置index相关设置,改为通过CRUD接口设置
curl -H 'Content-Type:application/json' -XPUT 'http://localhost:9200/_all/_settings?pretty&preserve_existing=true' -d '{
"index.number_of_shards" : 4,
"index.number_of_replicas" : 0
}'
将此配置拷贝到61和164机器上的部署路径,在164上更改node.name: node-164
和node.data: false
防止脑裂,然后开始部署。
ElasticSearch部署有2种模式,Development和Production。默认network.host使用的是local IP(127.0.0.1)。如果要使用集群方式部署,network.host需要配置成外部IP。这个时候会ES会认为是部署成Production模式,进行启动时的强制检查。比如下面的错误
max number of threads [1024] for user [zhangkai] is too low, increase to at least [2048]
max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
Development时只是一个警告,Production时则是一个错误,会导致通不过bootstrap check。使用如下方式修改:
ulimit -u 4096
sudo echo 'vm.max_map_count = 655360' >> /etc/sysctl.conf
sudo sysctl -p
编辑 /etc/security/limits.conf,加入
zhangkai soft nofile 65536
zhangkai hard nofile 131072
zhangkai soft nproc 4096
zhangkai hard nproc 8192
zhangkai - memlock unlimited
注意 需要重新登录生效,使用ulimit -n
验证。
如果要部署成Development,但是通过外网http访问,可以单独设置http.host为外网IP,network.host保持默认。
使用bin/elasticsearch
在前台启动,确认无误后使用bin/elasticsearch -d
在后台启动。执行curl http://127.0.0.1:9200
,如果出现下面的信息则说明启动成功
{
"name" : "escluster",
"cluster_name" : "node-164",
"cluster_uuid" : "ofWCU22DSbuGrzne5AvZWQ",
"version" : {
"number" : "5.3.0",
"build_hash" : "3adb13b",
"build_date" : "2017-03-23T03:31:50.652Z",
"build_snapshot" : false,
"lucene_version" : "6.4.1"
},
"tagline" : "You Know, for Search"
}
常用命令
# 查看集群健康状态
curl localhost:9200/_cluster/health?pretty
# 查看分片信息
curl localhost:9200/_cat/shards?pretty
# 查看主节点信息
curl localhost:9200/_cat/master?pretty
# 查看mapping信息
curl localhost:9200/cmnews/video_info/_mapping?pretty
# 查看index设置
curl localhost:9200/cmnews/_settings?pretty
# 更新setting
curl -H 'Content-Type:application/json' -XPUT localhost:9200/cmnews/_settings -d '
{
"index": {
"number_of_replicas": 0
}
}'
# 查看mapping
curl -H 'Content-Type:application/json' -XGET localhost:9200/cmnews/_mapping/channel_info?pretty
# 重建索引
curl -H 'Content-Type:application/json' -XPOST localhost:9200/_reindex?pretty -d'
{
"source": {
"index": "cmnews"
},
"dest": {
"index": "cmnews_v1"
}
}'
# 删除索引
curl -XDELETE localhost:9200/cmnews
修改字段
ElasticSearch修改字段类型比较麻烦,要进行reindex,平滑升级的方式是使用同义词。
- 先创建一个所有,带上版本号,如 cmnews_v1。
创建一个指向本索引的同义词,通过同义词cmnews访问。
curl -H 'Content-Type:application/json' -XPOST localhost:9200/_aliases?pretty -d '
{
"actions": [
{ "add": { "index": "cmnews_v1", "alias": "cmnews" }}
]
}'
创建一个新的索引,如cmnews_v2, 刷入新的数据,修改同义词,指向v2。
curl -H 'Content-Type:application/json' -XPOST localhost:9200/_aliases?pretty -d '
{
"actions": [
{ "remove": {"index": "cmnews_v1", "alias": "cmnews" }},
{ "add": { "index": "cmnews_v2", "alias": "cmnews" }}
]
}'
删除老的cmnews_v1。
解决未分配分片
根据你的数据量大小,选择合适的分片数和副本集(最好保证至少一个副本集)。如果出现了UNASSIGNED的分片,可以尝试如下解决“
查询节点的唯一标识
curl -H 'Content-Type:application/json' localhost:9200/_nodes/process?pretty
重新分配副本分片
curl -H 'Content-Type:application/json' -XPOST 'localhost:9200/_cluster/reroute?pretty&retry_failed=true' -d '{
"commands" : [ {
"allocate_replica" : {
"index" : "vine",
"shard" : 3,
"node" : "3jHjZKNaQM-4bGDUONpSBg"
}
}
]
}'
如果要强制分配主分片(允许数据丢失)
curl -H 'Content-Type:application/json' -XPOST 'localhost:9200/_cluster/reroute?pretty' -d'
{
"commands" : [
{
"allocate_stale_primary" : {
"index" : "cmnews_v1",
"shard" : 0,
"node" : "lQaOnaHsQsO83Z2FVfWc6Q",
"accept_data_loss":true
}
}
]
}
'
集群重启
先关闭集群的shard自动均衡
curl -H 'Content-Type:application/json' -XPUT localhost:9200/_cluster/settings?pretty -d'
{
"transient" : {
"cluster.routing.allocation.enable" : "none"
}
}'
重启要升级的节点后,再打开自动均衡
curl -H 'Content-Type:application/json' -XPUT localhost:9200/_cluster/settings?pretty -d'
{
"transient" : {
"cluster.routing.allocation.enable" : "all"
}
}'
安装kibana
kibana是ES的可视化工具,下载 https://www.elastic.co/downloads/kibana
将其解压到/home/zhangkai/data/tools/opt/kibana-5.3.0-linux-x86_64, 编辑config/kibana.yml 修改elasticsearch.url
elasticsearch.url: "http://10.2.2.164:9200"
server.host: "0.0.0.0"
安装x-pack
elasticsearch和kibana都需要安装
bin/elasticsearch-plugin install x-pack
bin/kibana-plugin install x-pack
关闭安全性设置(可选),在elasticsearch.xml和kibana.xml中都加上下面一句
xpack.security.enabled: false
Elasticsearch备份恢复工具
https://github.com/medcl/elasticsearch-migration
安装mongo-connector
使用下面的命令安装mongo和elasticsearch的同步工具
pip install mongo-connector 'elastic2-doc-manager[elastic5]'
注意事项
mongo-connector在同步时,如果没有找到会直接报错。修改 mongo_connector/doc_managers/elastic2_doc_manager.py 第625行为
if ES_doc.get('found'):
mongo-connector通过mongodb的oplog去将数据同步到ES。mongo-connector会为mongodb的database创建一个index,collection则是index中的不同type。在同步之前,最好先建立elasticsearch的mapping表,设置字段类型。
settings.json示例:
{
"settings": {
"refresh_interval": "5s",
"number_of_shards": 4,
"number_of_replicas": 0
},
"mappings": {
"_default_": {
"_all": {
"enabled": false
}
},
"channel_info": {
"_all": {
"enabled": false
},
"properties": {
"id": {
"type": "keyword",
"store": true
},
"tags": {
"type": "text",
"store": true,
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"publishedAt": {
"type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ss||yyyy-MM-dd HH:mm:ss||epoch_millis",
"store": true
},
"regionRestriction": {
"type": "object",
"properties": {
"allowed": {
"type": "keyword"
},
"blocked": {
"type": "keyword"
}
}
}
}
}
}
}
创建索引和别名:
curl -H 'Content-Type:application/json' -XDELETE localhost:9200/cmnews_v1?pretty
curl -H 'Content-Type:application/json' -XDELETE localhost:9200/mongodb_meta?pretty
curl -H 'Content-Type:application/json' -XPUT localhost:9200/cmnews_v1?pretty -d @settings.json
curl -H 'Content-Type:application/json' -XPOST localhost:9200/_aliases?pretty -d @aliases.json
注意事项
如果没有oplog.timestamp文件,则mongo-connector会进行一次全量同步,该过程会花费较长时间。首次同步时,建议先关闭复制集,以便快速同步,同步完成后再设置复制集
curl -H 'Content-Type:application/json' -XPUT localhost:9200/mongodb_meta?pretty -d '
{
"settings": {
"refresh_interval": "-1",
"number_of_shards": 4,
"number_of_replicas": 0
}
}'
同步完成后,再更改回来
curl -H 'Content-Type:application/json' -XPUT localhost:9200/mongodb_meta/_settings -d '
{
"index": {
"refresh_interval": "5s",
"number_of_replicas": 1
}
}'
如果oplog size设置的过小,可能会导致mongo-connector同步时所保存的那条oplog信息已经被覆盖了,他就不会继续同步了。所以要保证oplog足够大,防止oplog被覆盖。
2017-04-13 02:55:00,599 [DEBUG] mongo_connector.oplog_manager:830 - OplogThread: reading last checkpoint as Timestamp(1492023299, 12222)
2017-04-13 02:55:00,609 [DEBUG] mongo_connector.oplog_manager:704 - OplogThread: Oldest oplog entry has timestamp Timestamp(1492024942, 7177).
2017-04-13 02:55:00,610 [ERROR] mongo_connector.oplog_manager:202 - OplogThread: Last entry no longer in oplog cannot recover! Collection(Database(MongoClient(host=['10.2.2.164:27017'], document_class=dict, tz_aware=False, connect=True, replicaset=u'rs0'), u'local'), u'oplog.rs')
数据遗漏时的同步
mongo-connector在同步时可能会遗漏数据,可以使用下面的脚本sync.py进行同步检查
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
Author: zhangkai
Last modified: 2017-04-23 12:29
'''
import os
import bson
import json
import pymongo
import math
import tornado.httpclient
import tornado.curl_httpclient
import tornado.queues
import tornado.locks
import logging
logging.basicConfig(level=logging.INFO,
format="[%(levelname)s %(asctime)s %(module)s:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
class Sync(object):
'''
export command:
mongoexport -d cmnews -c video_info --fields='_id' --sort='{_id:1}' --type='csv' --noHeaderLine -o data/cmnews.video_info.csv
cat data/cmnews.video_info.csv | awk -F '[()]' '{print $2}' > data/cmnews.video_info.init.ids
'''
def __init__(self, database, collection):
self.http = tornado.curl_httpclient.CurlAsyncHTTPClient(max_clients=10)
self.queue = tornado.queues.Queue()
self.lock = tornado.locks.Lock()
self.table = pymongo.MongoClient()[database][collection]
self.logger = logging
self.missing_file = "data/%s.%s.missing.ids" % (database, collection)
self.database = database
self.collection = collection
@tornado.gen.coroutine
def worker(self):
while not self.queue.empty():
try:
resp = yield self.http.fetch('http://10.2.2.164:9200/_cat/shards', raise_error=False)
if not (resp.code == 200 and resp.body.find('UNASSIGNED') == -1):
self.logger.error('UNASSIGNED shards found')
raise tornado.gen.Return(os._exit(1))
ids = yield self.queue.get()
url = 'http://10.2.2.164:9200/%s/%s/_search' % (self.database, self.collection)
body = {
"size": len(ids),
"_source": "",
"query": {
"terms": {
"_id": ids
}
}
}
resp = yield self.http.fetch(url, method="POST", body=json.dumps(body), raise_error=False)
if resp.code == 200:
self.logger.info("%s fetch succeed", ids[0])
ret = json.loads(resp.body)
if len(ids) != ret['hits']['total']:
exists_ids = map(lambda x: x['_id'], ret['hits']['hits'])
missing_ids = set(ids) - set(exists_ids)
self.logger.info('missing_ids: %s', len(missing_ids))
with (yield self.lock.acquire()):
self.missing_ids |= missing_ids
else:
self.logger.error("%s fetch failed, code: %s, error: %s", ids[0], resp.code, resp.error)
self.queue.put(ids)
except Exception as e:
self.logger.exception(e)
finally:
self.queue.task_done()
@tornado.gen.coroutine
def start(self):
self.missing_ids = set()
ids = map(lambda line: line.strip(), open(self.missing_file))
self.logger.info('total ids: %s', len(ids))
for i in xrange(int(math.ceil(len(ids)/1000.0))):
self.logger.info('reading %s', i * 1000)
yield self.queue.put(ids[i*1000:(i+1)*1000])
for _ in xrange(10):
self.worker()
yield self.queue.join()
with open(self.missing_file, 'w') as fp:
fp.write('\n'.join(self.missing_ids))
def slove(self):
missing_ids = map(lambda line: line.strip(), open(self.missing_file))
if missing_ids:
self.logger.info("%s.%s missing %s", self.database, self.collection, len(missing_ids))
total_ids = map(lambda id: bson.ObjectId(id), missing_ids)
for i in xrange(int(math.ceil(len(total_ids)/1000.0))):
self.logger.info('syncing %s.%s processing: %s', self.database, self.collection, i*1000)
ids = total_ids[i*1000:(i+1)*1000]
cursor = self.table.find({'_id': {'$in': ids}})
docs = [c for c in cursor]
if docs:
self.table.delete_many({'_id': {'$in': ids}})
self.table.insert_many(docs)
return False
else:
return True
if __name__ == '__main__':
client = Sync('cmnews', 'channel_info')
tornado.ioloop.IOLoop.current().run_sync(client.start)
数据生成脚本start.sh
set -e -x
database="cmnews"
collections="channel_info video_info"
for collection in $collections
do
missing_file=data/$database.$collection.missing.ids
checking_file=data/$database.$collection.checking.id
if test -e $checking_file; then
id=`cat $checking_file`
mongoexport -d $database -c $collection \
--query="{_id: {\$gte: ObjectId('$id')}}" \
--fields='_id' \
--sort='{_id:1}' \
--type='csv' --noHeaderLine \
-o $missing_file
else
mongoexport -d $database -c $collection \
--fields='_id' \
--sort='{_id:1}' \
--type='csv' --noHeaderLine \
-o $missing_file
fi
num=`cat $missing_file | wc -l`
if [[ $num -gt 0 ]]; then
sed 's/ObjectId(\(.*\))/\1/g' -i $missing_file
tail -n1 $missing_file > $checking_file
fi
done
安装supervisor
pip install supervisor
mkdir -p /home/zhangkai/tools/runtime/supervisor
echo_supervisord_conf > /home/zhangkai/tools/runtime/supervisor/supervisord.conf
supervisord.conf配置
[program:mongo-connector]
command = mongo-connector -m localhost:27017 -t localhost:9200 -d elastic2_doc_manager --continue-on-error
directory = /home/zhangkai/data/tools/runtime/connector
user = zhangkai
autostart = true
autorestart = true
redirect_stderr = true
stdout_logfile = /home/zhangkai/data/tools/runtime/connector/logs/out.log
[program:elasticsearch]
command = /home/zhangkai/data/tools/runtime/elasticsearch-5.3.0/bin/elasticsearch
directory = /home/zhangkai/data/tools/runtime/elasticsearch-5.3.0
user = zhangkai
autostart = true
autorestart = true
redirect_stderr = true
stdout_logfile = /home/zhangkai/data/tools/runtime/elasticsearch-5.3.0/logs/out.log
[program:kibana]
command = /home/zhangkai/data/tools/runtime/kibana-5.3.0/bin/kibana
directory = /home/zhangkai/data/tools/runtime/kibana-5.3.0
user = zhangkai
autostart = true
autorestart = true
redirect_stderr = true
stdout_logfile = /home/zhangkai/data/tools/runtime/kibana-5.3.0/logs/out.log
启动supervisor开始同步
supervisord -c /home/zhangkai/tools/runtime/supervisor/supervisord.conf
supervisorctl -c /home/zhangkai/tools/runtime/supervisor/supervisord.conf