zmqd

zmqd – a thin wrapper around the low-level C API of the ZeroMQ messaging framework.

Most functions in this module have a one-to-one relationship with functions in the underlying C API. Some adaptations have been made to make the API safer, easier and more pleasant to use; most importantly:

  • Errors are signalled by means of exceptions rather than return codes. The ZmqException class provides a standard textual message for any error condition, but it also provides access to the errno code set by the C function that reported the error.
  • Functions are marked with @safe, pure and nothrow as appropriate, thus facilitating their use in high-level D code.
  • Memory and resources (i.e. contexts, sockets and messages) are automatically managed, thus preventing leaks.
  • Context, socket and message options are implemented as properties.
The names of functions and types in zmqd are very similar to those in ZeroMQ, but they follow the D naming conventions. Thus, the library should feel both familiar to ZeroMQ users and natural to D users. A notable deviation from the C API is that message parts are consistently called "frames". For example, zmq_msg_send() becomes  zmqd.Frame.send() and so on. (Multipart messages were a late addition to ZeroMQ, and the "msg" function names were well established at that point. The library's developers have admitted that this is somewhat confusing, and the newer CZMQ API consistently uses "frame" in function names.)

Due to the close correspondence with the C API, this documentation has intentionally been kept sparse. There is really no reason to repeat the contents of the ZeroMQ reference manual here. Instead, the documentation for each function contains a "Corresponds to" section that links to the appropriate pages in the ZeroMQ reference. Any details given in the present documentation mostly concern the D-specific adaptations that have been made.

Also note that the examples generally only use the INPROC transport. The reason for this is that the examples double as unittests, and we want to avoid firewall troubles and other issues that could arise with the use of network protocols such as TCP, PGM, etc., and the IPC protocol is not supported on Windows. Anyway, they are only short snippets that demonstrate the syntax; for more comprehensive and realistic examples, please refer to the ZeroMQ Guide. Many of the examples in the Guide have been translated to D, and can be found in the examples subdirectory of the zmqd source repository.

Version
1.1.0 (compatible with ZeroMQ >= 4.0.0)
Authors
Lars T. Kyllingstad
License
zmqd is released under the terms of the Mozilla Public License v. 2.0.
Please refer to the ZeroMQ site for details about ZeroMQ licensing.

nothrow @safe Tuple!(int, "major", int, "minor", int, "patch")  zmqVersion();

Reports the ZeroMQ library version.

Returns
A std.typecons.Tuple with three integer fields that represent the three versioning levels: major, minor and patch.
Corresponds to:
zmq_version()

nothrow @safe bool  zmqHas(const char[] capability);

Checks for a ZeroMQ capability.

Corresponds to:
zmq_has()

enum  SocketType: int;

The various socket types.

These are described in the zmq_socket() reference.


 req

Corresponds to ZMQ_REQ


 rep

Corresponds to ZMQ_REP


Corresponds to ZMQ_DEALER


Corresponds to ZMQ_ROUTER


 pub

Corresponds to ZMQ_PUB


 sub

Corresponds to ZMQ_SUB


Corresponds to ZMQ_XPUB


Corresponds to ZMQ_XSUB


Corresponds to ZMQ_PUSH


Corresponds to ZMQ_PULL


Corresponds to ZMQ_PAIR


Corresponds to ZMQ_STREAM


enum  Security: int;

Security mechanisms.


NULL: No security or confidentiality.


PLAIN: Clear-text authentication.


CURVE: Secure authentication and confidentiality.


enum Duration  infiniteDuration;

A special core.time.Duration value used to signify an infinite timeout or time interval in certain contexts.

The places where this value may be used are:

Note:
Since core.time.Duration doesn't reserve such a special value, the actual value of  infiniteDuration is core.time.Duration.max.

struct  Socket;

An object that encapsulates a ZeroMQ socket.

A default-initialized  Socket is not a valid ZeroMQ socket; it must always be explicitly initialized with a constructor (see Socket.this()):

Socket s;                     // Not a valid socket yet
s = Socket(SocketType.push);  // ...but now it is.
This struct is noncopyable, which means that a socket is always uniquely managed by a single  Socket object. Functions that will inspect or use the socket, but not take ownership of it, should take a ref  Socket parameter. Use std.algorithm.move to move a  Socket to a different location (e.g. into a sink function that takes it by value, or into a new variable).

The socket is automatically closed when the  Socket object goes out of scope.

Linger period:
Note that  Socket by default sets the socket's linger period to zero. This deviates from the ZeroMQ default (which is an infinite linger period).

@safe this(SocketType type);
@safe this(Context context, SocketType type);

Creates a new ZeroMQ socket.

If context is not specified, the default context (as returned by defaultContext()) is used.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_socket()
Examples
With default context:
auto sck = Socket(SocketType.push);
assert (sck.initialized);
Examples
With explicit context:
auto ctx = Context();
auto sck = Socket(ctx, SocketType.push);
assert (sck.initialized);

@safe void  close();

Closes the ZeroMQ socket.

Note that the socket will be closed automatically upon destruction, so it is usually not necessary to call this method manually.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_close()
Examples
auto s = Socket(SocketType.pair);
assert (s.initialized);
s.close();
assert (!s.initialized);

@safe void  bind(const char[] endpoint);

Starts accepting incoming connections on endpoint.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_bind()
Examples
auto s = Socket(SocketType.pub);
s.bind("inproc://zmqd_bind_example");

@safe void  unbind(const char[] endpoint);

Stops accepting incoming connections on endpoint.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_unbind()
Examples
auto s = Socket(SocketType.pub);
s.bind("ipc://zmqd_unbind_example");
// Do some work...
s.unbind("ipc://zmqd_unbind_example");

@safe void  connect(const char[] endpoint);

Creates an outgoing connection to endpoint.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_connect()
Examples
auto s = Socket(SocketType.sub);
s.connect("inproc://zmqd_connect_example");

@safe void  disconnect(const char[] endpoint);

Disconnects the socket from endpoint.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_disconnect()
Examples
auto s = Socket(SocketType.sub);
s.connect("inproc://zmqd_disconnect_example");
// Do some work...
s.disconnect("inproc://zmqd_disconnect_example");

@safe void  send(const ubyte[] data, bool more = false);
@safe void  send(const char[] data, bool more = false);
@safe bool  trySend(const ubyte[] data, bool more = false);
@safe bool  trySend(const char[] data, bool more = false);

Sends a message frame.

send blocks until the frame has been queued on the socket.  trySend performs the operation in non-blocking mode, and returns a bool value that signifies whether the frame was queued on the socket.

The more parameter specifies whether this is a multipart message and there are more frames to follow.

The char[] overload is a convenience function that simply casts the string argument to ubyte[].

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_send() (with the ZMQ_DONTWAIT flag, in the case of  trySend, and with the ZMQ_SNDMORE flag if more == true).
Examples
auto sck = Socket(SocketType.pub);
sck.send(cast(ubyte[]) [11, 226, 92]);
sck.send("Hello World!");

@safe void  send(ref Frame msg, bool more = false);
@safe bool  trySend(ref Frame msg, bool more = false);

Sends a message frame.

send blocks until the frame has been queued on the socket.  trySend performs the operation in non-blocking mode, and returns a bool value that signifies whether the frame was queued on the socket.

The more parameter specifies whether this is a multipart message and there are more frames to follow.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_send() (with the ZMQ_DONTWAIT flag, in the case of  trySend, and with the ZMQ_SNDMORE flag if more == true).
Examples
auto sck = Socket(SocketType.pub);
auto msg = Frame(12);
msg.data.asString()[] = "Hello World!";
sck.send(msg);

@safe void  sendConst(immutable ubyte[] data, bool more = false);
@safe void  sendConst(string data, bool more = false);
@safe bool  trySendConst(immutable ubyte[] data, bool more = false);
@safe bool  trySendConst(string data, bool more = false);

Sends a constant-memory message frame.

sendConst blocks until the frame has been queued on the socket.  trySendConst performs the operation in non-blocking mode, and returns a bool value that signifies whether the frame was queued on the socket.

The more parameter specifies whether this is a multipart message and there are more frames to follow.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_send_const() (with the ZMQ_DONTWAIT flag, in the case of trySend, and with the ZMQ_SNDMORE flag if more == true).
Examples
static immutable arr = cast(ubyte[]) [11, 226, 92];
auto sck = Socket(SocketType.pub);
sck.sendConst(arr);
sck.sendConst("Hello World!");

@safe size_t  receive(ubyte[] data);
@safe Tuple!(size_t, bool)  tryReceive(ubyte[] data);

Receives a message frame.

receive blocks until the request can be satisfied, and returns the number of bytes in the frame.  tryReceive performs the operation in non-blocking mode, and returns a std.typecons.Tuple which contains the size of the frame along with a bool value that signifies whether a frame was received. (If the latter is false, the former is always set to zero.)

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_recv() (with the ZMQ_DONTWAIT flag, in the case of  tryReceive).
Examples
// Sender
auto snd = Socket(SocketType.req);
snd.connect("inproc://zmqd_receive_example");
snd.send("Hello World!");

// Receiver
import std.string: representation;
auto rcv = Socket(SocketType.rep);
rcv.bind("inproc://zmqd_receive_example");
char[256] buf;
immutable len  = rcv.receive(buf.representation);
assert (buf[0 .. len] == "Hello World!");

@safe size_t  receive(ref Frame msg);
@safe Tuple!(size_t, bool)  tryReceive(ref Frame msg);

Receives a message frame.

receive blocks until the request can be satisfied, and returns the number of bytes in the frame.  tryReceive performs the operation in non-blocking mode, and returns a std.typecons.Tuple which contains the size of the frame along with a bool value that signifies whether a frame was received. (If the latter is false, the former is always set to zero.)

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_recv() (with the ZMQ_DONTWAIT flag, in the case of  tryReceive).
Examples
// Sender
auto snd = Socket(SocketType.req);
snd.connect("inproc://zmqd_msg_receive_example");
snd.send("Hello World!");

// Receiver
import std.string: representation;
auto rcv = Socket(SocketType.rep);
rcv.bind("inproc://zmqd_msg_receive_example");
auto msg = Frame();
rcv.receive(msg);
assert (msg.data.asString() == "Hello World!");

@property @safe bool  more();

Whether there are more message frames to follow.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_getsockopt() with ZMQ_RCVMORE.

@property @safe ulong  threadAffinity();
@property @safe void  threadAffinity(ulong value);
@property @safe ubyte[]  identity();
@safe ubyte[]  getIdentity(ubyte[] dest);
@property @safe void  identity(const ubyte[] value);
@property @safe void  identity(const char[] value);
@property @safe int  rate();
@property @safe void  rate(int value);
@property @safe Duration  recoveryInterval();
@property @safe void  recoveryInterval(Duration value);
@property @safe int  sendBufferSize();
@property @safe void  sendBufferSize(int value);
@property @safe int  receiveBufferSize();
@property @safe void  receiveBufferSize(int value);
@property @safe FD  fd();
@property @safe PollFlags  events();
@property @safe SocketType  type();
@property @safe Duration  linger();
@property @safe void  linger(Duration value);
@property @safe Duration  reconnectionInterval();
@property @safe void  reconnectionInterval(Duration value);
@property @safe int  backlog();
@property @safe void  backlog(int value);
@property @safe Duration  maxReconnectionInterval();
@property @safe void  maxReconnectionInterval(Duration value);
@property @safe long  maxMsgSize();
@property @safe void  maxMsgSize(long value);
@property @safe int  sendHWM();
@property @safe void  sendHWM(int value);
@property @safe int  receiveHWM();
@property @safe void  receiveHWM(int value);
@property @safe int  multicastHops();
@property @safe void  multicastHops(int value);
@property @safe Duration  receiveTimeout();
@property @safe void  receiveTimeout(Duration value);
@property @safe Duration  sendTimeout();
@property @safe void  sendTimeout(Duration value);
@property @trusted char[]  lastEndpoint();
@property @safe void  routerMandatory(bool value);
@property @safe int  tcpKeepalive();
@property @safe void  tcpKeepalive(int value);
@property @safe int  tcpKeepaliveCnt();
@property @safe void  tcpKeepaliveCnt(int value);
@property @safe int  tcpKeepaliveIdle();
@property @safe void  tcpKeepaliveIdle(int value);
@property @safe int  tcpKeepaliveIntvl();
@property @safe void  tcpKeepaliveIntvl(int value);
@property @safe bool  immediate();
@property @safe void  immediate(bool value);
@property @safe void  xpubVerbose(bool value);
@property @safe bool  ipv6();
@property @safe void  ipv6(bool value);
@property @safe Security  mechanism();
@property @safe bool  plainServer();
@property @safe void  plainServer(bool value);
@safe char[]  getPlainUsername(char[] dest);
@property @safe void  plainUsername(const(char)[] value);
@safe char[]  getPlainPassword(char[] dest);
@property @safe void  plainPassword(const(char)[] value);
@property @safe bool  curveServer();
@property @safe void  curveServer(bool value);
@property @safe ubyte[]  curvePublicKey();
@safe ubyte[]  getCurvePublicKey(ubyte[] dest);
@property @safe char[]  curvePublicKeyZ85();
@safe char[]  getCurvePublicKeyZ85(char[] dest);
@property @safe void  curvePublicKey(const(ubyte)[] value);
@property @safe void  curvePublicKeyZ85(const(char)[] value);
@property @safe ubyte[]  curveSecretKey();
@safe ubyte[]  getCurveSecretKey(ubyte[] dest);
@property @safe char[]  curveSecretKeyZ85();
@safe char[]  getCurveSecretKeyZ85(char[] dest);
@property @safe void  curveSecretKey(const(ubyte)[] value);
@property @safe void  curveSecretKeyZ85(const(char)[] value);
@property @safe ubyte[]  curveServerKey();
@safe ubyte[]  getCurveServerKey(ubyte[] dest);
@property @safe char[]  curveServerKeyZ85();
@safe char[]  getCurveServerKeyZ85(char[] dest);
@property @safe void  curveServerKey(const(ubyte)[] value);
@property @safe void  curveServerKeyZ85(const(char)[] value);
@property @safe void  probeRouter(bool value);
@property @safe void  reqCorrelate(bool value);
@property @safe void  reqRelaxed(bool value);
@property @safe void  conflate(bool value);
@property @safe char[]  zapDomain();
@safe char[]  getZapDomain(char[] dest);
@property @safe void  zapDomain(const char[] value);
@property @safe void  routerHandover(bool value);
@property @safe void  connectionRID(const ubyte[] value);
@property @safe int  typeOfService();
@property @safe void  typeOfService(int value);
@property @safe bool  gssapiPlaintext();
@property @safe void  gssapiPlaintext(bool value);
@property @safe char[]  gssapiPrincipal();
@safe char[]  getGssapiPrincipal(char[] dest);
@property @safe void  gssapiPrincipal(const char[] value);
@property @safe bool  gssapiServer();
@property @safe void  gssapiServer(bool value);
@property @safe char[]  gssapiServicePrincipal();
@safe char[]  getGssapiServicePrincipal(char[] dest);
@property @safe void  gssapiServicePrincipal(const char[] value);
@property @safe Duration  handshakeInterval();
@property @safe void  handshakeInterval(Duration value);

Miscellaneous socket options.

Each of these corresponds to an option passed to zmq_getsockopt() and zmq_setsockopt(). For example,  identity corresponds to ZMQ_IDENTITY,  receiveBufferSize corresponds to ZMQ_RCVBUF, etc.

Notes:
  • For convenience, the setter for the  identity property accepts strings. To retrieve a string with the getter, use the asString() function.
    sck.identity = "foobar";
    assert (sck.identity.asString() == "foobar");
    
  • The  linger,  receiveTimeout,  sendTimeout and  handshakeInterval properties may have the special value infiniteDuration. This is translated to an option value of -1 or 0 (depending on which property is being set) in the C API.
  • Some options have array type, and these allow the user to supply a buffer in which to store the value, to avoid a GC allocation. The return value is then a slice of this buffer. These are not marked as @property, but are prefixed with "get" (e.g.  getIdentity()). A user-supplied buffer is required for some options, namely  getPlainUsername() and  getPlainPassword(), and these do not have @property versions. getCurveXxxKey() and getCurveXxxKeyZ85() require buffers which are at least 32 and 41 bytes long, respectively.
  • The ZMQ_SUBSCRIBE and ZMQ_UNSUBSCRIBE options are treated differently from the others; see Socket.subscribe() and Socket.unsubscribe()
Throws
ZmqException if ZeroMQ reports an error.
std.conv.ConvOverflowException if a given Duration is longer than the number of milliseconds that will fit in an int (only applies to properties of core.time.Duration type).
core.exception.RangeError if the dest buffers passed to getCurveXxxKey() or getCurveXxxKeyZ85() are less than 32 or 41 bytes long, respectively.
Corresponds to:
zmq_getsockopt() and zmq_setsockopt().

@safe void  subscribe(const ubyte[] filterPrefix);
@safe void  subscribe(const char[] filterPrefix);

Establishes a message filter.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_setsockopt() with ZMQ_SUBSCRIBE.
Examples
// Create a subscriber that accepts all messages that start with
// the prefixes "foo" or "bar".
auto sck = Socket(SocketType.sub);
sck.subscribe("foo");
sck.subscribe("bar");

@safe void  unsubscribe(const ubyte[] filterPrefix);
@safe void  unsubscribe(const char[] filterPrefix);

Removes a message filter.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_setsockopt() with ZMQ_SUBSCRIBE.
Examples
// Subscribe to messages that start with "foo" or "bar".
auto sck = Socket(SocketType.sub);
sck.subscribe("foo");
sck.subscribe("bar");
// ...
// From now on, only accept messages that start with "bar"
sck.unsubscribe("foo");

@safe void  monitor(const char[] endpoint, EventType events = EventType.all);

Spawns a PAIR socket that publishes socket state changes (events) over the INPROC transport to the given endpoint.

Which event types should be published may be selected by bitwise-ORing together different EventType flags in the events parameter.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_socket_monitor()
See Also
receiveEvent(), which receives and parses event messages.
Examples
auto sck = Socket(SocketType.pub);
sck.monitor("inproc://zmqd_monitor_unittest",
            EventType.accepted | EventType.closed);

inout pure nothrow @property @safe inout(void)*  handle();

The void* pointer used by the underlying C API to refer to the socket.

If the object has not been initialized, this function returns null.


const pure nothrow @property @safe bool  initialized();

Whether this Socket object has been initialized, i.e. whether it refers to a valid ZeroMQ socket.

Examples
Socket sck;
assert (!sck.initialized);
sck = Socket(SocketType.sub);
assert (sck.initialized);
sck.close();
assert (!sck.initialized);

alias  FD = int;

The native socket file descriptor type.

This is an alias for SOCKET on Windows and int on POSIX systems.


@safe void  proxy(ref Socket frontend, ref Socket backend);
@safe void  proxy(ref Socket frontend, ref Socket backend, ref Socket capture);

Starts the built-in ZeroMQ proxy.

This function never returns normally, but it may throw an exception. This could happen if the context associated with either of the specified sockets is manually destroyed in a different thread.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_proxy()
See Also
steerableProxy()

@safe void  steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control);
@safe void  steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control, ref Socket capture);

