zmqd

A thin wrapper around the low-level C API of the ∅MQ messaging framework, for the D programming language.

Most functions in this module have a one-to-one relationship with functions in the underlying C API. Some adaptations have been made to make the API safer, easier and more pleasant to use; most importantly: The names of functions and types in ∅MQD are very similar to those in ∅MQ, but they follow the D naming conventions. Thus, the library should feel both familiar to ∅MQ users and natural to D users. A notable deviation from the C API is that message parts are consistently called "frames". For example, zmq_msg_send() becomes zmqd.Frame.send() and so on. (Multipart messages were a late addition to ∅MQ, and the "msg" function names were well established at that point. The library's developers have admitted that this is somewhat confusing, and the newer CZMQ API consistently uses "frame" in function names.)

Due to the close correspondence with the C API, this documentation has intentionally been kept sparse. There is really no reason to repeat the contents of the ∅MQ reference manual here. Instead, the documentation for each function contains a "Corresponds to" section that links to the appropriate pages in the ∅MQ reference. Any details given in the present documentation mostly concern the D-specific adaptations that have been made.

Also note that the examples only use the INPROC and IPC transports. The reason for this is that the examples double as unittests, and we want to avoid firewall troubles and other issues that could arise with the use of network protocols such as TCP, PGM, etc. Anyway, they are only short snippets that demonstrate the syntax; for more comprehensive and realistic examples, please refer to the ∅MQ Guide. Many of the examples in the Guide have been translated to D, and can be found in the examples subdirectory of the ∅MQD source repository.

Version:

0.5.1 (compatible with ∅MQ >= 3.2.1)

Authors:

Licence:

∅MQD is released under a BSD licence (see LICENCE.txt for details).
Please refer to the ∅MQ site for details about ∅MQ licensing.

Members

nothrow @safe Tuple!(int, "major", int, "minor", int, "patch") zmqVersion();

Reports the ∅MQ library version.

Returns:

A std.typecons.Tuple with three integer fields that represent the three versioning levels: major, minor and patch.

Corresponds to:

enum SocketType: int;

The various socket types.

These are described in the zmq_socket() reference.

Members

req

Corresponds to ZMQ_REQ

rep

Corresponds to ZMQ_REP

dealer

Corresponds to ZMQ_DEALER

router

Corresponds to ZMQ_ROUTER

pub

Corresponds to ZMQ_PUB

sub

Corresponds to ZMQ_SUB

xpub

Corresponds to ZMQ_XPUB

xsub

Corresponds to ZMQ_XSUB

push

Corresponds to ZMQ_PUSH

pull

Corresponds to ZMQ_PULL

pair

Corresponds to ZMQ_PAIR

struct Socket;

An object that encapsulates a ∅MQ socket.

A default-initialized Socket is not a valid ∅MQ socket; it must always be explicitly initialized with a constructor (see Socket.this()):
Socket s;                     // Not a valid socket yet
s = Socket(SocketType.push);  // ...but now it is.
This struct is noncopyable, which means that a socket is always uniquely managed by a single Socket object. Functions that will inspect or use the socket, but not take ownership of it, should take a ref Socket parameter. Use std.algorithm.move to move a Socket to a different location (e.g. into a sink function that takes it by value, or into a new variable).

The socket is automatically closed when the Socket object goes out of scope.

Members

@safe this(SocketType type);
@safe this(Context context, SocketType type);

Creates a new ∅MQ socket.

If context is not specified, the default context (as returned by defaultContext()) is used.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

With default context:
auto sck = Socket(SocketType.push);
assert (sck.initialized);

Examples:

With explicit context:
auto ctx = Context();
auto sck = Socket(ctx, SocketType.push);
assert (sck.initialized);

@safe void close();

Closes the ∅MQ socket.

Note that the socket will be closed automatically upon destruction, so it is usually not necessary to call this method manually.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto s = Socket(SocketType.pair);
assert (s.initialized);
s.close();
assert (!s.initialized);

@safe void bind(const char[] endpoint);

Starts accepting incoming connections on endpoint.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto s = Socket(SocketType.pub);
s.bind("ipc://zmqd_bind_example");

@safe void unbind(const char[] endpoint);

Stops accepting incoming connections on endpoint.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto s = Socket(SocketType.pub);
s.bind("ipc://zmqd_unbind_example");
// Do some work...
s.unbind("ipc://zmqd_unbind_example");

@safe void connect(const char[] endpoint);

Creates an outgoing connection to endpoint.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto s = Socket(SocketType.sub);
s.connect("ipc://zmqd_connect_example");

@safe void disconnect(const char[] endpoint);

Disconnects the socket from endpoint.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto s = Socket(SocketType.sub);
s.connect("ipc://zmqd_disconnect_example");
// Do some work...
s.disconnect("ipc://zmqd_disconnect_example");

@safe void send(const ubyte[] data, bool more = false);
@trusted void send(const char[] data, bool more = false);
@safe bool trySend(const ubyte[] data, bool more = false);
@trusted bool trySend(const char[] data, bool more = false);

Sends a message frame.

send blocks until the frame has been queued on the socket. trySend performs the operation in non-blocking mode, and returns a bool value that signifies whether the frame was queued on the socket.

The more parameter specifies whether this is a multipart message and there are more frames to follow.

The char[] overload is a convenience function that simply casts the string argument to ubyte[].

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_send() (with the ZMQ_DONTWAIT flag, in the case of trySend, and with the ZMQ_SNDMORE flag if more == true).

Examples:

auto sck = Socket(SocketType.pub);
sck.send(cast(ubyte[]) [11, 226, 92]);
sck.send("Hello World!");

@safe void send(ref Frame msg, bool more = false);
@safe bool trySend(ref Frame msg, bool more = false);

Sends a message frame.

send blocks until the frame has been queued on the socket. trySend performs the operation in non-blocking mode, and returns a bool value that signifies whether the frame was queued on the socket.

The more parameter specifies whether this is a multipart message and there are more frames to follow.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_msg_send() (with the ZMQ_DONTWAIT flag, in the case of trySend, and with the ZMQ_SNDMORE flag if more == true).

Examples:

auto sck = Socket(SocketType.pub);
auto msg = Frame(12);
msg.data.asString()[] = "Hello World!";
sck.send(msg);

@safe size_t receive(ubyte[] data);
@safe Tuple!(size_t, bool) tryReceive(ubyte[] data);

Receives a message frame.

receive blocks until the request can be satisfied, and returns the number of bytes in the frame. tryReceive performs the operation in non-blocking mode, and returns a std.typecons.Tuple which contains the size of the frame along with a bool value that signifies whether a frame was received. (If the latter is false, the former is always set to zero.)

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_recv() (with the ZMQ_DONTWAIT flag, in the case of tryReceive).

Examples:

// Sender
auto snd = Socket(SocketType.req);
snd.connect("ipc://zmqd_receive_example");
snd.send("Hello World!");

// Receiver
import std.string: representation;
auto rcv = Socket(SocketType.rep);
rcv.bind("ipc://zmqd_receive_example");
char[256] buf;
immutable len  = rcv.receive(buf.representation);
assert (buf[0 .. len] == "Hello World!");

@safe size_t receive(ref Frame msg);
@safe Tuple!(size_t, bool) tryReceive(ref Frame msg);

Receives a message frame.

receive blocks until the request can be satisfied, and returns the number of bytes in the frame. tryReceive performs the operation in non-blocking mode, and returns a std.typecons.Tuple which contains the size of the frame along with a bool value that signifies whether a frame was received. (If the latter is false, the former is always set to zero.)

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_msg_recv() (with the ZMQ_DONTWAIT flag, in the case of tryReceive).

Examples:

// Sender
auto snd = Socket(SocketType.req);
snd.connect("ipc://zmqd_msg_receive_example");
snd.send("Hello World!");

// Receiver
import std.string: representation;
auto rcv = Socket(SocketType.rep);
rcv.bind("ipc://zmqd_msg_receive_example");
auto msg = Frame();
rcv.receive(msg);
assert (msg.data.asString() == "Hello World!");

@property @safe SocketType type();

The socket type.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_getsockopt() with ZMQ_TYPE.

Examples:

auto sck = Socket(SocketType.xpub);
assert (sck.type == SocketType.xpub);

@property @safe bool more();

Whether there are more message frames to follow.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_getsockopt() with ZMQ_RCVMORE.

@property @safe int sendHWM();
@property @safe void sendHWM(int value);
@property @safe int receiveHWM();
@property @safe void receiveHWM(int value);
@property @safe ulong threadAffinity();
@property @safe void threadAffinity(ulong value);
@property @trusted ubyte[] identity();
@property @safe void identity(const ubyte[] value);
@property @safe void identity(const char[] value);
@property @safe int rate();
@property @safe void rate(int value);
@property @safe Duration recoveryInterval();
@property @safe void recoveryInterval(Duration value);
@property @safe int sendBufferSize();
@property @safe void sendBufferSize(int value);
@property @safe int receiveBufferSize();
@property @safe void receiveBufferSize(int value);
@property @safe Duration linger();
@property @safe void linger(Duration value);
@property @safe Duration reconnectionInterval();
@property @safe void reconnectionInterval(Duration value);
@property @safe Duration maxReconnectionInterval();
@property @safe void maxReconnectionInterval(Duration value);
@property @safe int backlog();
@property @safe void backlog(int value);
@property @safe long maxMsgSize();
@property @safe void maxMsgSize(long value);
@property @safe int multicastHops();
@property @safe void multicastHops(int value);
@property @safe Duration receiveTimeout();
@property @safe void receiveTimeout(Duration value);
@property @safe Duration sendTimeout();
@property @safe void sendTimeout(Duration value);
@property @safe bool ipv4Only();
@property @safe void ipv4Only(bool value);
@property @safe bool delayAttachOnConnect();
@property @safe void delayAttachOnConnect(bool value);
@property @safe FD fd();
@property @safe PollFlags events();
@property @trusted char[] lastEndpoint();
@property @safe void routerMandatory(bool value);
@property @safe void xpubVerbose(bool value);

Misc. socket properties.

Each of these has a one-to-one correspondence with an option passed to zmq_getsockopt() and zmq_setsockopt(). For example, identity corresponds to ZMQ_IDENTITY, receiveBufferSize corresponds to ZMQ_RCVBUF, etc.

Notes:

  • For convenience, the setter for the identity property accepts strings. To retrieve a string with the getter, use the asString() function.
    sck.identity = "foobar";
    assert (sck.identity.asString() == "foobar");
    
  • The linger, receiveTimeout and sendTimeout properties may have the special value core.time.Duration.max, which in this context specifies an infinite duration. This is translated to an option value of -1 in the C API (and it is also the default value for all of them).
  • The ZMQ_SUBSCRIBE and ZMQ_UNSUBSCRIBE options are treated differently from the others; see Socket.subscribe() and Socket.unsubscribe()

Throws:

ZmqException if ∅MQ reports an error.
std.conv.ConvOverflowException if a given Duration is longer than the number of milliseconds that will fit in an int (only applies to properties of core.time.Duration type).

Corresponds to:

@safe void subscribe(const ubyte[] filterPrefix);
@safe void subscribe(const char[] filterPrefix);

Establishes a message filter.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_msg_setsockopt() with ZMQ_SUBSCRIBE.

Examples:

// Create a subscriber that accepts all messages that start with
// the prefixes "foo" or "bar".
auto sck = Socket(SocketType.sub);
sck.subscribe("foo");
sck.subscribe("bar");

@safe void unsubscribe(const ubyte[] filterPrefix);
@safe void unsubscribe(const char[] filterPrefix);

Removes a message filter.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_msg_setsockopt() with ZMQ_SUBSCRIBE.

Examples:

// Subscribe to messages that start with "foo" or "bar".
auto sck = Socket(SocketType.sub);
sck.subscribe("foo");
sck.subscribe("bar");
// ...
// From now on, only accept messages that start with "bar"
sck.unsubscribe("foo");

@safe void monitor(const char[] endpoint, EventType events = EventType.all);

Spawns a PAIR socket that publishes socket state changes (events) over the INPROC transport to the given endpoint.

Which event types should be published may be selected by bitwise-ORing together different EventType flags in the event parameter.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

See also:

receiveEvent(), which receives and parses event messages.

Examples:

auto sck = Socket(SocketType.pub);
sck.monitor("inproc://zmqd_monitor_unittest",
            EventType.accepted | EventType.closed);

inout pure nothrow @property @safe inout(void)* handle();

The void* pointer used by the underlying C API to refer to the socket.

If the object has not been initialized, this function returns null.

const pure nothrow @property @safe bool initialized();

Whether this Socket object has been initialized, i.e. whether it refers to a valid ∅MQ socket.

Examples:

Socket sck;
assert (!sck.initialized);
sck = Socket(SocketType.sub);
assert (sck.initialized);
sck.close();
assert (!sck.initialized);

alias FD = int;

The native socket file descriptor type.

This is an alias for SOCKET on Windows and int on POSIX systems.

@safe void proxy(ref Socket frontend, ref Socket backend);
@safe void proxy(ref Socket frontend, ref Socket backend, ref Socket capture);

Starts the built-in ∅MQ proxy.

This function never returns normally, but it may throw an exception. This could happen if the context associated with either of the specified sockets is manually destroyed in a different thread.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

@trusted uint poll(PollItem[] items, Duration timeout = Duration.max);

Input/output multiplexing.

The timeout parameter may have the special value core.time.Duration.max, which in this context specifies an infinite duration. This is translated to an argument value of -1 in the C API.

Returns:

The number of PollItem structures with events signalled in PollItem.returnedEvents, or 0 if no events have been signalled.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto socket1 = zmqd.Socket(zmqd.SocketType.pull);
socket1.bind("ipc://zmqd_poll_example");

import std.socket;
auto socket2 = new std.socket.Socket(
    AddressFamily.INET,
    std.socket.SocketType.DGRAM);
socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678));

