# Kafka_深入探秘者(3):kafka 消费者

Kafka_深入探秘者(3):kafka 消费者

一、kafka 消费者、消费组

1、Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个 T1 主题,该主题有4个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息,如下所示:

消费者1.png在这里插入图片描述

2、Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。

对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:

消费者2.png在这里插入图片描述

二、kafka 消息接收参数设置

1、kafka 消息接收 必要参数设置

  • 1)(生产者 和 消费者的 key , value 保持一致)
  • 2)制定连接 Kafka 集群所需的 broker 地址清单,可以设置一个或者多个的名称,生产者 和 消费者的 bootstrap 保持一致。
  • 3)消费者隶属于的消费组 group.id,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义。
  • 4)指定 Kafkaconsumer 对应的客户端 client.id,默认为空,如果不设置 Kafkaconsumer 会自动生成一个非空字符串。

2、示例代码:


public static Properties initconfig(){

	Properties props =new Properties();
	
	//1)与 KafkaProducer 中设置保持一致(生产乾消费者保持一致)	
	props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
	
	props.put("value.deserializer","org.apache.kafka.common.serialization.stringDeserializer");

	//2)必填参数,该参数和 KafkaProducer 中的相同,制定连接 Kafka 集群所需的 broker 地址清单,可以设置一个或者多个的名称
	props.put("bootstrap.servers",brokerList);

	//3)消费者隶属于的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义
	props.put("group.id",groupId);

	//4)指定 Kafkaconsumer 对应的客户端 ID,默认为空,如果不设置 Kafkaconsumer 会自动生成一个非空字符串
	props.put("client.id","consumer.client.id.demo");
	return props;
}

三、kafka 订阅主题和分区

1、kafka 订阅主题和分区

创建完消费者后我们便可以订阅主题了,只需要通过调用 subscribe() 方法即可,这个方法接收一个主题列表


KafkaConsumer<String, String>consumer = new Kafkaconsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

2、另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接 Kafka 与其他系统时非常有用。比如订阅所有的测试主题:


//订阅所有以 heima 开头的主题
consumer.subscribe(Pattern.compile("heima*"));

3、指定订阅的分区


//指定订阅的分区
consumer.assign(Arrays.asList(new TopicPartition("topic",0)));

4、kafka 反序列化


//与 KafkaProducer 中设置保持一致(生产者消费者保持一致)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

四、kafka 重复消费、消息丢失

1、位移提交

对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中的位置。

当我们调用 poll() 时,该方法会返回我们没有消费的消息。当消息从 broker 返回消费者时,broker 并不跟踪这些消息是否被消费者接收到; Kafka 让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交 (commit)。

2、kafka 消息 重复消费

kafka重复消费消息.png

3、kafka 消息丢失

kafka消息丢失.png

五、kafka 同步、异步提交

1、kafka 消息 自动提交

这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将 enable.auto.commit 设置为 true,那么消费者会在 poll 方法调用后每隔 5 秒 (由 auto.commit.interval.ms 指定) 提交一次位移。和很多其他操作一样,自动提交也是由 poll() 方法来驱动的;在调用 poll() 时,消费者判断是否到达提交时间,如果是则提交上一次 poll 返回的最大位移。

需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者 poll 消息后,应用正在处理消息,在 3 秒后 Kafka 进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

2、kafka 消息 同步提交

在 kafka_learn 工程中,创建 CheckOffsetAndcommit.java 类,进行 同步提交 测试。


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\CheckOffsetAndcommit.java
 *
 *  2024-6-22 创建 CheckOffsetAndcommit.java 类 测试同步提交
 */