Starts the built-in ZeroMQ proxy with control flow.

Note that the order of the two last parameters is reversed compared to zmq_proxy_steerable(). That is, the control socket always comes before the capture socket. Furthermore, unlike in ZeroMQ, control is mandatory. (Without the control socket one can simply use proxy().)

Versions:
This function is only available when using the bindings for ZeroMQ 4.0.5 or later.
Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_proxy_steerable()
See Also
proxy()

@trusted uint  poll(PollItem[] items, Duration timeout = infiniteDuration);

Input/output multiplexing.

The timeout parameter may have the special value infiniteDuration which means no timeout. This is translated to an argument value of -1 in the C API.

Returns
The number of PollItem structures with events signalled in PollItem.returnedEvents, or 0 if no events have been signalled.
Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_poll()
Examples
auto socket1 = zmqd.Socket(zmqd.SocketType.pull);
socket1.bind("inproc://zmqd_poll_example");

import std.socket;
auto socket2 = new std.socket.Socket(
    AddressFamily.INET,
    std.socket.SocketType.DGRAM);
socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678));

auto socket3 = zmqd.Socket(zmqd.SocketType.push);
socket3.connect("inproc://zmqd_poll_example");
socket3.send("test");

import core.thread: Thread;
Thread.sleep(10.msecs);

