当前位置: 首页 > news >正文

顺德网站建设如何镇江唐唐网络科技有限公司

顺德网站建设如何,镇江唐唐网络科技有限公司,织梦dedecms网站内容页,免费建站建设网站目录 举个例子 连接器 下载连接器#xff08;connector#xff09;和格式#xff08;format#xff09;jar 包 依赖管理 如何使用连接器 举个例子 StreamExecutionEnvironment集成了DataStream API#xff0c;通过额外的函数扩展了TableEnvironment。 下面代码演示两…目录 举个例子 连接器 下载连接器connector和格式formatjar 包 依赖管理 如何使用连接器 举个例子 StreamExecutionEnvironment集成了DataStream API通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.common.typeinfo import Typesenv StreamExecutionEnvironment.get_execution_environment() t_env StreamTableEnvironment.create(env) # create a DataStream ds env.from_collection([Alice, Bob, John], Types.STRING())# interpret the insert-only DataStream as a Table t t_env.from_data_stream(ds)# register the Table object as a view and query it t_env.create_temporary_view(InputTable, t) res_table t_env.sql_query(SELECT UPPER(f0) FROM InputTable)# interpret the insert-only Table as a DataStream again res_ds t_env.to_data_stream(res_table)# add a printing sink and execute in DataStream API res_ds.print()env.execute() TableEnvironment将采用StreamExecutionEnvironment所有的配置选项。 建议在转换为Table API之前设置DataStream API的所有配置选项如下代码。 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.datastream.checkpointing_mode import CheckpointingMode# create Python DataStream API env StreamExecutionEnvironment.get_execution_environment()# set various configuration early env.set_max_parallelism(256)env.get_config().add_default_kryo_serializer(type_class_name, serializer_class_name) env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)# then switch to Python Table API t_env StreamTableEnvironment.create(env) # set configuration early t_env.get_config().set_local_timezone(Europe/Berlin)# start defining your pipelines in both APIs... 连接器 下载连接器connector和格式formatjar 包 由于Flink是一个基于 Java/Scala 的项目连接器connector和格式format的实现是作为 jar 包存在的 要在 PyFlink 作业中使用首先需要将其指定为作业的依赖。 如果使用第三方JAR可以在Python Table API中指定JAR如下所示 table_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar) or table_env.get_config().get_configuration().set_string(pipeline.classpaths, file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar) 依赖管理 需要在Python API程序中使用依赖项。例如Python用户自定义函数中使用第三方Python库。此外在用机器学习模型预测等场景中用户可能希望在Python自定义函数中加载机器学习模型。 当PyFlink作业在本地执行时可以将第三方Python库安装到本地Python环境中将机器学习模型下载到本地等等。 然而当用户想要将PyFlink任务提交到远程集群时这种方法并不奏效。 除了Table API 在Python DataStream API中则如下配置 stream_execution_environment.add_jars(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar) stream_execution_environment.add_jars(file:///E:/my/jar/path/connector1.jar, file:///E:/my/jar/path/connector2.jar) # NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the # URLs are accessible on both the client and the cluster. stream_execution_environment.add_classpaths(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar) 如何使用连接器 在 PyFlink Table API 中DDL 是定义 source 和 sink 比较推荐的方式这可以通过 TableEnvironment 中的 execute_sql() 方法来完成然后就可以在作业中使用这张表了。 --下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。 from pyflink.table import TableEnvironment, Environmentsettingsdef log_processing():env_settings Environmentsettings.in_streaming_mode()t_env TableEnvironment.create(env_settings)# specify connector and format jarst_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar)source_ddl CREATE TABLE source_table(a VARCHAR,b INT) WITH (connector kafka,topic source_topic,properties.bootstrap.servers kafka:9092,properties.group.id test_3,scan.startup.mode latest-offset,format json)sink_ddl CREATE TABLE sink_table(a VARCHAR) WITH (connector kafka,topic sink_topic,properties.bootstrap.servers kafka:9092,format json)t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query(SELECT a FROM source_table) \.execute_insert(sink_table).wait()if __name__ __main__:log_processing()
http://www.yutouwan.com/news/312696/

相关文章:

  • 12306的网站多少钱做的深圳网络营销模式
  • 中国航天建设集团有限公司网站合肥比较好的设计公司
  • 江苏集团网站建设自助建站之星
  • 地方网站怎么做的做租凭网站是经营性吗
  • 网站模块怎么恢复巴中城乡建设官方网站
  • 佛山行业网站设计我想网上做网站
  • 重庆制作网站域名注册服务商网站
  • 系统开发北京网站建设菏泽做网站建设找哪家
  • thinkphp做网站快吗如何创建一家公司
  • 部门网站建设管理经验交流材料wordpress撰写设置
  • 福州网站建设熊掌号正规接单赚佣金的app
  • 跨境电商平台网站建设做网站 被谷歌收录
  • 建设美团网站wordpress get_most_viewed
  • 百度网站公司信息推广怎么做的网站后台登陆界面模板
  • 备案网站查询深圳旅游网站开发
  • 做艺人资料卡的网站最近实时热点新闻事件
  • 村级网站建设 不断增强苏州360推广 网站建设
  • 宜兴市做网站外贸服装接单网站
  • 网站上线 邮件群发模板网站设计就业培训
  • 阿里云主机网站开发WordPress网站被恶意登录
  • 郑州h5网站建设好用的ppt模板网站
  • 栖霞企业网站建设网站开发老是弹广告
  • 建设网站群的意义动态个人网站模板下载
  • 电子商务网站建设与维护题库目前流行的网站开发技术
  • 电商平台网站建设合同建站与备案的具体流程是什么
  • html5网站布局教程安阳市网站建设
  • 网站对于一个企业的优势展厅设计费取费标准一览表
  • 安徽省住房和城乡建设厅门户网站企业网站源码模板
  • 罗湖网站的建设长沙个人做网站
  • 武威 网站建设电商网站通用左侧弹出导航