Spring AMQPでオレオレErrorHandlerを使ってみるテスト

@RabbitListeneを使っている時にデフォルトのErrorHandlerではなくて、独自処理を実装したErrorHandlerを使う方法を調べてみたので自分用メモ。

設計も汚いし、とにかく動くところまでしか確認してない。

package hello;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.ErrorHandler;

@SpringBootApplication
@EnableRabbit
public class Application implements RabbitListenerConfigurer {

    @Autowired
    ConnectionFactory connectionFactory;

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(errorHandler());
        registrar.setContainerFactory(factory);
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    public ErrorHandler errorHandler() {
        return new MyHandler();
    }

    @Slf4j
    static class MyHandler implements ErrorHandler {

        ErrorHandler errorHandler = new ConditionalRejectingErrorHandler();

        @Override
        public void handleError(Throwable t) {
            log.error("==do something for error==");
            errorHandler.handleError(t);
        }

    }

}

@Service
@Slf4j
class Receiver {

    @RabbitListener(queues = "spring-boot")
    public void receiveMessage(SampleDto dto) {
        log.info("Received <" + dto.toString() + ">");
    }

}

@Data
@NoArgsConstructor
@AllArgsConstructor
class SampleDto {

    private String key;
    private String value;
}

build.gradleはこんな感じ

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.0.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'spring-boot'

jar {
    baseName = 'messaging-rabbitmq-oreore'
    version =  '0.1.0'
}

repositories {
    mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter-amqp")
    compile group: 'org.projectlombok', name: 'lombok', version: '1.16.10'

    testCompile("junit:junit")
}

task wrapper(type: Wrapper) {
    gradleVersion = '2.3'
}

ext.mainClass  = 'hello.Application'

あとテストように作ったpublisherも貼っておく

package hello;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.exit(SpringApplication.run(Application.class, args));
    }

    @Bean
    Queue queue() {
        return new Queue("spring-boot", true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange("direct");
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).withQueueName();
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... args) throws Exception {
        for (String i : args) {
            ObjectMapper mapper = new ObjectMapper();
            SampleDto dto = null;
            try {
                dto = mapper.readValue(i, SampleDto.class);
                sender.sendToRabbitmqAsJson(dto);
            } catch (Exception ex) {
                sender.sendToRabbitmqAsString(i);
            }
        }

    }
}

@Service
class Sender {

    @Autowired
    private RabbitMessagingTemplate rabbitMessagingTemplateAsJson;

    @Autowired
    private RabbitMessagingTemplate rabbitMessagingTemplateAsString;

    @Autowired
    private MappingJackson2MessageConverter mappingJackson2MessageConverter;

    public void sendToRabbitmqAsJson(final SampleDto dto) {

        this.rabbitMessagingTemplateAsJson.setMessageConverter(this.mappingJackson2MessageConverter);

        this.rabbitMessagingTemplateAsJson.convertAndSend("direct", "spring-boot", dto);

    }

    public void sendToRabbitmqAsString(final String message) {

        this.rabbitMessagingTemplateAsString.convertAndSend("direct", "spring-boot", message);

    }
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class SampleDto {

    private String key;
    private String value;
}