Service Bus Topics and Rules

Service Bus Topics and Rules

The goal of this post is to create a Publisher application and a Subscriber application, where the Subscriber will use Rules to decide which messages it is interested in.

Prerequisites

To be able to follow along with this post, you must have a Service Bus Namespace set up. You will need the Service Bus connection string later. We will be creating a Topic and a Subscription as part of the application later, but feel free to do this yourself if you prefer.

The Publisher

The publisher will be a simple console application, sending messages to you chosen Topic on your Service Bus Namespace. We set a simple User Property on the published messages, that we can later use when setting up our Rules on the Subscriber side. After creating the console application, the first thing we’ll do is install the Microsoft.Azure.ServiceBus nuget package. When that is done, we’ll start writing our code.

const string ServiceBusConnectionString = "<your-connection-string>";
const string TopicPath = "<your-topic-name>";
static TopicClient topicClient;

static async Task Main(string[] args)
{
    var numberOfMessages = 10;
    topicClient = new TopicClient(ServiceBusConnectionString, TopicPath);

    await SendMessagesAsync(numberOfMessages);
    await topicClient.CloseAsync();
}

The main function is quite simple. We create a new TopicClient with our configured Connection String and Topic Path. We then call the method SendMessagesAsync, telling it how many messages to send, and finally we close the TopicClient. Simple enough. Let’s take a look at SendMessagesAsync.

static async Task SendMessagesAsync(int numberOfMessages)
{
    try
    {
        for (int i = 0; i < numberOfMessages; i++)
        {
            var messageBody = $"Message {i}";
            var message = new Message(Encoding.UTF8.GetBytes(messageBody));

            message.UserProperties.Add("IsEven", i % 2 == 0);
            await topicClient.SendAsync(message);
        }
    }
    catch (Exception exception)
    {
        Console.WriteLine($"Error sending message. Exception: {exception.Message}");
    }
}

In here, there is a bit more going on. We do a simple for-loop to send the right number of messages. For each iteration, we create a new Message, with a Message Body telling us what number this message has. We also add a very simple user property, “IsEven”, which will be true or false depending on the message number. Finally, we send the message to the Topic, using the TopicClient. As you can see, sending a message is pretty straightforward.

The Subscriber

Now, the Subscriber is where the fun stuff happens. This will also be a simple console application, so go ahead and create a new project for this. Like before, we will need the Microsoft.Azure.ServiceBus nuget package installed. Then, let’s head on to our Main function.

const string ServiceBusConnectionString = "<your-connection-string>";
const string TopicName = "<your-topic-name>";
static SubscriptionClient subscriptionClient;
static ManagementClient managementClient;

static async Task Main(string[] args)
{
    managementClient = new ManagementClient(ServiceBusConnectionString);

    await CreateTopicIfNotExistsAsync(TopicName);
    await CreateSubscriptionIfNotExistsAsync(TopicName, "EvenNumbers");

    subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, "EvenNumbers");
    await RemoveDefaultRuleAsync();

    var isEvenFilter = CreateFilter("IsEven", true);
    var ruleDescription = new RuleDescription("MyFilter", isEvenFilter);
    await AddRuleIfNotExistsAsync(ruleDescription);

    var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
    {
        AutoComplete = false
    };

    subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

    Console.ReadKey();
}

There’s a bit more going on here, but we will go through it all. We start of by creating the necessary resources, if they don’t already exist. Then we do some cleaning up, and finally we create our Rules and start listening to our subscription.

static async Task CreateTopicIfNotExistsAsync(string topicName)
{
    try
    {
        await managementClient.GetTopicAsync(topicName);
    }
    catch (MessagingEntityNotFoundException)
    {
        await managementClient.CreateTopicAsync(topicName);
    }
}

static async Task CreateSubscriptionIfNotExistsAsync(string topicName, string subscriptionName)
{
    try
    {
        await managementClient.GetSubscriptionAsync(topicName, subscriptionName);
    }
    catch (MessagingEntityNotFoundException)
    {
        await managementClient.CreateSubscriptionAsync(new SubscriptionDescription(topicName, subscriptionName));
    }
}

These functions are quite similar to eachother and will create our Topic and Subscription if they do not already exist.

static async Task RemoveDefaultRuleAsync()
{
    var rules = await subscriptionClient.GetRulesAsync();
    var defaultRule = rules.SingleOrDefault(rule => rule.Name == RuleDescription.DefaultRuleName);

    if (defaultRule != null)
    {
        await subscriptionClient.RemoveRuleAsync(defaultRule.Name);
    }
}

Depending on how you create your Subscription, a Default Rule might be added. This is configured to accept everything, so we need to delete this if it exists, for our Rule to work as intended.

static CorrelationFilter CreateFilter(string propertyName, object value)
{
    var filter = new CorrelationFilter();
    filter.Properties[propertyName] = value;

    return filter;
}

There are different types of filters, each suitable for different use cases. I choose a CorrelationFilter in this case. Here we just specify that a certain property should match a certain value. In our case, the property name is IsEven, and the value is true. More info on filters can be found here: docs.microsoft.com/sv-se/azure/service-bus-..

static async Task AddRuleIfNotExistsAsync(RuleDescription ruleDescription)
{
    try
    {
        await subscriptionClient.AddRuleAsync(ruleDescription);
    }
    catch (ServiceBusException exception)
    {
        Console.WriteLine($"Could not add rule '{ruleDescription.Name}'. Exception: {exception.Message}.");
    }
}

Here we try to add the rule, if it already exists the exception will let us know. Now we have a subscription with a rule added, all we have left is to setup our MessageHandler.

static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
    var context = exceptionReceivedEventArgs.ExceptionReceivedContext;

    Console.WriteLine("Exception context for troubleshooting:");
    Console.WriteLine($"- Endpoint: {context.Endpoint}");

    Console.WriteLine($"- Entity Path: {context.EntityPath}");
    Console.WriteLine($"- Executing Action: {context.Action}");

    return Task.CompletedTask;
}

If we should encounter an exception when trying to handle our Messages, this ExceptionHandler will take care of it. In this case, we will just log it in the Console.

static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
    Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
    await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}

Finally, the MessageHandler itself is simple. It will write the message Sequence Number and body to the Console, and then Complete the message.

Putting it all together.

Now, all we need to do is fill in our config variables in each project, and then run our console applications. I would suggest running the Subscriber application first, to be sure that everything is up and running before publishing. You should see in the Console of the Subscriber application, that only messages with an even number in the message body is handled by this Subscription.

If you have any thoughts or questions, please let me know. This is my first blog post here, so any helpful feedback would be appreciated.

Cover photo by Nubia Navarro (nubikini) from Pexels