Replicated entities

This page documents how to implement Cloudstate Replicated Entities in Java. For information on what Cloudstate Replicated Entities are, please read how to choose a state model and the general Replicated Entity information first.

A Replicated Entity can be created by annotating it with the @CrdtEntitynew tab annotation.

@CrdtEntity
public class ShoppingCartEntity {

Accessing and creating an entity’s Replicated Entity

Each Replicated Entity manages one root replicated data type. That data type will either be supplied to the entity when it is started, or, if no data type exists for the entity when it is started, it can be created by the entity using a CrdtFactorynew tab extending context.

There are multiple ways that a Replicated Entity may access its data type. It can be injected directly into its constructor or a command handler—​the value can be wrapped in an Optional to distinguish between entities that have been created and Replicated Entities that have not yet been created. If not wrapped in Optional, the Replicated Entity will be created automatically, according to its type. The data type can also be read from any CrdtContextnew tab via the statenew tab method.

An entity’s data type can be created from the entity’s constructor using the CrdtFactory methods on CrdtCreationContextnew tab, or using the same methods in a command handler using the CommandContextnew tab. Note that the data type may only be created once, and only if it hasn’t already been provided by CloudState. Any attempt to create the data type when one already exists will throw an IllegalStateException.

For most use cases, simply injecting the data type directly into the constructor, and storing in a local field, will be the most convenient and straightforward method of using a Replicated Entity. In our shopping cart example, we’re going to use an LWWRegisterMapnew tab, this shows how it may be injected:

private final LWWRegisterMap<String, Shoppingcart.LineItem> items;

public ShoppingCartEntity(LWWRegisterMap<String, Shoppingcart.LineItem> items) {
  this.items = items;
}

In addition to the Replicated Entity, the constructor may accept a CrdtCreationContextnew tab.

Handling commands

Command handlers can be declared by annotating a method with @CommandHandlernew tab. They take a context class of type CommandContextnew tab.

By default, the name of the command that the method handles will be the name of the method with the first letter capitalized. So, a method called getCart will handle a gRPC service call command named GetCart. This can be overridden by setting the name parameter on the @CommandHandler annotation.

The command handler also can take the gRPC service call input type as a parameter to receive the command message. This is optional, sometimes it’s not needed. For example, our GetCart service call doesn’t need any information from the message, since it’s just returning the current state as is. Meanwhile, the AddItem service call does need information from the message, since it needs to know the product id, description and quantity to add to the cart.

The return type of the command handler must be the output type for the gRPC service call, this will be sent as the reply.

The following shows the implementation of the GetCart command handler. This command handler is a read-only command handler, it doesn’t update the Replicated Entity, it just returns some state:

@CommandHandler
public Shoppingcart.Cart getCart() {
  return Shoppingcart.Cart.newBuilder().addAllItems(items.values()).build();
}

Updating a Replicated Entity

Due to CloudState’s take in turns approach, Replicated Entities may only be updated in command handlers and stream cancellation callbacks.

Here’s a command handler for the AddItem command that adds the item to the shopping cart:

@CommandHandler
public Empty addItem(Shoppingcart.AddLineItem item, CommandContext ctx) {
  if (item.getQuantity() <= 0) {
    ctx.fail("Cannot add a negative quantity of items.");
  }
  if (items.containsKey(item.getProductId())) {
    items.computeIfPresent(
        item.getProductId(),
        (id, old) -> old.toBuilder().setQuantity(old.getQuantity() + item.getQuantity()).build());
  } else {
    items.put(
        item.getProductId(),
        Shoppingcart.LineItem.newBuilder()
            .setProductId(item.getProductId())
            .setName(item.getName())
            .setQuantity(item.getQuantity())
            .build());
  }
  return Empty.getDefaultInstance();
}

Deleting a Replicated Entity

A Replicated Entity can be deleted by invoking CommandContext.deletenew tab. Once deleted, the entity will be shut down, and all subsequent commands for the entity will be rejected.

Caution should be taken when deleting Replicated Entities—​Cloudstate needs to maintain tombstones for each deleted Replicated Entity. Over time, if many Replicated Entities are created and deleted, this will result in not just running out of memory, but increased network usage as the tombstones still need to be gossipped through the cluster for replication.

Streamed command handlers

Replicated Entities support streaming over gRPC. Streamed commands can be used to receive and publish updates to the state. If a gRPC service call has a streamed result type, the handler for that call can accept a StreamedCommandContextnew tab, and use that to register callbacks.

We provide a full chat app example including frontend and backend. Please check Replicated Entities Streaming Example for detail.

Responding to changes

If the command handler wishes to publish changes to the stream it can register a callback with onChangenew tab, which will be invoked every time the Replicated Entity changes.

The callback is then able to return a message to be sent to the client (or empty, if it wishes to send no message in response to that particular change). The callback may not modify the CRDT itself, but it may emit effects that may modify the Replicated Entity.

If the shopping cart service had a WatchCart call, like this:

rpc WatchCart(GetShoppingCart) returns (stream Cart);

that could be implemented like this:

@CommandHandler
public Shoppingcart.Cart watchCart(StreamedCommandContext<Shoppingcart.Cart> ctx) {

  ctx.onChange(subscription -> Optional.of(getCart()));

  return getCart();
}

Ending the stream

The onChange callback can end the stream by invoking endStreamnew tab on the SubscriptionContextnew tab it is passed. If it does this, it will not receive an onCancel callback.

Responding to stream cancellation

A streamed command handler may also register an onCancelnew tab callback to be notified when the stream is cancelled. The cancellation callback handler may update the Replicated Entity. This is useful if the Replicated Entity is being used to track connections, for example, when using Votenew tab Replicated Entities to track a user’s online status.

Replicated entity data types

The Cloudstate Java support library offers Java classes for each of the data types available in Cloudstate.

Counters and flags

GCounternew tab, PNCounternew tab and Flagnew tab are available, offering operations relevant to each Replicated Entity.

Vote

Votenew tab is available for a vote Replicated Entity. The vote type allows updating the current node’s vote using the votenew tab method, the current nodes vote can be queried using the getSelfVotenew tab method.

For determining the result of a vote, getVotersnew tab and getVotesFornew tab can be used to check the total number of nodes, and the number of nodes that have voted for the condition, respectively. In addition, convenience methods are provided for common vote decision approaches, isAtLeastOnenew tab returns true if there is at least one voter for the condition, isMajoritynew tab returns true if the number of votes for is more than half the number of voters, and isUnanimousnew tab returns true if the number of votes for equals the number of voters.

Registers

LWWRegisternew tab provides the LWWRegister Replicated Entity data type. It can be interacted with using the setnew tab and getnew tab methods. If you wish to use a custom clock, you can use the setnew tab overload that allows passing a custom clock and custom clock value.

Direct mutations to LWWRegisternew tab and LWWRegisterMapnew tab values will not be replicated to other nodes, only mutations triggered through using the setnew tab and putnew tab methods will be replicated. Hence, the following update will not be replicated:

myLwwRegister.get().setSomeField("foo");

This update however will be replicated:

MyValue myValue = myLwwRegister.get();
myValue.setSomeField("foo");
myLwwRegister.set(myValue);

In general, we recommend that these values be immutable, as this will prevent accidentally mutating without realizing the update won’t be applied. If using protobufs as values, this will be straightforward, since compiled protobuf classes are immutable.

Sets and Maps

Cloudstate Java support provides GSetnew tab and ORSetnew tab that implement the java.util.Set interface, and ORMapnew tab that implements the java.util.Map. However, not all operations are implemented - GSet doesn’t support any removal operations, and ORMap does not support any operations that would replace an existing value in the map.

To insert a value into an ORMap, you should use the getOrCreatenew tab method. The passed in callback will give you a CrdtFactorynew tab that you can use to create the Replicated Entity value that you wish to use.

With all maps and sets, map keys and set values must be immutable. Cloudstate ignores the individual mutation of the key or value (not replicated to other nodes). Furthermore, their serialized form must be stable. The Cloudstate proxy uses the serialized form of the values to track changes in the set or map. If the same value serializes to two different sets of bytes on different occasions, they will be treated as different elements in the set or map.

This is particularly relevant when using protobufs. The ordering of map entries in a serialized protobuf is undefined, and very often will be different for two equal maps. Hence, maps should never be used as keys in ORMap or as values in GSet, ORSet. For the rest of the protobuf specification, while no guarantees are made on the stability by the protobuf specification itself, the Java libraries do produce stable orderings of fields and stable output of non-map values. Care should be taken when changing the protobuf structure. Many changes, that are backwards compatible from a protobuf standpoint, do not necessarily translate into stable serializations.

If using JSON serialization, it is recommended that you explicitly define the field ordering using Jackson’s @JsonPropertyOrder annotation, and as with protobufs, never use Map or Set in your JSON objects since the ordering of those is not stable.

Some wrapper classes are also provided for ORMap. These provide more convenient APIs for working with values of particular CRDT types. They are:

LWWRegisterMapnew tab

A map of LWWRegister values. This exposes the LWWRegister values as values directly in the map.

PNCounterMapnew tab

A map of PNCounter values. This exposes the current value of the PNCounters directly as values in the map, and offers incrementnew tab and decrementnew tab methods to update the values.

Registering the entity

Once you’ve created your entity, you can register it with the CloudStatenew tab server, by invoking the registerCrdtEntitynew tab method. In addition to passing your entity class and service descriptor, if you use protobuf for serialization and any protobuf message definitions are missing from your service descriptor (they are not declared directly in the file, nor as dependencies), then you’ll need to pass those protobuf descriptors as well.

public static void main(String... args) {
  new CloudState()
      .registerCrdtEntity(
          ShoppingCartEntity.class,
          Shoppingcart.getDescriptor().findServiceByName("ShoppingCartService"))
      .start();
}