深圳海外医疗网站建设,wordpress数据库地址,比较冷门的视频网站做搬运,网站如何增加百度权重的方法水善利万物而不争#xff0c;处众人之所恶#xff0c;故几于道#x1f4a6; 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据
… 水善利万物而不争处众人之所恶故几于道 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据
fromCollection(waterSensors)
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ListWaterSensor waterSensors Arrays.asList(new WaterSensor(ws_001, 1577844001L, 45),new WaterSensor(ws_002, 1577844015L, 43),new WaterSensor(ws_003, 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
2. 从本地文件中读取数据
readTextFile(“input/words.txt”)支持相对路径和绝对路径
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile(input/words.txt).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}运行结果
3. 从HDFS中读取数据
readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)
要先在pom文件中添加hadoop-client依赖
dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/version
/dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile(hdfs://hadoop101:8020/flink/data/words.txt).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
4. 从Socket中读取数据
socketTextStream(“hadoop101”,9999)这个输入源不支持多个并行度。
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据 windows中 nc -lp 9999 Linux nc -lk 9999env.socketTextStream(hadoop101,9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
5. 从Kafka中读取数据
addSource(new FlinkKafkaConsumer(“flink_source_kafka”,new SimpleStringSchema(),properties))
第一个参数是topic
第二个参数是序列化器序列化器就是在Kafka和flink之间转换数据 - 官方注释The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。
第三个参数是Kafka的配置。
public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties new Properties();// 设置集群地址properties.setProperty(bootstrap.servers, hadoop101:9092,hadoop102:9092,hadoop103:9092);// 设置所属消费者组properties.setProperty(group.id, flink_consumer_group);env.addSource(new FlinkKafkaConsumer(flink_source_kafka,new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}运行结果
6. 自定义Source
addSource(new XXXX()) 大多数情况下前面的数据源已经能够满足需要但是难免会存在特殊情况的场合所以flink也提供了能自定义数据源的方式.
public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}自定义数据源需要定义一个类然后实现SourceFunction接口然后实现其中的两个方法run和cancelrun方法包含具体读数据的逻辑当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止
public class RandomWatersensor implements SourceFunctionWaterSensor {private Boolean running true;Overridepublic void run(SourceContextWaterSensor sourceContext) throws Exception {Random random new Random();while (running){sourceContext.collect(new WaterSensor(sensor random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/Overridepublic void cancel() {running false;}}运行结果 demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource(hadoop102, 9999)).print();env.execute();}public static class MySource implements SourceFunctionWaterSensor {private String host;private int port;private volatile boolean isRunning true;private Socket socket;public MySource(String host, int port) {this.host host;this.port port;}Overridepublic void run(SourceContextWaterSensor ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket new Socket(host, port);BufferedReader reader new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line null;while (isRunning (line reader.readLine()) ! null) {String[] split line.split(,);ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/Overridepublic void cancel() {isRunning false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/