auto items = [
    PollItem(socket1, PollFlags.pollIn),
    PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut),
    PollItem(socket3, PollFlags.pollIn),
];

const n = poll(items, 100.msecs);
assert (n == 2);
assert (items[0].returnedEvents == PollFlags.pollIn);
assert (items[1].returnedEvents == PollFlags.pollOut);
assert (items[2].returnedEvents == 0);
socket2.close();

enum  PollFlags: int;

poll() event flags.

These are described in the zmq_poll() manual.


Corresponds to ZMQ_POLLIN


Corresponds to ZMQ_POLLOUT


Corresponds to ZMQ_POLLERR


struct  PollItem;

A structure that specifies a socket to be monitored by poll() as well as the events to poll for, and, when poll() returns, the events that occurred.

Warning:
 PollItem objects do not store std.socket.Socket references, only the corresponding native file descriptors. This means that the references have to be stored elsewhere, or the objects may be garbage collected, invalidating the sockets before or while poll() executes.
// Not OK
auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn);

// OK
auto s = new std.socket.Socket(/*...*/);
auto p2 = PollItem(s, PollFlags.pollIn);
Corresponds to:
zmq_pollitem_t

nothrow @safe this(ref zmqd.Socket socket, PollFlags events);

Constructs a PollItem for monitoring a ZeroMQ socket.


