当前位置: 首页 > news >正文

深圳海外医疗网站建设wordpress数据库地址

深圳海外医疗网站建设,wordpress数据库地址,比较冷门的视频网站做搬运,网站如何增加百度权重的方法水善利万物而不争#xff0c;处众人之所恶#xff0c;故几于道#x1f4a6; 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 … 水善利万物而不争处众人之所恶故几于道 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 fromCollection(waterSensors) public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ListWaterSensor waterSensors Arrays.asList(new WaterSensor(ws_001, 1577844001L, 45),new WaterSensor(ws_002, 1577844015L, 43),new WaterSensor(ws_003, 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 2. 从本地文件中读取数据 readTextFile(“input/words.txt”)支持相对路径和绝对路径 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile(input/words.txt).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}运行结果 3. 从HDFS中读取数据 readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”) 要先在pom文件中添加hadoop-client依赖 dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/version /dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile(hdfs://hadoop101:8020/flink/data/words.txt).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 4. 从Socket中读取数据 socketTextStream(“hadoop101”,9999)这个输入源不支持多个并行度。 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据 windows中 nc -lp 9999 Linux nc -lk 9999env.socketTextStream(hadoop101,9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 5. 从Kafka中读取数据 addSource(new FlinkKafkaConsumer(“flink_source_kafka”,new SimpleStringSchema(),properties)) 第一个参数是topic 第二个参数是序列化器序列化器就是在Kafka和flink之间转换数据 - 官方注释The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。 第三个参数是Kafka的配置。 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties new Properties();// 设置集群地址properties.setProperty(bootstrap.servers, hadoop101:9092,hadoop102:9092,hadoop103:9092);// 设置所属消费者组properties.setProperty(group.id, flink_consumer_group);env.addSource(new FlinkKafkaConsumer(flink_source_kafka,new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 6. 自定义Source addSource(new XXXX()) 大多数情况下前面的数据源已经能够满足需要但是难免会存在特殊情况的场合所以flink也提供了能自定义数据源的方式. public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}} }自定义数据源需要定义一个类然后实现SourceFunction接口然后实现其中的两个方法run和cancelrun方法包含具体读数据的逻辑当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止 public class RandomWatersensor implements SourceFunctionWaterSensor {private Boolean running true;Overridepublic void run(SourceContextWaterSensor sourceContext) throws Exception {Random random new Random();while (running){sourceContext.collect(new WaterSensor(sensor random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/Overridepublic void cancel() {running false;}}运行结果 demo2 - 自定义从socket中读取数据 public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource(hadoop102, 9999)).print();env.execute();}public static class MySource implements SourceFunctionWaterSensor {private String host;private int port;private volatile boolean isRunning true;private Socket socket;public MySource(String host, int port) {this.host host;this.port port;}Overridepublic void run(SourceContextWaterSensor ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket new Socket(host, port);BufferedReader reader new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line null;while (isRunning (line reader.readLine()) ! null) {String[] split line.split(,);ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/Overridepublic void cancel() {isRunning false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}} } /* sensor_1,1607527992000,20 sensor_1,1607527993000,40 sensor_1,1607527994000,50*/
http://www.huolong8.cn/news/442715/

相关文章:

  • 福建厦门网站建设网站背景大小
  • 毕设做网站是不是太low网易梦幻西游网页版
  • 安康免费做网站公司网站标题优化技巧
  • 哪个网可以网站备案做效果图常用的网站有哪些软件
  • 宿州外贸网站建设公司返利网网站框架目录
  • 长春个人做网站免费网站推广软件哪个好
  • 建设网站还不如搬砖wordpress 切换中文
  • 怎样给自己的店子做网站六安做网站
  • 超级链接网站模板常州模板建站代理
  • 我想注册网站我怎么做做网站有什么好的推荐
  • 成都网站商城建设公众号开发需要学什么
  • 车床加工东莞网站建设杭州津伟网络科技有限公司
  • 化妆培训学校网站源码 下载做电影网站需要的服务器配置
  • 网站 方案建站怎么赚钱
  • 渭南哪里做网站石家庄物流网站建设
  • 阿里巴巴做外贸的网站东莞英文建站公司
  • 用ip地址做网站临沂网站建设设计公司
  • 企业网站建设的思路房屋不动产查询官网
  • 找生产厂家的网站百度sem竞价推广电子书
  • 漳州网站建设去博大a优南京百度seo
  • 和建设银行类似的网站中文手机编程软件app
  • 网站外包项目两人合伙做网站但不准备开公司
  • 梁山专做网站的公司小说网站做编辑
  • 宁夏建设监理协会网站可以做图片视频的网站
  • 河北城乡建设网站临沭县建设局官方网站
  • 沧州南皮网站建设call_user_func_array() wordpress
  • 什么网站容易做百度权重戴尔公司网站建设成功的关键
  • 三河市建设厅公示网站python编程下载
  • 广告网站怎么做网页小游戏插件不支持
  • 太和县建设局网站学会计哪个培训机构比较正规