Wednesday, July 6, 2011

NServiceBus - Single process, multiple input

My default NServiceBus will listen to one single MSMQ and dispatch incoming messages to handlers loaded by the bus. It's possible to run multiple threads for consuming messages from MSMQ. There is a problem with this. Say we have two services, one that updates a database, another service communicating with some "slow" hardware over telnet. If you now have one thread receiving messages and dispatch this to one of the two handlers the following can happen: You send 1000 telnet messages, and then a database update message. The issue now is that all of the telnet messages will be handeled before the database update message will be processed. Can you increase the number of workerthreads on the transport? For database messages, most likely. For the telnet messages that might not be the case.

The trick here is to have different queues for the telnet and database messages. And then we get the first problem, there can only be a single <MsmqTransportConfig/> in the application configuration file.

It is possible to create multiple instances in the application:


var bus = Configure.With....CreateBus().Start();


Here we will create an application with two input busses and one sender bus. We will do explicit type mapping for message routing and handlers.



But we need to tell the bus what queue to listen on. This can be done by implementing the IConfigurationSource. In our example:


sealed class MyConfigurationSource : IConfigurationSource
{
private readonly string _inputQueue;

public MyConfigurationSource(string inputQueue)
{
_inputQueue = inputQueue;
}

public T GetConfiguration<T>() where T : class
{
if (typeof(T) == typeof(MsmqTransportConfig))
return new MsmqTransportConfig
{
InputQueue = _inputQueue
} as T;

return ConfigurationManager.GetSection(typeof(T).Name) as T;
}
}


We can then add this to the construction phase of the bus:


private static void AddReceiverBus(
string inputQueue,
Type handler)
{
var types = typeof(UnicastBus).Assembly.GetTypes().Union(
new[] { handler });

Configure.With(types)
.CustomConfigurationSource(new MyConfigurationSource(inputQueue))
.StructureMapBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.MsmqSubscriptionStorage()
.UnicastBus()
.ImpersonateSender(false)
.LoadMessageHandlers()
.CreateBus()
.Start();
}


With(types) tells NServiceBus what handlers to load. In this example a single type.

We can then create a new receiver bus listening at a given queue, with a given handler.


AddReceiverBus(
"SomeMessage.Input",
typeof(SomeMessageHandler));




AddMapping is an custom extension method that will tell the bus where a given messagetype should be sent. This is used for the "sender" bus


public static class MyMappingExtension
{
public static ConfigUnicastBus AddMapping(
this ConfigUnicastBus config,
Hashtable mapping)
{
config.RunCustomAction(() =>
Configure.Instance
.Configurer
.ConfigureProperty<UnicastBus>(
x => x.MessageOwners, mapping)
);

return config;
}
}


And the hashtable is populated this way:


var mapping = new Hashtable
{
{ typeof(SomeMessage).AssemblyQualifiedName, "SomeMessage.Input" },
{ typeof(AnotherMessage).AssemblyQualifiedName, "AnotherMessage.Input" }
};


The "sender" bus is almost the same as the receiver, but there are no handlers that will be loaded:


private static IBus CreateSenderBus(Hashtable mapping)
{
return Configure.With()
.StructureMapBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.UnicastBus()
.ImpersonateSender(false)
.AddMapping(mapping)
.CreateBus()
.Start();
}

...

StructureMap.ObjectFactory.Inject(
CreateSenderBus(mapping));


We can now get the bus out of StructureMap and send a message


private static void SendMessageOnTheBus()
{
var bus = StructureMap.ObjectFactory.GetInstance<IBus>();

bus.Send(new SomeMessage { Text = "Hello World!" });
}


The message will then be routed to the SomeMessageHandler class and with this implementation it will be routed to AnotherMessageHandler. Take a second look at the code, this is a very nice way to upgrade your message format from A to B. You create a new class, move the logic from handler A to handler B, and make handler A translate message A to message B.


public class SomeMessageHandler : IHandleMessages<SomeMessage>
{
public void Handle(SomeMessage message)
{
var bus = StructureMap.ObjectFactory.GetInstance<IBus>();
bus.Send(new AnotherMessage { Text = message.Text });
}
}


... that will write the content to the logging framework.


public class AnotherMessageHandler : IMessageHandler<AnotherMessage>
{
private static readonly ILog Logger = LogManager.GetLogger(
typeof(AnotherMessageHandler));

public void Handle(AnotherMessage message)
{
Logger.DebugFormat("Message received: {0}", message.Text);
}
}