@system this(std.socket.Socket socket, PollFlags events);

Constructs a PollItem for monitoring a standard socket referenced by a std.socket.Socket.


pure nothrow @safe this(FD fd, PollFlags events);

Constructs a PollItem for monitoring a standard socket referenced by a native file descriptor.


pure nothrow @property @safe void  requestedEvents(PollFlags events);
const pure nothrow @property @safe PollFlags  requestedEvents();

Requested events.

Corresponds to:
zmq_pollitem_t.events

const pure nothrow @property @safe PollFlags  returnedEvents();

Returned events.

Corresponds to:
zmq_pollitem_t.revents

struct  Frame;

An object that encapsulates a ZeroMQ message frame.

This struct is a wrapper around a zmq_msg_t object. A default-initialized  Frame is not a valid ZeroMQ message frame; it should always be explicitly initialized upon construction using Frame.opCall(). Alternatively, it may be initialized later with Frame.rebuild().

Frame msg1;                 // Invalid frame
auto msg2 = Frame();        // Empty frame
auto msg3 = Frame(1024);    // 1K frame
msg1.rebuild(2048);         // msg1 now has size 2K
msg2.rebuild(2048);         // ...and so does msg2
When a  Frame object is destroyed, zmq_msg_close() is called on the underlying zmq_msg_t.