auto socket3 = zmqd.Socket(zmqd.SocketType.push);
socket3.connect("ipc://zmqd_poll_example");
socket3.send("test");

import core.thread: Thread;
Thread.sleep(10.msecs);

auto items = [
    PollItem(socket1, PollFlags.pollIn),
    PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut),
    PollItem(socket3, PollFlags.pollIn),
];

const n = poll(items, 100.msecs);
assert (n == 2);
assert (items[0].returnedEvents == PollFlags.pollIn);
assert (items[1].returnedEvents == PollFlags.pollOut);
assert (items[2].returnedEvents == 0);
socket2.close();

enum PollFlags: int;

poll() event flags.

These are described in the zmq_poll() manual.

Members

pollIn

Corresponds to ZMQ_POLLIN

pollOut

Corresponds to ZMQ_POLLOUT

pollErr

Corresponds to ZMQ_POLLERR

struct PollItem;

A structure that specifies a socket to be monitored by poll() as well as the events to poll for, and, when poll() returns, the events that occurred.

Warning:

PollItem objects do not store std.socket.Socket references, only the corresponding native file descriptors. This means that the references have to be stored elsewhere, or the objects may be garbage collected, invalidating the sockets before or while poll() executes.
// Not OK
auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn);

// OK
auto s = new std.socket.Socket(/*...*/);
auto p2 = PollItem(s, PollFlags.pollIn);

