A thin wrapper around the low-level C API of the ∅MQ messaging framework, for the D programming language.
ZmqException class provides
a standard textual message for any error condition, but it also
provides access to the errno code set by the C function
that reported the error.@safe, pure
and nothrow, thus easing their use in high-level D code.zmq_msg_send() becomes zmqd.Message.send() and so on. Thus,
the library should feel both familiar to ∅MQ users and natural to D
users.
nothrow @safe Tuple!(int, "major", int, "minor", int, "patch") zmqVersion();
Reports the ∅MQ library version.
std.typecons.Tuple with three integer fields that represent the
three versioning levels: major, minor and patch.
struct Context;
An object that encapsulates a ∅MQ context.
Socket will use a default global context if not explicitly
provided with one. See defaultContext() for details.
Context is not a valid ∅MQ context; it
must always be explicitly initialized with Context.opCall():
Context ctx; // Not a valid context yet ctx = Context(); // ...but now it is.
Context objects can be passed around by value, and two copies will
refer to the same context. The underlying context is managed using
reference counting, so that when the last copy of a Context goes
out of scope, the context is automatically destroyed.
static @safe Context opCall();
Creates a new ∅MQ context.
Context object that encapsulates the new context.
ZmqException if ∅MQ reports an error.
auto ctx = Context(); assert (ctx.initialized);
@safe void destroy();
Destroys the ∅MQ context.
ZmqException if ∅MQ reports an error.
auto ctx = Context(); assert (ctx.initialized); ctx.destroy(); assert (!ctx.initialized);
@property @safe int ioThreads();
@property @safe void ioThreads(int value);
The number of I/O threads.
ZmqException if ∅MQ reports an error.
auto ctx = Context(); ctx.ioThreads = 3; assert (ctx.ioThreads == 3);
@property @safe int maxSockets();
@property @safe void maxSockets(int value);
The maximum number of sockets.
ZmqException if ∅MQ reports an error.
auto ctx = Context(); ctx.maxSockets = 512; assert (ctx.maxSockets == 512);
inout pure nothrow @property @safe inout(void)* handle();
The void* pointer used by the underlying C API to refer to the context.
null.const pure nothrow @property @safe bool initialized();
Whether this Context object has been initialized, i.e. whether it
refers to a valid ∅MQ context.
Context ctx; assert (!ctx.initialized); ctx = Context(); assert (ctx.initialized); ctx.destroy(); assert (!ctx.initialized);
@trusted Context defaultContext();
A global context which is used by default by all sockets, unless they are explicitly constructed with a different context.
You should create and use exactly one context in your process. […] If at runtime a process has two contexts, these are like separate ∅MQ instances. If that's explicitly what you want, OK, but otherwise remember: Do oneBy usingzmq_ctx_new()at the start of your main line code, and onezmq_ctx_destroy()at the end.
defaultContext(), this is exactly what you achieve. The
context is created the first time the function is called, and is
automatically destroyed when the program ends.
ZmqException if ∅MQ reports an error.
enum SocketType: int;
The various socket types.
zmq_socket() reference.reqCorresponds to ZMQ_REQ
repCorresponds to ZMQ_REP
dealerCorresponds to ZMQ_DEALER
routerCorresponds to ZMQ_ROUTER
pubCorresponds to ZMQ_PUB
subCorresponds to ZMQ_SUB
xpubCorresponds to ZMQ_XPUB
xsubCorresponds to ZMQ_XSUB
pushCorresponds to ZMQ_PUSH
pullCorresponds to ZMQ_PULL
pairCorresponds to ZMQ_PAIR
struct Socket;
An object that encapsulates a ∅MQ socket.
Socket is not a valid ∅MQ socket; it
must always be explicitly initialized with a constructor (see
Socket.this()):
Socket s; // Not a valid socket yet s = Socket(SocketType.push); // ...but now it is.
Socket objects can be passed around by value, and two copies will
refer to the same socket. The underlying socket is managed using
reference counting, so that when the last copy of a Socket goes
out of scope, the socket is automatically closed. this(SocketType type);
this(Context context, SocketType type);
Creates a new ∅MQ socket.
context is not specified, the default context (as returned
by defaultContext()) is used.
ZmqException if ∅MQ reports an error.
auto sck = Socket(SocketType.push); assert (sck.initialized);
auto ctx = Context(); auto sck = Socket(ctx, SocketType.push); assert (sck.initialized);
@safe void close();
Closes the ∅MQ socket.
ZmqException if ∅MQ reports an error.
auto s = Socket(SocketType.pair); assert (s.initialized); s.close(); assert (!s.initialized);
@safe void bind(const char[] endpoint);
Starts accepting incoming connections on endpoint.
ZmqException if ∅MQ reports an error.
auto s = Socket(SocketType.pub);
s.bind("inproc://zmqd_bind_example");
@safe void unbind(const char[] endpoint);
Stops accepting incoming connections on endpoint.
ZmqException if ∅MQ reports an error.
auto s = Socket(SocketType.pub);
s.bind("ipc://zmqd_unbind_example");
// Do some work...
s.unbind("ipc://zmqd_unbind_example");
@safe void connect(const char[] endpoint);
Creates an outgoing connection to endpoint.
ZmqException if ∅MQ reports an error.
auto s = Socket(SocketType.sub);
s.connect("ipc://zmqd_connect_example");
@safe void disconnect(const char[] endpoint);
Disconnects the socket from endpoint.
ZmqException if ∅MQ reports an error.
auto s = Socket(SocketType.sub);
s.connect("ipc://zmqd_disconnect_example");
// Do some work...
s.disconnect("ipc://zmqd_disconnect_example");
@safe void send(const ubyte[] data, bool more = false);
@trusted void send(const char[] data, bool more = false);
@safe bool trySend(const ubyte[] data, bool more = false);
@trusted bool trySend(const char[] data, bool more = false);
Sends a message part.
send blocks until the message has been queued on the socket.
trySend performs the operation in non-blocking mode, and returns
a bool value that signifies whether the message was queued on the
socket.
char[] overload is a convenience function that simply casts
the string argument to ubyte[].
ZmqException if ∅MQ reports an error.
auto sck = Socket(SocketType.pub);
sck.send(cast(ubyte[]) [11, 226, 92]);
sck.send("Hello World!");
@safe void send(ref Message msg, bool more = false);
@safe bool trySend(ref Message msg, bool more = false);
Sends a message part.
send blocks until the message has been queued on the socket.
trySend performs the operation in non-blocking mode, and returns
a bool value that signifies whether the message was queued on the
socket.
ZmqException if ∅MQ reports an error.
auto sck = Socket(SocketType.pub); auto msg = Message(12); msg.data.asString()[] = "Hello World!"; sck.send(msg);
@safe size_t receive(ubyte[] data);
@safe Tuple!(size_t, bool) tryReceive(ubyte[] data);
Receives a message part.
receive blocks until the request can be satisfied, and returns the
number of bytes in the message.
tryReceive performs the operation in non-blocking mode, and returns
a std.typecons.Tuple which contains the size of the message along
with a bool value that signifies whether a message was received.
(If the latter is false, the former is always set to zero.)
ZmqException if ∅MQ reports an error.
// Sender
auto snd = Socket(SocketType.req);
snd.connect("ipc://zmqd_receive_example");
snd.send("Hello World!");
// Receiver
import std.string: representation;
auto rcv = Socket(SocketType.rep);
rcv.bind("ipc://zmqd_receive_example");
char[256] buf;
immutable len = rcv.receive(buf.representation);
assert (buf[0 .. len] == "Hello World!");
@safe size_t receive(ref Message msg);
@safe Tuple!(size_t, bool) tryReceive(ref Message msg);
Receives a message part.
receive blocks until the request can be satisfied, and returns the
number of bytes in the message.
tryReceive performs the operation in non-blocking mode, and returns
a std.typecons.Tuple which contains the size of the message along
with a bool value that signifies whether a message was received.
(If the latter is false, the former is always set to zero.)
ZmqException if ∅MQ reports an error.
// Sender
auto snd = Socket(SocketType.req);
snd.connect("ipc://zmqd_msg_receive_example");
snd.send("Hello World!");
// Receiver
import std.string: representation;
auto rcv = Socket(SocketType.rep);
rcv.bind("ipc://zmqd_msg_receive_example");
auto msg = Message();
rcv.receive(msg);
assert (msg.data.asString() == "Hello World!");
@property @safe SocketType type();
The socket type.
ZmqException if ∅MQ reports an error.
zmq_msg_getsockopt() with ZMQ_TYPE.auto sck = Socket(SocketType.xpub); assert (sck.type == SocketType.xpub);
@property @safe bool more();
Whether there are more message data parts to follow.
ZmqException if ∅MQ reports an error.
zmq_msg_getsockopt() with ZMQ_RCVMORE.@property @safe int sendHWM();
@property @safe void sendHWM(int value);
@property @safe int receiveHWM();
@property @safe void receiveHWM(int value);
@property @safe ulong threadAffinity();
@property @safe void threadAffinity(ulong value);
@property @trusted ubyte[] identity();
@property @safe void identity(const ubyte[] value);
@property @safe void identity(const char[] value);
@property @safe int rate();
@property @safe void rate(int value);
@property @safe int recoveryInterval();
@property @safe void recoveryInterval(int value);
@property @safe int sendBufferSize();
@property @safe void sendBufferSize(int value);
@property @safe int receiveBufferSize();
@property @safe void receiveBufferSize(int value);
@property @safe int linger();
@property @safe void linger(int value);
@property @safe int reconnectionInterval();
@property @safe void reconnectionInterval(int value);
@property @safe int maxReconnectionInterval();
@property @safe void maxReconnectionInterval(int value);
@property @safe int backlog();
@property @safe void backlog(int value);
@property @safe long maxMsgSize();
@property @safe void maxMsgSize(long value);
@property @safe int multicastHops();
@property @safe void multicastHops(int value);
@property @safe int receiveTimeout();
@property @safe void receiveTimeout(int value);
@property @safe int sendTimeout();
@property @safe void sendTimeout(int value);
@property @safe bool ipv4Only();
@property @safe void ipv4Only(bool value);
@property @safe bool delayAttachOnConnect();
@property @safe void delayAttachOnConnect(bool value);
@property @safe FD fd();
@property @safe int events();
@property @trusted char[] lastEndpoint();
Misc. socket properties.
zmq_getsockopt() and zmq_setsockopt(). For
example, identity corresponds to ZMQ_IDENTITY,
receiveBufferSize corresponds to ZMQ_RCVBUF, etc.
identity property
accepts strings. To retrieve a string with the getter, use
the asString() function.
sck.identity = "foobar"; assert (sck.identity.asString() == "foobar");
fd property is an int on POSIX and a SOCKET
on Windows.ZMQ_SUBSCRIBE and ZMQ_UNSUBSCRIBE options are
treated differently from the others; see Socket.subscribe()
and Socket.unsubscribe()ZmqException if ∅MQ reports an error.
@safe void subscribe(const ubyte[] filterPrefix);
@safe void subscribe(const char[] filterPrefix);
Establishes a message filter.
ZmqException if ∅MQ reports an error.
zmq_msg_setsockopt() with ZMQ_SUBSCRIBE.
// Create a subscriber that accepts all messages that start with
// the prefixes "foo" or "bar".
auto sck = Socket(SocketType.sub);
sck.subscribe("foo");
sck.subscribe("bar");
@safe void unsubscribe(const ubyte[] filterPrefix);
@safe void unsubscribe(const char[] filterPrefix);
Removes a message filter.
ZmqException if ∅MQ reports an error.
zmq_msg_setsockopt() with ZMQ_SUBSCRIBE.
// Subscribe to messages that start with "foo" or "bar".
auto sck = Socket(SocketType.sub);
sck.subscribe("foo");
sck.subscribe("bar");
// ...
// From now on, only accept messages that start with "bar"
sck.unsubscribe("foo");
@safe void monitor(const char[] endpoint, EventType events = EventType.all);
Spawns a PAIR socket that publishes socket state changes (events) over the INPROC transport to the given endpoint.
EventType flags in the event parameter.
ZmqException if ∅MQ reports an error.
receiveEvent(), which receives and parses event messages.
auto sck = Socket(SocketType.pub);
sck.monitor("inproc://zmqd_monitor_unittest",
EventType.accepted | EventType.closed);
inout pure nothrow @property @safe inout(void)* handle();
The void* pointer used by the underlying C API to refer to the socket.
null.const pure nothrow @property @safe bool initialized();
Whether this Socket object has been initialized, i.e. whether it
refers to a valid ∅MQ socket.
Socket sck; assert (!sck.initialized); sck = Socket(SocketType.sub); assert (sck.initialized); sck.close(); assert (!sck.initialized);
nothrow @safe void proxy(ref Socket frontend, ref Socket backend);
nothrow @safe void proxy(ref Socket frontend, ref Socket backend, ref Socket capture);
Starts the built-in ∅MQ proxy.
struct Message;
An object that encapsulates a ∅MQ message.
struct is a wrapper around a zmq_msg_t object. Unlike
Context and Socket, it does not perform reference
counting, because ∅MQ messages have a form of reference counting of
their own. A Message cannot be copied by normal assignment; use
Message.copy() for this.
Message is not a valid ∅MQ message; it
must always be explicitly initialized with Message.opCall() or
Message.this():
Message msg1; // Invalid message auto msg2 = Message(); // Empty message auto msg3 = Message(1024); // 1K messageWhen a
Message goes out of scope, zmq_msg_close() is
called on the underlying zmq_msg_t.static @safe Message opCall();
Initialises an empty ∅MQ message.
ZmqException if ∅MQ reports an error.
auto msg = Message(); assert(msg.size == 0);
this(size_t size);
Initialises a ∅MQ message of a specified size.
ZmqException if ∅MQ reports an error.
auto msg = Message(123); assert(msg.size == 123);
@safe void close();
Releases the ∅MQ message.
Message
object is destroyed, so it is often not necessary to call this method
manually.
ZmqException if ∅MQ reports an error.
@safe Message copy();
@safe void copyTo(ref Message dest);
Copies message content to another message.
copy() returns a new Message object, while copyTo(dest)
copies the contents of this Message into dest. dest must
be a valid (i.e. initialised) Message.
ZmqException if ∅MQ reports an error.
import std.string: representation; auto msg1 = Message(3); msg1.data[] = "foo".representation; auto msg2 = msg1.copy(); assert (msg2.data.asString() == "foo");
@safe Message move();
@safe void moveTo(ref Message dest);
Moves message content to another message.
move() returns a new Message object, while moveTo(dest)
moves the contents of this Message to dest. dest must
be a valid (i.e. initialised) Message.
ZmqException if ∅MQ reports an error.
import std.string: representation; auto msg1 = Message(3); msg1.data[] = "foo".representation; auto msg2 = msg1.move(); assert (msg1.size == 0); assert (msg2.data.asString() == "foo");
nothrow @property @safe size_t size();
The message content size in bytes.
auto msg = Message(123); assert(msg.size == 123);
nothrow @property @trusted ubyte[] data();
Retrieves the message content.
import std.string: representation; auto msg = Message(3); assert(msg.data.length == 3); msg.data[] = "foo".representation; // Slice operator -> array copy. assert(msg.data.asString() == "foo");
nothrow @property @safe bool more();
Whether there are more message parts to retrieve.
inout pure nothrow @property @safe inout(zmq_msg_t)* handle();
A pointer to the underlying zmq_msg_t.
enum EventType: int;
Socket event types.
Socket.monitor(), and are described
in the zmq_socket_monitor() reference.connectedCorresponds to ZMQ_EVENT_CONNECTED.
connectDelayedCorresponds to ZMQ_EVENT_CONNECT_DELAYED.
connectRetriedCorresponds to ZMQ_EVENT_CONNECT_RETRIED.
listeningCorresponds to ZMQ_EVENT_LISTENING.
bindFailedCorresponds to ZMQ_EVENT_BIND_FAILED.
acceptedCorresponds to ZMQ_EVENT_ACCEPTED.
acceptFailedCorresponds to ZMQ_EVENT_ACCEPT_FAILED.
closedCorresponds to ZMQ_EVENT_CLOSED.
closeFailedCorresponds to ZMQ_EVENT_CLOSE_FAILED.
disconnectedCorresponds to ZMQ_EVENT_DISCONNECTED.
allCorresponds to ZMQ_EVENT_ALL.
@system Event receiveEvent(Socket socket);
Receives a message on the given socket and interprets it as a socket state change event.
socket must be a PAIR socket which is connected to an endpoint
created via a Socket.monitor() call. receiveEvent() receives
one message on the socket, parses its contents according to the
specification in the zmq_socket_monitor() reference,
and returns the event information as an Event object.
zmq_event_t.sizeof and that the value of the
zmq_event_t.event field is valid. If this is not the case,
an InvalidEventException is thrown.
ZmqException if ∅MQ reports an error.InvalidEventException if the received message could not
be interpreted as an event message.
Socket.monitor(), for monitoring socket state changes.struct Event;
Information about a socket state change.
const pure nothrow @property @safe EventType type();
The event type.
zmq_event_t.eventconst pure nothrow @property @safe string address();
The peer address.
zmq_event_t.data.xyz.addr, where xyz is the event-specific union.const pure nothrow @property @safe Socket.FD fd();
The socket file descriptor.
Event.type is one of:
connected, listening, accepted, closed or disonnected.
Error if the property is called for a wrong event type.
zmq_event_t.data.xyz.addr, where xyz is the event-specific union.const pure nothrow @property @safe int errno();
The errno code for the error which triggered the event.
Event.type is one of:
connectDelayed, bindFailed, acceptFailed or closeFailed.
Error if the property is called for a wrong event type.
zmq_event_t.data.xyz.addr, where xyz is the event-specific union.const pure nothrow @property @safe int interval();
The socket file descriptor.
Event.type is
connectRetried.
Error if the property is called for a wrong event type.
zmq_event_t.data.connect_retried.intervalpure @safe inout(char)[] asString(inout(ubyte)[] data);
Utility function which interprets and validates a byte array as a UTF-8 string.
ubyte[] arrays, but very often,
the message data contains plain text. asString() allows for easy and
safe interpretation of raw data as characters. It checks that data is
a valid UTF-8 encoded string, and returns a char[] array that refers to
the same memory region.
std.utf.UTFException if data is not a valid UTF-8 string.
std.string.representation, which performs the opposite operation.
auto s1 = Socket(SocketType.pair);
s1.bind("ipc://zmqd_asString_example");
auto s2 = Socket(SocketType.pair);
s2.connect("ipc://zmqd_asString_example");
auto msg = Message(12);
msg.data.asString()[] = "Hello World!";
s1.send(msg);
ubyte[12] buf;
s2.receive(buf);
assert(buf.asString() == "Hello World!");
class ZmqException: object.Exception;
A class for exceptions thrown when any of the underlying ∅MQ C functions report an error.
zmq_strerror(), as well as the errno code set by the ∅MQ
function which reported the error.immutable int errno;
The errno code that was set by the ∅MQ function that reported
the error.
class InvalidEventException: object.Exception;
Exception thrown by receiveEvent() on failure to interpret a
received message as an event description.