后端KafkaKafka多个TOPIC线路发送邮件
cmyang1. 背景
在做消息中心的邮件模块时,反馈偶现邮件发送延迟较高的问题,可能会出现发送的邮件,几个小时后才能收到。经过排查发现是,通过API调用邮箱服务器发送邮件时比较慢,大概1~2秒发一份邮件,而且由于公司邮箱服务器的问题,经常超时。当有某个服务发送了大量邮件时,比如全员的通知邮件,会导致Kafka的发送队列长时间排队堆积,后面其他业务发送的邮件会在很长时间的排队后才能收到。针对该问题,消息中心做了邮件发送的优化。
2. 原始方案
- 原始方案中通过两层异步解决一次发送中多个发送渠道发送的并发问题。
- 邮件发送全部由单个email的TOPIC发送,当单次发送邮件数量太大,会影响其他服务的业务邮件发送时间
3. 方法优化
将原来单个topic发送通道改为多条线路同时发送。
改为多条线路后也会带来其他的问题
- 改为多路后,只是缓解了单路带来的压力,并没有彻底解决大量发送时导致其他业务的延迟问题
- 通过将同一个发送账号的发送任务放在同一个队列
- 不同的账号走不同的通道
- 这样即使有一个业务发送的邮件数很多,也只会占用其中一个TOPIC,其他业务的发送任务来了,可以使用其他线路来发送
- 因为不确定会有多少个发送账号,不可能每个发送账号都创建一个单独的TOPIC,这里需要考虑TOPIC线路服用的问题
- 初始设置3个TOPIC线路,L1,L2,L3 ,3路并行发送
- 每个发送任务只从3个线路中选择占用其中一个线路来进行发送任务
- 每个发送任务的发送邮件数量不一样,会导致每个线路的排队程度不一样,当有一个新任务来时,需要找到最空闲的线路
- 需要一个获取最优线路的方法
- 从3个TOPIC中实时获取消息的消费情况,找到消息堆积最少的一个线路来发送
4. 代码改造
4.1 kafka获取消费情况
- 获取TOPIC最大的offset,可以同通过kafkaConsumer来获取
1 2 3 4
| TopicPartition topic1 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_1, 0); TopicPartition topic2 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_2, 0); TopicPartition topic3 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_3, 0); Map<TopicPartition, Long> maxTopicsOffset = kafkaConsumer.endOffsets(CollUtil.newArrayList(topic1, topic2, topic3));
|
- 获取TOPIC当前消费的offset,可以通过adminClient来获取
1 2 3 4
| String groupId = kafkaProperties.getConsumer().getGroupId(); ListConsumerGroupOffsetsResult currentOffsets = adminClient.listConsumerGroupOffsets(groupId); Map<TopicPartition, OffsetAndMetadata> currentTopicsOffset = currentOffsets.partitionsToOffsetAndMetadata().get(); long offset = currentTopicsOffset.get(topic1).offset()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
| import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.exceptions.ExceptionUtil; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import java.util.List; import java.util.Map;
@Service @Slf4j public class EmailSendService implements HandleMqMsgService<SendEmailMsgDTO> {
@Resource private MsgEmailMapper msgEmailMapper; @Resource private AccountEmailMapper accountEmailMapper; @Resource private AdminClient adminClient; @Resource private KafkaConsumer<String, String> kafkaConsumer; @Resource private KafkaProperties kafkaProperties; @Resource private RedisTemplate<String, Object> redisTemplate; @Resource private MqService mqService;
@Override public void handleMsg(SendEmailMsgDTO sendEmailMsgDTO, ConsumerInfoDTO consumerInfo) { MsgEmailDO msgEmailDO = msgEmailMapper.selectById(sendEmailMsgDTO.getEmailMsgId()); if(msgEmailDO == null) { ServiceAssert.getThrow(ErrorCodeEnum.EMAIL_MSG_NOT_FOUND); } if(SendStateEnum.SUCCESS.name().equals(msgEmailDO.getState())) { return; }
AccountEmailDO accountEmailDO = accountEmailMapper.selectById(msgEmailDO.getAccount()); ServiceAssert.nullThrow(accountEmailDO, ErrorCodeEnum.EMAIL_ACCOUNT_NOT_FOUND);
List<String> receiverUsers = MsgUtil.strToList(msgEmailDO.getReceiver()); String[] receiverEmailArr = MsgUtil.getEmail(receiverUsers);
String sendLine = getSendLine(accountEmailDO.getUsername());
if(CommonStateEnum.isY(msgEmailDO.getSingleSend())) { for (String eachReceiver : receiverEmailArr) { sendToLine(sendLine, msgEmailDO.getId(), new String[]{eachReceiver}); } } else { sendToLine(sendLine, msgEmailDO.getId(), receiverEmailArr); }
msgEmailDO.setState(SendStateEnum.SUCCESS.name()); msgEmailDO.setSendTime(DateUtil.date()); msgEmailMapper.updateById(msgEmailDO); }
@Override public void handleException(Exception e, SendEmailMsgDTO sendEmailMsgDTO, ConsumerInfoDTO consumerInfo) { MsgEmailDO msgEmailDO = msgEmailMapper.selectById(sendEmailMsgDTO.getEmailMsgId()); msgEmailDO.setState(SendStateEnum.FAIL.name()); msgEmailDO.setRecord(ExceptionUtil.stacktraceToString(e)); msgEmailMapper.updateById(msgEmailDO); }
private void sendToLine(String topic, Long msgId, String[] receiverEmails) { SendEmailLineMsgDTO sendEmailLineMsgDTO = new SendEmailLineMsgDTO(); sendEmailLineMsgDTO.setMsgId(msgId); sendEmailLineMsgDTO.setReceiverEmails(receiverEmails); sendEmailLineMsgDTO.setRetryNum(0); mqService.sendMsg(topic, JSONUtil.toJsonStr(sendEmailLineMsgDTO)); }
@SneakyThrows private String getSendLine(String emailAccount) { Map<Object, Object> lineMap = redisTemplate.opsForHash().entries(CustomConstant.REDIS_EMAIL_LINE); if(CollUtil.isNotEmpty(lineMap)) { for (Map.Entry<Object, Object> entry : lineMap.entrySet()) { String latestAccount = (String) entry.getValue(); if(StrUtil.equals(latestAccount, emailAccount)) { return (String) entry.getKey(); } } } String leisureLineFromOffset = findLeisureLineFromOffset(); redisTemplate.opsForHash().put(CustomConstant.REDIS_EMAIL_LINE, leisureLineFromOffset, emailAccount); return leisureLineFromOffset; }
@SneakyThrows private String findLeisureLineFromOffset() { String groupId = kafkaProperties.getConsumer().getGroupId(); ListConsumerGroupOffsetsResult currentOffsets = adminClient.listConsumerGroupOffsets(groupId); TopicPartition topic1 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_1, 0); TopicPartition topic2 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_2, 0); TopicPartition topic3 = new TopicPartition(CustomConstant.MQ_TOPIC_MESSAGE_SEND_EMAIL_LINE_3, 0); Map<TopicPartition, Long> maxTopicsOffset = kafkaConsumer.endOffsets(CollUtil.newArrayList(topic1, topic2, topic3));
Map<TopicPartition, OffsetAndMetadata> currentTopicsOffset = currentOffsets.partitionsToOffsetAndMetadata().get();
if(currentTopicsOffset.get(topic1) == null) { return topic1.topic(); } long l1 = maxTopicsOffset.get(topic1) - currentTopicsOffset.get(topic1).offset(); if(l1 == 0) { return topic1.topic(); }
if(currentTopicsOffset.get(topic2) == null) { return topic2.topic(); } long l2 = maxTopicsOffset.get(topic2) - currentTopicsOffset.get(topic2).offset(); if(l2 == 0) { return topic2.topic(); }
if(currentTopicsOffset.get(topic3) == null) { return topic3.topic(); } long l3 = maxTopicsOffset.get(topic3) - currentTopicsOffset.get(topic3).offset(); if(l3 == 0) { return topic3.topic(); }
long min = NumberUtil.min(l1, l2, l3); if(l1 == min) { return topic1.topic(); } if(l2 == min) { return topic2.topic(); } if(l3 == min) { return topic3.topic(); }
return topic1.topic(); } }
|