Corresponds to:

Members

nothrow @safe this(ref zmqd.Socket socket, PollFlags events);

Constructs a PollItem for monitoring a ∅MQ socket.

@system this(std.socket.Socket socket, PollFlags events);

Constructs a PollItem for monitoring a standard socket referenced by a std.socket.Socket.

pure nothrow @safe this(FD fd, PollFlags events);

Constructs a PollItem for monitoring a standard socket referenced by a native file descriptor.

pure nothrow @property @safe void requestedEvents(PollFlags events);
const pure nothrow @property @safe PollFlags requestedEvents();

Requested events.

Corresponds to:

const pure nothrow @property @safe PollFlags returnedEvents();

Returned events.

Corresponds to:

struct Frame;

An object that encapsulates a ∅MQ message frame.

This struct is a wrapper around a zmq_msg_t object. Unlike Context and Socket, it does not perform reference counting, because ∅MQ messages have a form of reference counting of their own. A Frame cannot be copied by normal assignment; use Frame.copy() for this.

A default-initialized Frame is not a valid ∅MQ message frame; it should always be explicitly initialized upon construction using Frame.opCall(). Alternatively, it may be initialized later with Frame.rebuild().
Frame msg1;                 // Invalid frame
auto msg2 = Frame();        // Empty frame
auto msg3 = Frame(1024);    // 1K frame
msg1.rebuild(2048);         // msg1 now has size 2K
msg2.rebuild(2048);         // ...and so does msg2
When a Frame goes out of scope, zmq_msg_close() is called on the underlying zmq_msg_t.

