19 drops

ancora imparo ← (michelangelo)

Futures in Scala and Java

| Comments

Cuando se recibe una petición, por ejemplo una llamada a un API, es frecuente que para atenderla haya que que recabar información de múltiples orígenes. Estos orígenes pueden ser directamente un repositorio de información (un acceso JDBC), o bien, un servicio interno independiente que gestiona una determinada funcionalidad (microservice). Así mismo, algunas de estas informaciones serán independientes unas de otras, pero habrá casos en las que existan dependencias entre ellas. Por otro lado, sería muy interesante que no se bloquearan innecesariamente recursos del sistema (reactive), que son siempre muy escasos.

Por lo tanto, el sistema más eficiente sería aquel:

  • que permitiera paralelizar todas las operaciones que fuera posible
  • que permitiera combinar los resultados de operaciones intermedias para obtener el resultado final
  • que evitara bloquear ningún recurso de manera innecesaria

En Scala esto es algo inherente al ADN del lenguaje, los elementos básicos del lenguaje proporcionan lo necesario, pero y en Java?

Supongamos un sistema de Gamificación en el que el API que devuelve el perfil del jugador proporciona la siguiente información:

  • Datos básicos de perfil (nickname, nivel)
  • Posición en los rankings
  • Última medalla conseguida en su nivel actual

Por ejemplo, en este caso: los datos básicos del perfil y la posición en el ranking son operaciones independientes que podrían lanzarse sobre los correspondientes servicios, y la identificación de la última medalla conseguida depende de conocer previamente el nivel último nivel del usuario.

 Scala Flavour

Scala ofrece las Futures como mecanismo para la parelización de acciones, y las operaciones flatMap y map para la combinación de las mismas dentro del propio lenguaje.

Las siguientes Futures modelizan las operaciones a realizar:

  • Recuperación de la posición en el Ranking [fBasicProfile]
  • Recuperación del perfil básico [fRanking]
  • Recuperación de la última medalla del nivel actual [fLastMedalInLevel], que depende del valor devuelto por [fBasicProfile]
Futures for Scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def fBasicProfile =
  (user: String) =>
        Future {
          doAction(s"Select BasicProfile ($user)")
          "basicProfile:MoonWalker"
        }

def fRanking =
  (user: String) =>
        Future {
          doAction(s"Select Ranking Position ($user)")
          "125"
        }

def fLastMedalInLevel =
  (basicProfile: String) =>
        Future {
          doAction(s"Select LastMedalInLevel ($basicProfile)")
          if (basicProfile.endsWith("MoonWalker"))
              "MoonConquest" else "NoMedalYet"
        }

flatMap y map

El modo más básico de combinar estas Futures con las operaciones flatMap/map es el siguiente:

flatMap/map Combinationlink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def findFullProfile(user: String): Future[String] = {

  val fRankingForUser = fRanking(user)
  val fbasicProfileForUser = fBasicProfile(user)

  fRankingForUser.flatMap(
      basicProfile =>
        fbasicProfileForUser.flatMap(
            ranking =>
              fLastMedalInLevel(basicProfile).map(
                    lastMedal =>
                       s"$basicProfile;$ranking;$lastMedal"
                                                 )
                                     )
                          )
}

Simple, aunque no escalable. Si hubiera un cuarto servicio al que acceder este mecanismo se volvería inmanejable.

for comprenhensions

Para solucionar esto Scala ofrece una sintaxis alternativa, las for comprenhension, que dejan al compilador la generación de esta estructura básica. Con una for comprenhension lo anterior quedaría como:

for comprenhension Combinationlink
1
2
3
4
5
6
7
8
9
10
11
12
13
def findFullProfile(user: String): Future[String] = {

  val fRankingForUser = fRanking(user)
  val fbasicProfileForUser = fBasicProfile(user)

  for {

       ranking <- fRankingForUser
       basicProfile <- fbasicProfileForUser
       lastMedal <- fLastMedalInLevel(basicProfile)

      } yield s"$basicProfile;$ranking;$lastMedal"
}

Mucho más simple, y sobre todo escalable. La inclusión de una petición a un nuevo servicio no altera significativamente la estructura del programa.

 launching y resultados

El proceso de lanzamiento de estas peticiones tendría la forma:

launching processlink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
print("StartingMainProcessing Action")

findFullProfile("JaneNone")
      .map {
            fp => {
                print("EndingFullProfileProcessing Action")
                println(s"FullProfile: $fp")
                  }
           }

print("EndingMainProcessing Action")

// Just waiting all work to be done
waitFor(5)

Si miramos la ejecución de estos procesos obtendríamos algo similar a lo siguiente:

request API execution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
(1- launching)
[TH]. 39 ... StartingMainProcessing Action  
[TH]. 40 ... Select BasicProfile (JaneNone)
[TH]. 32 ... Select Ranking Position (JaneNone)
[TH]. 39 ... EndingMainProcessing Action

