当前位置: 首页 > news >正文

网站开发做什么的h5是什么网站上面做的

网站开发做什么的,h5是什么网站上面做的,设计有什么网站推荐,公司官网怎么编辑大家好#xff0c;我们是 BTC.com 团队。2020 年#xff0c;我们有幸接触到了 Flink 和 PyFlink 生态#xff0c;从团队自身需求出发#xff0c;完善了团队内实时计算的任务和需求#xff0c;搭建了流批一体的计算环境。 在实现实时计算的过程中#xff0c;我们在实践中…大家好我们是 BTC.com 团队。2020 年我们有幸接触到了 Flink 和 PyFlink 生态从团队自身需求出发完善了团队内实时计算的任务和需求搭建了流批一体的计算环境。 在实现实时计算的过程中我们在实践中收获了一些经验在此分享一些这方面的心路历程。主要分享的大纲如下 困惑 • 描述 • 思考 • 行动流批一体的架构架构效果Zeppelin、PyFlink on K8S 等实践ZeppelinPyFlink on K8S区块链领域实践展望 • 总结 01 困惑 • 描述 • 思考 • 行动 作为工程师我们每天都在不断地了解需求研发业务。 有一天我们被拉到了一次团队总结会议上收到了以下的需求 销售总监 A 我们想要知道销售的历史和实时转化率、销售额能不能统计一下实时的 TOP5 的商品还有就是大促时候用户实时访问、商品实时浏览量 TOP5 的情况呢可以根据他历史访问的记录实时推荐相关的吗 市场总监 B 我们想要知道市场推广的效果每次活动的实时数据不然我们的市场投放无法准确评估效果及时反馈啊。 研发总监 C 有些用户的 Bug 无法复现日志可以再实时一点吗传统日志分析需要一定的梳理可不可以直接清洗 / 处理相关的数据 采购总监 D 这些年是不是流行数字化采购这边想预测采购需求做一下实时分类和管理支出预测未来供应来源完善一下成本。这个有办法做吗还有有些供应商不太稳定啊能监控到他们的情况吗 运维总监 E 网站有时候访问比较慢没有地方可以看到实时的机器情况搞个什么监控大屏这个有办法解决吗 部门领导 F 可以实现上面的人的需求吗。 做以上的了解之后才发现大家对于数据需求的渴望程度使用方不仅需要历史的数据而且还需要实时性的数据。 在电商、金融、制造等行业数据有着迅猛的增长诸多的企业面临着新的挑战数据分析的实时处理框架比如说做一些实时数据分析报表、实时数据处理计算等。 和大多数企业类似在此之前我们是没有实时计算这方面的经验和积累的。这时就开始困惑了怎样可以更好地做上面的需求在成本和效果之间取得平衡如何设计相关的架构 穷则思变在有了困惑以后我们就开始准备梳理已有的条件和我们到底需要什么。 我们的业务范围主要在区块链浏览器与数据服务、区块链矿池、多币种钱包等。在区块链浏览器的业务里BTC.com 目前已是全球领先的区块链数据服务平台矿池业务在业内排行第一区块链浏览器也是全球前三大浏览器之一。 首先我们通过 parser 解析区块链上的数据得到各方面的数据信息可以分析出每个币种的地址活跃度、地址交易情况、交易流向、参与程度等内容。目前BTC.com 区块链浏览器与行业内各大矿池和交易所等公司都有相关合作可以更好地实现一些数据的统计、整理、归纳、输出等。 面向的用户不仅有专业的区块链开发人员也有各样的 B 端和 C 端用户C 端用户可以进行区块链地址的标注智能合约的运行查看智能合约相关内容等以及链上数据的检索和查看。B 端用户则有更专业的支持和指导提供 API、区块链节点等一些的定制以及交易加速、链上的业务合作、数据定制等。 从数据量级来讲截至目前比特币大概有 5 亿笔交易3000 多万地址22 亿输出output每笔交易的输出并且还在不断增长中。以太坊的话则更多。而 BTC.com 的矿池和区块链浏览器都支持多币种各币种的总数据量级约为几十 T。 矿池是矿工购买矿机设备后连接到的服务平台矿工可以通过连接矿池从而获取更稳定的收益。这是一个需要保证 7 * 24 小时稳定的服务里面有矿机不断地提交其计算好的矿池下发的任务的解矿池将达到网络难度的解进行广播。这个过程也可以认为是近乎是实时的矿机通过提交到服务器服务器内部再提交到 Kafka 消息队列同时有一些组件监听这些消息进行消费。而这些提交上来的解可以从中分析出矿机的工作状态、算力、连接情况等。 在业务上我们需要进行历史数据和实时数据的计算。 历史数据要关联一些币价历史交易信息而这些交易信息需要一直保存是一种典型的批处理任务。 每当有新区块的确认就有一些数据可以得到处理和分析比如某个地址在这个区块里发生了一笔交易那么可以从其交易流向去分析是什么样的交易挖掘交易相关性。或者是在这个区块里有一些特殊的交易比如 segwit 的交易、比如闪电网络的交易就是有一些这个币种特有的东西可以进行解析分析和统计。并且在新区块确认时的难度预测也有所变化。 还有就是大额交易的监控通过新区块的确认和未确认交易锁定一些大额交易结合地址的一些标注锁定交易流向更好地进行数据分析。 还有是一些区块链方面的 OLAP 方面的需求。 总结了在数据统计方面的需求和问题以后我们就开始进行思考什么是最合适的架构如何让人员参与少、成本低 解决问题无非就是提出假设通过度量然后刷新认知。 在浏览了一些资料以后我们认为大部分的计算框架都是通过输入进行处理然后得到输出。首先我们要获取到数据这里数据可以从 MySQL 也可以从 Kafka然后进行计算这里计算可以是聚合也可以是 TOP 5 类型的在实时的话可能还会有窗口类型的。在计算完之后将结果做下发下发到消息渠道和存储发送到微信或者钉钉落地到 MySQL 等。 团队一开始尝试了 Spark搭建了 Yarn使用了 Airflow 作为调度框架通过做 MySQL 的集成导入开发了一些批处理任务有着离线任务的特点数据固定、量大、计算周期长需要做一些复杂操作。 在一些批处理任务上这种架构是稳定的但是随着业务的发展有了越来越多的实时的需求并且实时的数据并不能保证按顺序到达按时间戳排序消息的时间字段是允许前后有差距的。在数据模型上需求驱动式的开发成本相对来说Spark 的方式对于当时来说较高对于状态的处理不是很好导致影响一部分的效率。 其实在 2019 年的时候就有在调研一些实时计算的事情关注到了 Flink 框架当时还是以 Java 为主整体框架概念上和 Spark 不同认为批处理是一种特殊的流但是因为团队没有 Java 方面的基因和沉淀使用 Flink 作为实时计算的架构在当时就暂告一个段落。 在 2020 年初的时候不管是 Flink 社区 还是 InfoQ还是 B 站都有在推广 PyFlink而且当时尤其是程鹤群[1]和孙金城[2]的视频以及孙金城老师的博客[3]的印象深刻。于是就想尝试 PyFlink其有着流批一体的优势而且还支持 Python 的一些函数支持 pandas甚至以后还可以支持 Tensorflow、Keras这对我们的吸引力是巨大的。在之后就在构思我们的在 PyFlink 上的流批一体的架构。 02 流批一体的架构 架构 首先我们要梳理数据要清楚数据从哪里来。在以 Spark 为主的时期数据是定期从数据源加载增量数据通过一定的转换逻辑然后写入目的地由于数据量和业务需要延迟通常在小时级别而实时的话需要尽可能短的延迟因此将数据源进行了分类整体分成了几部分一部分是传统的数据我们存放在 MySQL 持久化做保存这部分之后可以直接作为批处理的计算也可以导入 Hive做进一步的计算。实时的部分实际上是有很多思路一种方式是通过 MySQL 的 Binlog 做解析还有就是 MySQL 的 CDC 功能在多方考量下最后我们选择了 Kafka不仅是因为其是优秀的分布式流式平台而且团队也有对其的技术沉淀。 并且实际上在本地开发的时候安装 Kafka 也比较方便只需要 Brew Install Kafka而且通过 Conduktor 客户端也可以方便的看到每个 Topic 的情况。于是就对现有的 Parser 进行改造使其支持 Kafka在当收到新的区块时会立即向 Kafka 发送一个消息然后进行处理。 大概是在 2018 年的时候团队将整体的业务迁移到了 Kubernetes 上在业务不断发展的过程中其对开发和运维上来说减轻了很多负担所以建议有一定规模的业务最好是迁移到 Kubernetes其对成本的优化DevOps以及高可用的支持都是其他平台和传统方式无法比拟的。 在开发作业的过程中我们在尽可能的使用 Flink SQL同时结合一些 Java、Python 的 UDF、UDAF、UDTF。每个作业通过初始化类似于以下的语句形成一定的模式 self.source_ddl CREATE TABLE SourceTable (xxx int) WITH self.sink_ddl CREATE TABLE SinkTable (xxx int) WITH self.transform_ddl INSERT INTO SinkTableSELECT udf(xxx)FROM SourceTableGROUP BY FROM_UNIXTIME(timestamp, yyyyMMdd)在未来的话会针对性地将数据进行分层按照业界通用的 ODS、DWD、DWS、ADS分出原始层明细层和汇总层进一步做好数据的治理。 效果 最终我们团队基于 PyFlink 开发快速地完成了已有的任务部分是批处理作业处理过去几天的数据部分是实时作业根据 Kafka 的消息进行消费目前还算比较稳定。 部署时选择了 Kubernetes具体下面会进行分享。在 K8S 部署了 Jobmanager 和 Taskmanager并且使用 Kubernetes 的 job 功能作为批处理作业的部署之后考虑接入一些监控平台比如 Prometheus 之类的。 在成本方面由于是使用的 Kubernetes 集群因此在机器上只有扩展主机的成本在这种方式上成本要比传统的 Yarn 部署方式要低并且之后 Kuberntes 会支持原生部署在扩展 Jobmanager 和 Taskmanager 上面会更加方便。 03 Zeppelin、PyFlink on K8S 等实践 Zeppelin 是我们用来进行数据探索和逻辑验证有些数据在本地不是真实数据利用 Zeppelin 连接实际的链上数据进行计算的逻辑验证当验证完成后便可转换成生产需要的代码进行部署。 一、Kubernetes 上搭建 PyFlink 和 Zeppelin 1. 整理后的部署 Demo 在 github可以参阅相关链接[4]。2. 关于配置文件修改以下配置的作用。 (1). 修改 configmap 的 flink-conf.yaml 文件的 taskmanager 配置。 taskmanager.numberOfTaskSlots: 10 调整 Taskmanager 可以调整运行的 job 的数量。 (2). 在 Zeppelin 的 dockerfile 中修改 zeppelin-site.xml 文件。 cp conf/zeppelin-site.xml.template conf/zeppelin-site.xml; \ sed -i s#value127.0.0.1/value#value0.0.0.0/value#g conf/zeppelin-site.xml; \ sed -i s#valueauto/value#valuelocal/value#g conf/zeppelin-site.xml 修改请求来源为 0.0.0.0如果是线上环境建议开启白名单加上 auth 认证。修改 interpreter 的启动模式为 localauto 会导致在集群启动时以 K8s 的模式启动目前 K8s 模式只支持 Sparklocal 模式可以理解为Zeppelin 将在本地启动一个连接 Flink 的 interpreter 进程。Zeppelin 和在本地提交 Flink 作业类似也需要 PyFlink 的基础环境所以需要将 Flink 对应版本的 jar 包放入镜像内。 3. Zeppelin 的 ingress 中添加 websocket 配置。 nginx.ingress.kubernetes.io/configuration-snippet: |proxy_set_header Upgrade websocket;proxy_set_header Connection Upgrade; Zeppelin 在浏览器需要和 server 端建立 socket 连接需要在 ingress 添加 websocket 配置。 4.Flink 和 Zeppelin 数据持久化的作用。 volumeMounts: - mountPath: /zeppelin/notebook/name: data volumes: - name: datapersistentVolumeClaim:claimName: zeppelin-pvc --- apiVersion: v1 kind: PersistentVolumeClaim metadata:name: zeppelin-pvc spec:storageClassName: efs-scaccessModes:- ReadWriteOnceresources:requests:storage: 1Gi 对 Flink 的 /opt/flink/lib 目录做持久化的目的是当我们需要新的 jar 包时可以直接进入 Flink 的 pod 进行下载并存放到 lib 目录保证 jobmanager 和 taskmanager 的 jar 版本一致同时也无需更换镜像。Zeppelin 的任务作业代码会存放在 /zeppelin/notebook/ 目录下目的是方便保存编写好的代码。 5. Flink 命令提交 job 作业的方式。 (1). 本地安装 PyFlinkPython 需要3.5及以上版本。 $ pip3 install apache-flink1.11.1 (2). 测试 Demo def word_count():env StreamExecutionEnvironment.get_execution_environment()t_env StreamTableEnvironment.create(env,environment_settingsEnvironmentSettings.new_instance().use_blink_planner().build())sink_ddl create table Results (word VARCHAR, count BIGINT) with ( connector print)t_env.sql_update(sink_ddl)elements [(word, 1) for word in content.split( )]# 这里也可以通过 Flink SQLt_env.from_elements(elements, [word, count]) \.group_by(word) \.select(word, count(1) as count) \.insert_into(Results)t_env.execute(word_count)if __name__ __main__:logging.basicConfig(streamsys.stdout, levellogging.INFO, format%(message)s)word_count() 或者是实时处理的 Demo def handle_kafka_message():s_env StreamExecutionEnvironment.get_execution_environment()# s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)s_env.set_parallelism(1)st_env StreamTableEnvironment \.create(s_env, environment_settingsEnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())source_ddl CREATE TABLE SourceTable (word string) WITH (connector.type kafka,connector.version universal,connector.topic Topic,connector.properties.bootstrap.servers localhost:9092,connector.properties.zookeeper.connect localhost:2121,format.type json,format.derive-schema true)sink_ddl create table Results (word VARCHAR) with (connector print)st_env.sql_update(sink_ddl)st_env.sql_update(source_ddl)st_env.from_path(source).insert_into(sink)st_env.execute(KafkaTest)if __name__ __main__:handle_kafka_message() (3). 本地测试 Flink 命令提交 job 作业。 $ flink run -m localhost:8081 -py word_count.py python/table/batch/word_count.py Job has been submitted with JobID 0a31b61c2f974bcc3f344f57829fc5d5 Program execution finished Job with JobID 0a31b61c2f974bcc3f344f57829fc5d5 has finished. Job Runtime: 741 ms (4). 如果存在多个 Python 文件可以先 zip 打包后再进行提交作业。 $ zip -r flinkdemo.zip ./* $ flink run -m localhost:8081 -pyfs flinkdemo.zip -pym main (5). Kubernetes 通过集群的 CronJob 定时调度来提交 Job之后会做自研一些 UI 后台界面做作业管理与监控。 04 在区块链领域实践 随着区块链技术的越来越成熟应用越来越多行业标准化、规范化的趋势也开始显现也越来越依赖于云计算、大数据毕竟是数字经济的产物。BTC.com 也在扎根于区块链技术基础设施为各类公司各类应用提供数据和业务上的支持。 近些年有个词火遍了 IT 业界中台不管是大公司还是创业公司都喜欢扯上这个概念号称自己业务中台数据中台等。我们的理解中中台是一种整合各方面资源的能力从传统的单兵作战到提升武器装备后勤保障提升作战能力。在数据上打破数据孤岛在需求快速变化的前台和日趋稳定的后台中取得平衡。而中台更重要的是服务最终还是要回馈到客户回馈到合作伙伴。 在区块链领域BTC.com 有着深厚的行业技术积累可以提供各方面数据化的能力。比如在利用机器学习进行链上数据的预估预估 eth 的 gas price还有最佳手续费等利用 keras 深度学习的能力进行一些回归计算在之后也会将 Flink、机器学习和区块链结合起来对外提供更多预测类和规范化分类的数据样本之前是在用定时任务不断训练模型与 Flink 结合之后会更加实时。在这方面以后也会提供更多的课题比如币价与 Defi舆情市场等的关系区块链地址与交易的标注和分类。甚至于将机器学习训练的模型放于 IPFS 网络中通过去中心化的代币进行训练提供方便调用样本和模型的能力。 在目前BTC.com 推出了一些通过数据挖掘实现的能力包括交易推送、OLAP 链上分析报表等改善和提升相关行业和开发者实际的体验。我们在各种链上都有监控节点监控各区块链网络的可用性、去中心化程度监控智能合约。在接入一些联盟链、隐私加密货币可以为联盟链、隐私加密货币提供这方面的数据能力。 BTC.com 将为区块链产业生态发展做出更多努力以科技公司的本质以技术发展为第一驱动力以市场和客户为导向开发创新和融合应用做好基础设施。 05 展望与总结 从实时计算的趋势到流批一体的架构通过对 PyFlink 和 Flink 的学习稳定在线上运行了多种作业任务对接了实际业务需求。并且搭建了 Zeppelin 平台使得业务开发上更加方便。在计算上尽可能地依赖 SQL方便各方面的集成与调试。 在社区方面PyFlink 也是没有令我们失望的较快的响应能力不断完善的文档。在 Confluence[5]上也可以看到一些 Flink Improvement Proposals其中也有一些是 PyFlink 相关的在不远的将来还会支持 Pandas UDAFDataStream APIML API也期望在之后可以支持 Joblistener总之在这里也非常感谢相关团队。 未来的展望总结起来就是通过业务实现数据的价值化。而数据中台的终局是将数据变现。 原文链接 本文为阿里云原创内容未经允许不得转载。
http://www.huolong8.cn/news/441277/

