воскресенье, 21 октября 2018 г.

clojure interceptors

1. dep.edn
{:paths       ["src/clj" "src/cljs" "test" "resources" "target"]
 :extra-paths ["resources" "resources/public"]              ;; fix cambada packaging
 :deps        {org.clojure/clojure          {:mvn/version "1.10.0-RC1"}
               org.clojure/core.async       {:mvn/version "0.4.474"}
               org.clojure/spec.alpha       {:mvn/version "0.2.176"}
               org.clojure/core.specs.alpha {:mvn/version "0.2.44"}
               http-kit/http-kit            {:mvn/version "2.3.0"}
               metosin/sieppari             {:mvn/version "0.0.0-alpha5"}
               fipp                         {:mvn/version "0.6.13"}}}

2. source code

(ns test01.core  (:gen-class)
  (:require [org.httpkit.server :as kit]
            [org.httpkit.client :as kit-client]
            [sieppari.core :as sieppari]
            [sieppari.context]
            [clojure.core.async :as a :refer [go chan <! <!! >! >!! timeout close! put! take! alts! alts!!]]))


(defn prn-tap [v]
  (println v))

(add-tap prn-tap)

;;(require '[fipp.edn :refer (pprint) :rename {pprint fipp}])
(def int-1  {:name :int-1   :enter (fn [ctx]
            (tap> :int-1)
            ctx)
   :leave (fn [ctx]
            (tap> :int-1-leave)
            ctx)})


(def int-2  {:name :int-2   :enter (fn [ctx]
            (go              (<! (a/timeout 1000))
              (tap> :int-2)
              ctx))})

(def int-3  {:name :int-3   :enter (fn [ctx]
            (sieppari.context/terminate ctx {:status  200                                             :headers {"Content-Type" "text/plain"}
                                             :body    "yo! error!"}))})

(def int-4  {:name :int-4   :enter (fn [ctx]
            (/ 1 0)
            ctx)
   :error (fn [ctx]
            (tap> ["got error:" (-> ctx :error .getMessage)])
            ;;(fipp ctx)            (assoc ctx :error nil                       :response {:status  200                                  :headers {"Content-Type" "text/plain"}
                                  :body   ["got error: " (-> ctx :error .getMessage)] }))})

(defn handler [request]
  {:status  200   :headers {"Content-Type" "text/plain"}
   :body    (:remote-addr request)})

(def pipeline [int-1 int-2 #_int-3 #_int-4 handler])

(defn exec-request [request]
  (a/thread    (tap> ["exec  " (.getName (Thread/currentThread))])
    (sieppari/execute pipeline request)))


(defn drain [c]
  (go (while (not (nil? (<! c)))
        nil)))

(defn async-kit-app  [request]
  (tap> ["async " (.getName (Thread/currentThread))])
  (kit/with-channel request channel                    (do                      (take! (exec-request request) #(kit/send! channel %))
                      (tap> ["async2 " (.getName (Thread/currentThread))]))))

(def params {:host "0.0.0.0" :port 8080})

(defn start-kit  [handler]
  (kit/run-server handler params))


(defn -main "entry point"  [& args]
  (start-kit async-kit-app)
  (println "started: " params))

(defn handle-response  [resp]
  (println (:body resp)))

(comment  (def stop-fn (start-kit async-kit-app))
  (time (dotimes [_ 1000]
     (kit-client/get "http://localhost:8080" {:timeout 20000} handle-response)))
  (slurp "http://localhost:8080")
  (stop-fn))


(ns test01.ret  (:gen-class)
  (:require [org.httpkit.server :as kit]
            [org.httpkit.client :as kit-client]
            [sieppari.core :as sieppari]
            [sieppari.context]
            [muuntaja.interceptor]
            [clojure.core.async :as a :refer [go chan <! <!! >! >!! timeout close! put! take! alts! alts!!]]
            [reitit.ring :as ring]
            [reitit.http :as http]
            [reitit.interceptor.sieppari]))



(defn interceptor [f x]
  {:enter (fn [ctx] (f (update-in ctx [:request :via] (fnil conj []) {:enter x})))
   :leave (fn [ctx] (f (update-in ctx [:response :body] conj {:leave x})))})

(defn interceptor2 [this-name]
  {
   :enter (fn [ctx] (update-in ctx [:request :via] (fnil conj []) {:enter this-name}))
   :leave (fn [ctx] (update-in ctx [:response :body] conj {:leave this-name}))})

(defn handler [f]
  (fn [{:keys [via]}]
    (f {:status 200,
        :body   (conj via :handler)})))


(def  #(a/go %))
(def  identity)

(def routes (http/router [["/" {:interceptors [(interceptor  :async) (interceptor2 :root-2)]
                                :get          {:interceptors [(interceptor  :get) (interceptor2 :get-2)]
                                               :handler      (handler )}}]
                          ]))

(def app (http/ring-handler routes                            (ring/create-default-handler)
                            {:executor     reitit.interceptor.sieppari/executor                             :interceptors [(muuntaja.interceptor/format-interceptor)]}))


(def params {:host "0.0.0.0" :port 8080})

(defn start-server  [handler params]
  (kit/run-server handler params))


(defn -main "entry point"  [& args]
  (start-server app params)
  (println "server is started: " params))



;; ----------- client and repl zone---------------------------------------(defn handle-response  [resp]
  (println (:body resp)))

(comment  (def stop-fn (start-server app params))
  (time (dotimes [_ 10]
          (kit-client/get "http://localhost:8080" {:timeout 20000} handle-response)))
  (time (slurp "http://localhost:8080"))
  (stop-fn))