Members

static @safe Frame opCall();

Initializes an empty ∅MQ message frame.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto msg = Frame();
assert(msg.size == 0);

static @safe Frame opCall(size_t size);

Initializes a ∅MQ message frame of a specified size.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto msg = Frame(123);
assert(msg.size == 123);

static @safe Frame opCall(ubyte[] data);

Initializes a ∅MQ message frame from a supplied buffer.

Warning:

Some care must be taken when using this function, as ∅MQ expects to take full ownership of the supplied buffer. Client code should therefore avoid retaining any references to it, including slices that contain, overlap with or are contained in data. ∅MQ makes no guarantee that the buffer is not modified, and it does not specify when the buffer is released.

An additional complication is caused by the fact that most arrays in D are owned by the garbage collector. This is solved by adding the array pointer as a new garbage collector root before passing it to zmq_msg_init_data(), thus preventing the GC from collecting it. The root is then removed again in the deallocator callback function which is called by ∅MQ when it no longer requires the buffer, thus allowing the GC to collect it.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto buf = new ubyte[123];
auto msg = Frame(buf);
assert(msg.size == buf.length);
assert(msg.data.ptr == buf.ptr);

@safe void rebuild();

Reinitializes the Frame object as an empty message.

This function will first call Frame.close() to release the resources associated with the message frame, and then it will initialize it anew, exactly as if it were constructed with Frame().

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto msg = Frame(256);
assert (msg.size == 256);
msg.rebuild();
assert (msg.size == 0);

