Search

Azure Servicebus

Indien de URM-DB Dienst wordt ingezet ten behoeve van batch berekeningsprocessen (bijvoorbeeld ten behoeve van het NPR, of ten behoeve van het maken van UPO's), dient gebruik gemaakt te worden van de Azure Servicebus die we ingericht hebben. Onderdeel van de Azure Service bus is een input queue en een output queue. U kunt berichten op een input queue plaatsen, die vervolgens opgepakt worden en aan de URM-DB batch webservice aangeboden worden. De response van deze aanroep wordt vervolgens op een output queue geplaatst. U kunt vervolgens de berichten van deze output queue afhalen. 

Parameters

Om berichten op de input queue te plaatsen en om berichten van de output queue te lezen heeft u een aantal gegevens nodig. Dit betreffen de onderstaande gegevens:

  • queueName: de naam van de queue waarop berichten geplaatst worden, of vanaf waar berichten gehaald kunnen worden
  • serviceBusConnection: een serviceBusConnection teb behoeve van de input queue en een serviceBusConnection ten behoeve van de output queue zodat u de rechten heeft om berichten op de input queue te plaatsen, en van de output queue mag halen 

U zult bij aansluiting op de URM-DB Dienst van ActuIT de benodigde gegevens ontvangen.

Naast deze gegevens zult u voor het plaatsen van een bericht op de input queue ook een message moeten definieren. Dit message is conform de specificaties zoals beschreven op de pagina https://www.actuit.nl/Producten/Support/URM-DB-Dienst/URM-DB-webservice-batch.

Poorten

Om de servicebus te kunnen benaderen moeten de onderstaande poorten worden open gezet:

  • TCP 5671
  • TCP 5672
  • TCP 9354
  • TCP 443

Sample code - plaatsen berichten op queue

Onderstaand sample code om berichten op een input queue te plaatsen:

static async Task SendMessagesAsync(int numberOfMessagesToSend)

        {

            try

            {

                for (var i = 1; i <= numberOfMessagesToSend; i++)

                {

                    // Create a new message to send to the queue.

 

                    string messageBody = ReadMessage(i);

                    var message = new Message(Encoding.UTF8.GetBytes(messageBody));

 

                    // Write the body of the message to the console.

                    Console.WriteLine($"Sending message: {messageBody}");

 

                    // Send the message to the queue.

                    await queueClient.SendAsync(message);

                }

            }

            catch (Exception exception)

            {

                Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");

            }

        }

Sample code - berichten van queue ophalen

Onderstaand sample code om berichten van een output queue op te halen:

using System;

using System.IO;

using System.Reflection;

using System.Xml.Linq;

using Microsoft.Azure.ServiceBus;

 

using System.Text;

using System.Collections.Generic;

using ActuIT.Futurama.Engine;

using System.Threading.Tasks;

using System.Threading;

 

namespace Futurama.ServiceBusQueueFormulaLibrary

{

    public class CustomFormulas : ActuIT.Futurama.IoCInterfaces.IExternalFormulaLibrary

    {

        public static Dictionary<string, IQueueClient> queueClients;

        //static List<string> result = new List<string>();

        //static int numprocessed = 0;

        //static int maxnumberofitems = 0;

        //static IQueueClient queueClient = null;

 

        public XDocument GetFormulasDocument()

        {

            Stream formulaxml = Assembly.GetExecutingAssembly().GetManifestResourceStream("Futurama.ServiceBusQueueFormulaLibrary.FormulaLibrary.xml");

            return System.Xml.Linq.XDocument.Load(formulaxml);

        }

 

        public string GetLibraryName()

        {

            return "Various formules for adding and retrieving items from an Azure Service Bus queue";

        }

 

        private static string GetQueueIdentifier(string serviceBusConnectionString, string queueName)

        {

            return $"{serviceBusConnectionString}_{queueName}";

        }

 

        private static IQueueClient getQueue(string serviceBusConnectionString, string queueName)

        {

            IQueueClient queueclient = null;

 

            //determine a string to identify this queue

            string queueidentifier = GetQueueIdentifier(serviceBusConnectionString, queueName);

            if (queueClients == null)

            {

                queueClients = new Dictionary<string, IQueueClient>();

            }

            //get queue client from static cache for performance improvement

            if (queueClients.ContainsKey(queueidentifier))

            {

                queueclient = queueClients[queueidentifier];

            }

            else

            {

                // DKu; 23-07-2019

                //queueClient = new QueueClient(serviceBusConnectionString, queueName);

                queueclient = new QueueClient(serviceBusConnectionString, queueName, ReceiveMode.PeekLock);

 

                queueClients.Add(queueidentifier, queueclient);

            }

            return queueclient;

        }

 

        public static int AddToQueue(string serviceBusConnectionString, string queueName, string messageBody)

        {

            try

            {

                IQueueClient queueclient = getQueue(serviceBusConnectionString, queueName);

                var message = new Message(Encoding.UTF8.GetBytes(messageBody));

 

                // Send the message to the queue.

                queueclient.SendAsync(message).GetAwaiter().GetResult();

                return 0;

            }

            catch (Exception ex)

            {

                System.Diagnostics.Debug.WriteLine($"an error occurred: {ex}");

                return -1;

            }

        }

 

        public static stringn GetFromQueue(string serviceBusConnectionString, string queueName, int maxNumberOfItems)

        {

            try

            {

                IQueueClient queueClient = getQueue(serviceBusConnectionString, queueName);

                stringn res = null;

                using (RetrievalJob job = new RetrievalJob(queueClient, maxNumberOfItems))

                {

                    // wait till all tasks are done, or timeout occurred

                    while (!job.Done) Task.Delay(10);

 

                    res = (stringn)matrix.Create(AllowedTypes.@string, job.Results.Count);

 

                    // Reshape result

                    for (int i = 0; i < job.Results.Count; i++)

                        res[i] = job.Results[i];

 

                    if (job.Exception != null)

                    {

                        System.Diagnostics.Debug.WriteLine($"An error occurred: {job.Exception}");

                    }

                }

               

                if (!queueClient.IsClosedOrClosing)

                    queueClient.CloseAsync();

                string key = GetQueueIdentifier(serviceBusConnectionString, queueName);

                queueClients.Remove(key);

                return res;

            }

            catch (Exception ex)

            {

                System.Diagnostics.Debug.WriteLine($"An error occurred: {ex}");

                return null;

            }

        }

    }

 

    /// <summary>

    /// A Job to retrieve messages from the service bus queue

    /// </summary>

    internal class RetrievalJob : IDisposable

    {

        #region Declare

 

        //  default (sliding) timeout

        private const int initialtimeout = 10;  //  The initial timeout to connect to Azure and retrieve the first result

        private const int slidingtimeout = 2;   //  Once a result is retrieved; subsequent results must me retrieved in less time

 

        internal List<string> Results = new List<string>();

 

        #endregion

 

        #region Public Members

 

        internal bool Done

        {

            get

            {

                return this.Results.Count >= this.MaxRetrievalSize || DateTime.Now > this.Timeout;

            }

        }

 

        internal Exception Exception { get; set; } = null;

 

        private int MaxRetrievalSize { get; }

 

        private IQueueClient QueueClient { get; }

 

        private DateTime Timeout { get; set; }

 

        #endregion

 

        #region Constructor

 

        //  Creates a new job

        public RetrievalJob(IQueueClient queueClient, int maxMessages)

        {

            QueueClient = queueClient;

            MaxRetrievalSize = maxMessages;

 

            // the default timeout of 2 seconds for the entire job

            Timeout = DateTime.Now.AddSeconds(initialtimeout);

 

            // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.

            var messageHandlerOptions = new MessageHandlerOptions(async (exceptionEvent) =>

            {

                // Process the exception

                Exception = exceptionEvent.Exception;

                await Task.CompletedTask;

            })

            {

                // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.

                // Set it according to how many messages the application wants to process in parallel.

                MaxConcurrentCalls = 1,

 

                // Indicates whether the message pump should automatically complete the messages after returning from user callback.

                // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().

                AutoComplete = false

            };

 

 

            QueueClient.RegisterMessageHandler(ReceiveMessagesAsync, messageHandlerOptions);

        }

 

        #endregion

 

        /// <summary>

        /// The eventhandler for the queue

        /// </summary>

        /// <param name="message"></param>

        /// <param name="token"></param>

        /// <returns></returns>

        internal async Task ReceiveMessagesAsync(Message message, CancellationToken token)

        {

            Console.WriteLine($"Received message: {Encoding.UTF8.GetString(message.Body)}");

 

            var result = Encoding.UTF8.GetString(message.Body);

            

            await QueueClient.CompleteAsync(message.SystemProperties.LockToken)

                .ContinueWith(t => {

 

                    this.Results.Add(result);

                    // check the batch is full

                    if (Results.Count >= MaxRetrievalSize)

                        QueueClient.CloseAsync();

            });

 

            // extend Timeout with 2 seconds

            this.Timeout = DateTime.Now.AddSeconds(slidingtimeout);

        }

 

        //  Close on disposal

        public void Dispose()

        {

            if (!QueueClient.IsClosedOrClosing)

                QueueClient.CloseAsync();

        }

    }

}

Updated: 30-08-2019