原理
利用logstash的input插件jdbc input周期性地从关系型数据库拉取增量数据,然后利用logstash的output插件elasticsearch output将拉取到的数据写入es
logstash
安装
docker方式
1.编写Dockerfile
FROM logstash:7.4.2
#logstash容器里的后续命令都以root用户执行
USER root
#删除logstash原始镜像里的示例配置文件,如果不删除,容器启动后会加载该配置文件
RUN rm -f /usr/share/logstash/pipeline/logstash.conf
#将宿主机当前目录下的pipeline目录的所有文件拷贝到容器的/usr/share/logstash/pipeline/,该目录下的文件为logstash的输入,输出等配置
ADD pipeline/ /usr/share/logstash/pipeline/
#将宿主机当前目录下的config目录的所有文件拷贝到容器的/usr/share/logstash/config/,该目录下的文件为jvm,日志等配置
ADD config/ /usr/share/logstash/config/
#将宿主机当前目录下的lib目录下的mysql-connector-java-8.0.16.jar拷贝到容器的/usr/share/logstash/logstash-core/lib/jars/,logstash拉取数据依赖该jar包
ADD lib/mysql-connector-java-8.0.16.jar /usr/share/logstash/logstash-core/lib/jars/
2.构建镜像
docker build -t logstash_input_jdbc_output_es:7.4.2 .
3.创建并启动容器
docker run --name logstash -d -v ~/logstash/data/:/usr/share/logstash/data/logstash_jdbc_last_run/ logstash_input_jdbc_output_es:7.4.2
解压缩方式
可参考链接logstash解压安装,但需注意,在启动前,需要把mysql-connector-java-8.0.16.jar放到 /logstash_INSTALL_DIR/logstash-core/lib/jars/
配置
管道配置文件
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://172.17.47.45:3306/db_jypt_transport"
jdbc_user => "root"
jdbc_password => "zjxljyptmsq"
schedule => "* * * * *"
statement => "SELECT * FROM test WHERE update_time > :sql_last_value"
use_column_value => true
tracking_column => "update_time"
last_run_metadata_path => "/usr/share/logstash/data/logstash_jdbc_last_run/logstash_jdbc_last_run"
tracking_column_type => "timestamp"
}
}
output {
elasticsearch {
hosts => ["http://172.17.47.45:9200"]
index => "truck-%{+YYYY.MM.dd}"
document_id => "%{id}"
#user => "elastic"
#password => "changeme"
}
}
es
docker方式
拉取镜像
docker pull elasticsearch:7.4.2
创建并启动容器
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.4.2
解压缩方式
es的解压缩安装可参考链接https://www.elastic.co/guide/en/elasticsearch/reference/7.6/targz.html