一站式服务:Kubernetes+Kafka+Flink

数据智能相依偎 2024-01-13 19:27:10
这是一个关于如何在 Kubernetes 中使用 Apache Kafka 连接器设置 Apache Flink 的实践教程。本教程的目标是将事件推送到 Kafka,在 Flink 中处理它,然后在单独的主题上将处理后的事件推送回 Kafka。本指南不会深入研究任何工具,因为有很多关于这些主题的优质资源。这里的重点只是让它启动并运行! 您可以通过克隆此git 存储库来进行操作 这就是我们要做的: 将 Kafka 和 Flink 部署到 Kubernetes将作业部署到 Flink生成一些数据微K8s在这些示例中使用了MicroK8 。按照他们的文档进行设置。 不要忘记启用一些必需的扩展: microk8s enable dns storage当 Kubernetes 在本地设置完毕后,您就可以开始了! 设置 Apache Kafka为了在 Kubernetes 上运行 Kafka,此设置中使用了Strimzi 。Strimzi 简化了 kafka 集群的整体管理。Strimzi 提供了一些操作符来管理 Kafka 和相关组件。就本指南而言,详细信息不太相关,但如果您有兴趣,可以在此处阅读有关 Strimzi 的更多信息: 概述快速入门指南将 Kafka 部署到 Kubernetes部署分两步完成: 安装Strimzi配置 Kafka 集群首先,进入k8s目录: cd k8s简单的! 安装Strimzi创建 Kafka 命名空间: kubectl create namespace kafka创建 Strimzi 集群算子: kubectl apply -f strimzi.yml --namespace kafka等待strimzi-cluster-operator启动(STATUS: Running): kubectl get pods --namespace kafka -w现在 Strimzi 应该安装到集群上。接下来我们将配置 Kafka 集群。 配置 Kafka 集群应用kafka-persistent-single.yml: kubectl apply -f kafka-persistent-single.yml --namespace kafka等待一切启动,可能需要几分钟: kubectl get pods --namespace kafka -w 验证 Kafka 设置对于这个特定的实验,我想探索如何从外部连接到 Kafka 集群。为此,NodePort在kafka-persistent-single.yml. 如果您有兴趣, Strimzi 有一篇关于访问 Kafka 的很好的博客文章。 首先,获取您的 Kubernetes 节点Name: kubectl get nodes接下来,获取您的节点InternalIP: # Replace with your node namekubectl get node -o=jsonpath='{range .status.addresses[*]}{.type}{"\t"}{.address}{"\n"}'获取 Kafka 外部引导服务的端口: kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'\n -n kafka现在你应该已经: 您的 Kubernetes 节点 IP 地址Kafka引导服务的端口如果您还没有可用的 Kafka CLI,则必须下载它,只需执行下载步骤就足够了。 最后,我们可以通过生成/消费一些消息来进行实际验证。打开两个终端窗口,然后浏览到 Kafka 安装文件夹。 在终端 1 中,我们将消费消息: # set the and bin/kafka-console-consumer.sh --bootstrap-server : --topic my-topic --from-beginning在终端 2 中,我们生成消息: # set the and bin/kafka-console-producer.sh --broker-list : --topic my-topic在终端2中发布一些消息,它们应该会在终端1中弹出。非常顺利。 将 Apache Flink 部署到 Kubernetes没有使用花哨的操作符来管理 Flink。相反,我们只是部署一个简单的 Flink yml。您可以在Apache Flink 主页上阅读有关 Flink 的更多信息。 再次浏览到k8s存储库的目录。 创建 Flink 命名空间: kubectl create namespace flink部署flink.yml到 Kubernetes 集群: kubectl apply -f flink.yml -n flink等待 Flink 正常启动: kubectl get pods --namespace flink -w现在 Flink 应该正在运行。 验证 Flink 设置ANodePort再次用于公开 Flink UI。要获取端口调用: kubectl get service flink-jobmanager-rest -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'\n -n flink使用此端口,您应该能够访问 Flink UI。进入浏览器并输入:您的地址字段。 将作业部署到 Flink将部署到 Flink 的作业是一个简单的 Flink 应用程序示例。它的作用是为所消费的事件添加前缀。 Flink 提供了一个模板工具来开始新的工作。我必须做一些小的修改才能符合我本地的 SBT 和 Scala 设置。您必须安装 SBT 和 Scala。这些是该项目中使用的版本: SBT版本1.3.12Scala 版本 2.12.11OpenJDK 13转到flink-job您的终端之一中的目录。然后构建一个 JAR 文件,只需运行: sbt assembly如果你幸运的话,它就会起作用。如果没有,您可能需要进行一些故障排除...确保您使用相同的版本。 组装完成后,您应该jar焕然一新target/scala-2.12/flink-job-assembly-0.1-SNAPSHOT.jar。 下一步是将作业提交给 Flink。您可以通过 Flink UI 使用“提交新作业”菜单选项来执行此操作。但我将展示如何使用 Flink CLI,因为从长远来看这更有用。对于本教程,请从此处下载“Apache Flink 1.10.1 for Scala 2.12” 。 解压包: tar xzf flink-1.10.1-bin-scala_2.12.tgzcd flink-1.10.1获取 Flink kubernetes NodePort: kubectl get service flink-jobmanager-rest -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'\n -n flink上传 flink-job jar: # set the and # set to the k8s-kafka-flink repobin/flink run -m : \ --class dev.chrisp.Job \ /k8s-kafka-flink/flink-job/target/scala-2.12/flink-job-assembly-0.1-SNAPSHOT.jar \ --input-topic input \ --output-topic output \ --bootstrap.servers my-cluster-kafka-bootstrap.kafka:9092 \ --zookeeper.connect my-cluster-zookeeper-client.kafka:2181 \ --group.id flinkFlink 工作的论据非常不言自明。 转到 Flink UI 并列出“正在运行的作业”。您应该看到一个处于“正在运行”状态的任务。如果您已经完成了这一步,您应该已经准备好处理数据了! 生成一些数据与 Kafka 验证相同,打开两个终端窗口,然后浏览到 Kafka 安装目录。 请注意,主题名称已更改。现在,input用于生产和output消费。 在终端 1 中,我们将消费消息: # set the and bin/kafka-console-consumer.sh --bootstrap-server : --topic output --from-beginning在终端 2 中,我们生成消息: # set the and bin/kafka-console-producer.sh --broker-list : --topic input当您生成消息时(只需在 Kafka 生产者提示中输入任何内容),您将看到该事件被推送到带有附加前缀的输出主题。 故障排除在 Kubernetes 中,您可以查看任何 pod 的日志: # get the pods name (use namespace kafka or flink)kubectl get pods --namespace kafka# get logskubectl get logs --namespace kafkaFlink 日志也可以通过 UI 获取。浏览到“任务管理器”或“作业管理器”,然后单击“日志”选项卡。 完毕!现在你有一个不错的流处理基线。现在轮到你对Flink作业进行一些更改了。在flink-job/src/main/scala/dev/chrisp/Job.scala文件中尽情发挥吧。
0 阅读:0

数据智能相依偎

简介:感谢大家的关注