相关文章:

  • 网站备案要求网站使用说明书模板
  • 怎么做网站推广图片凡科网站案例
  • 中国网站开发公司排名营销型网站建设要求
  • 公司建设网站注意事项哪些企业网站做的不错
  • 广告网站建设最专业网站软件开发招聘
  • 国际网站平台有哪些提高wordpress+权重
  • 百度快照 如何抓取网站wordpress中文标签云插件
  • 什么样的网站可以做外链北京做百度网站
  • 国外网站怎么注册wordpress怎么添加二级域名
  • 急求聊城网站建设施工企业安全生产评价标准jgjt77破解版
  • 做文字的网站网站建设观点
  • 专业的模板建站企业seo是什么意思呢
  • 个人博客网站源码建设通官网app下载
  • 网站别人做的我自己怎么续费wordpress插件微信
  • 如何看网站的ftp.net 手机网站源码下载
  • 陕西建设分行网站外贸网站建站平台
  • 网站建设方案书域名备案国内做的好的网站
  • 阜阳做网站的网络公司网站建设的流程图示
  • 广西区建设厅网站网站站群建设
  • 电子商务做网站wordpress 自动备份
  • 网站建设需要注意那些点扬州seo招聘
  • c 做网站教程wordpress 文章合集
  • 网站建设工作会议上的讲话网站建设就找奇思网络
  • 网站建设目标分析wordpress 刷浏览量
  • 番禺定制型网站建设快懂百科登录入口
  • 可以做课程的网站公司免费网页怎么制作
  • 网站建设 利润地坪漆东莞网站建设技术支持
  • 网络推广工作好干吗网站seo主管招聘
  • 凡科网制作网站教程建立公司的流程
  • 沈阳营销型网站建设上海企业信用信息公示系统