手机网站用什么制作,shopify可以做企业网站嘛,一键做网站的软件,wordpress 搬家后无法打开Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点#xff0c;并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分#xff0c;比如术语、架构、编程模型、编程指南、基本的…Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分和实际的生产应用联系更为密切以及有一定开发难度的内容。 5、Flink 监控系列 本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明一般不会介绍知识点的信息更多的是提供一个一个可以具体使用的示例。本专栏不再分目录通过链接即可看出介绍的内容。
两专栏的所有文章入口点击Flink 系列文章汇总索引 文章目录 Flink 系列文章一、maven依赖二、redis环境及相关内容说明三、redis作为Flink的source异步数据交互示例1、maven依赖2、redis异步交互数据实现1、读取redis数据时以string进行输出2、读取redis数据时以pojo进行输出 3、使用示例4、验证1、准备redis环境数据2、启动应用程序并观察控制台输出 本文主要介绍Flink 的redis作为数据源的异步读取使用示例。
如果需要了解更多内容可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外没有其他依赖。
本文依赖redis环境好用。
本专题分为以下几篇文章 【flink番外篇】3、fflink的source内置、mysql、kafka、redis、clickhouse介绍及示例1 - File、Socket、Collection 【flink番外篇】3、fflink的source内置、mysql、kafka、redis、clickhouse介绍及示例2- 自定义、mysql 【flink番外篇】3、flink的source内置、mysql、kafka、redis、clickhouse介绍及示例3- kafka 【flink番外篇】3、flink的source内置、mysql、kafka、redis、clickhouse介绍及示例4- redis -异步读取 【flink番外篇】3、flink的source内置、mysql、kafka、redis、clickhouse介绍及示例5- clickhouse 【flink番外篇】3、flink的source内置、mysql、kafka、redis、clickhouse介绍及示例 - 完整版
一、maven依赖
本文依赖见【flink番外篇】3、flink的source介绍及示例1- File、Socket、Collection不再赘述。
如果有新增的maven依赖则会在示例时加以说明避免篇幅的过大。
二、redis环境及相关内容说明
1、本示例是需要redis环境的至于redis是集群或单击和本示例关系不大。 关于redis的更多信息请参考其官网。
2、本示例是以redis作为外部数据进行异步交互的例子也是实际中应用中常见的例子。关于异步数据交互参考文章55、Flink之用于外部数据访问的异步 I/O
三、redis作为Flink的source异步数据交互示例
本示例是模拟根据外部数据用户姓名查询redis中用户的个人信息。 本示例外部数据就以flink的集合作为示例redis数据中存储的为hash表下面验证中会有具体展示。
1、maven依赖
dependencygroupIdorg.apache.bahir/groupIdartifactIdflink-connector-redis_2.12/artifactIdversion1.1.0/versionexclusionsexclusionartifactIdflink-streaming-java_2.12/artifactIdgroupIdorg.apache.flink/groupId/exclusionexclusionartifactIdflink-runtime_2.12/artifactIdgroupIdorg.apache.flink/groupId/exclusionexclusionartifactIdflink-core/artifactIdgroupIdorg.apache.flink/groupId/exclusionexclusionartifactIdflink-java/artifactIdgroupIdorg.apache.flink/groupId/exclusion/exclusions
/dependency2、redis异步交互数据实现
1、读取redis数据时以string进行输出
package org.datastreamapi.source.custom.redis;import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import com.sun.jdi.IntegerValue;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** author alanchan**/
public class CustomRedisSource extends RichAsyncFunctionString, String {private JedisPoolConfig config null;private static String ADDR 192.168.10.41;private static int PORT 6379;// 等待可用连接的最大时间单位是毫秒默认是-1表示永不超时private static int TIMEOUT 10000;private JedisPool jedisPool null;private Jedis jedis null;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);config new JedisPoolConfig();jedisPool new JedisPool(config, ADDR, PORT, TIMEOUT);jedis jedisPool.getResource();}Overridepublic void asyncInvoke(String input, ResultFutureString resultFuture) throws Exception {// 文件中读取的内容System.out.println(输入参数input----: input);// 发起一个异步请求返回结果CompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {String[] arrayData input.split(,);String name arrayData[1];String value jedis.hget(AsyncReadUser_Redis, name);System.out.println(查询结果output----: value);return value;}}).thenAccept((String dbResult) - {// 设置请求完成时的回调将结果返回resultFuture.complete(Collections.singleton(dbResult));});}// 连接超时的时候调用的方法Overridepublic void timeout(String input, ResultFutureString resultFuture) throws Exception {System.out.println(redis connect timeout!);}Overridepublic void close() throws Exception {super.close();if (jedis.isConnected()) {jedis.close();}}DataAllArgsConstructorNoArgsConstructorstatic class User {private int id;private String name;private int age;private double balance;User(String value) {String[] str value.split(,);this.setId(Integer.valueOf(str[0]));this.setName(str[1]);this.setAge(Integer.valueOf(str[2]));this.setBalance(Double.valueOf(str[0]));}}}
2、读取redis数据时以pojo进行输出
package org.datastreamapi.source.custom.redis;import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** author alanchan**/
public class CustomRedisSource2 extends RichAsyncFunctionString, User {private JedisPoolConfig config null;private static String ADDR 192.168.10.41;private static int PORT 6379;// 等待可用连接的最大时间单位是毫秒默认是-1表示永不超时private static int TIMEOUT 10000;private JedisPool jedisPool null;private Jedis jedis null;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);config new JedisPoolConfig();jedisPool new JedisPool(config, ADDR, PORT, TIMEOUT);jedis jedisPool.getResource();}Overridepublic void asyncInvoke(String input, ResultFutureUser resultFuture) throws Exception {System.out.println(输入查询条件 input);CompletableFuture.supplyAsync(new SupplierUser() {Overridepublic User get() {String[] arrayData input.split(,);String name arrayData[1];String value jedis.hget(AsyncReadUser_Redis, name);System.out.println(查询redis结果 value);return new User(value);}}).thenAccept((User dbResult) - {// 设置请求完成时的回调将结果返回resultFuture.complete(Collections.singleton(dbResult));});}// 连接超时的时候调用的方法Overridepublic void timeout(String input, ResultFutureUser resultFuture) throws Exception {System.out.println(redis connect timeout!);}Overridepublic void close() throws Exception {super.close();if (jedis.isConnected()) {jedis.close();}}}
3、使用示例
package org.datastreamapi.source.custom.redis;import java.util.concurrent.TimeUnit;import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;/*** author alanchan**/
public class TestCustomRedisSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// id,nameDataStreamSourceString lines env.fromElements(1,alan, 2,alanchan, 3,alanchanchn, 4,alan_chan, 5,alan_chan_chn);SingleOutputStreamOperatorString result AsyncDataStream.orderedWait(lines, new CustomRedisSource(), 10, TimeUnit.SECONDS, 1);SingleOutputStreamOperatorUser result2 AsyncDataStream.orderedWait(lines, new CustomRedisSource2(), 10, TimeUnit.SECONDS, 1);result.print(result--).setParallelism(1);result2.print(result2--).setParallelism(1);env.execute();}
}
4、验证
1、准备redis环境数据
hset AsyncReadUser_Redis alan 1,alan,18,20,alan.chan.chn163.com
hset AsyncReadUser_Redis alanchan 2,alanchan,19,25,alan.chan.chn163.com
hset AsyncReadUser_Redis alanchanchn 3,alanchanchn,20,30,alan.chan.chn163.com
hset AsyncReadUser_Redis alan_chan 4,alan_chan,27,20,alan.chan.chn163.com
hset AsyncReadUser_Redis alan_chan_chn 5,alan_chan_chn,36,10,alan.chan.chn163.com127.0.0.1:6379 hset AsyncReadUser_Redis alan 1,alan,18,20,alan.chan.chn163.com
(integer) 1
127.0.0.1:6379 hset AsyncReadUser_Redis alanchan 2,alanchan,19,25,alan.chan.chn163.com
(integer) 1
127.0.0.1:6379 hset AsyncReadUser_Redis alanchanchn 3,alanchanchn,20,30,alan.chan.chn163.com
(integer) 1
127.0.0.1:6379 hset AsyncReadUser_Redis alan_chan 4,alan_chan,27,20,alan.chan.chn163.com
(integer) 1
127.0.0.1:6379 hset AsyncReadUser_Redis alan_chan_chn 5,alan_chan_chn,36,10,alan.chan.chn163.com
(integer) 1
127.0.0.1:6379 hgetall AsyncReadUser_Redis1) alan2) 1,alan,18,20,alan.chan.chn163.com3) alanchan4) 2,alanchan,19,25,alan.chan.chn163.com5) alanchanchn6) 3,alanchanchn,20,30,alan.chan.chn163.com7) alan_chan8) 4,alan_chan,27,20,alan.chan.chn163.com9) alan_chan_chn
10) 5,alan_chan_chn,36,10,alan.chan.chn163.com
2、启动应用程序并观察控制台输出
输入查询条件5,alan_chan_chn
输入参数input----:2,alanchan
输入参数input----:5,alan_chan_chn
输入查询条件3,alanchanchn
输入查询条件1,alan
输入参数input----:1,alan
输入查询条件2,alanchan
输入查询条件4,alan_chan
输入参数input----:4,alan_chan
输入参数input----:3,alanchanchn
查询结果output----:3,alanchanchn,20,30,alan.chan.chn163.com
查询redis结果1,alan,18,20,alan.chan.chn163.com
查询结果output----:1,alan,18,20,alan.chan.chn163.com
查询redis结果4,alan_chan,27,20,alan.chan.chn163.com
查询redis结果2,alanchan,19,25,alan.chan.chn163.com
查询结果output----:2,alanchan,19,25,alan.chan.chn163.com
查询redis结果3,alanchanchn,20,30,alan.chan.chn163.com
查询结果output----:4,alan_chan,27,20,alan.chan.chn163.com
查询结果output----:5,alan_chan_chn,36,10,alan.chan.chn163.com
查询redis结果5,alan_chan_chn,36,10,alan.chan.chn163.com
result-- 4,alan_chan,27,20,alan.chan.chn163.com
result-- 5,alan_chan_chn,36,10,alan.chan.chn163.com
result-- 3,alanchanchn,20,30,alan.chan.chn163.com
result-- 2,alanchan,19,25,alan.chan.chn163.com
result-- 1,alan,18,20,alan.chan.chn163.com
result2-- CustomRedisSource.User(id4, namealan_chan, age27, balance4.0)
result2-- CustomRedisSource.User(id1, namealan, age18, balance1.0)
result2-- CustomRedisSource.User(id3, namealanchanchn, age20, balance3.0)
result2-- CustomRedisSource.User(id5, namealan_chan_chn, age36, balance5.0)
result2-- CustomRedisSource.User(id2, namealanchan, age19, balance2.0)
以上本文主要介绍Flink 的redis作为数据源的异步读取使用示例。
如果需要了解更多内容可以在本人Flink 专栏中了解更新系统的内容。
本专题分为以下几篇文章 【flink番外篇】3、fflink的source内置、mysql、kafka、redis、clickhouse介绍及示例1 - File、Socket、Collection 【flink番外篇】3、fflink的source内置、mysql、kafka、redis、clickhouse介绍及示例2- 自定义、mysql 【flink番外篇】3、flink的source内置、mysql、kafka、redis、clickhouse介绍及示例3- kafka 【flink番外篇】3、flink的source内置、mysql、kafka、redis、clickhouse介绍及示例4- redis -异步读取 【flink番外篇】3、flink的source内置、mysql、kafka、redis、clickhouse介绍及示例5- clickhouse 【flink番外篇】3、flink的source内置、mysql、kafka、redis、clickhouse介绍及示例 - 完整版