Using IBM MQ with .Net 5

Using IBM MQ with .Net 5

This is the next post in a series covering my experience of migrating to the cloud, the first post of which is here. I have previously worked with WPF, so I am familiar with publish/subscribe patterns for synchronising composite interfaces within loosely coupled applications. In the cloud applications are made up of loosely coupled services which may need to communicate and be synchronised with each other. This post provides an example of using IBM MQ to allow these services to communicate with each other as publishers and subscribers.

IBM MQ

IBM MQ really is an enterprise application so from a developers point of view it can be difficult to work with. Therefore, hosting MQ in a container using Docker will provide a good developer experience. Thankfully IBM provide a tutorial on how to Get an IBM MQ queue for development in a container.

Once MQ is running in a docker container you will have the following configuration:

  • QueueManagerName = "QM1"
  • UserId = "app"
  • Password = "passw0rd"
  • Port = 1414
  • Channel = "DEV.APP.SVRCONN"
  • HostName = "localhost(1414)"
  • Admin = "admin"
  • Password = "passw0rd"
  • MQ Console = localhost:9443/ibmmq/console/#

Queues and Topics

Out of the box MQ will have a number of queues. With MQ running in the container, open the console in a browser, login with the admin username and password, then navigate to the Manage menu item: MQConsole.PNG A queue is a pipeline where a message comes in and goes to just one subscriber. A topic is a pipeline where a message comes in and goes to every subscriber. To create a topic, on the Manage page click on the Topic tab and then the Create button: MQTopic.PNG Give the topic and name and a topic string, in this case BroadCast, then click on the Create button: MSCreateTopic.PNG To add a user that can rad and write messages, in the topic details page click on the View Configuration link. On the configuration page, click on the Security tab and then the Add button: MQTopicScreen.PNG On the Add queue manager access properties pane, enter a user name, in this case app, then tick the Ability to modify the origin of messages and Permissions to operate privileged commands, then click on the Add button: MQTopicSecurity.PNG

The MQ Adapter