A  Frame cannot be copied by normal assignment; use Frame.copy() for this.


static @safe Frame  opCall();

Initializes an empty ZeroMQ message frame.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_init()
Examples
auto msg = Frame();
assert(msg.size == 0);

static @safe Frame  opCall(size_t size);

Initializes a ZeroMQ message frame of the specified size.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_init_size()
Examples
auto msg = Frame(123);
assert(msg.size == 123);

static @system Frame  opCall(ubyte[] data);
static @system Frame  opCall(ubyte[] data, FreeData free, void* hint = null);

Initializes a ZeroMQ message frame from a supplied buffer.

If free is not specified, data must refer to a slice of memory which has been allocated by the garbage collector (typically using operator new). Ownership will then be transferred temporarily to ZeroMQ, and then transferred back to the GC when ZeroMQ is done using the buffer.

For memory which is not garbage collected, the argument free must be a pointer to a function that will release the memory, which will be called when ZeroMQ no longer needs the buffer. free must point to a function with the following signature:

extern(C) void f(void* d, void* h) nothrow;
When it is called, d will be equal to data.ptr, while h will be equal to hint (which is optional and null by default).

free is passed directly to the underlying ZeroMQ C function, which is why it needs to be extern(C).

Warning:
Some care must be taken when using this function, as there is no telling when, whether, or in which thread ZeroMQ relinquishes ownership of the buffer. Client code should therefore avoid retaining any references to it, including slices that contain, overlap with or are contained in data. For the "non-GC" version, client code should in general not retain slices or pointers to any memory which will be released when free is called.
See Also
Frame.FreeData
Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_init_data()
Examples
// Garbage-collected memory
auto buf = new ubyte[123];
auto msg = Frame(buf);
assert(msg.size == buf.length);
assert(msg.data.ptr == buf.ptr);
Examples
// Manually managed memory
import core.stdc.stdlib: malloc, free;
static extern(C) void myFree(void* data, void* hint) nothrow { free(data); }
auto buf = (cast(ubyte*) malloc(10))[0 .. 10];
auto msg = Frame(buf, &myFree);
assert(msg.size == buf.length);
assert(msg.data.ptr == buf.ptr);

alias  FreeData = extern (C) void function(void*, void*) nothrow @system;

