Kafka发送携带用户token的消息

1. 背景

在项目开发过程中,需要从上下文中获取当前用户,当前租户等信息,并且在feign调用时,能把用户信息传递到下游服务,这一切的前提是用户在浏览器发起请求时携带了用户token,经过security服务验证封装,最终将用户信息放在了上下文中。

  • 问题:在kafka异步监听事件中,由于不是用户发送的http请求,所以没有token,即使传递了token,security不会自动将用户信息封装到上下文中,所以无法获取执行的用户,SecurityUtils.getUserId()会报错

  • 方案:

    • 接收到用户http请求,把请求头中的用户信息拿出来,在kafka的消息头里加入用户token,租户id,用户id,等信息
    • 消费端根据消息头把用户信息取到
    • 根据security的规则,请求auth拿到用户信息,手动塞到上下文里,同时把token放在feign请求的线程变量里,方便传递到下游
  • 注意:

    • 因为kafka是一个高吞吐量的中间件,加入请求用户信息,解析用户的操作,会极大的影响性能
    • 所以不能直接修改原始的kafka的操作,需要单独开发一套功能,需要传递用户信息才使用该功能,不用的话还是用原始的类和方法
  • 思路:

    • kafka除了消息体,再发送消息时,也有消息头的概念,所以只需要把用户信息作为额外的附加信息放在消息头里即可,无需修改原始消息体
    • spring-kafka提供了发送监听接口ProducerInterceptor,实现接口可以在消息发送前,修改消息信息,包括头信息,消息体等
    • spring-kafka也提供了消费监听接口ConsumerInterceptor,在消息poll操作时,执行接口中的方法,但是因为此接口监听到的消息是批量操作,不适合来一个消息设置一个token的场景,所以不能用该方案
    • 使用AOP方式,在加@KafkaListener的注解,使用切面,执行方法前获取消息中的用户信息,并设置到上下文中,如果不需要解析也可以不加切面,保证原有的高性能
    • 不影响原有的KafkaTemplate,新建一个AuthKafkaTemplate用于支持发送带用户头的消息,原KafkaTemplate功能保持不变,并且两个可以同时使用,都注入到spring容器中

2. 使用DEMO

  • 添加依赖
1
2
3
4
5
<dependency>
<groupId>com.xxx.web.cloud</groupId>
<artifactId>xxx-boot-starter-kafka</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
  • 添加kafka配置文件,同上文
  • 发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@RequestMapping("/kafka")
public class KafkaController extends BaseController {

@Autowired
private AuthKafkaTemplate<String, String> authKafkaTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@GetMapping("/send")
public Result<String> sendMsg() {
String msg = Thread.currentThread().getId()+"testMsg_"+ IdUtil.simpleUUID();
//发送带用户的消息
authKafkaTemplate.send("topic1", msg);
//发送不带用户的消息
kafkaTemplate.send("topic1", msg);
return success("成功", msg);
}
}
  • 发送端说明:
    • AuthKafkaTemplate<String, String> 用于发送带用户的消息
    • KafkaTemplate<String, String> 用于发送不带用的消息
  • 消费端
1
2
3
4
5
6
7
8
9
@Component
public class KafkaConsumer {

@KafkaListener(topics = "topic1")
@AuthRequired
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
// 业务代码...
}
}
  • 消费端说明
    • @AuthRequired 用于标识该kafka监听方法需要从请求头里解析用户信息,放在上下文中
    • 如果不加注解,就不解析,不影响性能
    • containerFactory = "ackContainerFactory"可以不用加,已改为默认值

3. 代码实现

  • 发送端AuthKafkaTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.xxx.starter.kafka.config.auth;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

/**
* 自定义添加token的kafka消息发送类
* @author iseven.yang
* @date 2022/12/16 9:16
*/
public class AuthKafkaTemplate<K, V> extends KafkaTemplate<K, V> {
public AuthKafkaTemplate(ProducerFactory<K, V> producerFactory) {
super(producerFactory);
}

public AuthKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
super(producerFactory, autoFlush);
}
}

  • 发送前的消息拦截器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.xxx.starter.kafka.config.auth;

import com.xxx.web.cloud.common.security.util.AuthUtils;
import com.xxx.web.cloud.common.security.util.SecurityUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* 发送消息前添加token头
* @author iseven.yang
* @date 2022/12/16 9:20
*/
@Slf4j
public class AuthProducerInterceptor implements ProducerInterceptor<Object, Object> {

@Override
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
try {
addHeaderUserInfo(record);
} catch (Exception e) {
log.error("添加用户信息失败", e);
}
return record;
}

