澳门永利备用网址而Akka-http的SSE应该比较相符大家的渴求,而Akka-http的SSE应该相比符合大家的要求

 
 因为我明白Akka-http的重大目的不是为了有关Web-Server的编程,而是想已毕一套系统合而为一的api,所以也急需考虑由服务端主动向客户端发送指令的使用场景。比如一个零售店管理平台的服务端在做到了一点数据更新后要求文告各零售门市客户端下载最新数据。固然Akka-http也提供对websocket商谈的匡助,但websocket的互联网连接是双向恒久的,适合频仍的问答交互式服务端与客户端的互换,音信结构也比较零碎。而大家面临的大概是批次型的汪洋数据库数据交流,只须要不难的劳务端单向音信就行了,所以websocket不太合适,而Akka-http的SSE应该相比适合大家的需求。SSE形式的基本原理是服务端统一集中表露讯息,各客户端持久订阅服务端揭橥的音信并从音信的情节中筛选出属于自个儿应当履行的命令,然后开展对应的处理。客户端接收SSE是在一个独门的线程里持续开展的,不会潜移默化客户端当前的演算流程。当收到有效的新闻后就会调用一个工作职能函数作为后台异步运算义务。

 
 因为自己询问Akka-http的关键目标不是为了有关Web-Server的编程,而是想已毕一套系统合两为一的api,所以也必要考虑由服务端主动向客户端发送指令的行使场景。比如一个零售店管理平台的服务端在成就了好几数据更新后需求通知各零售门市客户端下载最新数据。即使Akka-http也提供对websocket协和的帮忙,但websocket的网络连接是双向恒久的,适合频仍的问答交互式服务端与客户端的互换,新闻结构也相比较零碎。而大家面临的只怕是批次型的豁达数据库数据沟通,只须求不难的服务端单向音讯就行了,所以websocket不太适宜,而Akka-http的SSE应该相比符合大家的须要。SSE形式的基本原理是服务端统一集中公布新闻,各客户端持久订阅服务端发表的新闻并从音信的内容中筛选出属于自身应有实施的授命,然后进行相应的拍卖。客户端接收SSE是在一个单独的线程里穿梭进行的,不会影响客户端当前的演算流程。当接过有效的音讯后就会调用一个业务功能函数作为后台异步运算义务。

服务端的SSE公布是以Source[ServerSentEvent,NotUsed]来落到实处的。ServerSent伊芙nt类型定义如下:

服务端的SSE发布是以Source[ServerSentEvent,NotUsed]来贯彻的。ServerSent伊夫nt类型定义如下:

/**
 * Representation of a server-sent event. According to the specification, an empty data field designates an event
 * which is to be ignored which is useful for heartbeats.
 *
 * @param data data, may span multiple lines
 * @param eventType optional type, must not contain \n or \r
 * @param id optional id, must not contain \n or \r
 * @param retry optional reconnection delay in milliseconds
 */
final case class ServerSentEvent(
  data:      String,
  eventType: Option[String] = None,
  id:        Option[String] = None,
  retry:     Option[Int]    = None) {...}
/**
 * Representation of a server-sent event. According to the specification, an empty data field designates an event
 * which is to be ignored which is useful for heartbeats.
 *
 * @param data data, may span multiple lines
 * @param eventType optional type, must not contain \n or \r
 * @param id optional id, must not contain \n or \r
 * @param retry optional reconnection delay in milliseconds
 */
final case class ServerSentEvent(
  data:      String,
  eventType: Option[String] = None,
  id:        Option[String] = None,
  retry:     Option[Int]    = None) {...}

其一类型的参数代表事件消息的数据结构。用户可以根据实际要求丰富利用那几个数据结构来传递信息。服务端是经过complete以SeverSent伊夫nt类为成分的Source来进展SSE的,如下:

以此类型的参数代表事件消息的数据结构。用户可以依照实际必要丰裕利用这一个数据结构来传递消息。服务端是透过complete以SeverSent伊夫nt类为因素的Source来进行SSE的,如下:

    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
         complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
         complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }

如上代码代表劳务端定时运算processToServerSent伊芙nt再次回到ServerSent伊芙nt类型结果后公布给持有订阅的客户端。大家用一个函数processToServerSent伊夫nt模拟重复运算的事情成效:

以上代码代表劳务端定时运算processToServerSent伊芙nt再次来到ServerSent伊芙nt类型结果后公告给所有订阅的客户端。我们用一个函数processToServerSent伊夫nt模拟重复运算的事体功用:

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }
  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }

以此函数模拟发布事件数量是某种业务运算结果,在那边表示客户端必要下载文件名称。大家用客户端request来模拟设定那么些文件名称:

其一函数模拟公布事件数量是某种业务运算结果,在此间代表客户端需求下载文件名称。我们用客户端request来效仿设定这一个文件名称:

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }
  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

客户端订阅SSE的不二法门如下:

客户端订阅SSE的方法如下:

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))
    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

每当客户端收到SSE后即运行downloadFiles(filename)函数。downloadFiles函数定义:

每当客户端收到SSE后即运行downloadFiles(filename)函数。downloadFiles函数定义:

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }
  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

下面是客户端程序的测试运算步骤:

上面是客户端程序的测试运算步骤:

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }
    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }

运算结果:

运算结果:

do some thing ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders

do some other things ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders
Try to download Items
Try to download Items

Try to download Items

Process finished with exit code 0
do some thing ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders

do some other things ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders
Try to download Items
Try to download Items

Try to download Items

Process finished with exit code 0

上面是本次切磋的示范源代码:

下边是此次啄磨的示范源代码:

服务端:

服务端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.DurationInt
import akka.http.scaladsl.model.sse.ServerSentEvent

object SSEServer {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()
    Http().bindAndHandle(route, "localhost", 8011)

    scala.io.StdIn.readLine()
    system.terminate()
  }

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

    def events =
      path("events") {
        get {
          complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
        }
      }

    syncRequests ~ events
  }

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }
}
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.DurationInt
import akka.http.scaladsl.model.sse.ServerSentEvent

object SSEServer {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()
    Http().bindAndHandle(route, "localhost", 8011)

    scala.io.StdIn.readLine()
    system.terminate()
  }

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

    def events =
      path("events") {
        get {
          complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
        }
      }

    syncRequests ~ events
  }

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }
}

客户端:

客户端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model._

object SSEClient {

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }


    scala.io.StdIn.readLine()
    system.terminate()
  }
}
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model._

object SSEClient {

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }


    scala.io.StdIn.readLine()
    system.terminate()
  }
}

 

 

 我的博客即将联合至腾讯云+社区。邀我们一同入驻http://cloud.tencent.com/developer/support-plan

 我的博客即将联合至腾讯云+社区。邀大家一同入驻http://cloud.tencent.com/developer/support-plan