kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

   2023-02-10 学习力0
核心提示: Kafka API 简单用法本篇会用到以下依赖:(本人包和这个不同,去maven里查找)dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion0.10.2.0/version/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka

 

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

Kafka API 简单用法

本篇会用到以下依赖:(本人包和这个不同,去maven里查找)

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.0</version>
</dependency>

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

package com.yjsj.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class OldProducer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("metadata.broker.list", "master:9092");
properties.put("request.required.acks", "1");
properties.put("serializer.class",
"kafka.serializer.StringEncoder");

Producer<Integer, String> producer = new Producer<Integer,String>(new
ProducerConfig(properties));
KeyedMessage<Integer, String> message = new KeyedMessage<Integer,
String>("first", "hello world");
producer.send(message );
}
}

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

package com.yjsj.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NewProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka 服务端的主机名和端口号
props.put("bootstrap.servers", "master:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key 序列化
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// value 序列化
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String, String>("first",

Integer.toString(i), "hello world-" + i));
}
producer.close();
}
}

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

for (int i = 0; i < 50; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first",

"hello" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if (metadata != null) {
System.out.println(metadata.partition() + "---" +
metadata.offset());
}
}
});
}
kafkaProducer.close();
}
}

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

package com.yjsj.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object
value, byte[] valueBytes, Cluster cluster) {
// 控制分区
return 0;
}
@Override
public void close() {
}
}

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

package com.yjsj.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class PartitionerProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka 服务端的主机名和端口号
props.put("bootstrap.servers", "node1:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小

props.put("buffer.memory", 33554432);
// key 序列化
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// value 序列化
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// 自定义分区
props.put("partitioner.class", "com.yjsj.kafka.CustomPartitioner");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("first", "1",
"hadoop"));
producer.close();
}
}

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

package com.yjsj.kafka.consume;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {

public static void main(String[] args) {
Properties props = new Properties();
// 定义 kakfa 服务的地址,不需要将所有 broker 指定上
props.put("bootstrap.servers", "master:9092");
// 制定 consumer group
props.put("group.id", "test");
// 是否自动确认 offset
props.put("enable.auto.commit", "true");
// 自动确认 offset 的时间间隔
props.put("auto.commit.interval.ms", "1000");
// key 的序列化类
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// value 的序列化类
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// 定义 consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅的 topic, 可同时订阅多个
consumer.subscribe(Arrays.asList("first", "second","third"));
while (true) {
// 读取数据,读取超时时间为 100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
}

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

kafka的api操作(官网http://kafka.apache.org/documentation/#producerapi)

package com.yjsj.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Application {
public static void main(String[] args) {
// 定义输入的 topic
String from = "first";
// 定义输出的 topic
String to = "second";
// 设置参数
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092");
StreamsConfig config = new StreamsConfig(settings);
// 构建拓扑

TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[],
byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具体分析处理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
// 创建 kafka stream
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}

具体业务处理

package com.yjsj.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 如果包含“>>>”则只保留该标记后面的内容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 输出到下一个 topic
context.forward("logProcessor".getBytes(), input.getBytes());
}else{
context.forward("logProcessor".getBytes(), input.getBytes());

}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}

 
反对 0举报 0 评论 0
 

免责声明:本文仅代表作者个人观点,与乐学笔记(本网)无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
    本网站有部分内容均转载自其它媒体,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责,若因作品内容、知识产权、版权和其他问题,请及时提供相关证明等材料并与我们留言联系,本网站将在规定时间内给予删除等相关处理.

  • Apache80端口被占用解决办法
    Apache80端口被占用解决办法
    1. win+R,输入 cmd,打开命令行窗口2. 命令行输入netstat -ano3. 找到80端口及对应进程 4. 在任务管理器中的进程处查看与上述80端口对应的PID相同的进程,并关闭。如果没有PID,选择“查看”--"选择列"--勾选“PID”。4. 关闭进程。如果进程为上述系统服务,
    03-08
  • Hadoop中mapreduce运行WordCount程序报错Error:
    这个问题是因为map的方法参数与继承mapper定义的参数类型不一致导致的,应该将Mapper的key参数类型设置成Object,就可以解决这个问题 
    03-08
  • 使用 Apache Hudi 实现 SCD-2(渐变维度)
    使用 Apache Hudi 实现 SCD-2(渐变维度)
    数据是当今分析世界的宝贵资产。 在向最终用户提供数据时,跟踪数据在一段时间内的变化非常重要。 渐变维度 (SCD) 是随时间推移存储和管理当前和历史数据的维度。 在 SCD 的类型中,我们将特别关注类型 2(SCD 2),它保留了值的完整历史。 每条记录都包含有
    03-08
  • 一个用 Python 分析 Apache 日志的故事
    一个用 Python 分析 Apache 日志的故事
    介绍不久前,公司里有人告诉我“我想知道 Apache 的访问日志是否可以用来做一些事情”。数据分析,Apache,发文章,我是初学者,写的不好请见谅。访问日志分析导入模块我正在使用以下模块。# データの処理import pandas as pdimport numpy as np# グラフ表示i
    03-08
  • [转]用apache反向代理解决单外网ip对应内网多个
    用apache反向代理解决单外网ip对应内网多个web主机的问题  转载一个有独立外网IP,需内网服务器对外发布的例子,是应用apache虚拟主机的。来源地址:http://www.itshantou.com/Servers/web/06/10/44219.html    几年前开始在学校的服务器上建网站,那时
    02-10
  • Apache service named reported the following
    apache启动失败报错:The Apache service named reported the following error: AH00451: no listening sockets available, shutting down . The Apache service named reported the following error: (OS 10055)由于系统缓冲区空间不足或队列已满,不能执行
    02-10
  • struts布局管理---SiteMesh一个优于Apache Tile
    1. SiteMesh的基本原理       一个请求到服务器后,如果该请求需要sitemesh装饰,服务器先解释被请求的资源,然后根据配置文件 获得用于该请求的装饰器,最后用装饰器装饰被请求资源,将结果一同返回给客户端浏览器。 2. 如何使用SiteMesh    这里以st
    02-10
  • linux 安装 apache2.2.31
     Linux下安装和配置Apache 概要:本文介绍在CentOS5.4 Linux中安装和配置Apache2.2.14,并且实现Apache和Tomcat6的整合。文章分为三部分,分别是删除系统自带的Apache、安装Apache2.2.14和配置Apache2.2.14。 文章中介绍的知识也可以在其它版本的Linux中
    02-10
  • Apache CXF使用Jetty发布WebService
    Apache CXF使用Jetty发布WebService
    一、概述Apache CXF提供了用于方便地构建和开发WebService的可靠基础架构。它允许创建高性能和可扩展的服务,可以部署在Tomcat和基于Spring的轻量级容器中,也可以部署在更高级的服务器上,例如Jboss、WebSphere或WebLogic。 CXF提供了以下功能:WebService
    02-10
  • apache下ab.exe使用方法。。 apache ab工具
    自己在cmd中写了半天的路径也没有写对。。最后网上的一个哥们告诉我说没有共同语言了。。。毛线啊 差距确实很大!大能猫死panda早晚干掉你,叫你丫整天嘲讽我!比如我的ab.exe在D盘的wamp文件夹下apache文件夹下bin文件夹下。那么在cmd中可以这么写:"D:\wamp
    02-10
点击排行