ERights Home elang / concurrency 
Back to: Introducing Remote Objects On to: Epimenides in E

Concurrency Races
by Mark Miller
with contributions by Terry Stanley


Given a list of promises, race returns a promise for the first of these to be resolved. The race is won by whoever first resolves the returned promise, since promises are resolved only once.

? pragma.syntax("0.9")
# E sample
def race(promises) :rcvr {
    def [result, resolver] := Ref.promise()
    for prom in promises {
        when (prom) -> {
            resolver.resolveRace(prom)
        } catch problem {
            resolver.smash(problem)
        }
    }
    return result
}

In this test, c is the winner of racing a and b:

? def [a,ar] := Ref.promise()
# value: [<Promise>, <Resolver>]

? def [b,br] := Ref.promise()
# value: [<Promise>, <Resolver>]

? def c := race([a, b])
# value: <Promise>

? c
# value: <Promise>

? ar.resolve(3)
? c
# value: 3

? a
# value: 3

? br.resolve(4)
? b
# value: 4

? c
# value: 3

(*** "once" no longer belongs in this chapter.) A "once" of a function is a use-once version of that function. Ie, "once(func)" returns a object that will forward no more than one "run" message to "func". The two argument form "once(verb, target)" is a generalization which will forward no more than one "verb" message to the target.

For the Miranda Methods, the result of the once (the forwarder below) must make good decisions about whether to override them and possibly forward them, or not override them and let them default to methods on the forwarder. For non-Miranda methods other than the suppressed method, they are simply forwarded.

# E sample
def once {
    to run(verb, target) :any {
        var used := false
        def forwarder {
            # use (don't forward) the Miranda __order/3, though it shouldn't matter

            # use the Miranda __optSealedDispatch/1 to protect the target

            # forward __getAllegedType/0, though one could argue that, once
            # used up, the type shouldn't include the supressed verb
            to __getAllegedType() :any {
                return target.__getAllegedType()
            }

            # forward __respondsTo, but suppress verb.  One could argue that __respondsTo
            # and __getAllegedType() should be consistent with each other.
            to __respondsTo(verb2, arity) :boolean {
                return verb != verb2 && target.__respondsTo(verb2, arity)
            }

            # forward __printOn/1
            to __printOn(out) :void { target.__printOn(out) }

            # forward __reactToLostClient
            to __reactToLostClient(problem) :void {
                target.__reactToLostClient(problem)
            }

            # use the Miranda __whenMoreResolved/1 to protect the target

            # use the Miranda __whenBroken/1 to protect the target

            # use the Miranda __yourself/0 to protect the target

            # handle all other messages
            match [verb2, args] {
                if (verb == verb2) {
                    if (used) {
                        throw("used up")
                    }
                    used := true
                }
                E.call(target, verb2, args)
            }
        }
        return forwarder
    }

    # default to suppressing "run"
    to run(target) :any { return once("run", target) }
}

(This utility is available as <import:org.erights.e.facet.once>.)

printOnce is a println that gets used up:

? def printOnce := once(println)
# value: <println>

? printOnce(3)
# stdout: 3
#

? printOnce(4)
# problem: used up

I've occasionally been asked "E can fork off multiple activities using '<-', but how can it do a join?" In other words, how can an activity be delayed until several others have completed? The best answer is use a joining abstraction like asynchAnd below, and do a when-catch on the result.

Given a list of promises for booleans, asynchAnd returns a promise for their conjunction. When all resolve to true, the answer resolves to true. When any resolve to false or broken, the answer resolves to false or likewise broken (without waiting for further answers).

# E sample
def asynchAnd(bools :List[vow[boolean]]) :vow[boolean] {
    def [result, resolver] := Ref.promise()
    var countDown := bools.size()
    for bool in bools {
        when (bool) -> {
            if (bool) {
                if ((countDown -= 1) <= 0) {
                    resolver.resolve(true)
                } else {
                    resolver.gettingCloser()
                }
            } else {
                resolver.resolveRace(false)
            }
        } catch problem {
            resolver.smash(problem)
        }
    }
    return result
}

(This utility is available as <import:org.erights.e.examples.concurrency.asyncAnd>.)

z is the asynchronous conjunction of x and y:

? def [x,xr] := Ref.promise()
# value: [<Promise>, <Resolver>]

? def [y,yr] := Ref.promise()
# value: [<Promise>, <Resolver>]

? def z := asynchAnd([x,y])
# value: <Promise>

? z
# value: <Promise>

? xr.resolve(true)
? x
# value: true

? z
# value: <Promise>

? yr.resolve(false)
? y
# value: false

? interp.waitAtTop(z)

? z
# value: false

"timeBomb(5000)" returns a promise that will become broken in 5000 milliseconds, ie, 5 seconds.

? def timeBomb(millis) :any {
>     return timer.whenPast(timer.now() + millis, fn{
>         Ref.broken("time's up")
>     })
> }
# value: <timeBomb>

Put on your safety goggles:

? def bomb := timeBomb(5000)
# value: <Promise>

? bomb
# value: <Promise>

Wait 6 seconds or so

? interp.waitAtTop(bomb) # automated wait until bomb is resolved

? bomb
# value: <ref broken by problem: time's up>

By combining timeBomb with race or a joining construct (like asynchAnd), we have timeouts! For example, Alice can ask Bob for an integer, but be sure to have a resolved answer within around 5 seconds even if Bob is wedged:

 def answer := race(bob <- gimmeInteger(), timeBomb(5000))
 
Unless stated otherwise, all text on this page which is either unattributed or by Mark S. Miller is hereby placed in the public domain.
ERights Home elang / concurrency 
Back to: Introducing Remote Objects On to: Epimenides in E
Download    FAQ    API    Mail Archive    Donate

report bug (including invalid html)

Golden Key Campaign Blue Ribbon Campaign