Xbus request-reply pattern

In Xbus 3.1 we introduced native support of the Request-Reply pattern.

In this post we will learn how to start using this native pattern, and see how it improves things compared to what we used to do in prior versions.

Introduction

Request-Reply is a very common pattern that allows a two-way conversation between a requestor and a replier:

In Xbus 3.0, we used to implement this pattern with one or two pipelines. In both cases, the requestor was the pair of emitter/consumer, in which the emitter sent a request, and the consumer received a reply.

There were several shortcommings, including the following:

  • The correlation between a request and its reply had to be done by the requestor and the replier by adding correlation data in the reply.

  • The requestor had to be both an emitter and a consumer.

  • The reply consumption was mandatory to complete the process.

  • Routing the reply back to the consumer associated with the right emitter could become pretty difficult.

  • Using 2 pipelines (one for the request, one for the reply) also implied:

    • The replier had to be both an emitter and a consumer

    • 2 processes were created for each roundtrip

Pipelines

Requestor nodes

In Xbus 3.1 you can use a new kind of node in the pipelines: “requestor”. A “requestor” node has one output: “request”, and one input: “response”.

For example:

- id: source
  type: requestor
  roles: [role]
  sourcematch:
    eventtypes:
    - question
  outputs: [request]
  inputs: [response]

Matching

The pipeline matching now has an extra criteria, the emitter output. This means the emitter must use the right output: - “default” if sending a one-way messagedepending - “request” if starting a request-reply conversation

Pipeline

A request-reply pipeline has a graph with a “requestor” node. It may have no consumer, and must link the “response” input of the “requestor” node to something.

A short example of a “hello-world” request-reply pipeline:

nodes:
- id: source
  type: requestor
  sourcematch:
    eventtypes: [evt1.request]
  outputs: [request]
  inputs: [response]
- id: server
  type: worker
  actors: [helloworld-actor]
  inputs: [default]
  outputs: [default]
edges:
- source.request -> server.request
- server.response -> source.response

In this graph, a “helloworld-actor” worker processes the request received on its “default” input and sends the result to its “default” output.

The fact that the result will go back to the requestor is totally ignored by the “server” node which can be any worker.

Emitter behavior

As we saw above, to act as a requestor the emitter has only one thing to do: send the request on its “request” output instead of “default”.

Once the request is emitted, the emitter can wait for the reply, and mark it as “read” when it’s done. This marking is independant from the process completion.

To be notified of the process completion, the emitter can subscribe to the ProcessState.EnvelopeStates() API. If the process is successful, the received EmitterEnvelopeState will contain the first fragment of the reply envelope.

Once the emitter has read the reply and taken it into consideration, it may call the ProcessState.AckResult() API so the bus knows that the reply was properly fetched.

With go-xbus

In go-xbus we provide higher-level functions to do all this.

Here is an example of sending a request and wait for the response:

func helloWorld(actor *xbus.Actor, message string) (string, error) {
    reply, err := actor.Request(
        context.Background(),
        envelope.MustNewEnvelope(
            envelope.MustNewTextMessage("demo.simplemessage", "world"),
        ),
    )

    if err != nil {
        return "", nil
    }

    var result string
    if err := envelope.ForEachMessage(reply, func(msg envelope.Message) error {
        s, err := envelope.ReadAllAsString(msg)
        result = s
        return err
    }); err != nil {
        return "", err
    }

    return result, nil
}

Conclusion

We have seen how to define a request-reply pipeline and how the emitter uses it.

This new feature opens new possibilities that will make Xbus a lot more easier to use in many situations.

What about you? Do you have any use-case where request-reply would apply? If so let us know, we would be happy to help you! You can also post to the public user mailing list if you prefer.