package djh.it.kafka.learn.chapter3;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class CheckOffsetAndcommit {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    private static final String groupId = "group.heima";

    private static AtomicBoolean running = new AtomicBoolean(true);

    public static Properties initConfig() {

        Properties properties = new Properties();

        //1)设置 key 序列化器 -- 优化代码
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //3)设置值序列化器 -- 优化代码
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //4)设置集群地址 -- 优化代码
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 手动提交开启
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return properties;
    }

    public static void main( String[] args ) {

        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        TopicPartition tp = new TopicPartition(topic, 0);
        consumer.assign(Arrays.asList(tp));
        long lastConsumedOffset = -1;
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            if(records.isEmpty()){
                break;
            }
            List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
            lastConsumedOffset = partitionRecords.get(partitionRecords.size() -1).offset();

            consumer.commitSync();  //同步提交消费位移

        }
        System.out.println("comsumed offset is " + lastConsumedOffset);
        OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
        System.out.println("commited offset is " + offsetAndMetadata.offset());
        long positition = consumer.position(tp);
        System.out.println("the offset of the next record is " + positition);
    }
}

同步提交.png

3、kafka 消息 异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的 API。

但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。

举个例子,假如我们发起了一个异步提交 commitA,此时的提交位移为 2000,随后又发起了一个异步提交 commitB 且位移为 3000; commitA 提交失败但 commitB 提交成功,此时 commitA 进行重试并成功的话,会将实际上将已经提交的位移从 3000 回滚到 2000,导致消息重复消费。

六、kafka 指定位移消费

1、kafka 指定位移消费

消息的拉取是根据 poll() 方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。
seek() 方法正好提供了这个功能,让我们得以追踪以前的消费或者回溯消费,

2、在 kafka_learn 工程中,创建 SeekDemo.java 类,进行 指定位移消费 测试。


/**
 *  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\SeekDemo.java
 *
 *  2024-6-22 创建 SeekDemo.java 类,进行 指定位移消费 测试。
 */
package djh.it.kafka.learn.chapter3;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Set;

public class SeekDemo extends ConsumerClientConfig{

    public static void main(String[] args){

        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        //timeout参数设置多少合适?太短会使分区分配失败,太长又有可能造成一些不必要的等待
        consumer.poll(Duration.ofMillis(2000));

        //获取消费者所分配到的分区
        Set<TopicPartition> assignment= consumer.assignment();
        System.out.println(assignment);

        for(TopicPartition tp : assignment){
            //参数partition表示分区,offset表示指定从分区的哪个位置开始消费
            consumer.seek(tp,10);
        }

        //consumer.seek(new TopicPartition(topic,0), 10);

        while(true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //consume the record.

            for(ConsumerRecord<String, String> record :records){
                System.out.println(record.offset()+ ":" + record.value());
            }
        }
    }
}

3、在 kafka_learn 工程中,创建 公共类 KafkaContext.java


/**
 *  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\KafkaContext.java
 *
 *  2024-6-22 创建公共类 KafkaContext.java
 */
package djh.it.kafka.learn.chapter3;

public class KafkaContext {
    // 172.18.30.110:9092 填写你自己的 虚拟机 IP 地址和端口号
    public static String brokerList = "172.18.30.110:9092";
    public static String topic = "heima";
    public static String groupId = "group.heima";
}

4、在 kafka_learn 工程中,创建 公共类 ConsumerClientConfig.java


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\ConsumerClientConfig.java
 *
 *  2024-6-22 创建公共类 ConsumerClientConfig.java
 */
package djh.it.kafka.learn.chapter3;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class ConsumerClientConfig extends KafkaContext{

    public static Properties initConfig(){
        Properties props = new Properties();
        //1)设置 key 序列化器 -- 优化代码
        //properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //2)设置值序列化器 -- 优化代码
        //properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //3)设置集群地址 -- 优化代码
        //properties.put("bootstrap.servers", brokerList);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        //4)消费组
        //properties.put("group.id", groupId);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        //kafka 消费者找不到消费的位移时,从什么位置开始消费,默认:latest :末尾开始消费 earliest : 从头开始
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        //是否启用自动位移提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        return props;
    }
}

5、在 kafka_learn 工程中,运行 SeekDemo.java 类,进行 指定位移消费 测试

指定位移消费测试.png

七、kafka 再均衡

1、再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取消息的。

2、在 kafka_learn 工程中,创建 再均衡监听器 类 CommitSyncInRebalance.java


/**
 *  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\CommitSyncInRebalance.java
 *
 *  2024-6-22 创建 再均衡监听器 类 CommitSyncInRebalance.java
 */
