General ZeroMq
ZeroMq is a library based on socket communication. The big advantage in contrast to plain socket programming is, that ZeroMq uses internal queues and enables various message patterns by default while hiding the details.
A central concept in ZeroMq is, surprisingly, the socket. Sockets have a certain type and are usually used in pairs - each pair enables a common communication pattern as:
- Publish / Subscribe (PUB / SUB)
- Request / Response (REQ / REP)
- Push / Pull (PUSH / PULL)
The server, or more generally the most stable peer, opens a socket and binds to it. During the bind an address, aka endpoint, is given to the socket. Multiple other peers can then connect to this socket using the opposite socket type. I. e. for Request / Response the server binds to a REP socket on a certain address and the client connects via an REQ socket to the given address.
The following transport types are supported:
- inproc
- ipc
- tcp
- pgm, epgm
As ZeroMq uses internal queues it is generally not important wether a socket is already bound before a client connects or if a client is connected prior to binding a socket. But one address can only be bound to one peer at a time - another binding yields an exception.
Basically all messages are represented as simple plain strings.
Getting Started
I will discuess the most common communication patterns in the following sections. To get started, kick off a new VS Solution and get the clrzmq package using nuget.
Make sure to select "Include prerelease" to get the latest version.
The general usage pattern of the ZeroMq API is always as follows:
The following snippet ilustrates this basic protocol:The general usage pattern of the ZeroMq API is always as follows:
- Creating a ZeroMq Context class by using var ctx = ZmqContext.Create()
- Create a socket using the context instance by calling var socket = ctx.CreateSocket(socketType)
- Bind / connect to the socket with a given address by calling socket.Bind("tcp://127.0.0.1:5000") or socket.Connect("tcp://127.0.0.1:5000")
- Sending / Recieving messages by calling socket.Send(message, encoding) or socket.Receive(encoding)
using (var context = ZmqContext.Create())
{
using (var socket = context.CreateSocket(SocketType.REQ))
{
socket.Connect("tcp://127.0.0.1:5000"); - OR - socket.Bind("tcp://127.0.0.1:5000");
socket.Send("Hello", Encoding.UTF8); - OR - var msg = socket.Receive(Encoding.UTF8);
}
}
Request / Response
The server binds to a REP typed socket, the clients connect using a REQ socket. Here is the server code:
using (var context = ZmqContext.Create())
{
using (var socket = context.CreateSocket(SocketType.REP))
{
socket.Bind("tcp://127.0.0.1:5000");
while (true)
{
var recievedMessage = socket.Receive(Encoding.UTF8);
Console.WriteLine("Recieved message: '{0}'", recievedMessage);
var replyMessage = String.Format("This is the reply to message '{0}'", recievedMessage);
Console.WriteLine("Sending reply: '{0}'", replyMessage);
socket.Send(replyMessage, Encoding.UTF8);
}
}
}
The client looks like this:
using (var context = ZmqContext.Create())
{
using (var socket = context.CreateSocket(SocketType.REQ))
{
socket.Connect("tcp://127.0.0.1:5000");
while (true)
{
Console.WriteLine("\nEnter message (Enter to quit)");
var sendMessage = Console.ReadLine();
if (String.IsNullOrEmpty(sendMessage))
return;
Console.WriteLine("Sending message '{0}'", sendMessage);
socket.Send(sendMessage, Encoding.UTF8);
var replyMesssage = socket.Receive(Encoding.UTF8);
Console.WriteLine("Recieved reply message '{0}'", replyMesssage);
}
}
}
My findings with Request / Response:
- For the client does not matter wether the service is up and running when the client starts.
- The client blocks at send until the server is present (a timeout can be specified).
- There is no way of finding out wether the server is present or if at least on client is connected, expect waiting for the timeout to expire.
- Request and response must come in pairs. To subsequent request fail as well as the response is mandatory.
Push / Pull
This pattern is named Pipeline in the documentation and they use it in a way that there is a source of messages that equally distributes these among all connected peers. The peers do some processing and then sending the results to a third party which is the message sink. The worker peers use two sockets, one to receive messages from the source and one to send messages to the sink. For my personal testing I have chosen an easiser setup with a pushing sender an a pulling receiver.
The server:
private static void Push()
{
int messageNumber = 0;
using (var ctx = ZmqContext.Create())
{
using (var socket = ctx.CreateSocket(SocketType.PUSH))
{
socket.Bind("tcp://127.0.0.1:5000");
while (true)
{
var msg = String.Format("{0}: pushing message {1}", DateTime.Now, messageNumber++);
Console.WriteLine("Pushing: " + msg);
socket.Send(msg, Encoding.UTF8);
Thread.Sleep(1000);
}
}
}
}
And the client:
private static void Push()
{
int messageNumber = 0;
using (var ctx = ZmqContext.Create())
{
using (var socket = ctx.CreateSocket(SocketType.PUSH))
{
socket.Bind("tcp://127.0.0.1:5000");
while (true)
{
var msg = String.Format("{0}: pushing message {1}", DateTime.Now, messageNumber++);
Console.WriteLine("Pushing: " + msg);
socket.Send(msg, Encoding.UTF8);
Thread.Sleep(1000);
}
}
}
}
My findings with Push / Pull
- Sending on a PUSH socket blocks until at least one pulling client is present (a timeout can be defined).
- Messages are distributed round-robin to connected peers.
- As only a single bind to an endpoint is allowed, only one process cann push at a time.
- Connecting to a PUSH socket and then sending message yields to lost messages.
Publish / Subscribe
The server binds a PUB socket, the client subscribes via SUB. In contrast to the previous examples the client does not use the Connect method, but the Subscribe / SubscribeAll method. When using Subscribe the client can pass a list of message prefixes it is interested in.Here goes the server:
private static void Pub(IEnumerable<String> prefixes)
{
if (prefixes.Count() == 0)
prefixes = new[] { "Apple", "Microsoft", "Google" };
var prefixMessageCounter = new Dictionary<String, int>();
foreach (var prefix in prefixes)
{
prefixMessageCounter.Add(prefix, 0);
}
using (var ctx = ZmqContext.Create())
{
using (var socket = ctx.CreateSocket(SocketType.PUB))
{
socket.Bind("tcp://127.0.0.1:5000");
while (true)
{
foreach (var prefix in prefixes)
{
var msg = String.Format("{0}: Message number {1}", prefix, prefixMessageCounter[prefix]++);
Console.WriteLine("Publishing: " + msg);
socket.Send(msg, Encoding.UTF8);
Thread.Sleep(1000);
}
}
}
}
}
And the client:
private static void Sub(IEnumerable<String> prefixes)
{
using (var ctx = ZmqContext.Create())
{
using (var socket = ctx.CreateSocket(SocketType.SUB))
{
if (prefixes.Count() == 0)
socket.SubscribeAll();
else
foreach (var prefix in prefixes)
socket.Subscribe(Encoding.UTF8.GetBytes(prefix));
socket.Connect("tcp://127.0.0.1:5000");
while (true)
{
var msg = socket.Receive(Encoding.UTF8);
Console.WriteLine("Received: " + msg);
Thread.Sleep(1000);
}
}
}
}
My findings with Publish / Subscribe
- If no subscriber is listening the message will get lost.
- All the subscribers will get all published messages.
- There is no way to see if someone is subscribed.
- As only a single bind to an endpoint is allowed, only one process cann publish at a time.
- Connecting to a PUB socket yields no error but has no effect either.
Some "limitations" and the conclusion
While trying around with ZeroMq a few questions popped out in my mind, and I looked for answers on the web. Finally I came up with the following findings, which may be limitations in some cases:- No http communication
- No way of getting a peers IP-Address on connect
- No way of seeing if a port is bound or if anyone is connected
Of course all these points have a rationale, which I understand. I write down these points to make it easier to see in which cases using ZeroMq might not be an option: namely if one of the points above is a requirement for you.
On the other hand, when you have the need to quickly implement messaging using a well defined communication pattern, ZeroMq is great. The code is very easy as you can see above. And it is blazing fast. Furthermore there are a lot advanced communication patterns that I have not covered. Consult the ZeroMq documentation for this.
socket.send("Have fun with ZeroMq!", Encoding.UTF8);
Keine Kommentare:
Kommentar veröffentlichen