Error testing with Spring Cloud Stream Test

Refresh

April 2019

Views

420 time

1

We are using spring-cloud-stream to manage messages between our applications.

We have custom bindings:

public interface InboundChannels {

  String TASKS = "domainTasksInboundChannel";
  String EVENTS = "eventsInboundChannel";

  @Input(TASKS)
  SubscribableChannel tasks();

  @Input(EVENTS)
  SubscribableChannel events();

}

public interface OutboundChannels {

  String TASKS = "domainTasksOutboundChannel";
  String EVENTS = "eventsOutboundChannel";

  @Output(TASKS)
  MessageChannel tasks();

  @Output(EVENTS)
  MessageChannel events();

}

There are processors that consumes tasks and generate events:

@EnableBinding({InboundChannels.class, OutboundChannels.class})
public class TasksProcessor {

  public TasksProcessor(
        UserService userService,
        @Qualifier(OutboundChannels.EVENTS) MessageChannel eventsChannel
  ) {
    this.userService = userService;
    this.eventsChannel = eventsChannel;
  }

  @StreamListener(value = TASKS, condition = "headers['" + TYPE + "']=='" + CREATE_USER + "'")
  public void createUser(Message<User> message)  {
    final User user = message.getPayload();
    userService.save(user)
            .subscribe(created -> {
                Message<User> successMessage = fromMessage(message, Events.USER_CREATED, created).build();
                eventsChannel.send(successMessage);
            });
  }

}

Now we wanted to test it using spring-cloud-stream-test-support and its amazing features:

@DirtiesContext
@SpringBootTest
@RunWith(SpringRunner.class)
public class TasksProcessorTest {

  private User user;

  @Autowired
  private InboundChannels inboundChannels;

  @Autowired
  private OutboundChannels outboundChannels;

  @Autowired
  private MessageCollector collector;

  @Before
  public void setup() {
    user = new User(BigInteger.ONE, "[email protected]");
  }

  @Test
  public void createUserTest() {
    final Message<User> msg = create(CREATE_USER, user).build();
    outboundChannels.tasks().send(msg);
    final Message<?> incomingEvent = collector.forChannel(inboundChannels.events()).poll();
    final String type = (String) incomingEvent.getHeaders().get(TYPE);
    assertThat(type).isEqualToIgnoringCase(USER_CREATED);
  }

}

application.properties

##
# Spring AMQP configuration
##
spring.rabbitmq.host=rabbitmq
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

# Events channels
spring.cloud.stream.bindings.eventsOutboundChannel.destination=events
spring.cloud.stream.bindings.eventsInboundChannel.destination=events

spring.cloud.stream.bindings.domainTasksOutboundChannel.destination=domainTasks

spring.cloud.stream.bindings.domainTasksInboundChannel.destination=domainTasks

spring.cloud.stream.bindings.userTasksInboundChannel.group=domainServiceInstances spring.cloud.stream.bindings.eventsInboundChannel.group=domainServiceInstances

But then we get this error:

java.lang.IllegalArgumentException: Channel [eventsInboundChannel] was not bound by class org.springframework.cloud.stream.test.binder.TestSupportBinder

What are we doing wrong?

1 answers

0

In the .subscribe() you do eventsChannel.send(successMessage);, where that eventsChannel is from the OutboundChannels.EVENTS, but what you try to do in the test-case is like inboundChannels.events(). And it doesn't look like you really bind this channel anywhere.

I'm sure if you would use outboundChannels.events() instead, that would work for you.