CHAPTER 22
Julian provides a small but quite comprehensive foundation class library, shipped out of box along with the engine core. In most cases you are encouraged to use these API, which is collectively called JuFC, or Julian Foundation Classes, but if you feel any insufficiency it's also your option to turn to platform (Java) classes thanks to the platform mapping. Nonetheless, the built-in solution is usually faster and tested as part of engine delivery.
Console provides the very basic and yet most frequently used API - to easily access standard I/O of current process. Simply call Console.println(), which will convert any variable into a string. This means if it's an Object, toString() will be called.
import System;
class Car {
private string make;
private string model;
public Car(string make, string model){
this.make = make;
this.model = model;
}
public string toString(){
return make + " " + model;
}
}
Console.println(new Car("Ford", "Mustang")); // prints "Ford Mustang"
The general-purpose containers are among the most used JuFC classes. And some of them have language-level support.
See this chapter for the details.
The File API provides access to the file system (FS). The node on FS is represented by an Item, which has two implementations: a File or a Directory, each allowing more specific operations.
The operations on Directory
is straightforward and usually synchronous. These include listing items under the directory, creating a directory and deleting one.
The more I/O-bound operations can be found on File
. The recommended way of starting I/O operations on File
is by calling getReadStream and getWriteStream, respectively. While both return a System.IO.FileStream, the resultant streams have different capabilities. getReadStream()
returns a stream that can only be read from, while getWriteStream()
returns one that can only be written to. Having obtained the stream, the user can focus on dealing with Stream's API, which typically involves a buffer-backed operation to read or write multiple bytes. Also note that the stream also implements AsyncStream, so depending on the circumstances you may want to turn the logic into non-blocking mode.
As a less comprehensive example, the following snippets show the basic usage of these APIs.
Basic file operation:
import System.IO;
File file = new File("/path/to/my/file");
string fname = file.getName();
string contents = file.readAllText();
string p = file.getPath();
bool e = file.exists();
Asynchronously read a file:
import System.IO;
import System.Concurrency;
File file = new File(path);
AsyncStream astream = (AsyncStream)file.getReadStream();
byte[] buffer = new byte[1024];
Promise p = astream.readAllAsync(buffer, read => {
// This callback will be invoked multiple times, each time with re-populated buffer until index = read - 1
}).then(read => {
// This will be called only once, with read = total bytes read
});
p.getResult(true);
Process API provides a general abstraction over OS processes. One can use these APIs to create, terminate, and communicate with subprocesses.
Operations such as starting, killing or waiting on process are easy to comprehend. A little involved is the I/O with subprocesses, primarily for two reasons. First, the I/O direction of subprocess is inverted of our normal perspective, which may appear to be very confusing for new programmers. Second, there are three ways to conduct I/O redirection, and they can be mixed to some extent.
To lower the threshold for understanding, let's start from the basic scenario: I want to create a subprocess so that I can communicate with it directly through standard file descriptors (0/1/2). In other words, I would like to read from its standard output/error, and write to its standard input.
import System;
import System.IO;
import System.Concurrency;
ProcessConfig pc = new ProcessConfig();
Process p = new Process("some-exe", new String[]{"arg1", "arg2"}, pc);
p.start();
Stream input = p.getReadStream();
Thread t = Thread.create(() => {
byte[] buffer = new byte[128];
int total = 0, read = 0;
while ((read = input.read(buffer, 0, 128)) != -1){
// do something with input
}
});
t.start();
t.join();
By calling getReadStream()
, we obtained the standard output from the subprocess, which, from the parent process's angle, is an input stream, on which we can only call read(), but never write(). In fact, the method is named getReadStream() precisely for this reason. In this example, a thread is started to keep reading the output off of the subprocess, until EOF is hit. The subprocess will send EOF when it's over.
The communication mechanism behind getReadStream() and getWriteStream() is OS pipe. Programming against streams returned by these two methods is a very common situation in real-world applications, so if you use default ProcessConfig this is exactly what you get.
Now consider another scenario: I don't want to programmatically communicate with subprocess' standard I/O, but I do want to configure a redirection so that when the subprocess reads from standard input, it's actually reading from the redirected source other than the default (which is usually the console window). To achieve this, we can set the redirection on ProcessConfig.
Stream fstream = new File(path).getReadStream();
ProcessConfig pc = new ProcessConfig();
pc.setInputStream(fstream);
Process p = new Process(
"some-exe",
new String[]{"arg1", "arg2"},
pc);
p.start();
int res = p.wait();
In this example, the program creates a subprocess with executable "some-exe". It provides a ProcessConfig
with input stream set to a file stream. The implication of this setting is that whenever some-exe tries to read from standard input, it would actually read from the file. The "Input" in setInputStream() is from the subprocess's perspective. The similar redirection can be done for standard output and standard error as well.
Once you set standard stream through ProcessConfig
, no corresponding OS pipe would be created and you forfeit the chance of interacting on the channel. For example, if you call setInputStream()
to set an input stream for the subprocess, calling getWriteStream()
will return null - you cannot write into it from the program now that a stream out there is already taking the job.
In a third scenario, you don't want programmatic communication, and you don't care about redirection. You simply want the subprocess to use the same standard I/O used by your current process. This can be easily achieved by calling ProcessConfig.setInheritedIO. Now, if your input is the console window, so will be the spawned process.
Julian provides both synchronous and asynchronous interfaces for socket programming. Nonetheless, due to a couple of reasons it's generally recommended to use asynchronous API whenever possible.
At the core of Julian engine lies a special thread pool exclusively for callbacks after IO operations. Threads in this pool are called IO-continuation thread (or just "IO thread" for short), which are allocated on demand but capped at a very small count. Among asynchronous programming models there are two extremes: JavaScript essentially has only one single thread in the pool, while in .NET the pool may be managing a large number of IO threads. While Julian takes a middle ground in between, the users should keep a mindset close to that of a JS programmer: never wait, on an IO thread, for the completion of other IO operations. Since the synchronous API is implemented as a wrapper of the asynchronous one, performing synchronous operations on an IO-continuation thread may equally likely result in deadlock. We will understand this property after a few examples.
First, let's start with basic socket programming in Julian. To perform any kind of operations with TCP/IP protocols you need to provide one or more host addresses. Julian supports both IP V4/V6 notation and DNS-based host names to be used as host addresses. To help creating addresses, one may use helper class NetAddress to resolve host names from a raw address notated in IP format, but this class is not directly required by other socket APIs, which almost always only take string as address input.
NetAddress addr = NetAddress.resolve("10.20.30.40");
string ip = addr.getAddress(); // same as provided: 10.20.30.40
addr = NetAddress.resolve("www.google.com"); // Resolve an IP from DNS name
ip = addr.getAddress(); // some IP V4/6 address based on local network configuration and geolocation of the host.
Once we have addresses, we can either create a server socket waiting for connection, or create a regular socket to connect to a remote destination.
Let's suppose there is a service running on a host with assigned host name "myservice.com" and port open at 13579. In this example we create a Socket to connect to that service and perform some reading.
import System.Network;
import System.IO;
import System.Concurrency;
Socket sock = new Socket();
string dest = NetAddress.resolve("myservice.com").getAddress();
sock.connect(dest, 13579);
string accStr = "";
var rs = sock.getReadStream();
byte[] buffer = new byte[30];
Promise prom = rs.readToEndAsync(buffer, (count, handle) => {
if (count > 0) {
string s = String.fromBytes(buffer, "ascii", 0, count);
accStr += s;
if (accStr.contains("[DONE]") {
sock.close(); // Closing socket will shut down streams from this socket and settle the promise
}
}
if (count < 0) {
sock.close();
}
});
prom.getResult(true);
The first thing to notice is that once the connection is established we obtain a stream to perform IO operations via the socket. There are two streams the a socket offers: an input stream from which the program may read the data sent by the other side, and an output stream to which the program may write data. In this example we only use the input stream. In a later section we will see a full-duplex example.
Since the service ends message transmission with a string "[DONE]
", the client uses that string as the signal to disconnect. This, however, is a quite naive approach in practice. Instead of trying to scan through what we read, a client (or server) should fully implement whatever the application protocol requires, and must not rely on the behavior of transport layer to convey any semantics. For example, a client must be able to read the incoming data and parse them into messages in compliance with the protocol, even if the messages are received in pieces across each callback invocation.
Another interesting aspect of this example is how it gracefully tears down the connection. The async operations are associated with a promise handler, which can be used to manipulate the state of the promise. To reduce unnecessary complexity that might arise from state checking on different components, the engine ties this handler with the connection itself. Should the connection fail or terminate, the promise will be settled automatically as well.
In the example below, a server socket waits for the next connection in an infinite loop. accept() is a blocking operation that does not return until the next connection comes in. This means we keep occupying one thread for the purpose of listening. Once the connection is established, we call readToEndAsync() to handle the incoming traffic on that socket. It's of great importance to realize that the callback passed on this method is called on some arbitrary IO thread, a fact from which we can reason about the workflow in a concurrent context.
import System.Network;
import System.IO;
import System.Concurrency;
ServerSocket ss = new ServerSocket();
string host = NetAddress.getLocalAddress().getName();
ss.bind(host, 24680);
while(true){
Socket sock = ss.accept();
AsyncStream stream = sock.getReadStream();
byte[] buffer = new byte[128];
int total = 0;
string accStr = "";
string threadName = "";
Promise prom = stream.readToEndAsync(buffer, (count, handle) => {
// Everything below happens on IO thread
if (count > 0) {
string s = String.fromBytes(buffer, "ascii", 0, count);
accStr += s;
total += count;
if (total >= 20) {
sock.close(); // Closing socket will shut down streams from this socket and settle the prmises
}
}
if (count < 0) {
handle.resolve(total);
}
// Everything above happens on IO thread
});
}
Initially, everything happens on the original thread. As soon as readToEndAsync()
is called, a callback is registered in an internal queue to be picked up by some IO thread, and the method returns immediately. When the data flows into the network device (NIC), the OS performs copying from the device buffer to kernel buffer, then notifies the engine of data becoming available. Then a Julian's IO thread picks up this work, and invokes the registered callback against the data that is copied from OS buffer to user's buffer.
Since System.Network.SocketStream also implements System.IO.Stream, it's natural to assume that a synchronous API is also viable for networking. While this is true, the users are in general not encouraged to do so.
The sync API is actually built as a wrapper of async API. Let's first see an example of server socket used with sync API.
import System.Network;
import System.IO;
import System.Concurrency;
ServerSocket ss = new ServerSocket();
string host = NetAddress.getLocalAddress().getName();
ss.bind(host, 33333);
Socket sock = ss.accept();
Stream stream = sock.getReadStream();
byte[] buffer = new byte[5];
int read = 0;
string accStr = "";
while((read = stream.read(buffer, 0, 5)) != -1) {
string s = String.fromBytes(buffer, "ascii", 0, read);
accStr += s;
}
sock.close();
The logic should be straightforward to understand, since every line is now executed in the text order. What is not explicitly pointed out, though, is the nature of asynchronicity under a synchronous facade. In fact, when we call stream.read(buffer, 0, 5)
, we are queuing an async call that simply copies back the input to the given buffer, and waiting for its completion. The waiting completely blocks the current thread.
By now the reason should become very clear why we don't recommend sync API usage in general. There exists a huge risk of deadlocking. In the above example we won't see such issue since the waiting occurs on the original thread, which is a normal working thread (technically - it's the main thread so there is some specialty about it, but that's irrelevant to our discussion here). Now suppose we are calling this in an IO thread. What would happen is that we will queue up a callback asynchronously and wait for its completion. The problem, however, is that we cannot control to which IO queue this new callback should be joining. If it somehow ends on the very same one the current logic is running from, then we will be waiting for another piece of work which will not complete before the current one does. And that's a classic situation of deadlocking.
It might be tempting to perform some synchronous operations in full-duplex communication. For example, amateur network programmers tend to make a write based on what is read, then continue reading something else. Try avoid such practices in Julian. Instead, always use a callback to handle both reading and writing.
import System.Network;
import System.IO;
import System.Concurrency;
class Message {
... ...
int fillResponse(byte[] buffer){
... // Fill a response message into buffer, returning the total bytes of the message
}
}
class MessageParser {
Message getMessage(string input){
... // Tracks all input and build up message incrementally
}
}
Socket sock = new Socket();
string host = NetAddress.getLocalAddress().getName();
ss.bind(host, 33333);
MessageParser parser = new MessageParser();
var rs = sock.getReadStream();
var ws = sock.getWriteStream();
byte[] rbuffer = new byte[30];
byte[] wbuffer = new byte[30];
Promise prom = rs.readToEndAsync(rbuffer, (count, handle) => {
if (count > 0) {
string s = String.fromBytes(rbuffer, "ascii", 0, count);
Message msg = parser.getMessage(s);
switch (msg.getKind()) {
case MessageKind.COMM:
int len = 0;
if ((len = msg.fillResponse(wbuffer)) > 0) {
ws.writeAsync(wbuffer, 0, len);
}
break;
case MessageKind.DISCONN:
sock.close();
break;
}
}
});
prom.getResult(true);
If you want to follow up with the read/write operation, feel free to chain the promise with other callbacks. See Asychronous Programming for more details on how to use Promise API.