后端KafkaKafka发送携带用户token的消息
cmyang1. 背景
在项目开发过程中,需要从上下文中获取当前用户,当前租户等信息,并且在feign调用时,能把用户信息传递到下游服务,这一切的前提是用户在浏览器发起请求时携带了用户token,经过security服务验证封装,最终将用户信息放在了上下文中。
2. 使用DEMO
1 2 3 4 5
| <dependency> <groupId>com.xxx.web.cloud</groupId> <artifactId>xxx-boot-starter-kafka</artifactId> <version>1.2.0-SNAPSHOT</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @RestController @RequestMapping("/kafka") public class KafkaController extends BaseController {
@Autowired private AuthKafkaTemplate<String, String> authKafkaTemplate;
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/send") public Result<String> sendMsg() { String msg = Thread.currentThread().getId()+"testMsg_"+ IdUtil.simpleUUID(); authKafkaTemplate.send("topic1", msg); kafkaTemplate.send("topic1", msg); return success("成功", msg); } }
|
- 发送端说明:
AuthKafkaTemplate<String, String>
用于发送带用户的消息
KafkaTemplate<String, String>
用于发送不带用的消息
- 消费端
1 2 3 4 5 6 7 8 9
| @Component public class KafkaConsumer {
@KafkaListener(topics = "topic1") @AuthRequired public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) { } }
|
- 消费端说明
@AuthRequired
用于标识该kafka监听方法需要从请求头里解析用户信息,放在上下文中
- 如果不加注解,就不解析,不影响性能
containerFactory = "ackContainerFactory"
可以不用加,已改为默认值
3. 代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.xxx.starter.kafka.config.auth;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory;
public class AuthKafkaTemplate<K, V> extends KafkaTemplate<K, V> { public AuthKafkaTemplate(ProducerFactory<K, V> producerFactory) { super(producerFactory); }
public AuthKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) { super(producerFactory, autoFlush); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| package com.xxx.starter.kafka.config.auth;
import com.xxx.web.cloud.common.security.util.AuthUtils; import com.xxx.web.cloud.common.security.util.SecurityUtils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Headers; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest; import java.nio.charset.StandardCharsets; import java.util.Map;
@Slf4j public class AuthProducerInterceptor implements ProducerInterceptor<Object, Object> {
@Override public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) { try { addHeaderUserInfo(record); } catch (Exception e) { log.error("添加用户信息失败", e); } return record; }
private void addHeaderUserInfo(ProducerRecord<Object, Object> record) { Headers headers = record.headers(); ServletRequestAttributes requestAttributes= (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); String token = AuthUtils.extractToken(request); headers.add("userToken", token.getBytes(StandardCharsets.UTF_8)); headers.add("tenantId", SecurityUtils.getTenantId().toString().getBytes(StandardCharsets.UTF_8)); headers.add("userId", SecurityUtils.getUserId().toString().getBytes(StandardCharsets.UTF_8)); }
@Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override public void close() {
}
@Override public void configure(Map<String, ?> configs) {
} }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| package com.xxx.starter.kafka.config;
import com.xxx.starter.kafka.config.auth.AuthKafkaTemplate; import com.xxx.starter.kafka.config.auth.AuthProducerInterceptor; import lombok.AllArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.web.client.RestTemplate;
import java.util.Map;
@Configuration @EnableConfigurationProperties({KafkaProperties.class}) @EnableKafka @AllArgsConstructor public class KafkaConfiguration { private final KafkaProperties kafkaProperties; private final Integer DEFAULT_PARTITION_NUM = 3; @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
@Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()); }
@Bean public ConsumerFactory<Object, Object> consumerFactory() { return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); }
@Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setConcurrency(DEFAULT_PARTITION_NUM); return factory; }
@Bean("authKafkaProducerFactory") public ProducerFactory<String, String> authKafkaProducerFactory() { Map<String, Object> map = kafkaProperties.buildProducerProperties(); map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, AuthProducerInterceptor.class.getName()); DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(map); String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); } return factory; }
@Bean("authKafkaTemplate") public AuthKafkaTemplate<String, String> authKafkaTemplate(ProducerFactory<String, String> authKafkaProducerFactory) { AuthKafkaTemplate<String, String> authKafkaTemplate = new AuthKafkaTemplate<>(authKafkaProducerFactory); authKafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic()); return authKafkaTemplate; }
@Bean("authRequestRestTemplate") public RestTemplate restTemplate() { SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory(); requestFactory.setReadTimeout(5000); requestFactory.setConnectTimeout(3000); return new RestTemplate(requestFactory); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.xxx.starter.kafka.config.auth;
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface AuthRequired { }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| package com.xxx.starter.kafka.config.auth;
import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.oauth2.provider.OAuth2Authentication; import org.springframework.security.oauth2.provider.token.ResourceServerTokenServices; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import java.nio.charset.StandardCharsets;
@Aspect @Component @Slf4j public class AuthPre {
@Resource private ResourceServerTokenServices tokenServices;
@Before("@annotation(com.xxx.starter.kafka.config.auth.AuthRequired)") public void before(JoinPoint joinPoint) { try { addUserInfo(joinPoint); } catch (Exception e) { log.error("设置用户信息失败", e); } }
private void addUserInfo(JoinPoint joinPoint) { Object[] args = joinPoint.getArgs(); for (Object arg : args) { if(arg instanceof ConsumerRecord) { ConsumerRecord consumerRecord = (ConsumerRecord) arg; Headers headers = consumerRecord.headers(); String token = null; String tenantId = null; String userId = null; for (Header header : headers) { String key = header.key(); if("userToken".equals(key)) { token = new String(header.value(), StandardCharsets.UTF_8); } if("tenantId".equals(key)) { tenantId = new String(header.value(), StandardCharsets.UTF_8); } if("userId".equals(key)) { userId = new String(header.value(), StandardCharsets.UTF_8); } } if(StrUtil.isAllBlank(token, tenantId, userId)) { log.error("发送端未添加用户信息消息头!!!"); return; } generateUserInfo(token); } } }
private void generateUserInfo(String token) { OAuth2Authentication auth = tokenServices.loadAuthentication(token); SecurityContextHolder.getContext().setAuthentication(auth); } }
|
4. 问题总结
- 拿到token后如何在上下文中构建用户对象
SecurityContextHolder.getContext().setAuthentication(tokenServices.loadAuthentication(token))
- 如何把token通过feign传递到下游服务
- 之前通过
oAuth2ClientContext.setAccessToken(new DefaultOAuth2AccessToken(token));
的方式,但是在公司框架中oAuth2ClientContext和spring的bean,是两个对象,所以不能使用,具体的实现方式,通过restTemplate设置请求头的方式,详见spring中的用户token相关文章