Wednesday, July 6, 2011

NServiceBus - Single process, multiple input

My default NServiceBus will listen to one single MSMQ and dispatch incoming messages to handlers loaded by the bus. It's possible to run multiple threads for consuming messages from MSMQ. There is a problem with this. Say we have two services, one that updates a database, another service communicating with some "slow" hardware over telnet. If you now have one thread receiving messages and dispatch this to one of the two handlers the following can happen: You send 1000 telnet messages, and then a database update message. The issue now is that all of the telnet messages will be handeled before the database update message will be processed. Can you increase the number of workerthreads on the transport? For database messages, most likely. For the telnet messages that might not be the case.

The trick here is to have different queues for the telnet and database messages. And then we get the first problem, there can only be a single <MsmqTransportConfig/> in the application configuration file.

It is possible to create multiple instances in the application:


var bus = Configure.With....CreateBus().Start();


Here we will create an application with two input busses and one sender bus. We will do explicit type mapping for message routing and handlers.



But we need to tell the bus what queue to listen on. This can be done by implementing the IConfigurationSource. In our example:


sealed class MyConfigurationSource : IConfigurationSource
{
private readonly string _inputQueue;

public MyConfigurationSource(string inputQueue)
{
_inputQueue = inputQueue;
}

public T GetConfiguration<T>() where T : class
{
if (typeof(T) == typeof(MsmqTransportConfig))
return new MsmqTransportConfig
{
InputQueue = _inputQueue
} as T;

return ConfigurationManager.GetSection(typeof(T).Name) as T;
}
}


We can then add this to the construction phase of the bus:


private static void AddReceiverBus(
string inputQueue,
Type handler)
{
var types = typeof(UnicastBus).Assembly.GetTypes().Union(
new[] { handler });

Configure.With(types)
.CustomConfigurationSource(new MyConfigurationSource(inputQueue))
.StructureMapBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.MsmqSubscriptionStorage()
.UnicastBus()
.ImpersonateSender(false)
.LoadMessageHandlers()
.CreateBus()
.Start();
}


With(types) tells NServiceBus what handlers to load. In this example a single type.

We can then create a new receiver bus listening at a given queue, with a given handler.


AddReceiverBus(
"SomeMessage.Input",
typeof(SomeMessageHandler));




AddMapping is an custom extension method that will tell the bus where a given messagetype should be sent. This is used for the "sender" bus


public static class MyMappingExtension
{
public static ConfigUnicastBus AddMapping(
this ConfigUnicastBus config,
Hashtable mapping)
{
config.RunCustomAction(() =>
Configure.Instance
.Configurer
.ConfigureProperty<UnicastBus>(
x => x.MessageOwners, mapping)
);

return config;
}
}


And the hashtable is populated this way:


var mapping = new Hashtable
{
{ typeof(SomeMessage).AssemblyQualifiedName, "SomeMessage.Input" },
{ typeof(AnotherMessage).AssemblyQualifiedName, "AnotherMessage.Input" }
};


The "sender" bus is almost the same as the receiver, but there are no handlers that will be loaded:


private static IBus CreateSenderBus(Hashtable mapping)
{
return Configure.With()
.StructureMapBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.UnicastBus()
.ImpersonateSender(false)
.AddMapping(mapping)
.CreateBus()
.Start();
}

...

StructureMap.ObjectFactory.Inject(
CreateSenderBus(mapping));


We can now get the bus out of StructureMap and send a message


private static void SendMessageOnTheBus()
{
var bus = StructureMap.ObjectFactory.GetInstance<IBus>();

bus.Send(new SomeMessage { Text = "Hello World!" });
}


The message will then be routed to the SomeMessageHandler class and with this implementation it will be routed to AnotherMessageHandler. Take a second look at the code, this is a very nice way to upgrade your message format from A to B. You create a new class, move the logic from handler A to handler B, and make handler A translate message A to message B.


public class SomeMessageHandler : IHandleMessages<SomeMessage>
{
public void Handle(SomeMessage message)
{
var bus = StructureMap.ObjectFactory.GetInstance<IBus>();
bus.Send(new AnotherMessage { Text = message.Text });
}
}


... that will write the content to the logging framework.


public class AnotherMessageHandler : IMessageHandler<AnotherMessage>
{
private static readonly ILog Logger = LogManager.GetLogger(
typeof(AnotherMessageHandler));

public void Handle(AnotherMessage message)
{
Logger.DebugFormat("Message received: {0}", message.Text);
}
}




The code is also available on github:

https://github.com/mteinum/linq-enumerable

2 comments:

  1. The nsb recommendation is to not run multiple busses in the same process. This makes scaling them separately possible. The generic host that comes with NSB will make it very easy for you to host and install multiple nsb endpoints. Any particular reason that you want to keep them in the same process?

    ReplyDelete
  2. I should have mentioned that ;)

    Currently I'm looking on what I 'can do' and will move on the the 'should do' later on.

    I can see a reason if I have - let say - ten times of endpoints and they all access some readonly configuration data, example a huge dataset loaded from database.

    ReplyDelete