当前位置: 首页 > news >正文

成都网站建设哪家好今天新闻最新消息

成都网站建设哪家好,今天新闻最新消息,建设网站用什么服务器,温州网站建设平台动态初始化Kafka消费者实例一.Kafka 环境搭建二.动态初始化消费者1.Topic定义2.方法处理器工厂3.参数解析器(Copy SpringBoot 源码)4.消费接口和消费实现5.动态初始化1.关键类简介2.动态初始化实现一.Kafka 环境搭建 参考:Kafka搭建和测试 …

动态初始化Kafka消费者实例

  • 一.Kafka 环境搭建
  • 二.动态初始化消费者
    • 1.Topic定义
    • 2.方法处理器工厂
    • 3.参数解析器(Copy SpringBoot 源码)
    • 4.消费接口和消费实现
    • 5.动态初始化
      • 1.关键类简介
      • 2.动态初始化实现

一.Kafka 环境搭建

参考:Kafka搭建和测试

二.动态初始化消费者

1.Topic定义

动态初始化,即不通过注解和配置文件实现消费者的初始化,定义一个Topic对象,用于设置消费者参数

package com.demo.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author * @date 2023-02-08 15:06* @since 1.8*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Topic {private String id;private String topic;private Integer partitions;private String group = "test";private String clientPrefix;
}

2.方法处理器工厂

此类直接使用 SpringBoot 源码,原实现为私有类

