Wednesday, July 6, 2011

NServiceBus - Single process, multiple input

My default NServiceBus will listen to one single MSMQ and dispatch incoming messages to handlers loaded by the bus. It's possible to run multiple threads for consuming messages from MSMQ. There is a problem with this. Say we have two services, one that updates a database, another service communicating with some "slow" hardware over telnet. If you now have one thread receiving messages and dispatch this to one of the two handlers the following can happen: You send 1000 telnet messages, and then a database update message. The issue now is that all of the telnet messages will be handeled before the database update message will be processed. Can you increase the number of workerthreads on the transport? For database messages, most likely. For the telnet messages that might not be the case.

The trick here is to have different queues for the telnet and database messages. And then we get the first problem, there can only be a single <MsmqTransportConfig/> in the application configuration file.

It is possible to create multiple instances in the application:


var bus = Configure.With....CreateBus().Start();


Here we will create an application with two input busses and one sender bus. We will do explicit type mapping for message routing and handlers.



But we need to tell the bus what queue to listen on. This can be done by implementing the IConfigurationSource. In our example:


sealed class MyConfigurationSource : IConfigurationSource
{
private readonly string _inputQueue;

public MyConfigurationSource(string inputQueue)
{
_inputQueue = inputQueue;
}

public T GetConfiguration<T>() where T : class
{
if (typeof(T) == typeof(MsmqTransportConfig))
return new MsmqTransportConfig
{
InputQueue = _inputQueue
} as T;

return ConfigurationManager.GetSection(typeof(T).Name) as T;
}
}


We can then add this to the construction phase of the bus:


private static void AddReceiverBus(
string inputQueue,
Type handler)
{
var types = typeof(UnicastBus).Assembly.GetTypes().Union(
new[] { handler });

Configure.With(types)
.CustomConfigurationSource(new MyConfigurationSource(inputQueue))
.StructureMapBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.MsmqSubscriptionStorage()
.UnicastBus()
.ImpersonateSender(false)
.LoadMessageHandlers()
.CreateBus()
.Start();
}


With(types) tells NServiceBus what handlers to load. In this example a single type.

We can then create a new receiver bus listening at a given queue, with a given handler.


AddReceiverBus(
"SomeMessage.Input",
typeof(SomeMessageHandler));




AddMapping is an custom extension method that will tell the bus where a given messagetype should be sent. This is used for the "sender" bus


public static class MyMappingExtension
{
public static ConfigUnicastBus AddMapping(
this ConfigUnicastBus config,
Hashtable mapping)
{
config.RunCustomAction(() =>
Configure.Instance
.Configurer
.ConfigureProperty<UnicastBus>(
x => x.MessageOwners, mapping)
);

return config;
}
}


And the hashtable is populated this way:


var mapping = new Hashtable
{
{ typeof(SomeMessage).AssemblyQualifiedName, "SomeMessage.Input" },
{ typeof(AnotherMessage).AssemblyQualifiedName, "AnotherMessage.Input" }
};


The "sender" bus is almost the same as the receiver, but there are no handlers that will be loaded:


private static IBus CreateSenderBus(Hashtable mapping)
{
return Configure.With()
.StructureMapBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.UnicastBus()
.ImpersonateSender(false)
.AddMapping(mapping)
.CreateBus()
.Start();
}

...

StructureMap.ObjectFactory.Inject(
CreateSenderBus(mapping));


We can now get the bus out of StructureMap and send a message


private static void SendMessageOnTheBus()
{
var bus = StructureMap.ObjectFactory.GetInstance<IBus>();

bus.Send(new SomeMessage { Text = "Hello World!" });
}


The message will then be routed to the SomeMessageHandler class and with this implementation it will be routed to AnotherMessageHandler. Take a second look at the code, this is a very nice way to upgrade your message format from A to B. You create a new class, move the logic from handler A to handler B, and make handler A translate message A to message B.


public class SomeMessageHandler : IHandleMessages<SomeMessage>
{
public void Handle(SomeMessage message)
{
var bus = StructureMap.ObjectFactory.GetInstance<IBus>();
bus.Send(new AnotherMessage { Text = message.Text });
}
}


... that will write the content to the logging framework.


public class AnotherMessageHandler : IMessageHandler<AnotherMessage>
{
private static readonly ILog Logger = LogManager.GetLogger(
typeof(AnotherMessageHandler));

public void Handle(AnotherMessage message)
{
Logger.DebugFormat("Message received: {0}", message.Text);
}
}




The code is also available on github:

https://github.com/mteinum/linq-enumerable

Friday, July 1, 2011

NServiceBus - Part 2

Continuing writing down my findings getting NServiceBus up and running.

Overal structure, simple:


There are two config sections that are important regarding NServiceBus

* MsmqTransportConfig
* UnicastBusConfig

MsmqTransportConfig


This section have information about the messages going in to the current process.

UnicastBusConfig


This section is used to say where a message should be sent.

Example:

The following section says that a specific message should be sent to a queue named testqueue on the 10.21.50.11 machine.