/**
* 添加消息头中用户信息
* @param record
*/
private void addHeaderUserInfo(ProducerRecord<Object, Object> record) {
// 设置请求头
Headers headers = record.headers();
// 获取request请求
ServletRequestAttributes requestAttributes= (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
String token = AuthUtils.extractToken(request);
headers.add("userToken", token.getBytes(StandardCharsets.UTF_8));
headers.add("tenantId", SecurityUtils.getTenantId().toString().getBytes(StandardCharsets.UTF_8));
headers.add("userId", SecurityUtils.getUserId().toString().getBytes(StandardCharsets.UTF_8));
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

  • 发送端加入到spring容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.xxx.starter.kafka.config;

import com.xxx.starter.kafka.config.auth.AuthKafkaTemplate;
import com.xxx.starter.kafka.config.auth.AuthProducerInterceptor;
import lombok.AllArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.web.client.RestTemplate;

import java.util.Map;


@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
private final Integer DEFAULT_PARTITION_NUM = 3;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}

@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(DEFAULT_PARTITION_NUM);
return factory;
}

@Bean("authKafkaProducerFactory")
public ProducerFactory<String, String> authKafkaProducerFactory() {
Map<String, Object> map = kafkaProperties.buildProducerProperties();
//添加token拦截器,添加用户信息到消息头
map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, AuthProducerInterceptor.class.getName());
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(map);
String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}

@Bean("authKafkaTemplate")
public AuthKafkaTemplate<String, String> authKafkaTemplate(ProducerFactory<String, String> authKafkaProducerFactory) {
AuthKafkaTemplate<String, String> authKafkaTemplate = new AuthKafkaTemplate<>(authKafkaProducerFactory);
authKafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
return authKafkaTemplate;
}

@Bean("authRequestRestTemplate")
public RestTemplate restTemplate() {
// 如果性能不好可以改为连接池
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setReadTimeout(5000);
requestFactory.setConnectTimeout(3000);
return new RestTemplate(requestFactory);
}
}

  • 消费端用户注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.xxx.starter.kafka.config.auth;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* 和kafka的监听器一起使用,
* 用这个注解,会从请求头获取用户信息,并放在本地线程变量中
* @author iseven.yang
* @date 2022/12/16 9:23
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AuthRequired {
}

  • 消费的切面实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.xxx.starter.kafka.config.auth;

import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.security.oauth2.provider.token.ResourceServerTokenServices;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
* 注解获取消息头用户信息aop处理类
* @author iseven.yang
* @date 2022/12/16 9:26
*/
@Aspect
@Component
@Slf4j
public class AuthPre {

@Resource
private ResourceServerTokenServices tokenServices;

@Before("@annotation(com.xxx.starter.kafka.config.auth.AuthRequired)")
public void before(JoinPoint joinPoint) {
// 不影响业务逻辑,异常全部捕获
try {
addUserInfo(joinPoint);
} catch (Exception e) {
log.error("设置用户信息失败", e);
}
}

/**
* 添加用户信息
* @param joinPoint
*/
private void addUserInfo(JoinPoint joinPoint) {
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if(arg instanceof ConsumerRecord) {
ConsumerRecord consumerRecord = (ConsumerRecord) arg;
// 获取消息头
Headers headers = consumerRecord.headers();
String token = null;
String tenantId = null;
String userId = null;
for (Header header : headers) {
//获取请求头信息
String key = header.key();
if("userToken".equals(key)) {
token = new String(header.value(), StandardCharsets.UTF_8);
}
if("tenantId".equals(key)) {
tenantId = new String(header.value(), StandardCharsets.UTF_8);
}
if("userId".equals(key)) {
userId = new String(header.value(), StandardCharsets.UTF_8);
}
}
if(StrUtil.isAllBlank(token, tenantId, userId)) {
log.error("发送端未添加用户信息消息头!!!");
return;
}
generateUserInfo(token);
}
}
}

/**
* 添加请求用户信息
* @param token
* @param tenantId
* @param userId
*/
private void generateUserInfo(String token) {
OAuth2Authentication auth = tokenServices.loadAuthentication(token);
SecurityContextHolder.getContext().setAuthentication(auth);
}
}

4. 问题总结

  • 拿到token后如何在上下文中构建用户对象
    • SecurityContextHolder.getContext().setAuthentication(tokenServices.loadAuthentication(token))
  • 如何把token通过feign传递到下游服务
    • 之前通过oAuth2ClientContext.setAccessToken(new DefaultOAuth2AccessToken(token));的方式,但是在公司框架中oAuth2ClientContext和spring的bean,是两个对象,所以不能使用,具体的实现方式,通过restTemplate设置请求头的方式,详见spring中的用户token相关文章