The function pointer type for memory-freeing callback functions passed to Frame(ubyte[], FreeData, void*).


@safe void  rebuild();

Reinitializes the Frame object as an empty message.

This function will first call Frame.close() to release the resources associated with the message frame, and then it will initialize it anew, exactly as if it were constructed with Frame().

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_close() followed by zmq_msg_init()
Examples
auto msg = Frame(256);
assert (msg.size == 256);
msg.rebuild();
assert (msg.size == 0);

@safe void  rebuild(size_t size);

Reinitializes the Frame object to a specified size.

This function will first call Frame.close() to release the resources associated with the message frame, and then it will initialize it anew, exactly as if it were constructed with Frame(size).

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_close() followed by zmq_msg_init_size().
Examples
auto msg = Frame(256);
assert (msg.size == 256);
msg.rebuild(1024);
assert (msg.size == 1024);

@system void  rebuild(ubyte[] data);
@system void  rebuild(ubyte[] data, FreeData free, void* hint = null);

Reinitializes the Frame object from a supplied buffer.

This function will first call Frame.close() to release the resources associated with the message frame, and then it will initialize it anew, exactly as if it were constructed with Frame(data) or Frame(data, free, hint).

Some care must be taken when using these functions. Please read the Frame(ubyte[]) documentation.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_close() followed by zmq_msg_init_data().
Examples
// Garbage-collected memory
auto msg = Frame(256);
assert (msg.size == 256);
auto buf = new ubyte[123];
msg.rebuild(buf);
assert(msg.size == buf.length);
assert(msg.data.ptr == buf.ptr);
Examples
// Manually managed memory
import core.stdc.stdlib: malloc, free;
static extern(C) void myFree(void* data, void* hint) nothrow { free(data); }

auto msg = Frame(256);
assert (msg.size == 256);
auto buf = (cast(ubyte*) malloc(10))[0 .. 10];
msg.rebuild(buf, &myFree);
assert(msg.size == buf.length);
assert(msg.data.ptr == buf.ptr);

@safe void  close();

Releases the ZeroMQ message frame.

Note that the frame will be automatically released when the Frame object is destroyed, so it is often not necessary to call this method manually.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_close()

@safe Frame  copy();
@safe void  copyTo(ref Frame dest);

Copies frame content to another message frame.

 copy() returns a new Frame object, while  copyTo(dest) copies the contents of this Frame into dest. dest must be a valid (i.e. initialized) Frame.

Warning:
These functions may not do what you think they do. Please refer to the ZeroMQ manual for details.
Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_copy()
Examples
import std.string: representation;
auto msg1 = Frame(3);
msg1.data[] = "foo".representation;
auto msg2 = msg1.copy();
assert (msg2.data.asString() == "foo");

@safe Frame  move();
@safe void  moveTo(ref Frame dest);

Moves frame content to another message frame.

 move() returns a new Frame object, while  moveTo(dest) moves the contents of this Frame to dest. dest must be a valid (i.e. initialized) Frame.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_move()
Examples
import std.string: representation;
auto msg1 = Frame(3);
msg1.data[] = "foo".representation;
auto msg2 = msg1.move();
assert (msg1.size == 0);
assert (msg2.data.asString() == "foo");

nothrow @property @safe size_t  size();

The message frame content size in bytes.

Corresponds to:
zmq_msg_size()
Examples
auto msg = Frame(123);
assert(msg.size == 123);

nothrow @property @trusted ubyte[]  data();

Retrieves the message frame content.

Corresponds to:
zmq_msg_data()
Examples
import std.string: representation;
auto msg = Frame(3);
assert(msg.data.length == 3);
msg.data[] = "foo".representation; // Slice operator -> array copy.
assert(msg.data.asString() == "foo");

nothrow @property @safe bool  more();

Whether there are more message frames to retrieve.

Corresponds to:
zmq_msg_more()

@property @safe FD  sourceFD();

The file descriptor of the socket the message was read from.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_get() with ZMQ_SRCFD.

@property @safe bool  sharedStorage();

Whether the message MAY share underlying storage with another copy.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_get() with ZMQ_SHARED.

@trusted char[]  metadata(const char[] property);
@system const(char)[]  metadataUnsafe(const char[] property);

Gets message metadata.

 metadataUnsafe() is faster than  metadata() because it directly returns the array which comes from zmq_msg_gets(), whereas the latter returns a freshly GC-allocated copy of it. However, the array returned by  metadataUnsafe() is owned by the underlying ZeroMQ message and gets destroyed along with it, so care must be taken when using this function.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_msg_gets()

inout pure nothrow @property @safe inout(zmq_msg_t)*  handle();

A pointer to the underlying zmq_msg_t.


@trusted Context  defaultContext();

A global context which is used by default by all sockets, unless they are explicitly constructed with a different context.

The ZeroMQ Guide has the following to say about context creation:

You should create and use exactly one context in your process. […] If at runtime a process has two contexts, these are like separate ZeroMQ instances. If that's explicitly what you want, OK, but otherwise remember: Do one zmq_ctx_new() at the start of your main line code, and one zmq_ctx_destroy() at the end.
By using  defaultContext(), this is exactly what you achieve. The context is created the first time the function is called, and is automatically destroyed when the program ends.

