On running parallel processes (2015 February 19)

At present (Feb 2015), Vicare supports concurrency, which is different from parallelism. This section is about what’s currently in the master branch, not the latest Vicare release.

I have recently integrated the library (vicare language-extensions coroutines) in the boot image: the code is small and does something that I find useful. Coroutines implemented on top of continuations provide concurrency.

Vicare does not implement multithreading; meaning native multithreading, as in using the pthreads library. It should be possible to implement it, but it is not trivial to interface threads with the garbage collector; some significant work must be put in. I would love to have native multithreading in Vicare. Multithreading support would mean supporting parallelism.

The only way to have some actual parallelism is to fork child processes; it is not for everything, because forking a vicare process costs resources.

Some example libraries

Vicare now comes with some libraries that run “well known” external programs:

(vicare posix sendmail)
(vicare posix mailx)
(vicare posix curl)
(vicare posix wget)
(vicare posix find)

they are really minimal: they just show how to run an external program by calling one of the fork variants then execvp from (vicare posix); their source code can be taken as starting point for more flexible interfaces. The following demo program sends an email message with sendmail and prints its output:

#!r6rs
(import (vicare)
  (only (vicare posix sendmail)
        sendmail))

(define (send-it message)
  (define rv
    (sendmail (string->ascii message)))
  (display rv)
  (newline)
  (flush-output-port (current-output-port)))