The MQ Adapter is a .Net Standard class library that uses the IBMMQDotnetClient NuGet package that provides MQ classes for .NET Standard. MQAdapterNuget.PNG The adapter provides three classes:

  • MQAdapterConnection implements the IMQAdapterConnection interface that defines the details required to connect to MQ;
      public class MQAdapterConnection : IMQAdapterConnection
      {
          public string HostName { get; set; }
          public int Port { get; set; }
          public string Channel { get; set; }
          public string UserId { get; set; }
          public string Password { get; set; }
          public string QueueManagerName { get; set; }
      }
    
  • MQConnectionStatus implements the IMQConnectionStatus interface that defines the details provided when the MQ connection status changes;

      public enum Status
      {
          Connected,
          Disconnected
      }
    
      public class MQConnectionStatus : IMQConnectionStatus
      {
          public MQConnectionStatus(string queueManager, string topicQueueName, Status status)
          {
              Status = status;
              QueueManager = queueManager;
              TopicQueueName = topicQueueName;
          }
    
          public Status Status { get; set; }
          public string QueueManager { get; set; }
          public string TopicQueueName { get; set; }
    
          public override string ToString()
          {
              return $"IBM MQ Adapter Connection Status Changed, Status: {Status}, QueueManager: {QueueManager}, TopicQueueName: {TopicQueueName}.";
          }
      }
    
  • MQAdapterService implements the IMQAdapterService interface which provides methods to send messages to a queue or topic and listen to a queue or topic.

      public interface IMQAdapterService
      {
          bool SendMessageToQueue(string queueName, object objToSend);
          bool SendMessageToTopic(string topicName, object objToSend);
    
          Task ListenToTopic(string topicString, Action<MQMessage> messageHandler, Action<IMQConnectionStatus> connectionStatusChangedHandler);
          Task ListenToQueue(string queueName, Action<MQMessage> messageHandler, Action<IMQConnectionStatus> connectionStatusChangedHandler);
      }
    

    Publishing and Subscribing

    To demonstrate sending and receiving messages there a number of ASP.Net projects, one will act as a publisher and the others will act as subscribers. To take on the roles of publisher and subscriber the projects only need to reference the MQAdapter class library: MQAdapterProjectReference.PNG

    Publisher

    The publisher will create a message and send it to the topic of the MQ. To do this, in the ConfigureServices(...) method of the startup.cs file, the MQAdapterConnection is added to the services collection as a singleton with the connection details of the MQ running in the Docker instance created above. The finally step to use MQ is to also add the MQAdapterService to the collection:

          public void ConfigureServices(IServiceCollection services)
          {
              services.AddSingleton<IMQAdapterConnection>(new MQAdapterConnection
              {
                  QueueManagerName = "QM1",
                  UserId = "app",
                  Password = "passw0rd",
                  Port = 1414,
                  Channel = "DEV.APP.SVRCONN",
                  HostName = "localhost(1414)"
              });
    
              services.AddTransient<IMQAdapterService, MQAdapterService>();
    
              services.AddControllersWithViews();
          }
    

    The MQAdapterService is be used to publish a message, so the constructor of the HomeController is updated to allow an implementation of the IMQAdapterService to be injected:

          private readonly IMQAdapterService _mqAdapterService;
          public HomeController(ILogger<HomeController> logger, IMQAdapterService mqAdapterService)
          {
              _logger = logger;
              _mqAdapterService = mqAdapterService;
          }
    

    Added to the Index.cshtml page of the Home view is a publish form that will POST a string to the HomeController Publish method:

          <form asp-action="Publish">
              <div class="form-group">
                  <input class="form-control" for="message" name="message" autofocus/>
              </div>
    
              <button class="btn btn-primary" name="button">Submit</button>
          </form>
    

    MQPublish.PNG The Publish(...) method accepts a string and publishes the message to the BroadCast topic, that was created earlier, using the adapter service:

          [HttpPost]
          public IActionResult Publish(string message)
          {
              _mqAdapterService.SendMessageToTopic("BroadCast", message);
              return View(nameof(Index));
          }
    

    Subscriber

    The subscriber listens to the MQ topic and reads the messages as they are recieved, in this example they are stored to an in-memory database managed by Entity Framework which is added in the ConfigureServices(...) method of the startup.cs file. For subscribers to process messages as they arrive on the queue it needs to continuously listen to that topic. To do this a MQSubscriberService is registered in the services collection as a Hosted Service. In the same way as the publisher, to use the MQAdapter the MQAdapterConnection and MQAdapterService are added to the services collection:

          public void ConfigureServices(IServiceCollection services)
          {
              services.AddDbContext<ApplicationDbContext>(opt => opt.UseInMemoryDatabase("SubscriberOne"));
    
              services.AddSingleton<IMQAdapterConnection>(new MQAdapterConnection
              {
                  QueueManagerName = "QM1",
                  UserId = "app",
                  Password = "passw0rd",
                  Port = 1414,
                  Channel = "DEV.APP.SVRCONN",
                  HostName = "localhost(1414)"
              });
    
              services.AddSingleton<IMQAdapterService, MQAdapterService>();     
              services.AddHostedService<MQSubscriberService>();
    

    The MQSubscriberService inherits from a BackgroundService and uses the MQAdapterService to listen to MQ. When a message is received it is saved to the database using the ApplicationDbContext. In this example any changes in the status of the MQ connection are just sent to the logger:

    public class MQSubscriberService : BackgroundService
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly ILogger<MQSubscriberService> _logger;
        private readonly IMQAdapterService _mqAdapterService;

        public MQSubscriberService(IServiceProvider sp, ILogger<MQSubscriberService> logger, IMQAdapterService mqAdapterService)
        {
            _serviceProvider = sp;
            _logger = logger;
            _mqAdapterService = mqAdapterService;
        }

        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                await ListenForMessages(cancellationToken);
                await Task.Delay(TimeSpan.FromMinutes(5), cancellationToken);
            }
        }

        private async Task ListenForMessages(CancellationToken cancellationToken)
        {
            await Task.Run(() => _mqAdapterService.ListenToTopic("BroadCast", MessageHandler, ConnectionStatusChangedHandler), cancellationToken);
        }

        private void ConnectionStatusChangedHandler(IMQConnectionStatus connectionStatus)
        {
            _logger.LogInformation(connectionStatus.ToString());
        }

        private void MessageHandler(MQMessage mqMessage)
        {
            var msg = mqMessage.ReadString(mqMessage.DataLength);
            var message = new Message
            {
                Data = msg
            };
            using (var scope = _serviceProvider.CreateScope())
            {
                var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
                dbContext.Messages.Add(message);
                dbContext.SaveChanges();
            }

            _logger.LogInformation(msg);
        }
    }

The Index(...) method of the HomeController has been modified to retrieve the list of the messages that have been saved to the ApplicationDbContext:

        public IActionResult Index()
        {
            var messages = _dbContext.Messages.ToList();
            return View(messages);
        }

The Index.cshtml page of the Home view is updated to display the list of messages. This page will not update in realtime so it will have to be refreshed as messages are processed:

@model List<Message>

<div class="text-center">
    <h1>Messages</h1>
    <p>
    @if(@Model != null && Model.Any())
    {
        <div>
        @foreach(var message in @Model)
        {
            <div>@message.Data</div>
        }
        </div>
    }
    </p>
</div>

After publishing a message, refreshing the Subscriber will list the messages in memory: MQSubscriber.PNG

I have to mention two posts that helped me come to this solution, the first on Communication with IBM MQ in .Net Core 3.0 got me started with MQ and the second on Implementing a Worker Service in ASP.NET Core helped me work out how to create the MQSubscriberService.

I hope you find this post useful, the source code can be found here in GitHub