This function is thread safe.

Throws
ZmqException if ZeroMQ reports an error.
See Also
Context

struct  Context;

An object that encapsulates a ZeroMQ context.

In most programs, it is not necessary to use this type directly, as Socket will use a default global context if not explicitly provided with one. See defaultContext() for details.

A default-initialized  Context is not a valid ZeroMQ context; it must always be explicitly initialized with Context.opCall():

Context ctx;        // Not a valid context yet
ctx = Context();    // ...but now it is.
 Context objects can be passed around by value, and two copies will refer to the same context. The underlying context is managed using reference counting, so that when the last copy of a  Context goes out of scope, the context is automatically destroyed. The reference counting is performed in a thread safe manner, so that the same context can be shared between multiple threads. (ZeroMQ guarantees the thread safety of other context operations.)

See Also
defaultContext()

static @trusted Context  opCall();

Creates a new ZeroMQ context.

Returns
A Context object that encapsulates the new context.
Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_ctx_new()
Examples
auto ctx = Context();
assert (ctx.initialized);

@safe void  detach();

Detaches from the ZeroMQ context.

If this is the last reference to the context, it will be destroyed with zmq_ctx_term().

Throws
ZmqException if ZeroMQ reports an error.
Examples
auto ctx = Context();
assert (ctx.initialized);
ctx.detach();
assert (!ctx.initialized);

@system void  terminate();

Forcefully terminates the context.

By using this function, one effectively circumvents the reference-counting mechanism for managing the context. After it returns, all other Context objects that used to refer to the same context will be in a state which is functionally equivalent to the default-initialized state (i.e., Context.initialized is false and Context.handle is null).

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_ctx_term()
Examples
auto ctx1 = Context();
auto ctx2 = ctx1;
assert (ctx1.initialized);
assert (ctx2.initialized);
assert (ctx1.handle == ctx2.handle);
ctx2.terminate();
assert (!ctx1.initialized);
assert (!ctx2.initialized);
assert (ctx1.handle == null);
assert (ctx2.handle == null);

@property @safe int  ioThreads();
@property @safe void  ioThreads(int value);

The number of I/O threads.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_ctx_get() and zmq_ctx_set() with ZMQ_IO_THREADS.
Examples
auto ctx = Context();
ctx.ioThreads = 3;
assert (ctx.ioThreads == 3);

@property @safe int  maxSockets();
@property @safe void  maxSockets(int value);

The maximum number of sockets.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_ctx_get() and zmq_ctx_set() with ZMQ_MAX_SOCKETS.
Examples
auto ctx = Context();
ctx.maxSockets = 512;
assert (ctx.maxSockets == 512);

@property @safe int  socketLimit();

The largest configurable number of sockets.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_ctx_get() with ZMQ_SOCKET_LIMIT.
Examples
auto ctx = Context();
assert (ctx.socketLimit > 0);

@property @safe bool  ipv6();
@property @safe void  ipv6(bool value);

IPv6 option.

Throws
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_ctx_get() and zmq_ctx_set() with ZMQ_IPV6.
Examples
auto ctx = Context();
ctx.ipv6 = true;
assert (ctx.ipv6 == true);

inout pure nothrow @property @trusted inout(void)*  handle();

The void* pointer used by the underlying C API to refer to the context.

If the object has not been initialized, this function returns null.


const pure nothrow @property @safe bool  initialized();

Whether this Context object has been initialized, i.e. whether it refers to a valid ZeroMQ context.

Examples
Context ctx;
assert (!ctx.initialized);
ctx = Context();
assert (ctx.initialized);
ctx.detach();
assert (!ctx.initialized);

enum  EventType: int;

Socket event types.

These are used together with Socket.monitor(), and are described in the zmq_socket_monitor() reference.


Corresponds to ZMQ_EVENT_CONNECTED.


Corresponds to ZMQ_EVENT_CONNECT_DELAYED.


Corresponds to ZMQ_EVENT_CONNECT_RETRIED.


Corresponds to ZMQ_EVENT_LISTENING.


Corresponds to ZMQ_EVENT_BIND_FAILED.


Corresponds to ZMQ_EVENT_ACCEPTED.


Corresponds to ZMQ_EVENT_ACCEPT_FAILED.


Corresponds to ZMQ_EVENT_CLOSED.


Corresponds to ZMQ_EVENT_CLOSE_FAILED.


Corresponds to ZMQ_EVENT_DISCONNECTED.


 all

Corresponds to ZMQ_EVENT_ALL.


@system Event  receiveEvent(ref Socket socket);

Receives a message on the given socket and interprets it as a socket state change event.

socket must be a PAIR socket which is connected to an endpoint created via a Socket.monitor() call.  receiveEvent() receives one message on the socket, parses its contents according to the specification in the zmq_socket_monitor() reference, and returns the event information as an Event object.

Throws
ZmqException if ZeroMQ reports an error.
InvalidEventException if the received message could not be interpreted as an event message.
See Also
Socket.monitor(), for monitoring socket state changes.

struct  Event;

Information about a socket state change.

See Also
receiveEvent()

