Abstracting Over State

In the previous post we have extended the state we are threading through our row parsers, i.e. the remainder of the row to be parsed, with the current location (row/column index) within the CSV file. We’ll take a closer look at the generic underlying pattern now.

To take a more abstract view, given a state type S and a result type R, the following type represents a generic stateful computation.

type FState[S, T] = S => (S, T)

Note: By convention, the state value is the first component in the return tuple, wheras in our row parser it used to be the second.

A first observation is that this type gives rise to the usual suspects in our abstraction tool set: We can have functor, applicative and monad on top.

def map[S, A, B](sa: FState[S, A])(f: A => B): FState[S, B] =
  sa(_) match { case (s, a) => s -> f(a) }
  
def ap[S, A, B](sf: FState[S, A => B])(sa: FState[S, A]): FState[S, B] =
  s =>
    val (s1, f) = sf(s)
    val (s2, a) = sa(s1)
    s2 -> f(a)
    
def flatMap[S, A, B](sa: FState[S, A])(f: A => FState[S, B]): FState[S, B] =
  s =>
    val (s1, a) = sa(s)
    f(a)(s1)

As noted in step 02, Cats already provides a State data type with the corresponding type class bindings.

So we can thread a State through a computation chain just like we do with Either right now. This sounds promising, but there’s a problem…

Transformation

We don’t want to replace Either with State, we want to have both - ideally we want a way of merging Either and State so we can handle it as one single, opaque abstraction when threading through the computation chain, while having access to the specific features of each at the “leaves” of the computation where we need to advance state and/or produce success/error results.

While it is not possible to merge arbitrary pairs of Monad instances (“monads don’t compose”), often one can provide a way for one specific Monad instance with arbitrary other instances. This can be implemented as a monad transformer. Fortunately, Cats provides a transformer for State as StateT.

Let’s transform our code accordingly.

State Error

One can view our combined Either/State type as encapsulating two different side effects: Failure mode and state mutation. Let’s create an alias and modify our parser definition.

type CSVParseEff[T] = StateT[CSVResult, ParserState, T]

trait RowParser[T]:
  def parse(): CSVParseEff[T]

First we need to fix our “core” parsers, string and end. string needs to modify the state, so we wrap our existing Either handling in a StateT#apply().

val string: RowParser[String] =
  () =>
    StateT {
      case ParserState(ParserPos(r, c), _, h :: t) => 
        (ParserState(ParserPos(r, c + 1), c, t), h).pure
      case ParserState(p, _, Nil) => RowExhaustionFailure(p).asLeft
    }

end only needs to read the state, so we’ll use StateT#inspectF() and wrap the Either handling similarly.

val end: RowParser[Unit] =
  () =>
    StateT.inspectF {
      case s@ParserState(_, _, Nil) => ().pure
      case ParserState(pos, _, _ :: _) =>
        ColumnParseFailure(
          new IllegalStateException("trailing data"),
          pos
        ).asLeft
    }

#emap() becomes simpler, and we can reuse it again for #guardMap(). For the latter, we wrap the existing Either handling in #inspectF() again.

def emap[B](f: A => CSVParseEff[B]): RowParser[B] =
  () => p.parse() >>= f
def guardMap[B](f: A => B): RowParser[B] =
  emap {
    a => StateT.inspectF { s =>
      Either
        .catchNonFatal(f(a))
        .leftMap(ColumnParseFailure(_, ParserPos(s.pos.rowIdx, s.prevColIdx)))
    }
  }

Chaining State

What remains is fixing the computation chaining code. The Applicative is straightforward and becomes simpler:

def pure[A](a: A): RowParser[A] = () => a.pure
def ap[A, B](ff: RowParser[A => B])(fa: RowParser[A]): RowParser[B] =
  () =>
    for {
      f <- ff.parse()
      a <- fa.parse()
    } yield f(a)

Same for the derivation step.

def deriveRowParser(f: A => B): RowParser[C] =
  () =>
    for {
      a <- summon[RowParser[A]].parse()
      c <- summon[RowParserDerivable[B, C]].deriveRowParser(f(a)).parse()
    } yield c

To wrap up, we need to initialize the state for each row computation with StateT#runA().

private def parseRow[T](p: RowParser[T])(row: Row, rowIdx: Int): CSVResult[T] =
  p.parse().runA(ParserState(ParserPos(rowIdx, 0), 0, row))

Note: RowParser#parse() now explicitly gives us a “computation plan” (a program?!) that is executed with the input provided via the #runA() invocation.

Let’s check if everything still works.

Success:

sbt:nanocsv> runMain de.sangamon.nanocsv.step07.main data/users.csv
1,Torsten Test,1970-01-01
2,Andrea Anders,2000-02-20
User(1,Torsten Test,1970-01-01)
User(2,Andrea Anders,2000-02-20)

Parse failure:

sbt:nanocsv> runMain de.sangamon.nanocsv.step07.main data/users.csv
1,Torsten Test,1970-01-01
2,Andrea Anders,2000-02-20x
ERROR: ColumnParseFailure(java.time.format.DateTimeParseException:
  Text '2000-02-20x' could not be parsed, unparsed text found at index 10,
  ParserPos(1,2))

Exhaustion:

sbt:nanocsv> runMain de.sangamon.nanocsv.step07.main data/users.csv
1,Torsten Test,1970-01-01
2,Andrea Anders
ERROR: RowExhaustionFailure(ParserPos(1,2))

Trailing data:

sbt:nanocsv> runMain de.sangamon.nanocsv.step07.main data/users.csv
1,Torsten Test,1970-01-01
2,Andrea Anders,2000-02-20,x
ERROR: ColumnParseFailure(java.lang.IllegalStateException:
  trailing data,ParserPos(1,3))

I/O failure:

sbt:nanocsv> runMain de.sangamon.nanocsv.step07.main data/xyz.csv
[info] running de.sangamon.nanocsv.step07.main data/xyz.csv
ERROR: CSVIOFailure(data/xyz.csv,java.io.FileNotFoundException:
  data/xyz.csv (No such file or directory))

The full code for this post can be found in package de.sangamon.nanocsv.step07. In the next post we will finally integrate our I/O handling with the Cats ecosystem.