@safe void rebuild(size_t size);

Reinitializes the Frame object to a specified size.

This function will first call Frame.close() to release the resources associated with the message frame, and then it will initialize it anew, exactly as if it were constructed with Frame(size).

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto msg = Frame(256);
assert (msg.size == 256);
msg.rebuild(1024);
assert (msg.size == 1024);

@safe void rebuild(ubyte[] data);

Reinitializes the Frame object from a supplied buffer.

This function will first call Frame.close() to release the resources associated with the message frame, and then it will initialize it anew, exactly as if it were constructed with Frame(data).

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto msg = Frame(256);
assert (msg.size == 256);
auto buf = new ubyte[123];
msg.rebuild(buf);
assert(msg.size == buf.length);
assert(msg.data.ptr == buf.ptr);

@safe void close();

Releases the ∅MQ message frame.

Note that the frame will be automatically released when the Frame object is destroyed, so it is often not necessary to call this method manually.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

@safe Frame copy();
@safe void copyTo(ref Frame dest);

Copies frame content to another message frame.

copy() returns a new Frame object, while copyTo(dest) copies the contents of this Frame into dest. dest must be a valid (i.e. initialized) Frame.

Warning:

These functions may not do what you think they do. Please refer to the ∅MQ manual for details.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

import std.string: representation;
auto msg1 = Frame(3);
msg1.data[] = "foo".representation;
auto msg2 = msg1.copy();
assert (msg2.data.asString() == "foo");

@safe Frame move();
@safe void moveTo(ref Frame dest);

Moves frame content to another message frame.

move() returns a new Frame object, while moveTo(dest) moves the contents of this Frame to dest. dest must be a valid (i.e. initialized) Frame.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

import std.string: representation;
auto msg1 = Frame(3);
msg1.data[] = "foo".representation;
auto msg2 = msg1.move();
assert (msg1.size == 0);
assert (msg2.data.asString() == "foo");

