funkcionálně.cz

Přední český blog o funkcionálním programování, kde se o funkcionálním programování nepíše
««« »»»

Async SQL

26. 4. 2013

JDBC blokuje.

JDBC blokuje a to představuje problém pro novou generaci async web frameworků jako např. Play! jejichž srdce je zcela asynchronní a reaktivní a bije na thread poolu stejně velkém jako je počet procesorových jader, který nikdy neblokuje aktivní vlákno. Neblokuje a ani nemůže.

Pokud z nějakého důvodu v takovémto prostředí používám JDBC, musím provádět blokující operace ve vlastních vláknech. Můžu je zabalit do blokující Future a doufat, že si s tím nějak poradí:

furure { blocking { blockingCall() } }

Nebo pro blokující operace vytvořit separátní thread pool a doufat, že bude stačit.

Ale to jsou jenom berličky. Ideální je, když mám k dispozici asynchronní driver jako třeba právě Async SQL. Ten není postaven na JDBC (viz začátek článku), jde o implementaci MySQL konektoru na zelené louce, který pod kapotou používá Akka framework a je zcela asynchronní.

Asynchronicita se projevuje tak, že výsledkem každého dotazu je Future:

val database = Database(actorSystem)

val res: Future[Data] = for {
  conn <- database.connect()
  data <- conn.executeQuery("select ...")
  _    <- conn.close()
} yield data

res onSuccess { case data         => println(data) }
res onFailure { case e: Exception => println("oh noes!") }

Jenom tohle nám ušetří spoustu trápení v asynchronním světě, ale neporadí si to s Příliš Velkými Dotazy. Takové dotazy můžou vrátit miliony řádků, mnohem víc než se mi vejde do paměti, ale já je chci hned streamovat pryč nebo je chci nějak zagregovat do řádově menší datové struktury (například zredukovat korpus textu na četnost výskytu jednotlivých slov, možná dokonce do nějaké pravděpodobnostní top-k struktury).

Když se s tím chci vypořádat ve světě JDBC, musím použít nebufferované dotazy. Ty sice nepotřebují materializovat výsledek v aplikaci, ale pořád blokují. A blokování je špatné (viz začátek článku).

Async SQL se umí i s tímhle elegantně poradit. Umí totiž reagovat na data tak, jak přicházejí po síti - tedy zcela reaktivně!

val res: Future[X] = for {
  conn <- database.connect()
  data <- conn.executeQuery("select ...", initialValue) {
    case StartRow(composite)      => combine(composite, "start-row")
    case AValue(value, composite) => combine(composite, value)
    case EndRow(composite)        => combine(composite, "end-row")
  }
  _    <- conn.close()
} yield data

Standardní API umí reagovat jenom na jednotlivé sloupce, ne na celé řádky, ale to se dá jednou nepříliš složitou funkcí napravit.

Pozn: vypadá to, že autor vydal novou verzi pro Scalu 2.10, která už nepotřebuje Akku.

@kaja47, kaja47@k47.cz, deadbeef.k47.cz, starší články