为Spring-boot提供消息队列能力的starter, 并提供了VM线程的轻量级实现。 项目地址: https://github.com/wangyuheng/embedded-mq-spring-boot-starter
什么是消息队列 消息队列是用于存放消息的容器,可供消费者取出消息进行消费。
观察者模式
观察者(Observer)模式的定义:指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式、模型-视图模式,它是对象行为型模式。
Observer
本来的意思是观察者 ,但具体的实现中并不是主动去观察,而是被动的接收来自Subject
的通知,所以更合适的名字应该是”消息投递”。 而且通知的模式还存在一个弊端 : 通知及多个ConcreteObserver
的消费程序仍在一个同步线程内,所以只是代码结构层面的解耦,底层还是一个事务内。 为了解决这个弊端 ,将消息的发送及N个消费程序拆分为N+1个事务,所以引入消息队列用于存储Subject
。
领域模型设计
代码实现
LinkedBlockingQueue
作为存储Message的容器。
Store用于存储消息。为了兼容多个Consumer
,每个Consumer
指定一个唯一标识作为Partition Key
,对应唯一的一个LinkedBlockingQueue
。 e.g. Map<Partition, LinkedBlockingQueue<Message>> messageQueueMap = new ConcurrentHashMap<>();
Producer
通过 Transport
将消息发送只多个Partition Key
的LinkedBlockingQueue
队列中
每个Consumer
开启一个线程,通过轮询方式从LinkedBlockingQueue
队列中消费消息。
代码片段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private Map<Partition, LinkedBlockingQueue<Message>> messageQueueMap = new ConcurrentHashMap<>();@Override public void append (Message message, Partition partition) { initQueue(partition); messageQueueMap.get(partition).add(message); } @Override public LinkedBlockingQueue<Message> findByPartition (Partition partition) { initQueue(partition); return messageQueueMap.get(partition); } private void initQueue (Partition partition) { if (!messageQueueMap.containsKey(partition)) { synchronized (this ) { if (!messageQueueMap.containsKey(partition)) { messageQueueMap.put(partition, new LinkedBlockingQueue<>()); } } } }
1 2 3 4 5 6 7 public void transfer (Message message) { final String topic = message.getTopic(); topicClientIdMap.get(topic).forEach(clientId -> { Partition partition = new Partition(clientId, topic); store.append(message, partition); }); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public synchronized void start (Store store) { if (!initialized.get()) { synchronized (this ) { SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setDaemon(true ); taskExecutor.execute(new ConsumerListener(this .getMessageHandler(), store.findByPartition(this .generatePartition()))); initialized.set(true ); } } } public void shutdown () { liveToggle.set(false ); } public void pause () { runToggle.set(false ); } public void restart () { runToggle.set(true ); } class ConsumerListener implements Runnable { private MessageHandler handler; private LinkedBlockingQueue<Message> queue; ConsumerListener(MessageHandler handler, LinkedBlockingQueue<Message> queue) { this .handler = handler; this .queue = queue; } @Override public void run () { while (true ) { try { if (!liveToggle.get()) { break ; } if (runToggle.get()) { Message message = queue.poll(); if (null == message) { Thread.sleep(100 ); } else { handler.handle(message); } } else { Thread.sleep(100 ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
使用LinkedBlockingQueue
却未使用take
方法的原因是为了灵活控制消费线程的启停。
spring集成 为了方便使用,通过annotation
的形式与spring框架进行集成。
示例
1 2 3 4 @Consumer(topic = CONSUMER_TOPIC, id = CUSTOM_CONSUMER_ID) public void consumerMessage (Message message) { consumerRecordMap.get(CUSTOM_CONSUMER_ID).add(message); }
1 2 3 4 5 6 @Autowired private DefaultProducer<String> producer;public void sendMessage () { producer.send(new Message<>(CUSTOM_TOPIC, "This is a message!" )); }
代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class ConsumerBeanDefinitionRegistryPostProcessor implements BeanPostProcessor , ApplicationContextAware { private ConfigurableApplicationContext applicationContext; @Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException { this .applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public Object postProcessAfterInitialization (Object bean, String beanName) throws BeansException { Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); Method[] methods = ReflectionUtils.getAllDeclaredMethods(targetClass); for (Method method : methods) { if (AnnotatedElementUtils.hasAnnotation(method, Consumer.class)) { final String topic = method.getAnnotation(Consumer.class).topic(); final String id = StringUtils.isEmpty(method.getAnnotation(Consumer.class).id()) ? beanName + method.getName() : method.getAnnotation(Consumer.class).id(); final BeanFactory beanFactory = applicationContext.getBeanFactory(); final Store store = beanFactory.getBean(Store.class); final MessageHandler messageHandler = message -> ReflectionUtils.invokeMethod(method, bean, message); final BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(ConsumerCluster.class, () -> { ConsumerCluster consumerCluster = new ConsumerCluster(); consumerCluster.setId(id); consumerCluster.setTopic(topic); consumerCluster.setMessageHandler(messageHandler); consumerCluster.start(store); return consumerCluster; }); BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition(); ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(beanName + method.getName() + "Listener" , beanDefinition); } } return bean; } }
其他
如何跨应用消费?通过Mysql、Redis等公共存储替换Store及Transport实现。Mysql需要考虑行锁。