(2- parallel execution)
[TH]. 32 ... Select Ranking Position (JaneNone) - Step[1]
[TH]. 40 ... Select BasicProfile (JaneNone) - Step[1]
[TH]. 32 ... Select Ranking Position (JaneNone) - Step[2]
[TH]. 32 ... Select Ranking Position (JaneNone) - Step[3]
[TH]. 40 ... Select BasicProfile (JaneNone) - Step[2]
[TH]. 32 ... Select Ranking Position (JaneNone) - Step[4]
[TH]. 32 ... Select Ranking Position (JaneNone) - Step[5]
[TH]. 40 ... Select BasicProfile (JaneNone) - Step[3]
[TH]. 40 ... Select BasicProfile (JaneNone) - Step[4]
[TH]. 40 ... Select BasicProfile (JaneNone) - Step[5]

(3- combination)
[TH]. 32 ... Select Last...Level (basicP...Walker)
[TH]. 32 ... Select Last...Level (basicP...Walker) - Step[1]
[TH]. 32 ... Select Last...Level (basicP...Walker) - Step[2]
[TH]. 32 ... Select Last...Level (basicP...Walker) - Step[3]
[TH]. 32 ... Select Last...Level (basicP...Walker) - Step[4]
[TH]. 32 ... Select Last...Level (basicP...Walker) - Step[5]

(4- results available)
[TH]. 32 ... EndingFullProfileProcessing Action
FullProfile: basicProfile:MoonWalker;125;MoonConquest
  • (1 - launching)

    • el proceso de la petición se inicia en el Thread-39
    • se lanzan las futures [fBasicProfileForUser] que comienza a ejecutarse en el Thread-40 y [fRankingForUser] que comienza a ejecutarse en el Thread-32. Ambas de forma paralela.
    • el proceso de lanzamiento concluye sin efectuar ningún bloqueo
  • (2 - parallel execution)

    • ambas futures continuan ejecutándose hasta que hay un resultado disponible
  • (3 - combination)

    • el resultado de [fBasicProfileForUser] está disponible y puede utilizarse para solicitar el cálculo de [fLastMedalInLevel]
  • (4 - results available)

    • los resultados de las tres están disponibles y es posible retornar la respuesta

Por lo tanto, se solicitan las informaciones de forma paralela, no hay bloqueos y se pueden combinar los resultados intermedios.

La operación waitFor es simplemente a efectos del ejemplo. En un caso real estaríamos utilizando un framework adecuado, como por ejemplo Play.

async block

Uno de los aspectos que más persigue Scala es en ofrecer mecanismos que permitan simplificar la paralelización de operaciones, y en este sentido, en la próxima versión se incluirá una simplicación nueva: los bloques async.

Utilizando bloques async quedaría de la forma:

async Blocklink
1
2
3
4
5
6
7
8
9
val fRankingForUser = fRanking(user)
val fbasicProfileForUser = fBasicProfile(user)

async {
   val ranking = await(fRankingForUser)
   val basicProfile = await(fbasicProfileForUser)
   val lastMedal = await(fLastMedalInLevel(basicProfile))
   s"$basicProfile;$ranking;$lastMedal"
}

que es una construcción más natural que la for comprenhension para los procedentes de lenguajes imperativos.

Java Flavour

En Java las cosas no son exactamente iguales. Hasta Java 7 no existía de forma nativa nada similar a lo ofrecido por Scala. Existían las Futures pero la extración del resultado siempre implicaba una operación bloqueante: get.

futures Java 7link
1
2
3
4
5
6
7
8
9
// BLOCKING Operation
String ranking = fRanking.get();

// BLOCKING Operation
String basicProfile = fBasicProfile.get();
Future<String> fLastMedalInLevel = lastMedalInLevel.apply(basicProfile);

// BLOCKING Operation
String lastMedal = fLastMedalInLevel.get();

Java 8, ofrece un nuevo tipo de Future: CompletableFuture. CompletableFuture es un Monad (al igual que Optional y Stream), y ofrece las mismas operaciones básicas que ofrecía Scala:

  • flatMap -> thenCompose/thenComposeAsync
  • map -> thenApply/thenApplyAsync

CompletableFuture tiene un comportamiento no bloqueante y ofrece alrededor de 50 métodos diferentes.

En el caso de Java, las Futures que utilizaríamos serían las siguientes:

Futures for Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CompletableFuture<String> fRanking =
  CompletableFuture.supplyAsync(() -> {
      Misc.doAction("Select Ranking " + user);
      return "125";
    });

CompletableFuture<String> fBasicProfile =
  CompletableFuture.supplyAsync(() -> {
      Misc.doAction("Select BasicProfile " + user);
      return "BasicProfile:MoonWalker";
    });

Function<String, CompletableFuture<String>> lastMedalInLevel =
    (String basicProfile) ->
            CompletableFuture.supplyAsync(() -> {
                Misc.doAction("Select LastMedalInLevel");
                return
                    basicProfile.endsWith("MoonWalker")
                    ? "MoonConquest"
                    : "NoMedalYet";
            });