const pure nothrow @property @safe EventType  type();

The event type.


const pure nothrow @property @safe string  address();

The peer address.


const pure nothrow @property @safe FD  fd();

The socket file descriptor.

This property function may only be called if Event.type is one of: connected, listening, accepted, closed or disonnected.

Throws
Error if the property is called for a wrong event type.

const pure nothrow @property @safe int  errno();

The errno code for the error which triggered the event.

This property function may only be called if Event.type is either bindFailed, acceptFailed or closeFailed.

Throws
Error if the property is called for a wrong event type.

const pure nothrow @property @safe Duration  interval();

The reconnect  interval.

This property function may only be called if Event.type is connectRetried.

Throws
Error if the property is called for a wrong event type.

@safe char[]  z85Encode(ubyte[] data, char[] dest);
@safe char[]  z85Encode(ubyte[] data);

Encodes a binary key as Z85 printable text.

dest must be an array whose length is at least 5*data.length/4 + 1, which will be used to store the return value plus a terminating zero byte. If dest is omitted, a new array will be created.

Returns
An array of size 5*data.length/4 which contains the Z85-encoded text, excluding the terminating zero byte. This will be a slice of dest if it is provided.
Throws
core.exception.RangeError if dest is given but is too small.
ZmqException if ZeroMQ reports an error (i.e., if data.length is not a multiple of 4).
Corresponds to:
zmq_z85_encode()

@safe ubyte[]  z85Decode(char[] text, ubyte[] dest);
@safe ubyte[]  z85Decode(char[] text);

Decodes a binary key from Z85 printable text.

dest must be an array whose length is at least 4*data.length/5, which will be used to store the return value. If dest is omitted, a new array will be created.

Note that zmq_z85_decode() expects a zero-terminated string, so a zero byte will be appended to text if it does not contain one already. However, this may trigger a (possibly unwanted) GC allocation. To avoid this, either make sure that the last character in text is '\0', or use assumeSafeAppend on the array before calling this function (provided this is safe).

Returns
An array of size 4*data.length/5 which contains the decoded data. This will be a slice of dest if it is provided.
Throws
core.exception.RangeError if dest is given but is too small.
ZmqException if ZeroMQ reports an error (i.e., if data.length is not a multiple of 5).
Corresponds to:
zmq_z85_decode()

@safe Tuple!(char[], "publicKey", char[], "secretKey")  curveKeyPair(char[] publicKeyBuf = null, char[] secretKeyBuf = null);

Generates a new Curve key pair.

To avoid a memory allocation, preallocated buffers may optionally be supplied for the two keys. Each of these must have a length of at least 41 bytes, enough for a 40-character Z85-encoded key plus a terminating zero byte. If either buffer is omitted/null, a new one will be created.

Returns
A tuple that contains the two keys. Each of these will have a length of 40 characters, and will be slices of the input buffers if such have been provided.
Throws
core.exception.RangeError if publicKeyBuf or secretKeyBuf are not null but have a length of less than 41 characters.
ZmqException if ZeroMQ reports an error.
Corresponds to:
zmq_curve_keypair()
Examples
auto server = Socket(SocketType.rep);
auto serverKeys = curveKeyPair();
server.curveServer = true;
server.curveSecretKeyZ85 = serverKeys.secretKey;
server.bind("inproc://curveKeyPair_test");

auto client = Socket(SocketType.req);
auto clientKeys = curveKeyPair();
client.curvePublicKeyZ85 = clientKeys.publicKey;
client.curveSecretKeyZ85 = clientKeys.secretKey;
client.curveServerKeyZ85 = serverKeys.publicKey;
client.connect("inproc://curveKeyPair_test");
client.send("hello");

ubyte[5] buf;
assert (server.receive(buf) == 5);
assert (buf.asString() == "hello");

pure @safe inout(char)[]  asString(inout(ubyte)[] data);

Utility function which interprets and validates a byte array as a UTF-8 string.

Most of zmqd's message API deals in ubyte[] arrays, but very often, the message data contains plain text.  asString() allows for easy and safe interpretation of raw data as characters. It checks that data is a valid UTF-8 encoded string, and returns a char[] array that refers to the same memory region.

Throws
std.utf.UTFException if data is not a valid UTF-8 string.
See Also
std.string.representation, which performs the opposite operation.
Examples
auto s1 = Socket(SocketType.pair);
s1.bind("inproc://zmqd_asString_example");
auto s2 = Socket(SocketType.pair);
s2.connect("inproc://zmqd_asString_example");

auto msg = Frame(12);
msg.data.asString()[] = "Hello World!";
s1.send(msg);

ubyte[12] buf;
s2.receive(buf);
assert(buf.asString() == "Hello World!");

class  ZmqException: object.Exception;

A class for exceptions thrown when any of the underlying ZeroMQ C functions report an error.

The exception provides a standard error message obtained with zmq_strerror(), as well as the errno code set by the ZeroMQ function which reported the error.


immutable int  errno;

The errno code that was set by the ZeroMQ function that reported the error.

Corresponds to:
zmq_errno()

class  InvalidEventException: object.Exception;

Exception thrown by receiveEvent() on failure to interpret a received message as an event description.