nothrow @property @safe size_t size();

The message frame content size in bytes.

Corresponds to:

Examples:

auto msg = Frame(123);
assert(msg.size == 123);

nothrow @property @trusted ubyte[] data();

Retrieves the message frame content.

Corresponds to:

Examples:

import std.string: representation;
auto msg = Frame(3);
assert(msg.data.length == 3);
msg.data[] = "foo".representation; // Slice operator -> array copy.
assert(msg.data.asString() == "foo");

nothrow @property @safe bool more();

Whether there are more message frames to retrieve.

Corresponds to:

inout pure nothrow @property @safe inout(zmq_msg_t)* handle();

A pointer to the underlying zmq_msg_t.

@trusted Context defaultContext();

A global context which is used by default by all sockets, unless they are explicitly constructed with a different context.

The ∅MQ Guide has the following to say about context creation:
You should create and use exactly one context in your process. […] If at runtime a process has two contexts, these are like separate ∅MQ instances. If that's explicitly what you want, OK, but otherwise remember: Do one zmq_ctx_new() at the start of your main line code, and one zmq_ctx_destroy() at the end.
By using defaultContext(), this is exactly what you achieve. The context is created the first time the function is called, and is automatically destroyed when the program ends.

This function is thread safe.

Throws:

ZmqException if ∅MQ reports an error.

See also:

struct Context;

An object that encapsulates a ∅MQ context.

In most programs, it is not necessary to use this type directly, as Socket will use a default global context if not explicitly provided with one. See defaultContext() for details.

A default-initialized Context is not a valid ∅MQ context; it must always be explicitly initialized with Context.opCall():
Context ctx;        // Not a valid context yet
ctx = Context();    // ...but now it is.
Context objects can be passed around by value, and two copies will refer to the same context. The underlying context is managed using reference counting, so that when the last copy of a Context goes out of scope, the context is automatically destroyed. The reference counting is performed in a thread safe manner, so that the same context can be shared between multiple threads. (∅MQ guarantees the thread safety of other context operations.)

See also:

Members

static @trusted Context opCall();

Creates a new ∅MQ context.

Returns:

A Context object that encapsulates the new context.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

Examples:

auto ctx = Context();
assert (ctx.initialized);

@safe void detach();

Detaches from the ∅MQ context.

If this is the last reference to the context, it will be destroyed with zmq_ctx_destroy().

Throws:

ZmqException if ∅MQ reports an error.

Examples:

auto ctx = Context();
assert (ctx.initialized);
ctx.detach();
assert (!ctx.initialized);

@property @safe int ioThreads();
@property @safe void ioThreads(int value);

The number of I/O threads.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_ctx_get() and zmq_ctx_set() with ZMQ_IO_THREADS.

Examples:

auto ctx = Context();
ctx.ioThreads = 3;
assert (ctx.ioThreads == 3);

@property @safe int maxSockets();
@property @safe void maxSockets(int value);

The maximum number of sockets.

Throws:

ZmqException if ∅MQ reports an error.

Corresponds to:

zmq_ctx_get() and zmq_ctx_set() with ZMQ_MAX_SOCKETS.

Examples:

auto ctx = Context();
ctx.maxSockets = 512;
assert (ctx.maxSockets == 512);

inout pure nothrow @property @trusted inout(void)* handle();

The void* pointer used by the underlying C API to refer to the context.

If the object has not been initialized, this function returns null.

const pure nothrow @property @safe bool initialized();

Whether this Context object has been initialized, i.e. whether it refers to a valid ∅MQ context.

Examples:

Context ctx;
assert (!ctx.initialized);
ctx = Context();
assert (ctx.initialized);
ctx.detach();
assert (!ctx.initialized);

enum EventType: int;

Socket event types.

These are used together with Socket.monitor(), and are described in the zmq_socket_monitor() reference.

Members

connected

Corresponds to ZMQ_EVENT_CONNECTED.

connectDelayed

Corresponds to ZMQ_EVENT_CONNECT_DELAYED.

connectRetried

Corresponds to ZMQ_EVENT_CONNECT_RETRIED.

listening