<UnicastBusConfig>
<MessageEndpointMappings>
<add Messages="NServiceBusDemo.SomeMessage, NServiceBusDemo"
Endpoint="testqueue@10.21.50.11"/>
</MessageEndpointMappings>
</UnicastBusConfig>


The following section says that all messages in the NServiceBusDemo assembly should be sent to the queue name testqueue on the local machine.


<UnicastBusConfig>
<MessageEndpointMappings>
<add Messages="NServiceBusDemo" Endpoint="testqueue"/>
</MessageEndpointMappings>
</UnicastBusConfig>


Message Format



This class is used as the message for this example


[Serializable]
public class SomeMessage : IMessage
{
public string Text;
}


By default there are two formatters available for serializing the messages to MSMQ. Binary and Xml. For both serializers a Label is set on the Message (MSMQ) with the following data:


<CorrId></CorrId><WinIdName>NO\mteinum</WinIdName>


The ResponseQueue on the Message is set to the inputqueue defined in MsmqTransportConfig

For my example:

DIRECT=OS:MORTEN\private$\NServiceBusDemoInputQueue


The body is the output from the XML.MessageSerializer:


<?xml version="1.0"?>
<Messages xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns="http://tempuri.net/NServiceBusDemo">
<SomeMessage>
<Text>FooBar</Text>
</SomeMessage>
</Messages>

If we are sending two messages, one of SomeMessage and another of type AnotherMessage, the serialized stream will look like:

<?xml version="1.0"?>
<Messages xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns="http://tempuri.net/NServiceBusDemo">
<SomeMessage>
<Text>Hello World!</Text>
</SomeMessage>
<AnotherMessage>
<Text>Hello World!</Text>
</AnotherMessage>
</Messages>


Custom formatter


SOAP ...? :) Is it possible? sure :)

First we need to create a serializer class implementing the NServiceBus.Serialization.IMessageSerializer interface.

And since we want a SOAP formatted message we will use the formatter from the System.Runtime.Serialization.Formatters.Soap namespace.

And the implementation:


public class SoapMessageSerializer : IMessageSerializer
{
private readonly SoapFormatter _formatter;

public SoapMessageSerializer()
{
_formatter = new SoapFormatter
{
AssemblyFormat = FormatterAssemblyStyle.Simple,
TypeFormat = FormatterTypeStyle.TypesWhenNeeded
};
}

public void Serialize(IMessage[] messages, Stream stream)
{
_formatter.Serialize(stream, messages);
}

public IMessage[] Deserialize(Stream stream)
{
return (IMessage[])_formatter.Deserialize(stream);
}
}


We then need to configure the bus to use this, and we then need an extension class:


public static class ConfigureSoapSerializer
{
public static Configure SoapSerializer(this Configure config)
{
config.Configurer.ConfigureComponent(
typeof(SoapMessageSerializer),
ComponentCallModelEnum.Singleton);

return config;
}
}


We can then start the bus using this extension method


var bus = Configure.With()
.Log4Net()
.StructureMapBuilder()
.SoapSerializer()
.MsmqTransport()
.UnicastBus()
.ImpersonateSender(false)
.CreateBus()
.Start();

bus.Send(new SomeMessage { Text = "Hello World!" });


And the message on the queue:


<SOAP-ENV:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"
xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:clr="http://schemas.microsoft.com/soap/encoding/clr/1.0"
SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<SOAP-ENV:Body>
<SOAP-ENC:Array SOAP-ENC:arrayType="a1:IMessage[1]"
xmlns:a1="http://schemas.microsoft.com/clr/nsassem/NServiceBus/NServiceBus">
<item href="#ref-3" />
</SOAP-ENC:Array>
<a2:SomeMessage id="ref-3"
xmlns:a2="http://schemas.microsoft.com/clr/nsassem/NServiceBusDemo/NServiceBusDemo">
<Text id="ref-5">Hello World!
</a2:SomeMessage>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>


Happy coding!

Friday, June 10, 2011

Starting up with NServiceBus

After many years with a inhouse made API on top of MSMQ, its time to evaluate other frameworks that gives us more out of the box.

NServiceBus looks like a great candidate.

  • Easy install

  • Extensible

  • Strongly typed messages using generics

  • Support for unit testing


The first problem was to use the StructureMap container in a strong name solution. The NServiceBus.ObjectBuilder.StructureMap262.dll extension have no stong name so I had to decompile and assemble it again with a keypair.

This is done using ildasm/ilasm.

sn -k test.snk

ildasm NServiceBus.ObjectBuilder.StructureMap262.dll /out=NServiceBus.ObjectBuilder.StructureMap262.il

ilasm NServiceBus.ObjectBuilder.StructureMap262.il /dll /key=test.snk /output=NServiceBus.ObjectBuilder.StructureMap262-sn.dll


Then its over to programming.

Message
We define a message with some properties that are sent from the client to the handler/server/service

public class PingMessage : NServiceBus.IMessage
{
public string Text;
}


Handler
Then a handler to consume these messages:

public class PingMessageHandler : NServiceBus.IHandleMessages<PingMessage>
{
public ILog Logger = LogManager.GetLogger(typeof(PingMessageHandler));

public void Handle(PingMessage message)
{
Logger.Debug(message.Text);
}
}


Unit testing
Normally I would have made Logger static readonly but we want to access this in our unit test

[TestFixture]
public class PingMessageHandlerFixture
{
[Test]
public void SendMessage()
{
Test.Initialize(typeof(PingMessage).Assembly);
var logger = A.Fake<log4net.ILog>();

Test.Handler<PingMessageHandler>()
.WithExternalDependencies(x => x.Logger = logger)
.OnMessage<PingMessage>(x => x.Text = "hello world");

A.CallTo(() => logger.Debug("hello world")).MustHaveHappened();
}
}


And the test goes green!

Great start :)

References:
www.nservicebus.com
fakeiteasy

Tuesday, May 31, 2011

Take me back!

After iterating the tree down, its time do go the opposite way: up!


public static IEnumerable<T> ToRoot<T>(
T item, Func<T, T> selector)
{
if (Equals(item, default(T))) yield break;
yield return item;

foreach (var x in ToRoot(selector(item), selector))
yield return x;
}


And how to use it:

 
static IEnumerable<int> CreateList(ProductDataSet.ProductRow row)
{
return ToRoot(row, x => x.ProductRowParent).Select(x => x.ProductId);
}

Iterate down a tree

Very often I see code like:


var list = new List<foo>();
AddRecursive(list, foo);
return list;


Nothing wrong with that except that there will be a lot of duplicate code after a while.

After some testing I was able to create an enumerator walking the tree.


public static IEnumerable<T> Recursive<T>(
T node, Func<T, IEnumerable<T>> selector)
{
yield return node;
var children = selector(node);
if (children == null) yield break;

foreach (var child in children.SelectMany(x => Recursive(x, selector)))
{
yield return child;
}
}


Let say you have this class:


public class Node {
public int Id;
public Node[] Children;
}


You can now find a given node using this statement


var node = Recursive(root, x => x.Children)
.FirstOrDefault(x => x.Id == 10);

Norwegian SSN validator

The Norwegian social security number consist of the birthday, then three digits with information about gender an century. The two last digits are checksums. Total 11 digits.

Can you do a validation of this using only two lines of code? Sure :D


public bool IsValid(string ssn)
{
var input = ssn.Select(x => x - '0').ToArray();
return (new[] {
new[] {3, 7, 6, 1, 8, 9, 4, 5, 2},
new[] {5, 4, 3, 2, 7, 6, 5, 4, 3, 2} })
.Select(x => new
{
Last = input[x.Length],
Check = x.Select((t, i) => input[i] * t).Sum() % 11 })
.All(z => (z.Last == (11 - z.Check) % 10)
|| (z.Last == 0 && z.Check == 0));
}


Specification: http://no.wikipedia.org/wiki/Fødselsnummer

Sequence inside a sequence

Is was looking at some code with smell. The method description was clear, but the implementation was not.

The code was, for a given sequence to determine if another sequence is present.

After getting some characterization tests up and running, it was time to do some refactoring.

From 38 lines of code, I was down to this:

for (;;)


public static bool Contains(List<T> x, IList<T> y)
{
for (int index = -1; (index = x.IndexOf(y[0], index + 1)) >= 0;)
{
if (x.Skip(index).Take(y.Count).SequenceEqual(y))
return true;
}

return false;
}


I thought I was done, but there was room for making this smaller. First out was to try to split up the algorithm into multiple parts

yield


public static bool Contains(List<T> x, IList<T> y)
{
return FindStarts(x, y[0])
.Any(index => x.Skip(index).Take(y.Count).SequenceEqual(y));
}

private static IEnumerable<int> FindStarts(List<T> x, T item)
{
for (int index = -1; (index = x.IndexOf(item, index + 1)) >= 0; )
yield return index;
}


Then combine these two methods:


public static bool Contains(IEnumerable<T> x, IList<T> y)
{
return x.Select((value, index) => new { Value = value, Index = index })
.Where(z => z.Value.Equals(y[0]))
.Select(z => z.Index)
.Any(index => x.Skip(index).Take(y.Count).SequenceEqual(y));
}


Another possibility is check each element in the source sequence and drop the IndexOf(,..) logic:

Range


public static bool Contains(IEnumerable<T> x, IList<T> y)
{
return Enumerable.Range(0, x.Count() - 1)
.Any(i => x.Skip(i).Take(y.Count).SequenceEqual(y));
}


And the final one using:

SkipWhile


public static bool Contains(IEnumerable<T> x, IList<T> y)
{
return x.SkipWhile((value, index) => (value.Equals(y[0])
&& x.Skip(index).Take(y.Count).SequenceEqual(y)) == false).Any();
}

Welcome!

This blog is about the System.Linq.Enumerable class in .NET API. The Enumerable class is an extension library primary for linq queries.

There is a corresponding fan page on facebook for this blog:

http://www.facebook.com/pages/SystemLinqEnumerable