package djh.it.kafka.learn.chapter3;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;


public class CommitSyncInRebalance extends ConsumerClientConfig {

    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static void main( String[] args ) {

        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        Map<TopicPartition, OffsetAndMetadata> currentoffsets = new HashMap<>();
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener(){
            @Override
            public void onPartitionsRevoked( Collection<TopicPartition> partitions){
                //尽量避免重复消费
                consumer.commitSync(currentoffsets);
            }
            @Override
            public void onPartitionsAssigned( Collection<TopicPartition> partitions){
                //do nothing.
            }
        });

        try{
            while (isRunning.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.offset() + ":" + record.value());

                    //异步提交消费位移,在发生再均衡动作之前可以通过再均衡临听器的 onPartitionsRevoked 回调执行 commitsvnc 方法同步提交位移。
                    currentoffsets.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1));
                }
                //异步提交
                consumer.commitAsync(currentoffsets, null);
            }
        } finally {
             consumer.close();
        }
    }
}

八、kafka 消费者拦截器

1、消费者拦截器

消费者也有相应的拦截器概念,消费者拦截器主要是在消费到消息或者在提交消费位移时进行的一些定制化的操作。

2、消费者拦截器 使用场景:

对消费消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那就视为无效,不需要再被处理。

3、在 kafka_learn 工程中,创建 消费者拦截器 类 ConsumerInterceptorTTL.java


/**
 *  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\ConsumerInterceptorTTL.java
 *
 *  2024-6-22 创建 消费者拦截器 类 ConsumerInterceptorTTL.java
 */
package djh.it.kafka.learn.chapter3;

import org.apache.commons.collections.map.HashedMap;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {

    private static final long EXPIRE_INTERVAL = 10 * 1000;

    @Override
    public ConsumerRecords<String, String> onConsume( ConsumerRecords<String, String> records ) {
        System.out.println("before" + records);
        long now = System.currentTimeMillis();
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashedMap();
        for(TopicPartition tp : records.partitions()){
            List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
            List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
            for(ConsumerRecord<String, String> record : tpRecords){
                //设置一个发送时间戳,超过一分钟的消息,超时,不能收到此消息
                if(now - record.timestamp() < EXPIRE_INTERVAL){
                    newTpRecords.add(record);
                }
            }
            if(!newTpRecords.isEmpty()){
                newRecords.put(tp, newTpRecords);
            }
        }
        return new ConsumerRecords<>(newRecords);
    }

    @Override
    public void onCommit( Map<TopicPartition, OffsetAndMetadata> offsets ) {
        offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
    }

    @Override
    public void close() {
    }

    @Override
    public void configure( Map<String, ?> configs ) {
    }
}

4、在 kafka_learn 工程中,创建 消费者 KafkaConsumerAnalysis.java 类,自定义分区器、自定义拦截器 分析,进行消费消息测试


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\KafkaConsumerAnalysis.java
 *
 *  2024-6-22 创建 消费者 KafkaConsumerAnalysis.java 类,自定义分区器、自定义拦截器 分析,进行消费消息测试
 */
package djh.it.kafka.learn.chapter3;

//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaConsumerAnalysis {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";
    private static final String topic = "heima";
    private static final String groupId = "group.heima";
    private static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig(){
        Properties props = new Properties();
        //1)设置 key 序列化器 -- 优化代码
        //properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //2)设置值序列化器 -- 优化代码
        //properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //3)设置集群地址 -- 优化代码
        //properties.put("bootstrap.servers", brokerList);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        //4)消费组
        //properties.put("group.id", groupId);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        //指定 KafkaConsumer 对应的客户端ID,默认为空,如果不设置KafkaConsumer会自动生成一个非空字符串
        props.put("client.id", "consumer.client.id.demo");

        // 指定消费者拦截器
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());

        //kafka 消费者找不到消费的位移时,从什么位置开始消费,默认:latest :末尾开始消费 earliest : 从头开始
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