(send-it "From: <marco@localhost>
To: <marco@localhost>
Subject: demo 01
MIME-Version: 1.0
Content-Type: text/plain

This is demo 01.
--\x20;
Marco Maggi
")

The api of these libraries is “blocking”: we run a process and wait for its termination with waitpid from (vicare posix). These library do not run a parallel process: they run a synchronous process.

Forking the current process

To run a parallel process we first have to fork the current process into parent and child processes; if we actually want another vicare process we are fine with the child, otherwise we have to replace the child process with a process running an external program.

On posix platforms, right after a fork call: the parent and child will share the standard input/output file descriptors 0, 1, 2. A very common task when forking a process is to replace the standard file descriptors in the child process with other file descriptors that are connected to pipes, so that the parent and the child can communicate through them.

With the raw fork function from (vicare posix) we can do this with:

(import (vicare)
  (prefix (vicare posix) px.))

(let-values
    (((child-stdin          parent->child-stdin) (px.pipe))
     ((parent<-child-stdout child-stdout)        (px.pipe))
     ((parent<-child-stderr child-stderr)        (px.pipe)))
  (px.fork
    (lambda (child-pid)
      ;;Here we are in the parent.
      (px.close child-stdin)
      (px.close child-stdout)
      (px.close child-stderr)
      (with-unwind-protection
          (lambda (why)
            (px.close parent->child-stdin)
            (px.close parent<-child-stdout)
            (px.close parent<-child-stderr))
        (lambda ()
          (do-that-thing))))
    (lambda ()
      ;;Here we are in the child.
      (guard (E (else
                 (print-condition E)
                 (exit 1)))
        (px.close parent->child-stdin)
        (px.close parent<-child-stdout)
        (px.close parent<-child-stderr)
        (begin        ;setup stdin
          (px.close 0)
          (px.dup2  child-stdin 0)
          (px.close child-stdin))
        (begin        ;setup stdout
          (px.close 1)
          (px.dup2  child-stdout 1)
          (px.close child-stdout))
        (begin        ;setup stderr
          (px.close 2)
          (px.dup2  child-stderr 2)
          (px.close child-stderr)))
        (do-this-thing))))

with this code the parent process can:

and the child process can:

The library (vicare posix) has some functions to automate this process and wrap the raw file descriptors with proper Scheme input/output port objects. We need to remember that:

Let’s see some usage examples for the fork variants:

fork-with-fds

The following example shows how to fork a process, setup the file descriptors and wait for child to terminate. The function fork-with-fds takes care of setting up the file descriptors in both the parent and child.

#!vicare
(import (vicare)
  (prefix (vicare posix) px.))

(define (parent-proc child-pid parent->child-stdin parent<-child-stdout parent<-child-stderr)
  (unwind-protect
      (let ((bufout (make-bytevector 4 0))
            (buferr (make-bytevector 4 0)))
        (px.write parent->child-stdin '#ve(ascii "ciao\n"))
        (let ((status (px.waitpid child-pid 0)))
          (if (and (px.WIFEXITED status)
                   (zero? (px.WEXITSTATUS status)))
              (begin
                (px.read parent<-child-stdout bufout)
                (px.read parent<-child-stderr buferr)
                (values bufout buferr))
            (error #f
              "child process exited abnormally"
              status))))
    (px.close parent->child-stdin)
    (px.close parent<-child-stdout)
    (px.close parent<-child-stderr)))

(define (child-thunk)
  (guard (E (else
             (print-condition E)
             (exit 1)))
    (let ((buf (make-bytevector 5 0)))
      (px.read 0 buf)
      (assert (bytevector=? buf '#ve(ascii "ciao\n")))
      (px.write 1 '#ve(ascii "out\n"))
      (px.write 2 '#ve(ascii "err\n"))
      (exit 0))))

(px.fork-with-fds parent-proc child-thunk)
⇒ #ve(ascii "out\n") #ve(ascii "err\n")
fork-with-binary-ports

The following example shows how to fork a process, setup the file descriptors and wrap them into binary ports, wait for child to terminate. The function fork-with-binary-ports takes care of setting up the file descriptors in both the parent and child and wrapping them into binary ports.

#!vicare
(import (vicare)
  (prefix (vicare posix) px.))

(define (parent-proc child-pid child-stdin child-stdout child-stderr)
  (unwind-protect
      (let ((bufout (make-bytevector 4 0))
            (buferr (make-bytevector 4 0)))
        (put-bytevector child-stdin '#ve(ascii "ciao\n"))
        (flush-output-port child-stdin)
        (let ((status (px.waitpid child-pid 0)))
          (if (and (px.WIFEXITED status)
                   (zero? (px.WEXITSTATUS status)))
              (begin
                (get-bytevector-n! child-stdout bufout 0 4)
                (get-bytevector-n! child-stderr buferr 0 4)
                (values bufout buferr))
            (error #f
              "child process exited abnormally"
              status))))
    (close-output-port child-stdin)
    (close-input-port  child-stdout)
    (close-input-port  child-stderr)))

(define (child-thunk)
  (guard (E (else
             (print-condition E)
             (exit 1)))
    (define-constant stdin-port
      (standard-input-port))
    (define-constant stdout-port
      (standard-output-port))
    (define-constant stderr-port
      (standard-error-port))
    (let ((buf (make-bytevector 5 0)))
      (get-bytevector-n! stdin-port buf 0 5)
      (assert (bytevector=? buf '#ve(ascii "ciao\n")))
      (put-bytevector stdout-port '#ve(ascii "out\n"))
      (put-bytevector stderr-port '#ve(ascii "err\n"))
      (flush-output-port stdout-port)
      (flush-output-port stderr-port)
      (exit 0))))

(px.fork-with-binary-ports parent-proc child-thunk)
⇒ #ve(ascii "out\n") #ve(ascii "err\n")
fork-with-textual-ports

The following example shows how to fork a process, setup the file descriptors and wrap them into textual ports, wait for child to terminate. The function fork-with-textual-ports takes care of setting up the file descriptors in both the parent and child and wrapping them into textual ports.

#!vicare
(import (vicare)
  (prefix (vicare posix) px.))

(define (parent-proc child-pid child-stdin child-stdout child-stderr)
  (unwind-protect
      (let ((bufout (make-string 4 #\x00))
            (buferr (make-string 4 #\x00)))
        (put-string child-stdin "ciao\n")
        (flush-output-port child-stdin)
        (let ((status (px.waitpid child-pid 0)))
          (if (and (px.WIFEXITED status)
                   (zero? (px.WEXITSTATUS status)))
              (begin
                (get-string-n! child-stdout bufout 0 4)
                (get-string-n! child-stderr buferr 0 4)
                (values bufout buferr))
            (error #f
              "child process exited abnormally"
              status))))
    (close-output-port child-stdin)
    (close-input-port  child-stdout)
    (close-input-port  child-stderr)))

(define (child-thunk)
  (guard (E (else
             (print-condition E)
             (exit 1)))
    (let ((buf (make-string 5 #\x00)))
      (get-string-n! (console-input-port) buf 0 5)
      (assert (string=? buf "ciao\n"))
      (put-string (console-output-port) "out\n")
      (put-string (console-error-port) "err\n")
      (flush-output-port (console-output-port))
      (flush-output-port (console-error-port))
      (exit 0))))

(px.fork-with-textual-ports parent-proc child-thunk)
⇒ "out\n" "err\n"

Managing fork and port sharing

On posix platforms, right after a fork call: the parent and child will share all the input/output file descriptors that are open at the time of the fork call. This is problematic.

Let’s see what happens with the following program in which we open a file, fork the process then close the file in both the parent and child:

#!vicare
(import (vicare)
  (prefix (vicare posix) px.))

(define (parent-proc child-pid)
  (px.waitpid child-pid 0)
  (close-output-port port)
  (exit 0))

(define (child-thunk)
  (close-output-port port)
  (exit 0))

(define port
  (parametrise ((output-file-buffer-size 8))
    (open-file-output-port "demo.txt"
			   (file-options no-fail)
			   (buffer-mode block)
			   (native-transcoder))))

(put-string port "0123456789ABCDEF")

(px.fork parent-proc child-thunk)

the output port has a buffer of 8 characters and the string written to it is 16 characters long; the call to put-string immediately flushes the first 8 characters and leaves the rest in the output buffer. The fork call causes the output buffer and its contents to be duplicated, so, when we close the port, the two buffers are flushed to the same file. The resulting file contents are:

0123456789ABCDEF89ABCDEF

which is not what we want.

posix has a partial solution for this problem: to configure file descriptors as close–on–exec; Vicare extends this to propose a partial solution: to configure ports as close–on–exec. Notice that it is close–on–exec, not close–on–fork. So we can try to address this problem by:

  1. Registering ports to be in close–on–exec mode.
  2. Before calling fork: flush the ports in close–on–exec mode.
  3. In the child process: close the ports in close–on–exec mode, to free allocated file descriptors.

Here is the example above modified to do so:

#!vicare
(import (vicare)
  (prefix (vicare posix) px.))

(define (parent-proc child-pid)
  (px.waitpid child-pid 0)
  (close-output-port port)
  (exit 0))

(define (child-thunk)
  (px.close-ports-in-close-on-exec-mode)
  (exit 0))

(define port
  (parametrise ((output-file-buffer-size 8))
    (open-file-output-port "demo.txt"
                           (file-options no-fail)
                           (buffer-mode block)
                           (native-transcoder))))

(px.port-set-close-on-exec-mode! port)

(put-string port "0123456789ABCDEF")

(px.flush-ports-in-close-on-exec-mode)
(px.fork parent-proc child-thunk)

Vicare also attempts to address the problem of “error while flushing a port” (see On errors and close–on–exec ports (2015 February 14)).

Asynchronously waiting for child process termination

We are almost there: all is left to do is to understand how not to block while waiting for the child process termination. (vicare posix) provides the waitpid function; the development code in the master branch has been updated to allow proper interfacing with the underlying waitpid() posix function when the option WNOHANG is used.

What we need to do is call waitpid as follows:

(waitpid child-pid WNOHANG)

and check the return value, which is special when WNOHANG is used: if the return value is #f, the child process is still running; if the return value is non–#f, then it is a fixnum representing the child process termination status.

Here is an example of waiting asynchronously:

#!vicare
(import (vicare)
  (prefix (vicare posix) px.)
  (prefix (vicare platform constants) const.))

(define (parent-proc child-pid)
  (define (print obj)
    (display obj)
    (flush-output-port))
  (let loop ((status (px.waitpid child-pid const.WNOHANG)))
    (cond ((not status)
           ;;Child still running.
           (print "not done yet\n")
           (px.nanosleep 0 #e1e8)
           (loop (px.waitpid child-pid const.WNOHANG)))
          ((px.WIFEXITED status)
           ;;Child exited.
           (print "done\n"))
          (else
           (error #f "child process exited abnormally" status)))))

(define (child-thunk)
  (guard (E (else
             (print-condition E)
             (exit 1)))
    (px.nanosleep 1 0)
    (exit 0)))

(px.fork parent-proc child-thunk)

Running a parallel process

At last! Let’s put it all together. While we are at it: we fork the process inside a coroutine, so that we can (yield) control while waiting for the child process to terminate. The following demo program starts a vicare child process and reads from it some Scheme objects through a textual port.

#!vicare
(import (vicare)
  (prefix (vicare posix) px.)
  (prefix (vicare platform constants) const.))

(define (parent-proc child-pid child-stdin child-stdout child-stderr)
  (with-unwind-protection
      (lambda (why)
        (close-output-port child-stdin)
        (close-input-port  child-stdout)
        (close-input-port  child-stderr))
    (lambda ()
      (let loop ((status (px.waitpid child-pid const.WNOHANG)))
        (cond ((not status)
               ;;Child still running.
               (yield)
               (loop (px.waitpid child-pid const.WNOHANG)))
              ((px.WIFEXITED status)
               ;;Child exited.
               (values status (read-all child-stdout) (get-string-all child-stderr)))
              (else
               (error #f "child process exited abnormally" status)))))))

(define (child-thunk)
  (guard (E (else
             (print-condition E)
             (exit 1)))
    (px.close-ports-in-close-on-exec-mode)
    (write '(1 2 3) (console-output-port))
    (write '(4 5 6) (console-output-port))
    (flush-output-port (console-output-port))
    (put-string (console-error-port) "done\n")
    (flush-output-port (console-error-port))
    (exit 0)))

(define (read-all port)
  (let ((obj (read port)))
    (if (eof-object? obj)
        '()
      (cons obj (read-all port)))))

(coroutine
    (lambda ()
      (px.flush-ports-in-close-on-exec-mode)
      (receive (status out err)
          (px.fork-with-textual-ports parent-proc child-thunk)
        (printf "out: ~s\n" out)
        (printf "err: ~a\n" err)
        (flush-output-port (current-output-port)))))

(finish-coroutines)

Conclusions

There are multiple things to do; there are decisions to be taken.

The existence of these possibilities is the reason why, at present, there is no encapsulating api that implements starting a parallel process with a one–liner. Maybe, in some future, a pattern will emerge.