梦想那样能够减低FSportageM数据库编制程序对函数式编制程序水平供给,F昂科雷M的特征是函数式的语法能够支撑灵活的靶子组合(Query

   在以前的博文中大家介绍了Slick,它是一种F安德拉M(Functional
Relation
Mapper)。有别于OLacrosseM,F奥德赛M的个性是函数式的语法能够协理灵活的目的组合(Query
Composition)完结广泛的代码重复使用,但同时那些特征又影响了编制程序职员群众体育对FQX56M的接受程度,阻碍了FOdysseyM成为广为流行的一种数据库编制程序方式。所以大家只好从小众心态来探索如何立异Slick现状,希望通过与某个Stream库集成,在Slick
FPAJEROM的底蕴上过来部分人们耳熟能详的Recordset数据库光标(cursor)操作办法,希望那样能够减低F奥德赛M数据库编制程序对函数式编程水平需要,能够吸引更加多的编制程序职员接受F本田CR-VM。刚好,在那篇研讨里大家目的在于能穿针引线部分Akka-Stream和表面系统融为一体对接的实在用例,把Slick数据库数据载入连接到Akka-Stream形成streaming-dataset应该是四个挺好的想法。Slick和Akka-Stream能够说是自然匹配的一对,它们都以同2个商厦出品,都帮忙Reactive-Specification。Reactive系统的合并对象之间是经过公共界面Publisher来完结对接的。Slick提供了个Dababase.stream函数能够创设这么些Publisher:

在在此在此之前的博文中咱们介绍了Slick,它是一种F哈弗M(Functional
Relation
Mapper)。有别于OOdysseyM,F奥迪Q7M的表征是函数式的语法能够支撑灵活的对象组合(Query
Composition)达成大规模的代码重复使用,但还要这么些特色又影响了编制程序职员群众体育对F凯雷德M的接受程度,阻碍了F福睿斯M成为广为流行的一种数据库编制程序格局。所以大家只可以从小众心态来斟酌怎样创新Slick现状,希望因此与有个别Stream库集成,在Slick
FXC90M的功底上回复部分大千世界耳熟能详的Recordset数据库光标操作情势,希望那样能够降低FPAJEROM数据库编制程序对函数式编程水平要求,能够引发越多的编程人士接受F揽胜M。刚好,在那篇斟酌里我们愿意能穿针引线一些Akka-Stream和表面系统融合为一对接的其实用例,把Slick数据库数据载入连接到Akka-Stream形成streaming-dataset应该是二个挺好的想法。Slick和Akka-Stream可以说是当然匹配的一对,它们都以同二个小卖部产品,都援助Reactive-Specification。Reactive系统的并轨对象时期是通过公共界面Publisher来完结连通的。Slick提供了个Dababase.stream函数能够创设那一个Publisher:

 /** Create a `Publisher` for Reactive Streams which, when subscribed to, will run the specified
      * `DBIOAction` and return the result directly as a stream without buffering everything first.
      * This method is only supported for streaming actions.
      *
      * The Publisher itself is just a stub that holds a reference to the action and this Database.
      * The action does not actually start to run until the call to `onSubscribe` returns, after
      * which the Subscriber is responsible for reading the full response or cancelling the
      * Subscription. The created Publisher can be reused to serve a multiple Subscribers,
      * each time triggering a new execution of the action.
      *
      * For the purpose of combinators such as `cleanup` which can run after a stream has been
      * produced, cancellation of a stream by the Subscriber is not considered an error. For
      * example, there is no way for the Subscriber to cause a rollback when streaming the
      * results of `someQuery.result.transactionally`.
      *
      * When using a JDBC back-end, all `onNext` calls are done synchronously and the ResultSet row
      * is not advanced before `onNext` returns. This allows the Subscriber to access LOB pointers
      * from within `onNext`. If streaming is interrupted due to back-pressure signaling, the next
      * row will be prefetched (in order to buffer the next result page from the server when a page
      * boundary has been reached). */
    final def stream[T](a: DBIOAction[_, Streaming[T], Nothing]): DatabasePublisher[T] = streamInternal(a, false)
 /** Create a `Publisher` for Reactive Streams which, when subscribed to, will run the specified      * `DBIOAction` and return the result directly as a stream without buffering everything first.      * This method is only supported for streaming actions.      *      * The Publisher itself is just a stub that holds a reference to the action and this Database.      * The action does not actually start to run until the call to `onSubscribe` returns, after      * which the Subscriber is responsible for reading the full response or cancelling the      * Subscription. The created Publisher can be reused to serve a multiple Subscribers,      * each time triggering a new execution of the action.      *      * For the purpose of combinators such as `cleanup` which can run after a stream has been      * produced, cancellation of a stream by the Subscriber is not considered an error. For      * example, there is no way for the Subscriber to cause a rollback when streaming the      * results of `someQuery.result.transactionally`.      *      * When using a JDBC back-end, all `onNext` calls are done synchronously and the ResultSet row      * is not advanced before `onNext` returns. This allows the Subscriber to access LOB pointers      * from within `onNext`. If streaming is interrupted due to back-pressure signaling, the next      * row will be prefetched (in order to buffer the next result page from the server when a page      * boundary has been reached). */    final def stream[T](a: DBIOAction[_, Streaming[T], Nothing]): DatabasePublisher[T] = streamInternal(a, false)

这个DatabasePublisher[T]就是二个Publisher[T]:

这个DatabasePublisher[T]正是二个Publisher[T]:

/** A Reactive Streams `Publisher` for database Actions. */
abstract class DatabasePublisher[T] extends Publisher[T] { self =>
...
}
/** A Reactive Streams `Publisher` for database Actions. */abstract class DatabasePublisher[T] extends Publisher[T] { self =>...}

接下来Akka-Stream能够因而Source.fromPublisher(publisher)创设Akka
Source构件:

接下来Akka-Stream能够由此Source.fromPublisher(publisher)营造Akka
Source构件:

  /**
   * Helper to create [[Source]] from `Publisher`.
   *
   * Construct a transformation starting with given publisher. The transformation steps
   * are executed by a series of [[org.reactivestreams.Processor]] instances
   * that mediate the flow of elements downstream and the propagation of
   * back-pressure upstream.
   */
  def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =
    fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))
  /**   * Helper to create [[Source]] from `Publisher`.   *   * Construct a transformation starting with given publisher. The transformation steps   * are executed by a series of [[org.reactivestreams.Processor]] instances   * that mediate the flow of elements downstream and the propagation of   * back-pressure upstream.   */  def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =    fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))