//        //是否启用自动位移提交
//        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        return props;
    }

    public static void main( String[] args ) throws InterruptedException{

        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));

        // 正则订阅主题
        //consumer.subscribe(Pattern.compile("heima"));

        // 指定订阅的分区
        //consumer.assign(Arrays.asList(new TopicPartition("heima", 0)));

        try{
            while (isRunning.get()){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for(ConsumerRecord<String, String> record : records){
                    System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset() );
                    System.out.println("key = " + record.key() + ", value = " + record.value());
                    // do something to process record.
                }
            }
        }catch (Exception e){
            e.printStackTrace();
            //log.error("occur exception ", e);
        } finally {
            consumer.close();
        }
    }
}

5、在 kafka_learn 工程中,创建 生产者 ProducerFastStart.java 类中,添加超时发送和不超时发送消息,进行测试。


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java
 *
 *  2024-6-21 创建 生产者 ProducerFastStart.java 类
 */
package djh.it.kafka.learn.chapter1;

import org.apache.kafka.clients.producer.*;
//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class ProducerFastStart {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    public static void main( String[] args ) {

        Properties properties = new Properties();
        //1)设置 key 序列化器 -- 优化代码
        //properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //2)设置重试次数 -- 优化代码
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);

        //3)设置值序列化器 -- 优化代码
        //properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //4)设置集群地址 -- 优化代码
        //properties.put("bootstrap.servers", brokerList);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo-000", "hello,kafka");
        //设置一个发送时间戳倒退500毫秒的消息,不超时,能消费到此消息
        ProducerRecord<String,String> record2 = new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 10 * 500,"kafka-demo-001", "hello,kafka-> 5秒不超时");
        //设置一个发送时间戳倒退一分钟的消息,超时,不能收到此消息
        ProducerRecord<String,String> record3 = new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 10 * 1000,"kafka-demo-001", "hello,kafka->10秒超时");
        try{

            producer.send(record);
            producer.send(record2);  //发送时间戳倒退500毫秒的消息,不超时,能消费到此消息
            producer.send(record3);  //发送时间戳倒退一分钟的消息,超时,不能收到此消息

//            //发送类型--同步发送
//            Future<RecordMetadata> send = producer.send(record);
//            RecordMetadata recordMetadata = send.get();
//            System.out.println("topic: " + recordMetadata.topic());
//            System.out.println("partition: " + recordMetadata.partition());
//            System.out.println("offset: " + recordMetadata.offset());

//            //发送类型--异步发送
//            producer.send(record, new Callback() {
//                public void onCompletion(RecordMetadata metadata, Exception exception) {
//                    if (exception == null) {
//                        System.out.println("topic: " + metadata.topic());
//                        System.out.println("partition: " + metadata.partition());
//                        System.out.println("offset: " + metadata.offset());
//                    }
//                }
//            });
        }catch (Exception e){
            e.printStackTrace();
        }
        producer.close();
    }
}

超时消息未接收到(超过1分钟).png

九、kafka 消费者 总结

1、kafka 消费者参数补充:

  • 1)fetch.min.bytes

这个参数允许消费者指定从 broker 读取消息时最小的数据量。当消费者从 broker 读取消息时,如果数据量小于这个阈值,broker 会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少 broker 和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻 broker 压力。

  • 2)fetch.max.wait.ms

上面的 fetch.min.bvtes 参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为 500ms。

  • 3)max.partition.fetch.bytes

这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,Kafkaconsumer.poll(0) 返回记录列表时,每个分区的记录字节数最多为 1M。如果一个主题有 20 个分区,同时有5个消费者,那么每个消费者需要 4M 的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。

  • 4)max.poll.records

这个参数控制一个 poll(0) 调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。

2、kafka 消费者总结

  • kafka 消费者和消费组的概念,
  • 使用 KafkaConsumer,
  • kafka 消费者参数的配置,
  • kafka 订阅、
  • kafka 反序列化、
  • kafka 位移提交、
  • kafka 再均衡、
  • kafka 拦截器等。

上一节关联链接请点击
# Kafka_深入探秘者(2):kafka 生产者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/737721.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring+SpringMVC+MyBatis整合

