当前位置:首页 » 《随便一记》 » 正文

通过FlinkCDC将MySQL中变更的数据写入到kafka_不会打球的摄影师不是好程序员

29 人参与  2022年06月02日 08:36  分类 : 《随便一记》  评论

点击全文阅读


文章目录

  • 前言
  • 一、CDC的种类?
  • 二、通过FlinkCDC将数据从MySQL导入到Kafka
    • 1.核心代码
    • 2.工具类
    • 三、结果展示
  • 3.可能会出现的错误及解决
    • 解决:


前言

CDC的Change Data Capture(变更数据捕获)的缩写

FlinkCDC的核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。


一、CDC的种类?

CDC 主要分为基于查询和基于 Binlog 两种方式,简述两者的区别:
基于查询的CDC基于binlog的CDC
常见的组件SqoopMaxwell、Canal、Debezium
思想BatchStreaming
延迟性
是否可以捕获所有数据变化

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。

二、通过FlinkCDC将数据从MySQL导入到Kafka

1.核心代码

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.atguigu.app.function.CustomerDeserialization;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class FlinkCDC {
    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.flinkcdc构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop101")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall-flink")
                .tableList("gmall-flink.base_trademark")
                .deserializer(new CustomerDeserialization())
                .startupOptions(StartupOptions.latest())
                .build();
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);

        //3.打印数据并将数据写入kafka
        streamSource.print();
        String sinkTopic = "ods_base_db";
        streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        //4.启动任务
        env.execute("FlinkCDC");

    }
}

2.工具类

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class MyKafkaUtil {

    public static FlinkKafkaProducer<String> getKafkaProducer(String topic){

        return new FlinkKafkaProducer<String>("192.168.2.101:9092", topic, new SimpleStringSchema());
    }
}

三、结果展示

我在MySQL数据库中每变更一条数据,在IDEA和kafka这边都可以检测到:
IDEA:
在这里插入图片描述
Kafka:
在这里插入图片描述

3.可能会出现的错误及解决

org.apache.kafka.common.errors.TimeoutException: Topic ods_base_database not present in metadata after 60000 ms

解决:

1.vi kafka/config.server.properties
在这里插入图片描述
修改这三个地方,切记用IP地址,之前用的hadoop101一直出错

2.重启kafka,重启zookeeper,问题解决!


点击全文阅读


本文链接:http://m.zhangshiyu.com/post/41208.html

数据  解决  变更  
<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

最新文章

  • 高分_夏晚歌陆秋(震惊!玄学大佬被读心了)(夏晚歌陆秋)全本完整阅读
  • 空有相思无尽处小说(燕婉娴容珩)后续+番外精编之作无删减_(燕婉娴容珩)空有相思无尽处小说结局+番外看点十足_笔趣阁
  • 大盛的繁荣景象尽收于眼底孟莹姝陆鼎鸿小说无删减阅读无干扰(孟莹姝陆鼎鸿)番外+续集
  • (头条)竹马抑郁后,未婚妻疯狂报复我小说(沈星眠陆景行)整本免费版阅读无广告(竹马抑郁后,未婚妻疯狂报复我)
  • 与君来生再相爱后续+完结_小说后续在线阅读_无删减免费完结_
  • 却道无情胜有情后续更新+番外_墨宸灵珠阿宸内容精选_小说后续在线阅读_无删减免费完结_
  • 新章速递可我不想再等你了是什么小说(沈疏月傅沉舟)完本阅读无广告(可我不想再等你了)
  • 春风不渡无心人最后结局(林至南霍以峦)_春风不渡无心人最后结局
  • 折光于渊向晦而明++后续(孟裕张雅)_折光于渊向晦而明++后续
  • 救助身价六十万的流浪狗后,室友疯了虐心反转_娜娜张琳爱犬后续更新+番外_小说后续在线阅读_无删减免费完结_
  • 送走老公的白月光,我转身嫁给了死对头完整文本_恩泽维生素薛泽恩免费看_小说后续在线阅读_无删减免费完结_
  • 所爱已隔人山人海看点十足(温掬月周既明)全书浏览_所爱已隔人山人海看点十足全书浏览

    关于我们 | 我要投稿 | 免责申明

    Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1