y utilizando las operaciones thenComposeAsync y thenApplyAsync obtendríamos:

flatMap/map Combinationlink
1
2
3
4
5
6
7
8
9
10
11
fRanking
  .thenComposeAsync(
      ranking -> fBasicProfile
          .thenComposeAsync(
              basicProfile ->
                    lastMedalInLevel.apply(basicProfile)
                        .thenApplyAsync(
                            lm -> String.format("%s;%s;%s", basicProfile, ranking, lm)
                                       )
                          )
                  );

que es extremadamente parecido a lo que obteníamos en el caso de Scala. Al igual que ocurría en Scala, esta solución es poco escalable, ya que si hubiera que introducir una nueva Future, el código comenzaría a estar excesivamente anidado.

Scala solucionaba esto con las for comprenhension y los bloques async, y en el caso de Java existe la operación allOf, que toma una lista de Futures y paraleliza su ejecución. Utilizando allOf obtendríamos:

allOf Combinationlink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
CompletableFuture.allOf(fBasicProfile, fRanking)
  .thenComposeAsync(
     na -> {
        // All futures ARE completed at this time
        String basicProfile = fBasicProfile.join();
        String ranking = fRanking.join();

        return lastMedalInLevel.apply(basicProfile)
                  .thenApplyAsync(
                      lm ->
                          String.format("%s;%s;%s", basicProfile, ranking, lm)
                                 );
            }
                  );

Más manejable, aunque algo menos que en el caso de Scala.

En ambos casos el proceso de lanzamiento de las operaciones sería de la forma:

launching process
1
2
3
4
5
6
7
8
9
10
11
12
Misc.print("StartingMainProcessing Action");

findFullProfile("JaneNone")
        .thenAcceptAsync(fp -> {
            Misc.print("EndingFullProfileProcessing Action");
            Misc.print(String.format("FullProfile: %s", fp));
        });

Misc.print("End of Main Processing");

// Just waiting all work to be done
Misc.waitFor(5);

Muy similar al caso de Scala (thenAcceptAsync es simplemente un método de terminación, que en Java es necesario para extraer los valores).

La ejecución de estos procesos ofrece un patrón muy similar al anteriormente visto:

request API execution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
(1- launching)
[TH]. 336 ... StartingMainProcessing Action
[TH]. 329 ... Select Ranking JaneNone
[TH]. 329 ... Select Ranking JaneNone - Step[1]
[TH]. 337 ... Select BasicProfile JaneNone
[TH]. 337 ... Select BasicProfile JaneNone - Step[1]
[TH]. 336 ... End of Main Processing

(2- parallel execution)
[TH]. 329 ... Select Ranking JaneNone - Step[2]
[TH]. 329 ... Select Ranking JaneNone - Step[3]
[TH]. 337 ... Select BasicProfile JaneNone - Step[2]
[TH]. 337 ... Select BasicProfile JaneNone - Step[3]
[TH]. 329 ... Select Ranking JaneNone - Step[4]
[TH]. 337 ... Select BasicProfile JaneNone - Step[4]
[TH]. 329 ... Select Ranking JaneNone - Step[5]
[TH]. 337 ... Select BasicProfile JaneNone - Step[5]

(3- Combination)
[TH]. 329 ... Select LastMedalInLevel
[TH]. 329 ... Select LastMedalInLevel - Step[1]
[TH]. 329 ... Select LastMedalInLevel - Step[2]
[TH]. 329 ... Select LastMedalInLevel - Step[3]
[TH]. 329 ... Select LastMedalInLevel - Step[4]
[TH]. 329 ... Select LastMedalInLevel - Step[5]

(4- results available)
[TH]. 337 ... EndingFullProfileProcessing Action
[TH]. 337 ... FullProfile: BasicProfile:MoonWalker;125;MoonConquest

resumiendo

CompletableFuture es uno de los elementos introducidos en Java 8 que actualizan la plataforma y la acercan a las necesidades actuales de las aplicaciones. No es tan potente y flexible como son las soluciones que ofrece Scala, pero es una gran mejora.

 Nota: Java 7 y 6

Si bien en versiones anteriores a Java 8 no existía en el lenguaje ningún elemento que permitiera realizar este tipo de operaciones, si hay librerías como por ejemplo RxJava que permiten realizar un aproximación similar. En RxJava quedaría de la forma:

using RxJavalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
Observable<String> oBasicProfile = Observable.from(fBasicProfile);
Observable<String> oRanking = Observable.from(fRanking);
Observable<String> oLastMedal =
      oBasicProfile
           .flatMap(
                bp ->
                   Observable.from(lastMedalInLevel.apply(bp)));

return
    Observable
      .zip(
        oBasicProfile,
        oRanking,
        oLastMedal,
        (basicProfile, ranking, lastMedal)
            -> String.format("%s;%s;%s", basicProfile, ranking, lastMedal))
      .subscribeOn(Schedulers.computation());
...

También con las operaciones paralelizadas, combinadas y no bloqueantes.

Comments