目录 1.SSM介绍1.1 什么是SSM&#xff1f;1.2 SSM框架1.2.1 Spring1.2.2 SpringMVC1.2.3 MyBatis 2.SSM框架整合2.1 建库建表2.2 创建工程2.3 pom.xml2.4 log4j.properties2.5 db.properties2.6 applicationContext-dao.xml2.7.applicationContext-tx.xml2.8 applicationContex…

移远通信SC200L(展锐SL8541E)Linux系统修改分区大小

一、确定大小 由于默认的根文件分区大小仅500M&#xff0c;/lib目录移植个app都放不进去&#xff0c;这谁受得了&#xff1f; userdata分区却有6G&#xff0c;匀一点。 在 prebuilts/pac-binary/sl8541e/ 下有分区信息表 sl8541e-emmc-marlin2.xml&#xff1a; 找到system项&a…

MyBatis-Plus常用注解详解与实战应用

MyBatis-Plus 是一个 MyBatis 的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生。它提供了大量的常用注解&#xff0c;使得开发者能够更方便地进行数据库操作。 MyBatis-Plus 提供的注解可以帮我们解决一些数据库与实体之间相…

高校新生如何选择最优手机流量卡?

一年一度的高考已经结束了&#xff0c;愿广大学子金榜题名&#xff0c;家长们都给孩子准备好了手机&#xff0c;那么手机流量卡应该如何选择呢&#xff1f; 高校新生在选择手机流量卡时&#xff0c;需要综合考量流量套餐、费用、网络覆盖、售后服务等多方面因素&#xff0c;以下…

怎么用Excel生成标签打印模板,自动生成二维码

环境&#xff1a; EXCEL2021 16.0 问题描述&#xff1a; 怎么用excel生成标签打印模板自动生成二维码 解决方案&#xff1a; 在Excel中生成标签打印模板并自动生成二维码&#xff0c;可以通过以下几个步骤完成&#xff1a; 1. 准备数据 首先&#xff0c;确保你的Excel表…

【PyTorch单点知识】神经元网络模型剪枝prune模块介绍(上)

文章目录 0. 前言1. 剪枝prune主要功能分类2. torch.nn.utils.prune中的方法介绍3. PyTorch实例3.1 BasePruningMethod3.2PruningContainer3.3 Identity3.4RandomUnstructured3.5L1Unstructured 4. 总结 0. 前言 按照国际惯例&#xff0c;首先声明&#xff1a;本文只是我自己学…

【AI大模型】驱动的未来:穿戴设备如何革新血液、皮肤检测与营养健康管理

文章目录 1. 引言2. 现状与挑战3. AI大模型与穿戴设备概述4. 数据采集与预处理4.1 数据集成与增强4.2 数据清洗与异常检测 5. 模型架构与训练5.1 高级模型架构5.2 模型训练与调优 6. 个性化营养建议系统6.1 营养建议生成优化6.2 用户反馈与系统优化 7. 关键血液成分与健康状况评…

Win11最适合打游戏的版本推荐:畅玩游戏,告别卡顿!

在Win11电脑操作中&#xff0c;用户不仅可以进行办公、学习等操作&#xff0c;也可以畅玩喜欢的游戏。如果喜欢打游戏的用户&#xff0c;就可以安装上适合打游戏的系统版本。但许多新手用户不知道去哪里找到最适合打游戏的Win11系统版本&#xff1f;以下小编就给大家带来这样的…

el-upload 上传图片及回显照片和预览图片,文件流和http线上链接格式操作

<div v-for"(info, index) in zsjzqwhxqList.helicopterTourInfoList" :key"info.id" >编辑上传图片// oss返回线上地址http链接格式&#xff1a;<el-form-itemlabel"巡视结果照片":label-width"formLabelWidth"><el…

Appium APP测试学习

1、安装client编程库(客户端) (1)如果遇到以下问题可以使用全路径安装 (2)安装后导致selenium升级&#xff0c;导致某些方法失效&#xff1a;如find_element_by_id。解决方法&#xff1a;卸载两个安装包&#xff0c;后面重新安装 2、安装appium Server:&#xff08;服务端&…