package com.demo.manual;import org.springframework.context.ApplicationContext;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.ConditionalGenericConverter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.validation.Validator;import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;/*** @author * @date 2023-02-08 14:18* @since 1.8*/
public class MessageHandlerMethodFactory implements org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory {private ApplicationContext applicationContext;private Validator validator;private List<HandlerMethodArgumentResolver> customMethodArgumentResolvers = new ArrayList<>();private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory handlerMethodFactory;public MessageHandlerMethodFactory(Validator validator, ApplicationContext applicationContext) {this.validator = validator;this.applicationContext = applicationContext;}public void setHandlerMethodFactory(org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {this.handlerMethodFactory = kafkaHandlerMethodFactory1;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory getHandlerMethodFactory() {if (this.handlerMethodFactory == null) {this.handlerMethodFactory = createDefaultMessageHandlerMethodFactory();}return this.handlerMethodFactory;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();if (this.validator != null) {defaultFactory.setValidator(this.validator);}defaultFactory.setBeanFactory(this.applicationContext);this.defaultFormattingConversionService.addConverter(new BytesToStringConverter(StandardCharsets.UTF_8));this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());defaultFactory.setConversionService(this.defaultFormattingConversionService);GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);defaultFactory.setMessageConverter(messageConverter);List<HandlerMethodArgumentResolver> customArgumentsResolver =new ArrayList<>(Collections.unmodifiableList(this.customMethodArgumentResolvers));// Has to be at the end - look at PayloadMethodArgumentResolver documentationcustomArgumentsResolver.add(new NullAwarePayloadArgumentResolver(messageConverter, this.validator));defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);defaultFactory.afterPropertiesSet();return defaultFactory;}@Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method);}private static class BytesToStringConverter implements Converter<byte[], String> {private final Charset charset;BytesToStringConverter(Charset charset) {this.charset = charset;}@Overridepublic String convert(byte[] source) {return new String(source, this.charset);}}private final class BytesToNumberConverter implements ConditionalGenericConverter {BytesToNumberConverter() {}@Override@Nullablepublic Set<ConvertiblePair> getConvertibleTypes() {HashSet<ConvertiblePair> pairs = new HashSet<>();pairs.add(new ConvertiblePair(byte[].class, long.class));pairs.add(new ConvertiblePair(byte[].class, int.class));pairs.add(new ConvertiblePair(byte[].class, short.class));pairs.add(new ConvertiblePair(byte[].class, byte.class));pairs.add(new ConvertiblePair(byte[].class, Long.class));pairs.add(new ConvertiblePair(byte[].class, Integer.class));pairs.add(new ConvertiblePair(byte[].class, Short.class));pairs.add(new ConvertiblePair(byte[].class, Byte.class));return pairs;}@Override@Nullablepublic Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {byte[] bytes = (byte[]) source;if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long"); // NOSONARreturn ByteBuffer.wrap(bytes).getLong();}else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {Assert.state(bytes.length >= 4, "At least 4 bytes needed to convert a byte[] to an integer"); // NOSONARreturn ByteBuffer.wrap(bytes).getInt();}else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {Assert.state(bytes.length >= 2, "At least 2 bytes needed to convert a byte[] to a short"); // NOSONARreturn ByteBuffer.wrap(bytes).getShort();}else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {Assert.state(bytes.length >= 1, "At least 1 byte needed to convert a byte[] to a byte"); // NOSONARreturn ByteBuffer.wrap(bytes).get();}return null;}@Overridepublic boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {if (sourceType.getType().equals(byte[].class)) {Class<?> target = targetType.getType();return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)|| target.equals(Short.class) || target.equals(Byte.class);}else {return false;}}}
}

3.参数解析器(Copy SpringBoot 源码)

此类直接使用 SpringBoot 源码,原实现为私有类

package com.demo.manual;import org.springframework.core.MethodParameter;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
import org.springframework.validation.Validator;import java.util.List;/*** @author * @date 2023-02-08 14:36* @since 1.8*/
public class NullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {NullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {super(messageConverter, validator);}@Overridepublic Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception { // NOSONARObject resolved = super.resolveArgument(parameter, message);/** Replace KafkaNull list elements with null.*/if (resolved instanceof List) {List<?> list = ((List<?>) resolved);for (int i = 0; i < list.size(); i++) {if (list.get(i) instanceof KafkaNull) {list.set(i, null);}}}return resolved;}@Overrideprotected boolean isEmptyPayload(Object payload) {return payload == null || payload instanceof KafkaNull;}}

4.消费接口和消费实现

当前接口和实现为了用于做统一的数据处理,可以在实现类内再根据Topic去调用对应的数据解析方法

接口:

package com.demo.manual;import org.apache.kafka.clients.consumer.ConsumerRecord;/*** @author * @date 2023-02-08 13:46* @since 1.8*/
public interface Handler {void deal(ConsumerRecord<String, String> cRecord);
}

实现:

package com.demo.manual;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;/*** @author * @date 2023-02-08 11:49* @since 1.8*/
@Slf4j
public class ManualHandler implements Handler{@Overridepublic void deal(ConsumerRecord<String, String> cRecord) {log.info("  Topic:{} Partition:{} Content:{}",cRecord.topic(),cRecord.partition(),cRecord.value());}
}

5.动态初始化

1.关键类简介

此处通过接口调用,实现创建、暂停和恢复消费;可根据实际应用场景进行设计

关键类说明
KafkaListenerEndpointRegistrySpring 的 Kafka 监听容器,可以通过 Id 获取 Listener 实例,从而暂停或恢复消费监听
ConcurrentKafkaListenerContainerFactoryListener 工厂,定义代码可参考上面链接的(2.3 节)
ConsumerAwareListenerErrorHandler消费异常处理器,定义代码可参考上面链接的(2.3 节)
ApplicationContextSpring 的上下文容器,MessageHandlerMethodFactory 初始化用
MethodKafkaListenerEndpointKafka 配置节点,详细逻辑可参考源码

SpringBoot 自动初始化 Kafka 消费者的主要实现类和方法

package org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor

