.tech域名的网站,深圳网站建设搜q479185700,网站建设合同,个人博客html代码CanalKafka实现MySQL与Redis数据同步#xff08;一#xff09; 前言
在很多业务情况下#xff0c;我们都会在系统中加入redis缓存做查询优化。
如果数据库数据发生更新#xff0c;这时候就需要在业务代码中写一段同步更新redis的代码。
这种数据同步的代码跟业务代码糅合…CanalKafka实现MySQL与Redis数据同步一 前言
在很多业务情况下我们都会在系统中加入redis缓存做查询优化。
如果数据库数据发生更新这时候就需要在业务代码中写一段同步更新redis的代码。
这种数据同步的代码跟业务代码糅合在一起会不太优雅能不能把这些数据同步的代码抽出来形成一个独立的模块呢答案是可以的。
架构图
canal是一个伪装成slave订阅mysql的binlog实现数据同步的中间件。 canal最简单的使用方法是tcp模式。 实际上canal是支持直接发送到MQ的目前最新版是支持主流的三种MQKafka、RocketMQ、RabbitMQ。而canal的RabbitMQ模式目前是有一定的bug所以一般使用Kafka或者RocketMQ。 这里使用Kafka实现Redis与MySQL的数据同步。架构图如下 通过架构图我们很清晰知道要用到的组件MySQL、Canal、Kafka、ZooKeeper、Redis。
搭建Kafka
首先在官网下载安装包 解压打开/config/server.properties配置文件修改日志目录
首先启动ZooKeeper我用的是3.6.1版本 接着再启动Kafka在Kafka的bin目录下打开cmd输入命令
kafka-server-start.bat ../../config/server.properties可以看到ZooKeeper上注册了Kafka相关的配置信息 然后创建一个队列用于接收canal传送过来的数据使用命令
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic创建的队列名是canaltopic。 配置Cannal Server
canal官网下载相关安装包 找到canal.deployer-1.1.4/conf目录下的canal.properties配置文件
# tcp, kafka, RocketMQ 这里选择kafka模式
canal.serverMode kafka
# 解析器的线程数打开此配置不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize 16
# 配置MQ的服务地址这里配置的是kafka对应的地址和端口
canal.mq.servers 127.0.0.1:9092
# 配置instance在conf目录下要有example同名的目录可以配置多个
canal.destinations example然后配置instance找到/conf/example/instance.properties配置文件
## mysql serverId , v1.0.26 will autoGen(自动生成不需配置)
# canal.instance.mysql.slaveId0
# position info
canal.instance.master.address127.0.0.1:3306
# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlog
canal.instance.master.journal.namemysql-bin.000006
canal.instance.master.position4596
# 账号密码
canal.instance.dbUsernamecanal
canal.instance.dbPasswordCanal****
canal.instance.connectionCharset UTF-8
#MQ队列名称
canal.mq.topiccanaltopic
#单队列模式的分区下标
canal.mq.partition0配置完成后就可以启动canal了。
测试
这时可以打开kafka的消费者窗口测试一下kafka是否收到消息。
使用命令进行监听消费
kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic这里使用的是win10系统的cmd命令行win10系统默认的编码是GBK而Canal Server是UTF-8的编码所以控制台会出现乱码 在cmd命令行执行前切换到UTF-8编码即可使用命令行chcp 65001 然后再执行打开kafka消费端的命令就不乱码了 接下来就是启动Redis把数据同步到Redis就完事了。
封装Redis客户端
环境搭建完成后我们可以写代码了。
首先引入Kafka和Redis的maven依赖
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId
/dependency在application.yml文件增加以下配置
spring: redis:host: 127.0.0.1port: 6379database: 0password: 123456封装一个操作Redis的工具类
Component
public class RedisClient {/*** 获取redis模版*/Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 设置redis的key-value*/public void setString(String key, String value) {setString(key, value, null);}/*** 设置redis的key-value带过期时间*/public void setString(String key, String value, Long timeOut) {stringRedisTemplate.opsForValue().set(key, value);if (timeOut ! null) {stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);}}/*** 获取redis中key对应的值*/public String getString(String key) {return stringRedisTemplate.opsForValue().get(key);}/*** 删除redis中key对应的值*/public Boolean deleteKey(String key) {return stringRedisTemplate.delete(key);}
}