【论文阅读】场景生成及编辑3D定位论文阅读

<div id"content_views" class"htmledit_views" style"user-select: auto;"><div class"kdocs-document"> 前置知识 归纳偏置 关于归纳偏置的理解&#xff1a;首先推荐一篇解释归纳偏置非常好的博客&#xff1a;浅谈归纳…

【PostgreSQL】AUTO_EXPLAIN - 慢速查询的日志执行计划

本文为云贝教育 刘峰 原创&#xff0c;请尊重知识产权&#xff0c;转发请注明出处&#xff0c;不接受任何抄袭、演绎和未经注明出处的转载。 一、介绍 在本文中&#xff0c;我们将了解 PostgreSQL AUTO_EXPLAIN功能的工作原理&#xff0c;以及为什么应该使用它来收集在生产系统…

高考十字路口:24年考生如何权衡专业与学校的抉择?

文章目录 每日一句正能量前言专业解析理工科专业商科专业人文社科专业艺术与设计专业个人经验与思考过程结论 名校效应分析名校声誉与品牌效应资源获取学术氛围就业优势个人发展结论 好专业和好学校的权衡个人职业目标行业需求教育质量资源和机会学术氛围就业优势经济和地理位置…

SVM算法-人脸识别背后技术详解

引言 支持向量机&#xff08;SVM&#xff09;是一种强大的监督学习算法&#xff0c;广泛应用于分类和回归任务中。本文将详细介绍SVM算法在人脸识别任务中的应用&#xff0c;并通过代码示例来展示其背后的技术精髓。我们将分三大部分来展开&#xff0c;本部分将重点介绍SVM算法…

SpringBoot2+Vue3开发博客管理系统

项目介绍 博客管理系统&#xff0c;可以帮助使用者管理自己的经验文章、学习心得、知识文章、技术文章&#xff0c;以及对文章进行分类&#xff0c;打标签等功能。便于日后的复习和回忆。 架构介绍 博客管理系统采用前后端分离模式进行开发。前端主要使用技术&#xff1a;Vu…

网管工作实践_02_IP/MAC地址管理工具

1、ipconfig命令格式及参数 ipconfig是内置于Windows的TCP/IP应用程序&#xff0c;用于显示本地计算机网络适配器的MAC地址和IP地址等配置信息&#xff0c;这些信息一般用来榆验手动配置的TCP/IP设置是否正确。当在网络中使用 DHCP服务时&#xff0c;IPConfig可以检测计算机中分…

利用定时器1产生全双工软件串口

代码; /*《AVR专题精选》随书例程3.通信接口使用技巧项目&#xff1a;使用AVR定时器1和外中断实现全双工软件串口文件&#xff1a;softuart.c说明&#xff1a;软件串口驱动文件作者&#xff1a;邵子扬时间&#xff1a;2012年12月16日*/ #include "softuart.h"// 内部…

达梦数据守护集群部署

接上篇 达梦8单机规范化部署 https://blog.csdn.net/qq_25045631/article/details/139898690 1. 集群规划 在正式生产环境中&#xff0c;两台机器建议使用统一配置的服务器。使用千兆或千兆以上网络。 两台虚拟机各加一块网卡&#xff0c;仅主机模式&#xff0c;作为心跳网卡…

【日记】原来真的有人不适合谈恋爱(1194 字)

正文 21 日正是周五&#xff0c;夏至。全年当中&#xff0c;白天时长最长的一天。而恰好那天也是银行扣息的日子。所以很忙&#xff0c;我差点没能走掉。 所幸最终还是有惊无险。 到斯的家里&#xff0c;是晚上 9 点钟。比我想得要早。这个周周四&#xff0c;他过生日。但是那天…

提升效率新选择——运营必备API合集

在数字化、自动化的时代背景下&#xff0c;运营小工具API已经成为了众多企业不可或缺的重要组成部分。这些API工具为运营工作提供了极大的便利&#xff0c;不仅简化了工作流程&#xff0c;还提高了工作效率。 运营小工具API是针对运营工作特定需求而设计的接口&#xff0c;通过…