Kafka多个TOPIC线路发送邮件

1. 背景

在做消息中心的邮件模块时,反馈偶现邮件发送延迟较高的问题,可能会出现发送的邮件,几个小时后才能收到。经过排查发现是,通过API调用邮箱服务器发送邮件时比较慢,大概1~2秒发一份邮件,而且由于公司邮箱服务器的问题,经常超时。当有某个服务发送了大量邮件时,比如全员的通知邮件,会导致Kafka的发送队列长时间排队堆积,后面其他业务发送的邮件会在很长时间的排队后才能收到。针对该问题,消息中心做了邮件发送的优化。

2. 原始方案

image-20231030115420001

  • 原始方案中通过两层异步解决一次发送中多个发送渠道发送的并发问题。
  • 邮件发送全部由单个email的TOPIC发送,当单次发送邮件数量太大,会影响其他服务的业务邮件发送时间

3. 方法优化

将原来单个topic发送通道改为多条线路同时发送。

改为多条线路后也会带来其他的问题

  • 改为多路后,只是缓解了单路带来的压力,并没有彻底解决大量发送时导致其他业务的延迟问题
    • 通过将同一个发送账号的发送任务放在同一个队列
    • 不同的账号走不同的通道
    • 这样即使有一个业务发送的邮件数很多,也只会占用其中一个TOPIC,其他业务的发送任务来了,可以使用其他线路来发送
  • 因为不确定会有多少个发送账号,不可能每个发送账号都创建一个单独的TOPIC,这里需要考虑TOPIC线路服用的问题
    • 初始设置3个TOPIC线路,L1,L2,L3 ,3路并行发送
    • 每个发送任务只从3个线路中选择占用其中一个线路来进行发送任务
  • 每个发送任务的发送邮件数量不一样,会导致每个线路的排队程度不一样,当有一个新任务来时,需要找到最空闲的线路
    • 需要一个获取最优线路的方法
    • 从3个TOPIC中实时获取消息的消费情况,找到消息堆积最少的一个线路来发送

image-20231030134739461

4. 代码改造

4.1 kafka获取消费情况

  • 获取TOPIC最大的offset,可以同通过kafkaConsumer来获取
1
2
3
4
TopicPartition topic1 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_1, 0);
TopicPartition topic2 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_2, 0);
TopicPartition topic3 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_3, 0);
Map<TopicPartition, Long> maxTopicsOffset = kafkaConsumer.endOffsets(CollUtil.newArrayList(topic1, topic2, topic3));
  • 获取TOPIC当前消费的offset,可以通过adminClient来获取
1
2
3
4
String groupId = kafkaProperties.getConsumer().getGroupId();
ListConsumerGroupOffsetsResult currentOffsets = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> currentTopicsOffset = currentOffsets.partitionsToOffsetAndMetadata().get();
long offset = currentTopicsOffset.get(topic1).offset()
  • 完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;

