Replicated Entities Streaming Example

Replicated Entities support streaming over gRPC. This page documents how to implement a stream handler in Java using Replicated Entities.

Sample Chat App

We provide a simple chat application that demonstrates how to use streaming. When deployed to Akka Serverless, the Cloudstate chat service streams users' online status for the UI to display. The code is in the following two github repositories:

Code Explanation

To implement streaming in the Chat app:

  • Define the server stream gRPC

In the Chat app, a client will send a request to the gRPC server and get a stream of messages back. For example, in the proto file, we define a server-streaming RPC (Monitor) which returns OnlineStatus stream.

service Presence {
    // Connect the given user. They will stay connected as long as the stream stays open.
    rpc Connect(User) returns (stream Empty);
    // Monitor the online status of the given user.
    rpc Monitor(User) returns (stream OnlineStatus);
}
The full source code is in presence.protonew tab
  • Implement the server stream gRPC in java

The following shows the implementation of the command handler Monitor(User) in java.

  /**
   * User presence monitoring call.
   *
   * This is a streamed call. We add a onStateChange callback, so that whenever the CRDT
   * changes, if the online status has changed since the last message we pushed, we push
   * it.
   */
  @CommandHandler OnlineStatus monitor(User user, StreamedCommandContext<OnlineStatus> ctx) {
    // Note we store the online status in an array for each call to monitor
    // even though it is a single boolean, as this lets us capture the object
    // in onChange callback's environment below.
    boolean onlineStatus[] = { presence.isAtLeastOne() };

    if(ctx.isStreamed()) {
      ctx.onChange(subCtx ->
       {
          boolean previousOnlineStatus = onlineStatus[0];
          boolean newOnlineStatus = presence.isAtLeastOne();
          onlineStatus[0] = newOnlineStatus;

          if(newOnlineStatus != previousOnlineStatus) {
            logger.debug("monitor: " + user.getName() + " return {" + newOnlineStatus + "}");
            return Optional.of(OnlineStatus.newBuilder().setOnline(newOnlineStatus).build());
          } else {
            logger.debug("monitor: " + user.getName() + " status unchanged");
            return Optional.empty();
          }
       });
    }

    logger.debug("monitor: " + user.getName() + " return {" + onlineStatus + "}");
    return OnlineStatus.newBuilder().setOnline(onlineStatus[0]).build();
  }
The full source code is in PresenceEntity.javanew tab
  • Write the streaming handler for the UI

The following shows the implementation of reading ResponseStream in the UI. It listens on "status" and "end" events. (NOTE: The "end" event shows after the streaming ends.)

monitorPresence = (user: User) =>{
    console.log("monitorPresence")
    const userpb = new UserPesence();
    userpb.setName(user.name);
    const presenceStream = this.presenceClient.monitor(userpb);
    if(this.presenceStreams[user.name])delete this.presenceStreams[user.name];
    this.presenceStreams[user.name] = presenceStream;
    console.log("monitor presence of user: ", user);
    presenceStream.on("status", (status) =>{
        console.log("status for: " + user.name, status);
        if(status.code == 0){   // connection ok
            //this.store.userStore.users[user.name].online = false;
        }
    });
    presenceStream.on("end", () =>{
        console.log("stream end for user", user);
        this.store.userStore.users[user.name].online = false;
    });
}
The full source code is in Apt.tsnew tab