Le modèle de request-reply de Xbus

Dans la version 3.1 de Xbus, nous avons introduit le support natif du modèle request-reply.

Ce tutoriel vous présente comment utiliser ce modèle et vous permet de découvrir les améliorations apportées.

Introduction

Le modèle de request-reply est un modèle très courant qui établit une conversation bidirectionnelle entre l’émetteur d’une demande et celui qui fournit une réponse :

Dans Xbus 3.0, nous avions mis en oeuvre ce modèle avec un ou deux pipeline(s). Dans les deux cas, l’application à l’origine de la demande endossait à la fois le rôle d’émetteur et de consommateur. Elle envoyait sa demande en tant qu’application émettrice et recevait la réponse en tant qu’application consommatrice.

Il y avait plusieurs lacunes, notamment les suivantes :

  • La corrélation entre une demande et sa réponse devait être effectuée par le demandeur et le fournisseur de la réponse en ajoutant des données de corrélation dans la réponse.

  • L’application à l’origine de la demande devait être à la fois émettrice et consommatrice.

  • La réception de la réponse était obligatoire pour terminer l’opération.

  • Il pouvait s’avérer difficile d’acheminer la réponse au consommateur associé au bon émetteur.

  • L’utilisation de deux pipelines (un pour la demande et un pour la réponse) impliquait aussi le fait que :

    • Le fournisseur de la réponse devait être à la fois un émetteur et un consommateur

    • Deux opérations étaient créées pour chaque aller-retour

Pipelines

Noeuds de type “requestor”

Dans Xbus 3.1, il est possible d’utiliser un nouveau type de noeud dans les pipelines : “requestor”. Un noeud de type “requestor” dispose d’une sortie : “request” et d’une entrée : “response”.

Par exemple:

- id: source
  type: requestor
  roles: [role]
  sourcematch:
    eventtypes:
    - question
  outputs: [request]
  inputs: [response]

Correspondance

La correspondance de pipelines a désormais un critère supplémentaire : la sortie de l’émetteur. Cela signifie que l’émetteur doit utiliser la bonne sortie : - “default” pour les messages à sens-unique - “request” pour les conversations bidirectionnelles de type request-reply

Pipeline

Un pipeline request-reply possède un graph avec un noeud “requestor”. Il peut ne pas y avoir de consommateur. L’entrée “response” du noeud “requestor” doit impérativement être reliée à quelque chose.

Un petit exemple d’un pipeline request-reply “hello-world” :

nodes:
- id: source
  type: requestor
  sourcematch:
    eventtypes: [evt1.request]
  outputs: [request]
  inputs: [response]
- id: server
  type: worker
  actors: [helloworld-actor]
  inputs: [default]
  outputs: [default]
edges:
- source.request -> server.request
- server.response -> source.response

Dans ce graph, un worker “helloworld-actor” traite la demande reçue sur son entrée “default” et envoie le résultat à sa sortie “default”.

Le fait que le résultat revienne à l’émetteur de la demande est totalement ignoré par le nœud “server” qui peut être n’importe quel worker.

Le comportement de l’émetteur

Comme vu ci-dessus, l’émetteur a juste à envoyer une demande sur sa sortie “request” à la place de “default” pour se comporter comme un émetteur de demande.

Lorsque la demande est envoyée, l’émetteur peut attendre la réponse et la noter comme lue (“read”) une fois reçue. Ce marquage est indépendant de la finalisation de l’opération.

Pour être alerté de la finalisation de l’opération, l’émetteur peut s’enregistrer auprès de l’API ProcessState.EnvelopeStates(). Si l’opération est réussie, le EmitterEnvelopeState reçu contiendra le premier fragment de l’enveloppe de réponse.

Lorsque l’émetteur a lu la réponse et l’a prise en considération, il peut faire appel à l’API ProcessState.AckResult() pour que le bus de données sache que la réponse a bien été reçue.

Avec go-xbus

Dans go-xbus nous fournissons des fonctionnalités de haut niveau pour exécuter tout cela.

Voici un exemple de l’envoi d’une demande et de l’attente d’une réponse :

func helloWorld(actor *xbus.Actor, message string) (string, error) {
    reply, err := actor.Request(
        context.Background(),
        envelope.MustNewEnvelope(
            envelope.MustNewTextMessage("demo.simplemessage", "world"),
        ),
    )

    if err != nil {
        return "", nil
    }

    var result string
    if err := envelope.ForEachMessage(reply, func(msg envelope.Message) error {
        s, err := envelope.ReadAllAsString(msg)
        result = s
        return err
    }); err != nil {
        return "", err
    }

    return result, nil
}

Conclusion

Nous avons vu comment définir un pipeline request-reply ainsi que la façon dont l’émetteur l’utilise. Cette nouvelle fonctionnalité ouvre de nouvelles possibilités pour rendre Xbus bien plus simple à utiliser dans de nombreuses situations.

Qu’en est-il de vous ? Avez-vous un cas d’usage dans lequel le modèle request-reply pourrait s’appliquer ? Si c’est le cas faites-le nous savoir, nous serions heureux de vous aider ! Vous pouvez aussi poster dans la mailing list publique si vous préférez.