辩白上Source.fromPublisher(db.stream(query))就足以营造2个Reactive-Stream-Source了。上面大家就建了例子来做示范:首先是Slick的搭配代码boiler-code:

辩解上Source.fromPublisher(db.stream就足以塑造多个Reactive-Stream-Source了。下边我们就建了例子来做示范:首先是Slick的烘托代码boiler-code:

  val aqmraw = Models.AQMRawQuery
  val db = Database.forConfig("h2db")
  // aqmQuery.result returns Seq[(String,String,String,String)]
  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
  // type alias
  type RowType = (String,String,String,String)
  // user designed strong typed resultset type. must extend FDAROW
  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
  // strong typed resultset conversion function. declared implicit to remind during compilation
  implicit def toTypedRow(row: RowType): TypedRow =
    TypedRow(row._1,row._2,row._3,row._4)
  val aqmraw = Models.AQMRawQuery  val db = Database.forConfig("h2db")  // aqmQuery.result returns Seq[(String,String,String,String)]  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}  // type alias  type RowType = (String,String,String,String)  // user designed strong typed resultset type. must extend FDAROW  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW  // strong typed resultset conversion function. declared implicit to remind during compilation  implicit def toTypedRow(row: RowType): TypedRow =    TypedRow(row._1,row._2,row._3,row._4)

作者们供给的实际上就是aqmQuery,用它来创设DatabasePublisher:

笔者们必要的其实便是aqmQuery,用它来营造DatabasePublisher:

  // construct DatabasePublisher from db.stream
  val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)
  // construct akka source
  val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)
  // construct DatabasePublisher from db.stream  val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)  // construct akka source  val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)

有了dbPublisher就能够用Source.fromPublisher函数创设source了。今后大家试着运算那个Akka-Stream:

有了dbPublisher就能够用Source.fromPublisher函数营造source了。未来我们试着运算这些Akka-Stream:

  implicit val actorSys = ActorSystem("actor-system")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer()

  source.take(6).map{row => toTypedRow(row)}.runWith(
    Sink.foreach(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"县名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    }))

  scala.io.StdIn.readLine()
  actorSys.terminate()
  implicit val actorSys = ActorSystem("actor-system")  implicit val ec = actorSys.dispatcher  implicit val mat = ActorMaterializer()  source.take(6).map{row => toTypedRow}.runWith(    Sink.foreach(qmr => {      println(s"州名: ${qmr.state}")      println(s"县名:${qmr.county}")      println(s"年份:${qmr.year}")      println(s"取值:${qmr.value}")      println("-------------")    }))  scala.io.StdIn.readLine()  actorSys.terminate()

上边是运算结果:

下边是运算结果:

州名: Alabama
县名:Elmore
年份:1999
取值:5
-------------
州名: Alabama
县名:Jefferson
年份:1999
取值:39
-------------
州名: Alabama
县名:Lawrence
年份:1999
取值:28
-------------
州名: Alabama
县名:Madison
年份:1999
取值:31
-------------
州名: Alabama
县名:Mobile
年份:1999
取值:32
-------------
州名: Alabama
县名:Montgomery
年份:1999
取值:15
-------------
州名: Alabama县名:Elmore年份:1999取值:5-------------州名: Alabama县名:Jefferson年份:1999取值:39-------------州名: Alabama县名:Lawrence年份:1999取值:28-------------州名: Alabama县名:Madison年份:1999取值:31-------------州名: Alabama县名:Mobile年份:1999取值:32-------------州名: Alabama县名:Montgomery年份:1999取值:15-------------

呈现大家早就打响的连天了Slick和Akka-Stream。

呈现大家早就成功的连接了Slick和Akka-Stream。

今昔我们有了Reactive
stream
source,它是个akka-stream,该怎么衔接处于下游的scalaz-stream-fs2呢?我们驾驭:akka-stream是Reactive
stream,而scalaz-stream-fs2是纯“拖式”pull-model
stream,也正是说上边这一个Reactive stream
source必须被动等待下游的scalaz-stream-fs2来读取数据。根据Reactive-Stream规范,下游必须通过backpressure信号来通告上游是还是不是足以发送数据状态,也等于说大家须求scalaz-stream-fs2来发出backpressure。scalaz-stream-fs2
async包里有个Queue结构:

现行反革命大家有了Reactive
stream
source,它是个akka-stream,该怎么衔接处于下游的scalaz-stream-fs2呢?大家领悟:akka-stream是Reactive
stream,而scalaz-stream-fs2是纯“拖式”pull-model
stream,也正是说上边那么些Reactive stream
source必须被动等待下游的scalaz-stream-fs2来读取数据。根据Reactive-Stream规范,下游必须透过backpressure信号来打招呼上游是或不是能够发送数据状态,也正是说大家须求scalaz-stream-fs2来发生backpressure。scalaz-stream-fs2
async包里有个Queue结构:

/**
 * Asynchronous queue interface. Operations are all nonblocking in their
 * implementations, but may be 'semantically' blocking. For instance,
 * a queue may have a bound on its size, in which case enqueuing may
 * block until there is an offsetting dequeue.
 */
trait Queue[F[_], A] { self =>
  /**
   * Enqueues one element in this `Queue`.
   * If the queue is `full` this waits until queue is empty.
   *
   * This completes after `a`  has been successfully enqueued to this `Queue`
   */
  def enqueue1(a: A): F[Unit]

  /**
   * Enqueues each element of the input stream to this `Queue` by
   * calling `enqueue1` on each element.
   */
  def enqueue: Sink[F, A] = _.evalMap(enqueue1)
  /** Dequeues one `A` from this queue. Completes once one is ready. */
  def dequeue1: F[A]
  /** Repeatedly calls `dequeue1` forever. */
  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat
...
}
/** * Asynchronous queue interface. Operations are all nonblocking in their * implementations, but may be 'semantically' blocking. For instance, * a queue may have a bound on its size, in which case enqueuing may * block until there is an offsetting dequeue. */trait Queue[F[_], A] { self =>  /**   * Enqueues one element in this `Queue`.   * If the queue is `full` this waits until queue is empty.   *   * This completes after `a`  has been successfully enqueued to this `Queue`   */  def enqueue1: F[Unit]  /**   * Enqueues each element of the input stream to this `Queue` by   * calling `enqueue1` on each element.   */  def enqueue: Sink[F, A] = _.evalMap  /** Dequeues one `A` from this queue. Completes once one is ready. */  def dequeue1: F[A]  /** Repeatedly calls `dequeue1` forever. */  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval, d => d._2).repeat...}

以此组织帮忙八线程操作,也正是说enqueue和dequeue能够在分裂的线程里操作。值得关心的是:enqueue会block,只有在形成了dequeue后才能持续。这一个dequeue就改成了平衡backpressure的管事格局了。具体操作方法是:上游在1个线程里用enqueue发送2个数据成分,然后等待下游完结在另2个线程里的dequeue操作,完成这一个轮回后再拓展下贰个要素的enqueue。enqueue代表akka-stream向scalaz-stream-fs2发送数据,能够用akka-stream的Sink构件来促成:

这一个布局帮忙十二线程操作,也正是说enqueue和dequeue能够在不相同的线程里操作。值得关怀的是:enqueue会block,唯有在完毕了dequeue后才能继续。这么些dequeue就改成了平衡backpressure的实用方法了。具体操作方法是:上游在二个线程里用enqueue发送八个数量成分,然后等待下游完毕在另1个线程里的dequeue操作,达成那个轮回后再展开下1个要素的enqueue。enqueue代表akka-stream向scalaz-stream-fs2发送数据,能够用akka-stream的Sink构件来促成:

 class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {
  val in = Inlet[T]("inport")
  val shape = SinkShape.of(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {
      override def preStart(): Unit = {
        pull(in)          //initiate stream elements movement
        super.preStart()
      }

      override def onPush(): Unit = {
        q.enqueue1(Some(grab(in))).unsafeRun()
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        q.enqueue1(None).unsafeRun()
        println("the end of stream !")
        completeStage()
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        q.enqueue1(None).unsafeRun()
        completeStage()
      }

      setHandler(in,this)

    }
}
 class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {  val in = Inlet[T]("inport")  val shape = SinkShape.of(in)  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =    new GraphStageLogic with InHandler {      override def preStart(): Unit = {        pull(in)          //initiate stream elements movement        super.preStart()      }      override def onPush(): Unit = {        q.enqueue1(Some(grab(in))).unsafeRun()        pull(in)      }      override def onUpstreamFinish(): Unit = {        q.enqueue1.unsafeRun()        println("the end of stream !")        completeStage()      }      override def onUpstreamFailure(ex: Throwable): Unit = {        q.enqueue1.unsafeRun()        completeStage()      }      setHandler(in,this)    }}

上述那些akka-stream
GraphStage描述了对上游每二个因素的enqueue动作。大家得以用scalaz-stream-fs2的flatMap来连串化运算八个线程里的enqueue和dequeue: 

上述那个akka-stream
GraphStage描述了对上游每多个要素的enqueue动作。大家能够用scalaz-stream-fs2的flatMap来系列化运算多少个线程里的enqueue和dequeue:

   val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))
     .flatMap { q =>
       Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture  //enqueue Task(new thread)
       pipe.unNoneTerminate(q.dequeue)      //dequeue in current thread
     }
   val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))     .flatMap { q =>       Task(source.to(new FS2Gate[RowType].unsafeRunAsyncFuture  //enqueue Task(new thread)       pipe.unNoneTerminate(q.dequeue)      //dequeue in current thread     }

那一个函数重临fs2.Stream[Task,RowType],是一种运算方案,大家亟须run来实在运算:

以此函数重返fs2.Stream[Task,RowType],是一种运算方案,大家务必run来实在运算:

  fs2Stream.map{row => toTypedRow(row)}
      .map(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"县名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    }).run.unsafeRun
  fs2Stream.map{row => toTypedRow}      .map(qmr => {      println(s"州名: ${qmr.state}")      println(s"县名:${qmr.county}")      println(s"年份:${qmr.year}")      println(s"取值:${qmr.value}")      println("-------------")    }).run.unsafeRun

由此测试运营,大家成功的为scalaz-stream-fs2达成了data
streaming。

透过测试运营,我们中标的为scalaz-stream-fs2完毕了data
streaming。

上面是此次示范的源代码:

上面是本次示范的源代码:

 

import slick.jdbc.H2Profile.api._import com.bayakala.funda._import api._import scala.language.implicitConversionsimport scala.concurrent.duration._import akka.actor._import akka.stream._import akka.stream.scaladsl._import akka.stream.stage._import slick.basic.DatabasePublisherimport akka._import fs2._import akka.stream.stage.{GraphStage, GraphStageLogic} class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {  val in = Inlet[T]("inport")  val shape = SinkShape.of(in)  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =    new GraphStageLogic with InHandler {      override def preStart(): Unit = {        pull(in)          //initiate stream elements movement        super.preStart()      }      override def onPush(): Unit = {        q.enqueue1(Some(grab(in))).unsafeRun()        pull(in)      }      override def onUpstreamFinish(): Unit = {        q.enqueue1.unsafeRun()        println("end of stream !!!!!!!")        completeStage()      }      override def onUpstreamFailure(ex: Throwable): Unit = {        q.enqueue1.unsafeRun()        completeStage()      }      setHandler(in,this)    }}object AkkaStreamSource extends App {  val aqmraw = Models.AQMRawQuery  val db = Database.forConfig("h2db")  // aqmQuery.result returns Seq[(String,String,String,String)]  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}  // type alias  type RowType = (String,String,String,String)  // user designed strong typed resultset type. must extend FDAROW  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW  // strong typed resultset conversion function. declared implicit to remind during compilation  implicit def toTypedRow(row: RowType): TypedRow =    TypedRow(row._1,row._2,row._3,row._4)  // construct DatabasePublisher from db.stream  val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)  // construct akka source  val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)  implicit val actorSys = ActorSystem("actor-system")  implicit val ec = actorSys.dispatcher  implicit val mat = ActorMaterializer()  /*  source.take.map{row => toTypedRow}.runWith(    Sink.foreach(qmr => {      println(s"州名: ${qmr.state}")      println(s"县名:${qmr.county}")      println(s"年份:${qmr.year}")      println(s"取值:${qmr.value}")      println("-------------")    })) */   val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))     .flatMap { q =>       Task(source.to(new FS2Gate[RowType].unsafeRunAsyncFuture  //enqueue Task(new thread)       pipe.unNoneTerminate(q.dequeue)      //dequeue in current thread     }  fs2Stream.map{row => toTypedRow}      .map(qmr => {      println(s"州名: ${qmr.state}")      println(s"县名:${qmr.county}")      println(s"年份:${qmr.year}")      println(s"取值:${qmr.value}")      println("-------------")    }).run.unsafeRun  scala.io.StdIn.readLine()  actorSys.terminate()}
import slick.jdbc.H2Profile.api._
import com.bayakala.funda._
import api._

import scala.language.implicitConversions
import scala.concurrent.duration._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import slick.basic.DatabasePublisher
import akka._
import fs2._
import akka.stream.stage.{GraphStage, GraphStageLogic}


 class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {
  val in = Inlet[T]("inport")
  val shape = SinkShape.of(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {
      override def preStart(): Unit = {
        pull(in)          //initiate stream elements movement
        super.preStart()
      }

      override def onPush(): Unit = {
        q.enqueue1(Some(grab(in))).unsafeRun()
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        q.enqueue1(None).unsafeRun()
        println("end of stream !!!!!!!")
        completeStage()
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        q.enqueue1(None).unsafeRun()
        completeStage()
      }

      setHandler(in,this)

    }
}

object AkkaStreamSource extends App {

  val aqmraw = Models.AQMRawQuery
  val db = Database.forConfig("h2db")
  // aqmQuery.result returns Seq[(String,String,String,String)]
  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
  // type alias
  type RowType = (String,String,String,String)
  // user designed strong typed resultset type. must extend FDAROW
  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
  // strong typed resultset conversion function. declared implicit to remind during compilation
  implicit def toTypedRow(row: RowType): TypedRow =
    TypedRow(row._1,row._2,row._3,row._4)
  // construct DatabasePublisher from db.stream
  val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)
  // construct akka source
  val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)

  implicit val actorSys = ActorSystem("actor-system")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer()

  /*
  source.take(10).map{row => toTypedRow(row)}.runWith(
    Sink.foreach(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"县名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    })) */

   val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))
     .flatMap { q =>
       Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture  //enqueue Task(new thread)
       pipe.unNoneTerminate(q.dequeue)      //dequeue in current thread
     }

  fs2Stream.map{row => toTypedRow(row)}
      .map(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"县名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    }).run.unsafeRun

  scala.io.StdIn.readLine()
  actorSys.terminate()

}