@Service
@Slf4j
public class EmailSendService implements HandleMqMsgService<SendEmailMsgDTO> {

@Resource
private MsgEmailMapper msgEmailMapper;
@Resource
private AccountEmailMapper accountEmailMapper;
@Resource
private AdminClient adminClient;
@Resource
private KafkaConsumer<String, String> kafkaConsumer;
@Resource
private KafkaProperties kafkaProperties;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private MqService mqService;

@Override
public void handleMsg(SendEmailMsgDTO sendEmailMsgDTO, ConsumerInfoDTO consumerInfo) {
//获取邮件对象
MsgEmailDO msgEmailDO = msgEmailMapper.selectById(sendEmailMsgDTO.getEmailMsgId());
if(msgEmailDO == null) {
ServiceAssert.getThrow(ErrorCodeEnum.EMAIL_MSG_NOT_FOUND);
}
if(SendStateEnum.SUCCESS.name().equals(msgEmailDO.getState())) {
// 已发送成功的邮件,无需再处理
return;
}

//查询邮件服务器
AccountEmailDO accountEmailDO = accountEmailMapper.selectById(msgEmailDO.getAccount());
ServiceAssert.nullThrow(accountEmailDO, ErrorCodeEnum.EMAIL_ACCOUNT_NOT_FOUND);

// 根据消息用户id获取用户详情
// 收件人id
List<String> receiverUsers = MsgUtil.strToList(msgEmailDO.getReceiver());
// 获取真实收件人邮箱
String[] receiverEmailArr = MsgUtil.getEmail(receiverUsers);

// 获取线路
String sendLine = getSendLine(accountEmailDO.getUsername());


if(CommonStateEnum.isY(msgEmailDO.getSingleSend())) {
for (String eachReceiver : receiverEmailArr) {
sendToLine(sendLine, msgEmailDO.getId(), new String[]{eachReceiver});
}
} else {
sendToLine(sendLine, msgEmailDO.getId(), receiverEmailArr);
}

//更新数据库
msgEmailDO.setState(SendStateEnum.SUCCESS.name());
msgEmailDO.setSendTime(DateUtil.date());
msgEmailMapper.updateById(msgEmailDO);
}

/**
* 处理异常
*
* @param e 异常
* @param msgContent 消息内容
*/
@Override
public void handleException(Exception e, SendEmailMsgDTO sendEmailMsgDTO, ConsumerInfoDTO consumerInfo) {
//获取邮件对象
MsgEmailDO msgEmailDO = msgEmailMapper.selectById(sendEmailMsgDTO.getEmailMsgId());
msgEmailDO.setState(SendStateEnum.FAIL.name());
msgEmailDO.setRecord(ExceptionUtil.stacktraceToString(e));
msgEmailMapper.updateById(msgEmailDO);
}

/**
* 推送到对应的线路
* @param topic
* @param msgId
* @param receiverEmails
*/
private void sendToLine(String topic, Long msgId, String[] receiverEmails) {
SendEmailLineMsgDTO sendEmailLineMsgDTO = new SendEmailLineMsgDTO();
sendEmailLineMsgDTO.setMsgId(msgId);
sendEmailLineMsgDTO.setReceiverEmails(receiverEmails);
sendEmailLineMsgDTO.setRetryNum(0);
mqService.sendMsg(topic, JSONUtil.toJsonStr(sendEmailLineMsgDTO));
}



/**
* 获取一个邮件发送的通道
* 这里有并发获取的问题,并发情况下,可能会分配到同一个topic,
* 并发问题发生概率较低,即使分配到同一个topic,也可以正常发送,加锁会降低效率
* 这里只解决大批量邮件发送,导致排队的大部分问题
* @param emailAccount
* @return topic
*/
@SneakyThrows
private String getSendLine(String emailAccount) {
// 获取每个topic最后发送的账号
Map<Object, Object> lineMap = redisTemplate.opsForHash().entries(CustomConstant.REDIS_EMAIL_LINE);
if(CollUtil.isNotEmpty(lineMap)) {
for (Map.Entry<Object, Object> entry : lineMap.entrySet()) {
String latestAccount = (String) entry.getValue();
// 如果该路线的最近一次发送也是该账号,那么接着使用该路线
if(StrUtil.equals(latestAccount, emailAccount)) {
return (String) entry.getKey();
}
}
}
// 从kafka中找空闲队列
String leisureLineFromOffset = findLeisureLineFromOffset();
redisTemplate.opsForHash().put(CustomConstant.REDIS_EMAIL_LINE, leisureLineFromOffset, emailAccount);
return leisureLineFromOffset;
}

/**
* 从kafka中找一个空闲队列线路
* @return
*/
@SneakyThrows
private String findLeisureLineFromOffset() {
// 获取最空闲的一个线路发送消息
// 获取当前offset
String groupId = kafkaProperties.getConsumer().getGroupId();
ListConsumerGroupOffsetsResult currentOffsets = adminClient.listConsumerGroupOffsets(groupId);
// 获取最大的offset
// 所有的邮件发送路线
TopicPartition topic1 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_1, 0);
TopicPartition topic2 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_2, 0);
TopicPartition topic3 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_3, 0);
Map<TopicPartition, Long> maxTopicsOffset = kafkaConsumer.endOffsets(CollUtil.newArrayList(topic1, topic2, topic3));

Map<TopicPartition, OffsetAndMetadata> currentTopicsOffset = currentOffsets.partitionsToOffsetAndMetadata().get();

// 线路1
if(currentTopicsOffset.get(topic1) == null) {
return topic1.topic();
}
long l1 = maxTopicsOffset.get(topic1) - currentTopicsOffset.get(topic1).offset();
if(l1 == 0) {
return topic1.topic();
}

// 线路2
if(currentTopicsOffset.get(topic2) == null) {
return topic2.topic();
}
long l2 = maxTopicsOffset.get(topic2) - currentTopicsOffset.get(topic2).offset();
if(l2 == 0) {
return topic2.topic();
}

// 线路3
if(currentTopicsOffset.get(topic3) == null) {
return topic3.topic();
}
long l3 = maxTopicsOffset.get(topic3) - currentTopicsOffset.get(topic3).offset();
if(l3 == 0) {
return topic3.topic();
}

long min = NumberUtil.min(l1, l2, l3);
if(l1 == min) {
return topic1.topic();
}
if(l2 == min) {
return topic2.topic();
}
if(l3 == min) {
return topic3.topic();
}

return topic1.topic();
}
}