	/*** 此处为相关源码,仅供参考 寻找带有 @KafkaListener 注解的类并初始化/@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = !classLevelListeners.isEmpty();final List<Method> multiMethods = new ArrayList<>();Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}

2.动态初始化实现

package com.demo.controller;import com.demo.entity.Topic;
import com.demo.manual.MessageHandlerMethodFactory;
import com.demo.manual.ManualHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author * @date 2023-02-07 13:40* @since 1.8*/
@Slf4j
@RestController
@RequestMapping("/listener")
public class ListenerController {@AutowiredKafkaListenerEndpointRegistry registry;@Autowired@Qualifier("batchTestContainerFactory")ConcurrentKafkaListenerContainerFactory<String,String> batchTestContainerFactory;@AutowiredConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler;@AutowiredApplicationContext applicationContext;MessageHandlerMethodFactory factory;@PostConstructprivate void init(){factory = new MessageHandlerMethodFactory(null,applicationContext);}static Map<String, Topic> map = new ConcurrentHashMap<>();static {map.put("test_manual_1_id",new Topic("test_manual_1_id","test-topic-new.1",2,"mygroup","test_manual_1_batch"));map.put("test_manual_2_id",new Topic("test_manual_2_id","test-topic-new.2",1,"mygroup","test_manual_2_batch"));}/*** 停止消费 自行选择停止时是否需要从监听容器内移除实例,容器为 Map 实现* Map<String, MessageListenerContainer>* @param id*/@GetMapping("/close")public void close(String id){MessageListenerContainer container = registry.unregisterListenerContainer(id);container.destroy();}/*** 开始消费 若果是已注册的则判断是否暂停,暂停则恢复* 如果不存在,则定义一个消费者,注册到容器内并启动* @param id* @throws NoSuchMethodException*/@GetMapping("/open")public void open(String id) throws NoSuchMethodException {MessageListenerContainer container = registry.getListenerContainer(id);if (null!=container){if (!container.isRunning()){container.start();container.resume();}} else {//TODO 新建一个对应 Topic 的实例Topic topic = map.get(id);if (null==topic){return;}ManualHandler bean = new ManualHandler();MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();endpoint.setMessageHandlerMethodFactory(factory);endpoint.setBean(bean);Method[] methods = bean.getClass().getDeclaredMethods();endpoint.setMethod(checkProxy(methods[0],bean));endpoint.setId(topic.getId());endpoint.setTopics(topic.getTopic());endpoint.setGroupId(topic.getGroup());endpoint.setClientIdPrefix(topic.getClientPrefix());endpoint.setConcurrency(topic.getPartitions());endpoint.setErrorHandler(consumerAwareListenerErrorHandler);registry.registerListenerContainer(endpoint,batchTestContainerFactory);container = registry.getListenerContainer(id);container.start();}}/*** Copy Spring 源码* @param methodArg* @param bean* @return*/private Method checkProxy(Method methodArg, Object bean) {Method method = methodArg;if (AopUtils.isJdkDynamicProxy(bean)) {try {// Found a @KafkaListener method on the target class for this JDK proxy ->// is it also present on the proxy itself?method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();for (Class<?> iface : proxiedInterfaces) {try {method = iface.getMethod(method.getName(), method.getParameterTypes());break;}catch (@SuppressWarnings("unused") NoSuchMethodException noMethod) {// NOSONAR}}}catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex);}catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format("@KafkaListener method '%s' found on bean target class '%s', " +"but not found in any interface(s) for bean JDK proxy. Either " +"pull the method up to an interface or switch to subclass (CGLIB) " +"proxies by setting proxy-target-class/proxyTargetClass " +"attribute to 'true'", method.getName(),method.getDeclaringClass().getSimpleName()), ex);}}return method;}
}
http://www.shuangfujiaoyu.com/news/17875.html

相关文章:

  • 看网红直播做爰的网站百度搜索引擎营销案例
  • 做网站应该掌握的技术如何免费引流推广
  • 白云区手机版网站建设网络推广怎么做?
  • 青岛做网站seosem是什么公司
  • 肇庆各行业落实新十条要求郑州seo优化阿亮
  • 购买模板做网站搭建一个网站平台需要多少钱
  • 360免费建站百度网站怎么申请注册
  • 南京开发门户网站的公司全国疫情排名一览表
  • 济南网站开发 blog微信群推广网站
  • 长沙多迪php网站开发培训学校制作免费个人网站
  • 房地产项目网站建设郑州seo网站管理
  • 数字展厅制作公司网站优化建议怎么写
  • 哪家网站推广好百度运营公司
  • 微信公众号平台官网免费注册上海百度推广优化排名
  • asp做网站的步骤网上代写文章一般多少钱
  • 用c 做动态网站桂林网页
  • 网站开发加盟商怎么做seo企业建站系统
  • 做运动鞋评价的网站网络营销推广的手段
  • 有哪些可以免费做视频的网站推广自己产品的文案
  • 渭南疫情最新消息今天封城独立站seo
  • 广州网站制作功能营销软文范例大全
  • 广告文案策划seo资讯
  • 做网站用什么网名好seo实战技巧
  • 长沙做网站街微平台推广
  • 如何把电脑改成服务器 做网站优化大师优化项目有哪些
  • 政和网站建设口碑营销的步骤
  • 香港美女做旅游视频网站域名注册新网
  • 自助建站免费建站关键词在线播放免费
  • 青岛网站专业制作襄阳百度开户
  • 专门做照片书的网站百度自然搜索排名优化