CHAPTER 20
Many computer programs inevitably involve frequent and massive operations on peripheral, or I/O, devices. In comparison to CPU instructions, I/O operations are outrageously time costly. Entailing mechanical maneuver by disk head and spindle, a simple addressing operation on hard disk drive can take a few milliseconds, during which a modern CPU would have executed several millions of instructions. The time consumption becomes even longer and unpredictable when it comes to network operation. From the perspective of OS, the waiting on CPU side is manifested as a blocking thread, which holds precious system resources while idling.
The remedy to this quintessential problem is asynchronous programming, or async programming for short, which is a non-blocking approach to handle I/O operations. In async programming, the caller to a long running operation doesn't wait there, and simply proceeds to do some other works. When the operation is done, it will invoke a callback to handle the result data. This callback is usually invoked on a new thread that can be running alongside the original one which initiated I/O operation.
Most modern programming languages support async API, some of them even at language level (C#, Python, JavaScript). As of 0.1.34, Julian offers a system API for async programming, but doesn't yet provide any language-level support for it. At the core of this API is class Promise. To start a promise, call Promise.start() with a Function object, usually a lambda.
Promise promise = Promise.start(()=> {
return 5;
});
int result = promise.getResult(true);
The lambda or Function passed to start()
represents the potentially long-running task, although what we used in example is going to finish really fast. In return, the caller gets a Promise instance, which can be demanded of the result any time. However, if by the time we call getResult()
the task is still running, getResult()
will be blocking. But eventually the task will either finish or fault on exceptions (unless there is a bug in the provided function causing infinite behavior). Indeed, what a Promise promises to its users is exactly its eventual completion. In the terminology of Promise API, completing without error is called resolved, while faulting in error is called rejected, and both are considered as settled.
getResult()
takes an argument that specifies the behavior for faulted promise. If the promise faults, the original exception will be preserved, and the caller may opt to either get it as the return value or rethrow.
Promise promise = Promise.start(()=> {
throw new Exception();
});
var ex = promise.getResult(false); // Get exception
try {
promise.getResult(true);
} catch (Exception ex) {
// Catch exception
}
A promise can be chained through then()
, which take up to two callbacks. The first one is called upon resolution, the second one rejection. then()
returns another promise that would eventually settle after either of the two callback is finished. This promise can continue the chain via then()
again. The programming pattern enabled by this chain has the benefit of projecting the time-sequenced operations into an exactly same source order, a remarkable leap from the heavily nested callback hell.
Promise p0 = Promise.start(()=> {
return 5;
}).then(d => {
return d + 1;
}).then(d => {
return d * 2;
});
int res = p0.getResult(true); // 12
If both callbacks are provided to then()
, only one will be called. To only provide error callback, use error()
. While the success callback doesn't change the state of promise, the error callback has the effect of resetting it to resolved. So use caution when dealing with error callbacks.
Promise p0 = Promise.start(()=> {
throw new Exception("failed");
}).then(d => {
throw new Exception("skipped!"); // skipped since the promise is rejected
}).then(d => {
throw new Exception("skipped!"); // still skipped since the promise continues in rejected state
}).error(ex => {
return ex.getMessage().toUpper(); // called because we are in rejected state. However, since we return normally, this resets the state of current promise to resolved
}).error(ex => {
throw new Exception("skipped!"); // skipped since the promise is resolved
});
string msg = p0.getResult(true); // FAILED
Note that, if the error callback throws instead of returning normally, it will retain the promise's state as rejected.
The callback functions provided to then()
and error()
are invoked dynamically. The first thing this means is if you don't return anything, it will substitute a null as return value for you. The second matter to note is that the function actually takes two arguments. The first one is the resolved data or raised exception, depending on which callback function is invoked. The second one is a promise handle, which allows the programmer to manipulate the state of promise explicitly.
var p0 = Promise.start((res, handle)=> {
return 10;
}).then((res, handle)=> {
handle.reject("failed!"); // reject the promise explicitly.
}).error(ex=> {
return ex.getMessage(); // error callback is invoked.
});
If rejecting by a string, a System.Concurrency.PromiseRejectedException will be raised. Rejecting with an exception will set that exception as the cause. Once the promise is resolved this way, it cannot be explicitly set to rejected through the same handle. However, a following unhandled exception within the same callback may overwrite the state.
Sometimes we need a synthetic promise for which we have absolutely control over its state. Instead of letting a function to determine its fate, the user would like to control its result on the side. System.Concurrency.DeferredPromise is what we need in this scenario.
In this example, let's create a deferred promise, and let another thread try to get its result. However, since this promise doesn't have a function tied to it, nothing got run and it will never attain settled state.
DeferredPromise dp = new DeferredPromise();
// In some other thread...
var r1 = dp.getResult(true);
To help unblock the waiting thread, at one point we can resolve or reject this promise from a different thread.
PromiseHandle h = dp.getHandle();
h.resolve(7); // this will immediately unblock the other thread waiting in getResult()
So far we have walked through some major features of Promise API, but you may wonder that all the code I used inside callback functions are rather trivial and more importantly, not related to I/O at all. Julian's Promise API is a general programming framework that doesn't tie with I/O operations. You can basically do whatever you want in those callbacks. But to put it into real use, you may start looking at those I/O classes which expose asynchronous interfaces, serving as a starting point for your promise chain.
In this example, we open an AsyncStream from the file instance, and call its readAsync()
method, which returns us a Promise
. From there, we can chain callbacks. The read operation is implemented with asynchronous I/O provided by OS which will free the thread resources between invocation and completion.
import System.IO;
import System.Concurrency;
File file = new File(path);
AsyncStream astream = (AsyncStream)file.getReadStream();
byte[] buffer = new byte[128];
int i, j, k;
Promise p = astream.readAsync(buffer, 0, (read, handle)=>{
i = read;
}).then(read => {
j = read;
return 100;
});
k = p.getResult(true);
In another example, let's see the proper way of programming with TCP sockets in Julian, using asynchronous API:
import System.IO;
import System.Network;
void process(Socket sock){
AsyncStream rstream = sock.getReadStream();
MessageBuilder builder = new MessageBuilder(); // MessageBuilder not implemented here
byte[] buffer = new byte[_bsize];
rstream.readToEndAsync(buffer, (count, handle) => {
if (count > 0) {
builder.add(buffer, count);
string msg = builder.buildMessage(); // This will returns something only if all the constituent bytes are received
if(msg != null){
... // Process the message
sock.resolve(msg);
}
} else if (count < 0) {
handle.reject("Unexpected shutdown.");
}
}).then((msg, handler) => {
byte[] resp = getResponse(msg); // getResponse() not implemented here
AsyncStream wstream = sock.getWriteStream();
return wstream.writeAsync(resp, 0, resp.length);
}).then((count, handler) => {
sock.close();
});
}
ServerSocket ss = new ServerSocket();
ss.bind("127.0.0.1", 0); // bind to loopback at arbitrary port
while(true) {
Socket sock = ss.accept();
process(sock);
}
In this server example, the main thread performs an infinite loop of taking new connections. Once a connection is established, it sends out the socket to process()
, which calls readToEndAsync
to process the incoming messages. The application layer, which is omitted, is able to build a message out of the incoming byte stream, and settles the initial promise when the message is received completed. It then chains to another promise where it sends a response back, and in a third promise close the socket, concluding the exchange of messages. (And yes, you may implement MessageBuilder
and getResponse()
to make it an HTTP server)
The highlight about this piece of code is the usage of asynchronous API. Most importantly, process()
is not a blocking method. It will queue the subsequent reading operation up into the async processing queue. When the data is ready in the system buffer, the callback registered with readToEndAsync
will be invoked on one of the so-called IO continuation threads. Julian only maintains a handful of such threads, so make sure to not perform blocking operations on these thread. Whenever an IO operation is in order, perform an asynchronous method and return the promise immediately, as shown by the call to writeAsync
in the second promise.
At the end of this chapter I would like to help disperse some confusion about async programming - in particular, what is the difference between it and traditional concurrent programming. The short version of answer is that they are intimately related but also distinct sufficiently from each other.
By its definition, async programming is not necessarily concurrent, although in Julian this is usually true. The concurrency occurs because Julian's Promise places the callback functions in a separate thread. If we don't consider this factor, the process defined by a promise chain is actually pretty linear and totally predictable in terms of its timed order.
Concurrent programming mainly concerns parallelized computation, while async programming is primarily focused on efficient time-use around I/O operations. The implication of parallelization is to control the concurrent access to memory. Julian's Thread and Lock API largely deals with this intricacy. Async programming is merely to spread the logic over several sections, but they will always be called in serialized order. However, since the callback is invoked on an arbitrary thread, it's likely you would still need synchronization over sensitive memory area.
In summary, consider using Thread API in most cases, but when dealing with I/O try going asynchronously starting from system API's async endpoint. Once a promise is established, the following code can be contagiously affected, meaning you basically have to use callbacks all the way till the end. In most cases this should not be an issue, and in the worst case you can always demand the result from promise to end a chain.