Go操作Kafka和Etcd方法详解

   2023-02-09 学习力0
核心提示:目录操作Kafkasarama下载及安装注意事项连接 kafka 发送消息连接 kafka 消费消息操作Etcd安装put和get操作watch操作安装报错:操作KafkaKafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、

操作Kafka

Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了如何使用 Go 语言发送和接收 kafka 消息。

sarama

Go 语言中连接 kafka 使用第三方库:github.com/Shopify/sar…

下载及安装

go get github.com/Shopify/sarama

注意事项

sarama v1.20 之后的版本加入了zstd压缩算法,需要用到 cgo,在 Windows 平台编译时会提示类似如下错误:

# github.com/DataDog/zstd
exec: "gcc":executable file not found in %PATH%

所以在 Windows 平台请使用 v1.19 版本的 sarama。

连接 kafka 发送消息

package main
import (
	"fmt"
	"github.com/Shopify/sarama"
)
// 基于sarama第三方库开发的kafka client
func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

连接 kafka 消费消息

package main
import (
	"fmt"
	"github.com/Shopify/sarama"
)
// kafka consumer
func main() {
	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList { // 遍历所有的分区
		// 针对每个分区创建一个对应的分区消费者
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 异步从每个分区消费信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
			}
		}(pc)
	}
}

操作Etcd

这里使用官方的etcd/clientv3包来连接etcd并进行相关操作。

安装

go get go.etcd.io/etcd/clientv3

put和get操作

put命令用来设置键值对数据,get命令用来根据key获取值。

package main
import (
	"context"
	"fmt"
	"time"
	"go.etcd.io/etcd/clientv3"
)
func main(){
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"122.51.79.172:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		// handle error!
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
	fmt.Println("connect to etcd success")
	defer cli.Close()
	// put
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	_, err = cli.Put(ctx, "coolops", "test")
	cancel()
	if err != nil {
		fmt.Printf("put to etcd failed, err:%v\n", err)
		return
	}
	// get
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	resp, err := cli.Get(ctx, "coolops")
	cancel()
	if err != nil {
		fmt.Printf("get from etcd failed, err:%v\n", err)
		return
	}
	for _, ev := range resp.Kvs {
		fmt.Printf("%s:%s\n", ev.Key, ev.Value)
	}
}

watch操作

watch用来获取未来更改的通知。

package main
import (
	"context"
	"fmt"
	"time"
	"go.etcd.io/etcd/clientv3"
)
func main(){
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"122.51.79.172:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		// handle error!
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
	fmt.Println("connect to etcd success")
	defer cli.Close()
	// watch 操作,返回的是一个通道
	rch := cli.Watch(context.Background(), "coolops") // <-chan WatchResponse
	for wresp := range rch {
		for _, ev := range wresp.Events {
			fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
		}
	}
}

安装报错:

go: finding github.com/coreos/pkg latest
# github.com/coreos/etcd/clientv3/balancer/resolver/endpoint
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:114:78: undefined: resolver.BuildOption
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:182:31: undefined: resolver.ResolveNowOption
# github.com/coreos/etcd/clientv3/balancer/picker
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\picker\err.go:37:44: undefined: balancer.PickOptions
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\picker\roundrobin_balanced.go:55:54: undefined: balancer.PickOptions

解决: 将go.mod里的prpc改为1.26.0版本

google.golang.org/grpc v1.26.0

以上就是Go操作Kafka和Etcd方法详解的详细内容,更多关于Go操作Kafka Etcd的资料请关注其它相关文章!

原文地址:https://juejin.cn/post/7129376877449314335
 
标签: Go 操作 Kafka Etcd
反对 0举报 0 评论 0
 

