zmqd – a thin wrapper around the low-level C API of the ZeroMQ messaging framework.
Most functions in this module have a one-to-one relationship with functions in the underlying C API. Some adaptations have been made to make the API safer, easier and more pleasant to use; most importantly:
ZmqException
class provides
a standard textual message for any error condition, but it also
provides access to the errno
code set by the C function
that reported the error.@safe
, pure
and nothrow
as appropriate, thus facilitating their use in high-level D code.zmq_msg_send()
becomes zmqd.Frame.send()
and so on. (Multipart messages were a late addition to ZeroMQ, and the "msg"
function names were well established at that point. The library's
developers have admitted that this is somewhat confusing, and the newer
CZMQ API consistently uses "frame" in function names.)
examples
subdirectory of the zmqd source repository.
Reports the ZeroMQ library version.
std.typecons.Tuple
with three integer fields that represent the
three versioning levels: major
, minor
and patch
.
zmq_version()
Checks for a ZeroMQ capability.
zmq_has()
The various socket types.
These are described in the zmq_socket()
reference.
Corresponds to ZMQ_REQ
Corresponds to ZMQ_REP
Corresponds to ZMQ_DEALER
Corresponds to ZMQ_ROUTER
Corresponds to ZMQ_PUB
Corresponds to ZMQ_SUB
Corresponds to ZMQ_XPUB
Corresponds to ZMQ_XSUB
Corresponds to ZMQ_PUSH
Corresponds to ZMQ_PULL
Corresponds to ZMQ_PAIR
Corresponds to ZMQ_STREAM
Security mechanisms.
A special core.time.Duration
value used to signify an infinite
timeout or time interval in certain contexts.
The places where this value may be used are:
core.time.Duration
doesn't reserve such a special value, the
actual value of infiniteDuration
is core.time.Duration.max
.An object that encapsulates a ZeroMQ socket.
A default-initialized Socket
is not a valid ZeroMQ socket; it
must always be explicitly initialized with a constructor (see
Socket.this()
):
Socket s; // Not a valid socket yet s = Socket(SocketType.push); // ...but now it is.This
struct
is noncopyable, which means that a socket is always
uniquely managed by a single Socket
object. Functions that will
inspect or use the socket, but not take ownership of it, should take
a ref Socket
parameter. Use std.algorithm.move
to move
a Socket
to a different location (e.g. into a sink function that
takes it by value, or into a new variable).
Socket
object goes out
of scope.
Creates a new ZeroMQ socket.
If context
is not specified, the default context (as returned
by defaultContext()
) is used.
ZmqException
if ZeroMQ reports an error.
zmq_socket()
auto sck = Socket(SocketType.push); assert (sck.initialized);
auto ctx = Context(); auto sck = Socket(ctx, SocketType.push); assert (sck.initialized);
Closes the ZeroMQ socket.
Note that the socket will be closed automatically upon destruction, so it is usually not necessary to call this method manually.
ZmqException
if ZeroMQ reports an error.
zmq_close()
auto s = Socket(SocketType.pair); assert (s.initialized); s.close(); assert (!s.initialized);
Starts accepting incoming connections on endpoint
.
ZmqException
if ZeroMQ reports an error.
zmq_bind()
auto s = Socket(SocketType.pub); s.bind("inproc://zmqd_bind_example");
Stops accepting incoming connections on endpoint
.
ZmqException
if ZeroMQ reports an error.
zmq_unbind()
auto s = Socket(SocketType.pub); s.bind("ipc://zmqd_unbind_example"); // Do some work... s.unbind("ipc://zmqd_unbind_example");
Creates an outgoing connection to endpoint
.
ZmqException
if ZeroMQ reports an error.
zmq_connect()
auto s = Socket(SocketType.sub); s.connect("inproc://zmqd_connect_example");
Disconnects the socket from endpoint
.
ZmqException
if ZeroMQ reports an error.
zmq_disconnect()
auto s = Socket(SocketType.sub); s.connect("inproc://zmqd_disconnect_example"); // Do some work... s.disconnect("inproc://zmqd_disconnect_example");
Sends a message frame.
send
blocks until the frame has been queued on the socket.
trySend
performs the operation in non-blocking mode, and returns
a bool
value that signifies whether the frame was queued on the
socket.
The more
parameter specifies whether this is a multipart message
and there are more frames to follow.
The char[]
overload is a convenience function that simply casts
the string argument to ubyte[]
.
ZmqException
if ZeroMQ reports an error.
zmq_send()
(with the ZMQ_DONTWAIT
flag, in the
case of trySend
, and with the ZMQ_SNDMORE
flag if
more == true
).auto sck = Socket(SocketType.pub); sck.send(cast(ubyte[]) [11, 226, 92]); sck.send("Hello World!");
Sends a message frame.
send
blocks until the frame has been queued on the socket.
trySend
performs the operation in non-blocking mode, and returns
a bool
value that signifies whether the frame was queued on the
socket.
The more
parameter specifies whether this is a multipart message
and there are more frames to follow.
ZmqException
if ZeroMQ reports an error.
zmq_msg_send()
(with the ZMQ_DONTWAIT
flag, in the
case of trySend
, and with the ZMQ_SNDMORE
flag if
more == true
).auto sck = Socket(SocketType.pub); auto msg = Frame(12); msg.data.asString()[] = "Hello World!"; sck.send(msg);
Sends a constant-memory message frame.
sendConst
blocks until the frame has been queued on the socket.
trySendConst
performs the operation in non-blocking mode, and returns
a bool
value that signifies whether the frame was queued on the
socket.
The more
parameter specifies whether this is a multipart message
and there are more frames to follow.
ZmqException
if ZeroMQ reports an error.
zmq_send_const()
(with the ZMQ_DONTWAIT
flag, in the
case of trySend
, and with the ZMQ_SNDMORE
flag if
more == true
).static immutable arr = cast(ubyte[]) [11, 226, 92]; auto sck = Socket(SocketType.pub); sck.sendConst(arr); sck.sendConst("Hello World!");
Receives a message frame.
receive
blocks until the request can be satisfied, and returns the
number of bytes in the frame.
tryReceive
performs the operation in non-blocking mode, and returns
a std.typecons.Tuple
which contains the size of the frame along
with a bool
value that signifies whether a frame was received.
(If the latter is false
, the former is always set to zero.)
ZmqException
if ZeroMQ reports an error.
zmq_recv()
(with the ZMQ_DONTWAIT
flag, in the case
of tryReceive
).// Sender auto snd = Socket(SocketType.req); snd.connect("inproc://zmqd_receive_example"); snd.send("Hello World!"); // Receiver import std.string: representation; auto rcv = Socket(SocketType.rep); rcv.bind("inproc://zmqd_receive_example"); char[256] buf; immutable len = rcv.receive(buf.representation); assert (buf[0 .. len] == "Hello World!");
Receives a message frame.
receive
blocks until the request can be satisfied, and returns the
number of bytes in the frame.
tryReceive
performs the operation in non-blocking mode, and returns
a std.typecons.Tuple
which contains the size of the frame along
with a bool
value that signifies whether a frame was received.
(If the latter is false
, the former is always set to zero.)
ZmqException
if ZeroMQ reports an error.
zmq_msg_recv()
(with the ZMQ_DONTWAIT
flag, in the case
of tryReceive
).// Sender auto snd = Socket(SocketType.req); snd.connect("inproc://zmqd_msg_receive_example"); snd.send("Hello World!"); // Receiver import std.string: representation; auto rcv = Socket(SocketType.rep); rcv.bind("inproc://zmqd_msg_receive_example"); auto msg = Frame(); rcv.receive(msg); assert (msg.data.asString() == "Hello World!");
Whether there are more message frames to follow.
ZmqException
if ZeroMQ reports an error.
zmq_getsockopt()
with ZMQ_RCVMORE
.Miscellaneous socket options.
Each of these corresponds to an option passed to
zmq_getsockopt()
and zmq_setsockopt()
. For
example, identity
corresponds to ZMQ_IDENTITY
,
receiveBufferSize
corresponds to ZMQ_RCVBUF
, etc.
identity
property
accepts strings. To retrieve a string with the getter, use
the asString()
function.
sck.identity = "foobar"; assert (sck.identity.asString() == "foobar");
linger
, receiveTimeout
, sendTimeout
and
handshakeInterval
properties may have the special value
infiniteDuration
. This is translated to an option
value of -1 or 0 (depending on which property is being set)
in the C API.@property
, but are prefixed with
"get" (e.g. getIdentity()
). A user-supplied buffer is
required for some options, namely getPlainUsername()
and getPlainPassword()
, and these do not have @property
versions. getCurveXxxKey()
and getCurveXxxKeyZ85()
require buffers which are at least 32 and 41 bytes long,
respectively.ZMQ_SUBSCRIBE
and ZMQ_UNSUBSCRIBE
options are
treated differently from the others; see Socket.subscribe()
and Socket.unsubscribe()
ZmqException
if ZeroMQ reports an error.std.conv.ConvOverflowException
if a given Duration
is
longer than the number of milliseconds that will fit in an int
(only applies to properties of core.time.Duration
type).core.exception.RangeError
if the dest
buffers passed to
getCurveXxxKey()
or getCurveXxxKeyZ85()
are less than
32 or 41 bytes long, respectively.
zmq_getsockopt()
and zmq_setsockopt()
.Establishes a message filter.
ZmqException
if ZeroMQ reports an error.
zmq_msg_setsockopt()
with ZMQ_SUBSCRIBE
.// Create a subscriber that accepts all messages that start with // the prefixes "foo" or "bar". auto sck = Socket(SocketType.sub); sck.subscribe("foo"); sck.subscribe("bar");
Removes a message filter.
ZmqException
if ZeroMQ reports an error.
zmq_msg_setsockopt()
with ZMQ_SUBSCRIBE
.// Subscribe to messages that start with "foo" or "bar". auto sck = Socket(SocketType.sub); sck.subscribe("foo"); sck.subscribe("bar"); // ... // From now on, only accept messages that start with "bar" sck.unsubscribe("foo");
Spawns a PAIR socket that publishes socket state changes (events) over the INPROC transport to the given endpoint.
Which event types should be published may be selected by bitwise-ORing
together different EventType
flags in the events
parameter.
ZmqException
if ZeroMQ reports an error.
zmq_socket_monitor()
receiveEvent()
, which receives and parses event messages.auto sck = Socket(SocketType.pub); sck.monitor("inproc://zmqd_monitor_unittest", EventType.accepted | EventType.closed);
The void*
pointer used by the underlying C API to refer to the socket.
If the object has not been initialized, this function returns null
.
Whether this Socket
object has been initialized, i.e. whether it
refers to a valid ZeroMQ socket.
Socket sck; assert (!sck.initialized); sck = Socket(SocketType.sub); assert (sck.initialized); sck.close(); assert (!sck.initialized);
The native socket file descriptor type.
This is an alias for SOCKET
on Windows and int
on POSIX systems.
Starts the built-in ZeroMQ proxy.
This function never returns normally, but it may throw an exception. This could happen if the context associated with either of the specified sockets is manually destroyed in a different thread.
ZmqException
if ZeroMQ reports an error.
zmq_proxy()
steerableProxy()
Starts the built-in ZeroMQ proxy with control flow.
Note that the order of the two last parameters is reversed compared to
zmq_proxy_steerable()
. That is, the control
socket always
comes before the capture
socket. Furthermore, unlike in ZeroMQ,
control
is mandatory. (Without the control socket one can simply
use proxy()
.)
ZmqException
if ZeroMQ reports an error.
zmq_proxy_steerable()
proxy()
Input/output multiplexing.
The timeout
parameter may have the special value infiniteDuration
which means no timeout. This is translated to an argument value of -1 in the
C API.
PollItem
structures with events signalled in
PollItem.returnedEvents
, or 0 if no events have been signalled.
ZmqException
if ZeroMQ reports an error.
zmq_poll()
auto socket1 = zmqd.Socket(zmqd.SocketType.pull); socket1.bind("inproc://zmqd_poll_example"); import std.socket; auto socket2 = new std.socket.Socket( AddressFamily.INET, std.socket.SocketType.DGRAM); socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678)); auto socket3 = zmqd.Socket(zmqd.SocketType.push); socket3.connect("inproc://zmqd_poll_example"); socket3.send("test"); import core.thread: Thread; Thread.sleep(10.msecs); auto items = [ PollItem(socket1, PollFlags.pollIn), PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut), PollItem(socket3, PollFlags.pollIn), ]; const n = poll(items, 100.msecs); assert (n == 2); assert (items[0].returnedEvents == PollFlags.pollIn); assert (items[1].returnedEvents == PollFlags.pollOut); assert (items[2].returnedEvents == 0); socket2.close();
poll()
event flags.
These are described in the zmq_poll()
manual.
A structure that specifies a socket to be monitored by poll()
as well
as the events to poll for, and, when poll()
returns, the events that
occurred.
PollItem
objects do not store std.socket.Socket
references,
only the corresponding native file descriptors. This means that the
references have to be stored elsewhere, or the objects may be garbage
collected, invalidating the sockets before or while poll()
executes.
// Not OK auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn); // OK auto s = new std.socket.Socket(/*...*/); auto p2 = PollItem(s, PollFlags.pollIn);
zmq_pollitem_t
Constructs a PollItem
for monitoring a ZeroMQ socket.
Constructs a PollItem
for monitoring a standard socket referenced
by a std.socket.Socket
.
Constructs a PollItem
for monitoring a standard socket referenced
by a native file descriptor.
Requested events.
zmq_pollitem_t.events
Returned events.
zmq_pollitem_t.revents
An object that encapsulates a ZeroMQ message frame.
This struct
is a wrapper around a zmq_msg_t
object.
A default-initialized Frame
is not a valid ZeroMQ message frame; it
should always be explicitly initialized upon construction using
Frame.opCall()
. Alternatively, it may be initialized later with
Frame.rebuild()
.
Frame msg1; // Invalid frame auto msg2 = Frame(); // Empty frame auto msg3 = Frame(1024); // 1K frame msg1.rebuild(2048); // msg1 now has size 2K msg2.rebuild(2048); // ...and so does msg2When a
Frame
object is destroyed, zmq_msg_close()
is
called on the underlying zmq_msg_t
.
Frame
cannot be copied by normal assignment; use Frame.copy()
for this.
Initializes an empty ZeroMQ message frame.
ZmqException
if ZeroMQ reports an error.
zmq_msg_init()
auto msg = Frame(); assert(msg.size == 0);
Initializes a ZeroMQ message frame of the specified size.
ZmqException
if ZeroMQ reports an error.
zmq_msg_init_size()
auto msg = Frame(123); assert(msg.size == 123);
Initializes a ZeroMQ message frame from a supplied buffer.
If free
is not specified, data
must refer to a slice of
memory which has been allocated by the garbage collector (typically using
operator new
). Ownership will then be transferred temporarily to
ZeroMQ, and then transferred back to the GC when ZeroMQ is done using the
buffer.
For memory which is not garbage collected, the argument free
must be a pointer to a function that will release the memory, which will
be called when ZeroMQ no longer needs the buffer. free
must point to
a function with the following signature:
extern(C) void f(void* d, void* h) nothrow;When it is called,
d
will be equal to data.ptr
, while h
will
be equal to hint
(which is optional and null by default).
free
is passed directly to the underlying ZeroMQ C function, which is
why it needs to be extern(C)
.
data
. For the "non-GC" version, client code should in general not
retain slices or pointers to any memory which will be released when
free
is called.
Frame.FreeData
ZmqException
if ZeroMQ reports an error.
zmq_msg_init_data()
// Garbage-collected memory auto buf = new ubyte[123]; auto msg = Frame(buf); assert(msg.size == buf.length); assert(msg.data.ptr == buf.ptr);
// Manually managed memory import core.stdc.stdlib: malloc, free; static extern(C) void myFree(void* data, void* hint) nothrow { free(data); } auto buf = (cast(ubyte*) malloc(10))[0 .. 10]; auto msg = Frame(buf, &myFree); assert(msg.size == buf.length); assert(msg.data.ptr == buf.ptr);
The function pointer type for memory-freeing callback functions passed to
Frame(ubyte[], FreeData, void*)
.
Reinitializes the Frame object as an empty message.
This function will first call Frame.close()
to release the
resources associated with the message frame, and then it will
initialize it anew, exactly as if it were constructed with
Frame()
.
ZmqException
if ZeroMQ reports an error.
zmq_msg_close()
followed by zmq_msg_init()
auto msg = Frame(256); assert (msg.size == 256); msg.rebuild(); assert (msg.size == 0);
Reinitializes the Frame object to a specified size.
This function will first call Frame.close()
to release the
resources associated with the message frame, and then it will
initialize it anew, exactly as if it were constructed with
Frame(size)
.
ZmqException
if ZeroMQ reports an error.
zmq_msg_close()
followed by zmq_msg_init_size()
.auto msg = Frame(256); assert (msg.size == 256); msg.rebuild(1024); assert (msg.size == 1024);
Reinitializes the Frame object from a supplied buffer.
This function will first call Frame.close()
to release the
resources associated with the message frame, and then it will
initialize it anew, exactly as if it were constructed with
Frame(data)
or Frame(data, free, hint)
.
Some care must be taken when using these functions. Please read the
Frame(ubyte[])
documentation.
ZmqException
if ZeroMQ reports an error.
zmq_msg_close()
followed by zmq_msg_init_data()
.// Garbage-collected memory auto msg = Frame(256); assert (msg.size == 256); auto buf = new ubyte[123]; msg.rebuild(buf); assert(msg.size == buf.length); assert(msg.data.ptr == buf.ptr);
// Manually managed memory import core.stdc.stdlib: malloc, free; static extern(C) void myFree(void* data, void* hint) nothrow { free(data); } auto msg = Frame(256); assert (msg.size == 256); auto buf = (cast(ubyte*) malloc(10))[0 .. 10]; msg.rebuild(buf, &myFree); assert(msg.size == buf.length); assert(msg.data.ptr == buf.ptr);
Releases the ZeroMQ message frame.
Note that the frame will be automatically released when the Frame
object is destroyed, so it is often not necessary to call this method
manually.
ZmqException
if ZeroMQ reports an error.
zmq_msg_close()
Copies frame content to another message frame.
copy()
returns a new Frame
object, while copyTo(dest)
copies the contents of this Frame
into dest
. dest
must
be a valid (i.e. initialized) Frame
.
ZmqException
if ZeroMQ reports an error.
zmq_msg_copy()
import std.string: representation; auto msg1 = Frame(3); msg1.data[] = "foo".representation; auto msg2 = msg1.copy(); assert (msg2.data.asString() == "foo");
Moves frame content to another message frame.
move()
returns a new Frame
object, while moveTo(dest)
moves the contents of this Frame
to dest
. dest
must
be a valid (i.e. initialized) Frame
.
ZmqException
if ZeroMQ reports an error.
zmq_msg_move()
import std.string: representation; auto msg1 = Frame(3); msg1.data[] = "foo".representation; auto msg2 = msg1.move(); assert (msg1.size == 0); assert (msg2.data.asString() == "foo");
The message frame content size in bytes.
zmq_msg_size()
auto msg = Frame(123); assert(msg.size == 123);
Retrieves the message frame content.
zmq_msg_data()
import std.string: representation; auto msg = Frame(3); assert(msg.data.length == 3); msg.data[] = "foo".representation; // Slice operator -> array copy. assert(msg.data.asString() == "foo");
Whether there are more message frames to retrieve.
zmq_msg_more()
The file descriptor of the socket the message was read from.
ZmqException
if ZeroMQ reports an error.
zmq_msg_get()
with ZMQ_SRCFD
.Whether the message MAY share underlying storage with another copy.
ZmqException
if ZeroMQ reports an error.
zmq_msg_get()
with ZMQ_SHARED
.Gets message metadata.
metadataUnsafe()
is faster than metadata()
because it
directly returns the array which comes from zmq_msg_gets()
,
whereas the latter returns a freshly GC-allocated copy of it. However,
the array returned by metadataUnsafe()
is owned by the underlying
ZeroMQ message and gets destroyed along with it, so care must be
taken when using this function.
ZmqException
if ZeroMQ reports an error.
zmq_msg_gets()
A pointer to the underlying zmq_msg_t
.
A global context which is used by default by all sockets, unless they are explicitly constructed with a different context.
The ZeroMQ Guide has the following to say about context creation:
You should create and use exactly one context in your process. […] If at runtime a process has two contexts, these are like separate ZeroMQ instances. If that's explicitly what you want, OK, but otherwise remember: Do oneBy usingzmq_ctx_new()
at the start of your main line code, and onezmq_ctx_destroy()
at the end.
defaultContext()
, this is exactly what you achieve. The
context is created the first time the function is called, and is
automatically destroyed when the program ends.
ZmqException
if ZeroMQ reports an error.
Context
An object that encapsulates a ZeroMQ context.
In most programs, it is not necessary to use this type directly,
as Socket
will use a default global context if not explicitly
provided with one. See defaultContext()
for details.
A default-initialized Context
is not a valid ZeroMQ context; it
must always be explicitly initialized with Context.opCall()
:
Context ctx; // Not a valid context yet ctx = Context(); // ...but now it is.
Context
objects can be passed around by value, and two copies will
refer to the same context. The underlying context is managed using
reference counting, so that when the last copy of a Context
goes
out of scope, the context is automatically destroyed. The reference
counting is performed in a thread safe manner, so that the same context
can be shared between multiple threads. (ZeroMQ guarantees the thread
safety of other context operations.)
defaultContext()
Creates a new ZeroMQ context.
Context
object that encapsulates the new context.
ZmqException
if ZeroMQ reports an error.
zmq_ctx_new()
auto ctx = Context(); assert (ctx.initialized);
Detaches from the ZeroMQ context.
If this is the last reference to the context, it will be destroyed with
zmq_ctx_term()
.
ZmqException
if ZeroMQ reports an error.auto ctx = Context(); assert (ctx.initialized); ctx.detach(); assert (!ctx.initialized);
Forcefully terminates the context.
By using this function, one effectively circumvents the reference-counting
mechanism for managing the context. After it returns, all other
Context
objects that used to refer to the same context will be in a
state which is functionally equivalent to the default-initialized state
(i.e., Context.initialized
is false
and Context.handle
is null
).
ZmqException
if ZeroMQ reports an error.
zmq_ctx_term()
auto ctx1 = Context(); auto ctx2 = ctx1; assert (ctx1.initialized); assert (ctx2.initialized); assert (ctx1.handle == ctx2.handle); ctx2.terminate(); assert (!ctx1.initialized); assert (!ctx2.initialized); assert (ctx1.handle == null); assert (ctx2.handle == null);
The number of I/O threads.
ZmqException
if ZeroMQ reports an error.
zmq_ctx_get()
and zmq_ctx_set()
with
ZMQ_IO_THREADS
.auto ctx = Context(); ctx.ioThreads = 3; assert (ctx.ioThreads == 3);
The maximum number of sockets.
ZmqException
if ZeroMQ reports an error.
zmq_ctx_get()
and zmq_ctx_set()
with
ZMQ_MAX_SOCKETS
.auto ctx = Context(); ctx.maxSockets = 512; assert (ctx.maxSockets == 512);
The largest configurable number of sockets.
ZmqException
if ZeroMQ reports an error.
zmq_ctx_get()
with ZMQ_SOCKET_LIMIT
.auto ctx = Context(); assert (ctx.socketLimit > 0);
IPv6 option.
ZmqException
if ZeroMQ reports an error.
zmq_ctx_get()
and zmq_ctx_set()
with
ZMQ_IPV6
.auto ctx = Context(); ctx.ipv6 = true; assert (ctx.ipv6 == true);
The void*
pointer used by the underlying C API to refer to the context.
If the object has not been initialized, this function returns null
.
Whether this Context
object has been initialized, i.e. whether it
refers to a valid ZeroMQ context.
Context ctx; assert (!ctx.initialized); ctx = Context(); assert (ctx.initialized); ctx.detach(); assert (!ctx.initialized);
Socket event types.
These are used together with Socket.monitor()
, and are described
in the zmq_socket_monitor()
reference.
Corresponds to ZMQ_EVENT_CONNECTED
.
Corresponds to ZMQ_EVENT_CONNECT_DELAYED
.
Corresponds to ZMQ_EVENT_CONNECT_RETRIED
.
Corresponds to ZMQ_EVENT_LISTENING
.
Corresponds to ZMQ_EVENT_BIND_FAILED
.
Corresponds to ZMQ_EVENT_ACCEPTED
.
Corresponds to ZMQ_EVENT_ACCEPT_FAILED
.
Corresponds to ZMQ_EVENT_CLOSED
.
Corresponds to ZMQ_EVENT_CLOSE_FAILED
.
Corresponds to ZMQ_EVENT_DISCONNECTED
.
Corresponds to ZMQ_EVENT_ALL
.
Receives a message on the given socket and interprets it as a socket state change event.
socket
must be a PAIR socket which is connected to an endpoint
created via a Socket.monitor()
call. receiveEvent()
receives
one message on the socket, parses its contents according to the
specification in the zmq_socket_monitor()
reference,
and returns the event information as an Event
object.
ZmqException
if ZeroMQ reports an error.InvalidEventException
if the received message could not
be interpreted as an event message.
Socket.monitor()
, for monitoring socket state changes.Information about a socket state change.
receiveEvent()
The event type.
The peer address.
The socket file descriptor.
This property function may only be called if Event.type
is one of:
connected
, listening
, accepted
, closed
or disonnected
.
Error
if the property is called for a wrong event type.The errno
code for the error which triggered the event.
This property function may only be called if Event.type
is either
bindFailed
, acceptFailed
or closeFailed
.
Error
if the property is called for a wrong event type.The reconnect interval.
This property function may only be called if Event.type
is
connectRetried
.
Error
if the property is called for a wrong event type.Encodes a binary key as Z85 printable text.
dest
must be an array whose length is at least 5*data.length/4 + 1
,
which will be used to store the return value plus a terminating zero byte.
If dest
is omitted, a new array will be created.
5*data.length/4
which contains the Z85-encoded text,
excluding the terminating zero byte. This will be a slice of dest
if
it is provided.
core.exception.RangeError
if dest
is given but is too small.ZmqException
if ZeroMQ reports an error (i.e., if data.length
is not a multiple of 4).
zmq_z85_encode()
Decodes a binary key from Z85 printable text.
dest
must be an array whose length is at least 4*data.length/5
,
which will be used to store the return value.
If dest
is omitted, a new array will be created.
Note that zmq_z85_decode()
expects a zero-terminated string, so a zero
byte will be appended to text
if it does not contain one already. However,
this may trigger a (possibly unwanted) GC allocation. To avoid this, either
make sure that the last character in text
is '\0'
, or use
assumeSafeAppend
on the array before calling this function (provided
this is safe).
4*data.length/5
which contains the decoded data.
This will be a slice of dest
if it is provided.
core.exception.RangeError
if dest
is given but is too small.ZmqException
if ZeroMQ reports an error (i.e., if data.length
is not a multiple of 5).
zmq_z85_decode()
Generates a new Curve key pair.
To avoid a memory allocation, preallocated buffers may optionally be supplied
for the two keys. Each of these must have a length of at least 41 bytes, enough
for a 40-character Z85-encoded key plus a terminating zero byte. If either
buffer is omitted/null
, a new one will be created.
core.exception.RangeError
if publicKeyBuf
or secretKeyBuf
are
not null
but have a length of less than 41 characters.ZmqException
if ZeroMQ reports an error.
zmq_curve_keypair()
auto server = Socket(SocketType.rep); auto serverKeys = curveKeyPair(); server.curveServer = true; server.curveSecretKeyZ85 = serverKeys.secretKey; server.bind("inproc://curveKeyPair_test"); auto client = Socket(SocketType.req); auto clientKeys = curveKeyPair(); client.curvePublicKeyZ85 = clientKeys.publicKey; client.curveSecretKeyZ85 = clientKeys.secretKey; client.curveServerKeyZ85 = serverKeys.publicKey; client.connect("inproc://curveKeyPair_test"); client.send("hello"); ubyte[5] buf; assert (server.receive(buf) == 5); assert (buf.asString() == "hello");
Utility function which interprets and validates a byte array as a UTF-8 string.
Most of zmqd's message API deals in ubyte[]
arrays, but very often,
the message data contains plain text. asString()
allows for easy and
safe interpretation of raw data as characters. It checks that data
is
a valid UTF-8 encoded string, and returns a char[]
array that refers to
the same memory region.
std.utf.UTFException
if data
is not a valid UTF-8 string.
std.string.representation
, which performs the opposite operation.auto s1 = Socket(SocketType.pair); s1.bind("inproc://zmqd_asString_example"); auto s2 = Socket(SocketType.pair); s2.connect("inproc://zmqd_asString_example"); auto msg = Frame(12); msg.data.asString()[] = "Hello World!"; s1.send(msg); ubyte[12] buf; s2.receive(buf); assert(buf.asString() == "Hello World!");
A class for exceptions thrown when any of the underlying ZeroMQ C functions report an error.
The exception provides a standard error message obtained with
zmq_strerror()
, as well as the errno
code set by the ZeroMQ
function which reported the error.
The errno
code that was set by the ZeroMQ function that reported
the error.
zmq_errno()
Exception thrown by receiveEvent()
on failure to interpret a
received message as an event description.