Corresponds to ZMQ_EVENT_LISTENING.

bindFailed

Corresponds to ZMQ_EVENT_BIND_FAILED.

accepted

Corresponds to ZMQ_EVENT_ACCEPTED.

acceptFailed

Corresponds to ZMQ_EVENT_ACCEPT_FAILED.

closed

Corresponds to ZMQ_EVENT_CLOSED.

closeFailed

Corresponds to ZMQ_EVENT_CLOSE_FAILED.

disconnected

Corresponds to ZMQ_EVENT_DISCONNECTED.

all

Corresponds to ZMQ_EVENT_ALL.

@system Event receiveEvent(ref Socket socket);

Receives a message on the given socket and interprets it as a socket state change event.

socket must be a PAIR socket which is connected to an endpoint created via a Socket.monitor() call. receiveEvent() receives one message on the socket, parses its contents according to the specification in the zmq_socket_monitor() reference, and returns the event information as an Event object.

The function will attempt to detect whether the received message is in fact an event message, by checking that its length is equal to zmq_event_t.sizeof and that the value of the zmq_event_t.event field is valid. If this is not the case, an InvalidEventException is thrown.

Warning:

The format of event messages changed between ∅MQ 3.x and 4.x. For the time being, this implementation only supports 3.x, and the function will throw an InvalidEventException if used with a newer version of ∅MQ.

Throws:

ZmqException if ∅MQ reports an error.
InvalidEventException if the received message could not be interpreted as an event message.

See also:

Socket.monitor(), for monitoring socket state changes.

struct Event;

Information about a socket state change.

Corresponds to:

See also:

Members

const pure nothrow @property @safe EventType type();

The event type.

Corresponds to:

zmq_event_t.event

const pure nothrow @property @safe string address();

The peer address.

Corresponds to:

zmq_event_t.data.xyz.addr, where xyz is the event-specific union.

const pure nothrow @property @safe FD fd();

The socket file descriptor.

This property function may only be called if Event.type is one of: connected, listening, accepted, closed or disonnected.

Throws:

Error if the property is called for a wrong event type.

Corresponds to:

zmq_event_t.data.xyz.addr, where xyz is the event-specific union.

const pure nothrow @property @safe int errno();

The errno code for the error which triggered the event.

This property function may only be called if Event.type is either bindFailed, acceptFailed or closeFailed.

Throws:

Error if the property is called for a wrong event type.

Corresponds to:

zmq_event_t.data.xyz.addr, where xyz is the event-specific union.

const pure nothrow @property @safe Duration interval();

The reconnect interval.

This property function may only be called if Event.type is connectRetried.

Throws:

Error if the property is called for a wrong event type.

Corresponds to:

zmq_event_t.data.connect_retried.interval

pure @safe inout(char)[] asString(inout(ubyte)[] data);

Utility function which interprets and validates a byte array as a UTF-8 string.

Most of ∅MQD's message API deals in ubyte[] arrays, but very often, the message data contains plain text. asString() allows for easy and safe interpretation of raw data as characters. It checks that data is a valid UTF-8 encoded string, and returns a char[] array that refers to the same memory region.

Throws:

std.utf.UTFException if data is not a valid UTF-8 string.

See also:

std.string.representation, which performs the opposite operation.

Examples:

auto s1 = Socket(SocketType.pair);
s1.bind("ipc://zmqd_asString_example");
auto s2 = Socket(SocketType.pair);
s2.connect("ipc://zmqd_asString_example");

auto msg = Frame(12);
msg.data.asString()[] = "Hello World!";
s1.send(msg);

ubyte[12] buf;
s2.receive(buf);
assert(buf.asString() == "Hello World!");

class ZmqException: object.Exception;

A class for exceptions thrown when any of the underlying ∅MQ C functions report an error.

The exception provides a standard error message obtained with zmq_strerror(), as well as the errno code set by the ∅MQ function which reported the error.

Members

immutable int errno;

The errno code that was set by the ∅MQ function that reported the error.

Corresponds to:

class InvalidEventException: object.Exception;

Exception thrown by receiveEvent() on failure to interpret a received message as an event description.