免责声明:本文仅代表作者个人观点,与乐学笔记(本网)无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
    本网站有部分内容均转载自其它媒体,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责,若因作品内容、知识产权、版权和其他问题,请及时提供相关证明等材料并与我们留言联系,本网站将在规定时间内给予删除等相关处理.

  • 《黑马程序员》 category分类的使用(Objective
    分类的作用:在不改变原来类的基础上,可以给类增加一些方法。使用注意 : ①  分类只能增加方法,不可以增加成员变量                ②  分类的方法在实现中可以访问成员变量,不过成员变量必须手动实现。               
    03-16
  • intellij idea go go go!
    安装如下:1. intellij idea2. jdk-8u31-windows-x643.1. scala:Scala是一门多范式的编程语言,一种类似java的编程语言[1]  ,设计初衷是实现可伸缩的语言[2]  、并集成面向对象编程和函数式编程的各种特性。 2.IdeaVimEmulates Vim editorVim是一个类似
    03-08
  • 用 VSCode 搭建 Go 语言开发环境
    用 VSCode 搭建 Go 语言开发环境
    介绍这是来自 Funai Research Institute Digital 的 osw。由于我决定使用 Go 语言进行工作,所以我会在学习时将其记录在备忘录中。如果您可以参考,我将不胜感激。目标听众从现在开始学习 Go 语言的人我正在考虑使用 VSCode本次使用的环境这次,我们将基于以
    03-08
  • 我比较了 Go 和 C# 的速度
    我比较了 Go 和 C# 的速度
    我在 Go 和 C# 之间进行了速度比较。我通常使用 C#,但我有机会使用 Go,并且由于传闻 Go 速度很快,所以我实际测量了它。测量内容我在 Go 和 C# 中执行了一个简单的循环和判断过程,以查看整数 2 到 N 是否为质数。来源是Github参考。测量模式 逻辑内核 8 Wi
    03-08
  • Nginx动态路由的新姿势:使用Go取代lua nginx路由规则
    Nginx动态路由的新姿势:使用Go取代lua nginx路
    导语: 在Nitro 中, 我们需要一款专业的负载均衡器。 经过一番研究之后,Mihai Todor和我使用Go构建了基于Nginx、Redis 协议的路由器解决方案,其中nginx负责所有繁重工作,路由器本身并不承载流量。 这个解决方案过去一年在生产环境中运行顺畅。 以下是我
    03-08
  • 《zw版·Halcon-delphi系列原创教程》 Halcon分
    《zw版·Halcon-delphi系列原创教程》 Halcon分类函数012,polygon,多边形为方便阅读,在不影响说明的前提下,笔者对函数进行了简化::: 用符号“**”,替换:“procedure”:: 用大写字母“X”,替换:“IHUntypedObjectX”:: 省略了字符:“const”、“OleVa
    02-09
  • 设计模式之开放-封闭原则(引申出Objective-C中
    开放封闭原则(OCP原则The Open-Closed Principle)是面向对象的核心设计所在。它是说,软件开发实体(类、模块、函数等)应该可以扩展,但是不能修改。这个原则有两个特征,一个是说“对于扩展是开放的”,另一个是说“对于更改是封闭的”。我们在编写任何ap
    02-09
  • Objective-C——消息、Category和Protocol Objective-C
    Objective-C——消息、Category和Protocol Obje
    面向对象永远是个可以吐槽的话题,从开始提出到推崇备至,到充满质疑,一路走来让人唏嘘不已。面向对象的思想可谓历史悠久,20世纪70年代的Smalltalk可以说是面向对象语言的经典,直到今天我们依然将这门语言视为面向对象语言的基础。面向对象是大部分编程语
    02-09
  • Go语言使用goroutine及通道实现并发详解
    Go语言使用goroutine及通道实现并发详解
    目录使用通道接收数据阻塞接收数据非阻塞接收数据接收任意数据,忽略掉接收的数据循环接收数据使用通道接收数据在上一篇文章中介绍了通道以及使用通道发送数据,本篇接着了解通道的基本内容,如何使用通道接收数据;通道的接收同样使用"-"操作符;使用通道接
  • Go 数据结构之堆排序示例详解
    Go 数据结构之堆排序示例详解
    目录堆排序堆排序过程动画显示开始堆排序代码实现总结堆排序堆排序是一种树形选择排序算法。简单选择排序算法每次选择一个关键字最小的记录需要 O(n) 的时间,而堆排序选择一个关键字最小的记录需要 O(nlogn)的时间。堆可以看作一棵完全二叉树的顺序存储结构
点击排行