The code is also available on github:

https://github.com/mteinum/linq-enumerable

Friday, July 1, 2011

NServiceBus - Part 2

Continuing writing down my findings getting NServiceBus up and running.

Overal structure, simple:


There are two config sections that are important regarding NServiceBus

* MsmqTransportConfig
* UnicastBusConfig

MsmqTransportConfig


This section have information about the messages going in to the current process.

UnicastBusConfig


This section is used to say where a message should be sent.

Example:

The following section says that a specific message should be sent to a queue named testqueue on the 10.21.50.11 machine.


<UnicastBusConfig>
<MessageEndpointMappings>
<add Messages="NServiceBusDemo.SomeMessage, NServiceBusDemo"
Endpoint="testqueue@10.21.50.11"/>
</MessageEndpointMappings>
</UnicastBusConfig>


The following section says that all messages in the NServiceBusDemo assembly should be sent to the queue name testqueue on the local machine.


<UnicastBusConfig>
<MessageEndpointMappings>
<add Messages="NServiceBusDemo" Endpoint="testqueue"/>
</MessageEndpointMappings>
</UnicastBusConfig>


Message Format



This class is used as the message for this example


[Serializable]
public class SomeMessage : IMessage
{
public string Text;
}


By default there are two formatters available for serializing the messages to MSMQ. Binary and Xml. For both serializers a Label is set on the Message (MSMQ) with the following data:


<CorrId></CorrId><WinIdName>NO\mteinum</WinIdName>


The ResponseQueue on the Message is set to the inputqueue defined in MsmqTransportConfig

For my example:

DIRECT=OS:MORTEN\private$\NServiceBusDemoInputQueue


The body is the output from the XML.MessageSerializer:


<?xml version="1.0"?>
<Messages xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns="http://tempuri.net/NServiceBusDemo">
<SomeMessage>
<Text>FooBar</Text>
</SomeMessage>
</Messages>

If we are sending two messages, one of SomeMessage and another of type AnotherMessage, the serialized stream will look like:

<?xml version="1.0"?>
<Messages xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns="http://tempuri.net/NServiceBusDemo">
<SomeMessage>
<Text>Hello World!</Text>
</SomeMessage>
<AnotherMessage>
<Text>Hello World!</Text>
</AnotherMessage>
</Messages>


Custom formatter


SOAP ...? :) Is it possible? sure :)

First we need to create a serializer class implementing the NServiceBus.Serialization.IMessageSerializer interface.

And since we want a SOAP formatted message we will use the formatter from the System.Runtime.Serialization.Formatters.Soap namespace.

And the implementation:


public class SoapMessageSerializer : IMessageSerializer
{
private readonly SoapFormatter _formatter;

public SoapMessageSerializer()
{
_formatter = new SoapFormatter
{
AssemblyFormat = FormatterAssemblyStyle.Simple,
TypeFormat = FormatterTypeStyle.TypesWhenNeeded
};
}

public void Serialize(IMessage[] messages, Stream stream)
{
_formatter.Serialize(stream, messages);
}

public IMessage[] Deserialize(Stream stream)
{
return (IMessage[])_formatter.Deserialize(stream);
}
}


We then need to configure the bus to use this, and we then need an extension class:


public static class ConfigureSoapSerializer
{
public static Configure SoapSerializer(this Configure config)
{
config.Configurer.ConfigureComponent(
typeof(SoapMessageSerializer),
ComponentCallModelEnum.Singleton);

return config;
}
}


We can then start the bus using this extension method


var bus = Configure.With()
.Log4Net()
.StructureMapBuilder()
.SoapSerializer()
.MsmqTransport()
.UnicastBus()
.ImpersonateSender(false)
.CreateBus()
.Start();

bus.Send(new SomeMessage { Text = "Hello World!" });


And the message on the queue:


<SOAP-ENV:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"
xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:clr="http://schemas.microsoft.com/soap/encoding/clr/1.0"
SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<SOAP-ENV:Body>
<SOAP-ENC:Array SOAP-ENC:arrayType="a1:IMessage[1]"
xmlns:a1="http://schemas.microsoft.com/clr/nsassem/NServiceBus/NServiceBus">
<item href="#ref-3" />
</SOAP-ENC:Array>
<a2:SomeMessage id="ref-3"
xmlns:a2="http://schemas.microsoft.com/clr/nsassem/NServiceBusDemo/NServiceBusDemo">
<Text id="ref-5">Hello World!
</a2:SomeMessage>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>


Happy coding!