open Amall_types
open Am_Ops
open Printf
open Amall_http
open Cd_All
type port = int
type addr_string = string
type listen_addr =
[
| `Inet_any of port
| `Inet_loopback of port
| `Inet_str of (addr_string * port)
| `Inet_addr of (Unix.inet_addr * port)
]
module Http_server
(IO : IO_Type)
(I : It_type.IT with type 'a It_IO.m = 'a IO.m
and type It_IO.input_channel = IO.input_channel) :
sig
val post_form :
request -> (char, (request * ((string * string) list))) I.iteratee
val it_post_vars : (char, (string * string) list) I.iteratee
val run :
listen_addr ->
(request -> (segpath * Amall_http.Make(IO)(I).service_desc)) -> IO.
server
end =
struct
module H = Amall_http.Make(IO)(I)
let it_post_vars = H.it_post_vars
open I.Ops
let (runA : (('el, 'a) I.iteratee) IO.m -> 'a It_Types.res) =
fun i -> IO.runIO (i >>% I.run)
let rec printexc e =
match e with
| I.Iteratees_err_msg e -> printexc e
| _ -> Printexc.to_string e
let rec dump_chars_chunks title =
let rec step s =
match s with
| I.EOF oe ->
let err = (match oe with | None -> "eof" | Some e -> printexc e)
in
(IO.printf "dump_chars_chunks: %s: EOF: %s.\n" title err) >>%
(fun () -> I.ie_doneM () s)
| I.Chunk c ->
(IO.printf "dump_chars_chunks: %s: Chunk: %S\n" title
(I.Subarray.to_string c))
>>% (fun () -> I.ie_contM step)
in I.ie_cont step
let (post_form :
request -> (char, (request * ((string * string) list))) I.iteratee)
=
fun request ->
(I.printf
"request headers was read ok, request_uri: %S, uri: %S.\n\
headers:\n%s\n===\n%!"
(Uri.dump_uri request.rq_request_uri__)
(* Uri.dump_uri request.rq_uri *) ""
((String.concat "\n") &
(List.map (fun (k, v) -> sprintf "%S = %S" k v)
request.rq_headers.rq_all)))
>>=
(fun () -> (*
dump_chars_chunks "header"
*)
H.request_with_post_vars request)
(*
value _ () =
let fn = "post-req" in
match
runA & I.enum_file fn &
((H.it_http & post_form)
>>= fun r -> dump_chars_chunks "after body" >>= fun () ->
I.return r
)
with
[ `Ok _ -> ()
| `Error e -> eprintf "exception: %s\n%!" & printexc e
]
;
*)
let () = ignore & I.limit
(**********************************************************)
let can't_switch_ws_resp =
{
rs_status_code = 400;
rs_reason_phrase = "Bad request";
rs_headers = { rs_all = []; };
rs_body =
Body_string "Can't switch to Websockets after HTTP requests";
}
let ws_guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
open Am_Common
let (continue_ws_resp : string -> response) =
let accept handshake = let open Cryptokit
in
transform_string (Base64.encode_compact_pad ())
(hash_string (Hash.sha1 ()) (handshake ^ ws_guid))
and other_headers =
[ ("Upgrade", "websocket"); ("Connection", "Upgrade") ]
in
fun handshake ->
{
rs_status_code = 101;
rs_reason_phrase = "Switching Protocols";
rs_body = No_body;
rs_headers =
{
rs_all =
("Sec-WebSocket-Accept", (accept handshake)) ::
other_headers;
};
}
(* TODO: переписать с прямым доступом к массиву. *)
let it_uint16_as_64_netord =
I.head >>=
(fun b0 ->
I.head >>=
(fun b1 ->
I.return &
(Int64.of_int (((Char.code b0) lsl 8) lor (Char.code b1)))))
(* TODO: переписать с прямым доступом к массиву. *)
let ws_it_close_code ~masking_key =
I.head >>=
(fun b0 ->
I.head >>=
(fun b1 ->
I.return
((((Char.code b0) lxor (Char.code masking_key.[0])) lsl 8)
lor ((Char.code b1) lxor (Char.code masking_key.[1])))))
exception Ws_error of string
let ws_error msg = I.throw_err (Ws_error msg)
exception Ws_service_error of exn
(* TODO: переписать с прямым доступом к массиву и _прямо_. *)
let ws_len64 =
let h = I.head >>= (fun c -> I.return & (Int64.of_int (Char.code c)))
in
h >>=
(fun i0 -> let open Int64
in
let ( lor ) = Int64.logor
and ( land ) = Int64.logand
and ( lsl ) = Int64.shift_left
in
if (i0 land 0x80L) == 0x80L
then
ws_error
"ws_len64: 'the most significant bit MUST be 0' [rfc6455]"
else
h >>=
(fun i1 ->
h >>=
(fun i2 ->
h >>=
(fun i3 ->
h >>=
(fun i4 ->
h >>=
(fun i5 ->
h >>=
(fun i6 ->
h >>=
(fun i7 ->
I.return
(((((((i7 lor
(i6 lsl 8))
lor
(i5 lsl 16))
lor
(i4 lsl 24))
lor (i3 lsl 32))
lor (i2 lsl 40))
lor (i1 lsl 48))
lor (i0 lsl 56))))))))))
let ws_it_masking_key () =
I.head >>=
(fun a0 ->
I.head >>=
(fun a1 ->
I.head >>=
(fun a2 ->
I.head >>=
(fun a3 -> let module String = Bytes
in
let k = String.make 4 '\x00'
in
(k.[0] <- a0;
k.[1] <- a1;
k.[2] <- a2;
k.[3] <- a3;
I.return k)))))
let ws_it_frame_header =
let no_masking_key = Bytes.make 4 '\x00'
in
I.head >>=
(fun b0 ->
I.head >>=
(fun b1 ->
let fin = ((Char.code b0) land 0x80) == 0x80 in
let opcode_int = (Char.code b0) land 0x0F in
let mask = ((Char.code b1) land 0x80) == 0x80 in
let len1 = (Char.code b1) land 0x7F
in
(if len1 = 126
then it_uint16_as_64_netord
else
if len1 = 127
then ws_len64
else I.return (Int64.of_int len1))
>>=
(fun len ->
(if mask
then ws_it_masking_key ()
else I.return no_masking_key) >>=
(fun masking_key ->
(if opcode_int >= 0x8
then
if not fin
then
ws_error
"All control frames MUST NOT be fragmented."
else
if len > 125L
then
ws_error
"All control frames MUST have a payload length \
of 125 bytes or less."
else
(match opcode_int with
| 0x8 ->
if len = 0L
then I.return & (`Close None)
else
if len = 1L
then
ws_error
"Close: 'If there is a body, the first two bytes of \
the body MUST be a 2-byte unsigned integer', \
but body has only 1 byte."
else
(ws_it_close_code ~masking_key)
>>=
(fun status_code ->
let () =
dbg
"ws_it_frame_header: Close: status_code = %i"
status_code in
let () =
let m0 = masking_key.[0]
and m1 = masking_key.[1]
and m2 = masking_key.[2]
and m3 = masking_key.[3]
in
(masking_key.[0] <- m2;
masking_key.[1] <- m3;
masking_key.[2] <- m0;
masking_key.[3] <- m1)
in
I.return &
(`Close
(Some status_code)))
| 0x9 -> I.return `Ping
| 0xA -> I.return `Pong
| x -> I.return & (`Ctrl x))
else
I.return
(match opcode_int with
| 0x0 -> `Cont
| 0x1 -> `Text
| 0x2 -> `Binary
| x -> `Nonctrl x))
>>=
(fun opcode ->
let len =
match opcode with
| `Close (Some _) -> Int64.sub len 2L
| _ -> len
in
let open H
in
I.return &
{
fin = fin;
opcode = opcode;
len = len;
masking_key = masking_key;
})))))
type ws_it_feed_resp =
[
| `Frame_going of
(((char, unit) I.iteratee) *
[ | `Finished | `Continues ])
| `Closed of int
]
type ws_it_loop_data_state =
[ | `Frame_going of (char, unit) I.iteratee | `Frame_done
]
type ws_it_loop_resp =
[
| ws_it_feed_resp
| `Closed of int
| (* status code *)
`Ping of
(((char, string) I.iteratee) * (* body *)
ws_it_loop_data_state)
| `Proto_error of (int * string)
| (* (code, reason) *)
`Frame_done
]
let enee_unmask ~masking_key ~growing_array :
(Byte.t, Byte.t, 'a) I.enumeratee =
fun it ->
let mlen = String.length masking_key
in
if mlen = 0
then
I.throw_err &
(Invalid_argument "enum_unmask: masking_key is empty")
else
(let rec unmask ~mi it =
match it with
| I.IE_done _ | I.IE_cont ((Some _), _) -> I.return it
| I.IE_cont (None, k) -> I.ie_cont (step ~mi it k)
and step ~mi it k s =
match s with
| I.EOF _ -> I.ie_doneM it s
| I.Chunk c -> let open I.S.C
in
let clen = c.len in
let tarr =
Array.GrowingArray.request_array clen growing_array in
let carr = c.arr and cofs = c.ofs in
let cmaxi = cofs + clen in
let rec loop ~mi ~ci ~ti =
if ci = cmaxi
then mi
else
(tarr.(ti) <-
Char.chr
((Char.code carr.(ci)) lxor
(Char.code masking_key.[mi]));
loop ~mi: ((mi + 1) mod mlen) ~ci: (ci + 1)
~ti: (ti + 1)) in
let mi = loop ~mi ~ci: cofs ~ti: 0 in
let new_chunk = mk ~arr: tarr ~ofs: 0 ~len: clen
in
(k (I.Chunk new_chunk)) >>%
(fun (it, _s) ->
IO.return ((unmask ~mi it), I.Sl.empty))
in unmask ~mi: 0 it)
let wrap_feeding it_f =
I.catch it_f
(fun e ->
match e with
| Ws_service_error e -> I.throw_err e
| e -> I.throw_err & (Ws_service_error e))
let feed_going_it ~growing_array ~fh it =
let len_int = Int64.to_int fh.len in
let () = assert (fh.len = (Int64.of_int len_int))
in
(wrap_feeding
(fun () ->
I.take_exact len_int
(I.joinI &
(enee_unmask ~growing_array ~masking_key: fh.masking_key
it))))
>>=
(fun (ex, it) ->
match ex with
| `Done ->
I.return &
(`Frame_going (it,
(if fh.fin then `Finished else `Continues)))
| `Eof ((has_read, opt_err_msg)) ->
let () =
dbg
"ws: it_loop: feed_going_it: got %i bytes \
instead of %i because of %s; closing."
(len_int - has_read) len_int
(match opt_err_msg with
| None -> "normal EOF"
| Some e -> sprintf "error %s" (Printexc.to_string e))
in
(wrap_feeding
(fun () ->
I.feed_it it
(I.EOF (Some (H.Ws.Close_received 1006)))))
>>= (fun _it -> I.return & (`Closed 1006)))
let do_websocket outsock (sl_l : (char I.sl) Lazy.t) enum_part
(ws_worker : H.websocket_service_func_worker) : unit IO.m =
let worker_got_close = ref false
and growing_array = Array.GrowingArray.make 100 '\x00' in
let rec it_loop (data_state : ws_it_loop_data_state) :
(char, ws_it_loop_resp) I.iteratee =
let () = dbg "ws: it_loop"
in
(I.ie_cont &
(fun s ->
match s with
| I.EOF _ ->
let () = dbg "it_loop: stream: EOF" in I.ie_doneM () s
| I.Chunk c ->
let str = I.S.to_string c in
let () =
dbg "it_loop: stream: Chunk \"%s\""
(Bytes.hexdump ~style: `Line str)
in I.ie_doneM () s))
>>=
(fun () ->
let () = dbg "ws: it_loop: after dump"
in
I.peek >>=
(fun p ->
let () =
dbg "ws: it_loop: peek = %s"
(match p with
| None -> "None"
| Some c -> sprintf "Some %C" c)
in
ws_it_frame_header >>=
(fun fh ->
let () = dbg "ws: it_loop: fh" in
let rec ret_closed ~opc ~code =
let () = worker_got_close := true
in
(wrap_feeding
(fun () -> feed_new_worker ~opc))
>>= (fun () -> I.return & (`Closed code))
and feed :
data_state: ws_it_loop_data_state ->
opc: _ -> (char, ws_it_feed_resp) I.iteratee =
fun ~data_state ~opc ->
let it =
match data_state with
| `Frame_done ->
wrap_feeding (fun () -> ws_worker opc)
| `Frame_going it -> it
in feed_going_it ~growing_array ~fh it
and feed_new_worker ~opc :
(char, unit) I.iteratee =
(feed ~data_state: `Frame_done ~opc) >>=
(fun fr ->
match fr with
| `Frame_going (it, _) ->
(wrap_feeding
(fun () ->
I.feed_it it (I.EOF None)))
>>= (fun _it -> I.return ())
| `Closed _code -> I.return ())
and ret_data_state () =
I.return &
(match data_state with
| (`Frame_done as x) -> x
| `Frame_going it ->
`Frame_going (it, `Continues))
in
match fh.opcode with
| (`Close opt_code as opc) ->
let code =
(match opt_code with
| Some x -> x
| None -> 1005)
in
(match data_state with
| `Frame_done -> ret_closed ~opc ~code
| `Frame_going it ->
(wrap_feeding
(fun () ->
I.feed_it it
(I.EOF
(Some
(H.Ws.Close_received
code)))))
>>=
(fun _it ->
(* итерат, получив EOF, должен перейти в
"готовое состояние", а оно неинтересно. *)
ret_closed ~opc ~code))
| `Ping ->
(feed_going_it ~growing_array ~fh I.
gather_to_string)
>>=
(fun fr ->
match fr with
| (`Closed _ as c) -> I.return c
| `Frame_going (gath_it, _cont_fin)
->
I.return &
(`Ping (gath_it, data_state)))
| `Pong -> ret_data_state ()
| (`Ctrl _ as opc) ->
(feed_new_worker ~opc) >>=
(fun () -> ret_data_state ())
| (`Text | `Binary | `Nonctrl _ as opc) ->
let () =
dbg "ws: it_loop: text/binary/nonctrl"
in
(match data_state with
| `Frame_going it ->
let msg =
"data frame without previous fin"
in
(wrap_feeding
(fun () ->
I.feed_it it
(I.EOF
(Some (Ws_error msg)))))
>>=
(* ошибка, так как при фрейме с fin=true состояние станет
_done, а при исходном _going должен быть fin=true перед
другими фреймами. *)
(fun _it ->
I.return &
(`Proto_error (1002, msg)))
| `Frame_done ->
(feed ~data_state ~opc :>
(char, ws_it_loop_resp) I.
iteratee))
| `Cont ->
(match data_state with
| `Frame_done ->
I.return &
(`Proto_error (1003,
"continuation frame without \
previous beginning frame"))
| (* продолжение без предыдущего начала *)
`Frame_going it ->
(feed_going_it it ~growing_array ~fh :>
(char, ws_it_loop_resp) I.iteratee)))))
and
io_loop (enum_part : (char, ws_it_loop_resp) I.enumpart) sl_l
data_state =
let () = dbg "ws: io_loop"
in
if H.Ws.is_close_sent outsock
then IO.return ()
else
(let sl = Lazy.force sl_l
in
(enum_part sl (it_loop data_state)) >>%
(fun (loop_it, sl_l, enp_opt) ->
let () = dbg "ws: io_loop: enum_part returned"
in
(let proc_error e =
let () =
dbg "ws: io_loop: error: %s"
(Printexc.to_string &
(let rec loop e =
match e with
| Ws_service_error e |
I.Iteratees_err_msg e -> loop e
| _ -> e
in loop e)) in
let () =
dbg
"ws: io_loop: proc_error: 1: is_close_sent = %b"
(H.Ws.is_close_sent outsock)
in
(if H.Ws.is_close_sent outsock
then IO.return ()
else
(match e with
| Ws_service_error e ->
let msg = Printexc.to_string e in
let msg =
if (String.length msg) > 125
then (String.sub msg 0 122) ^ "..."
else msg in
let () =
dbg "ws: io_loop: service error: %S" msg
in H.Ws.close outsock (Some ((1011, msg)))
| _ -> H.Ws.close outsock None))
>>%
(fun () ->
let () =
dbg
"ws: it_loop: proc_error: 2: is_close_sent = %b"
(H.Ws.is_close_sent outsock)
in IO.return & (`Closed 1005))
in
IO.catch (fun () -> I.run loop_it)
(fun e -> proc_error e))
>>%
(fun loop_resp ->
let () = dbg "io_loop: analyzing loop_resp"
in
(match loop_resp with
| `Frame_going (it, `Finished) ->
(I.run it) >>%
(fun () ->
let () =
dbg
"ws: io_loop: finished service iteratee"
in IO.return `Frame_done)
| `Frame_going (it, `Continues) ->
IO.return & (`Frame_going it)
| `Frame_done -> IO.return `Frame_done
| `Closed code ->
(if not !worker_got_close
then
(let () = worker_got_close := true
in
I.run
(ws_worker (`Close (Some code))))
else IO.return ()) >>%
(fun () ->
if H.Ws.is_close_sent outsock
then IO.return `Frame_done
else
(H.Ws.close outsock
(Some ((code, ""))))
>>%
(fun () -> IO.return `Frame_done))
| (* will exit after entering io_loop *)
`Ping (it_body, old_state) ->
(I.run it_body) >>%
(fun body ->
(H.Ws.send outsock `Pong body) >>%
(fun () -> IO.return old_state))
| `Proto_error ((code : int), (msg : string))
->
let () = worker_got_close := true
in
(I.run (ws_worker (`Close (Some code))))
>>%
(fun () ->
if H.Ws.is_close_sent outsock
then IO.return `Frame_done
else
(H.Ws.close outsock
(Some ((code, msg))))
>>%
(fun () -> IO.return `Frame_done)))
>>%
(fun data_state ->
match enp_opt with
| I.EP_None ->
let () = dbg "ws: io_loop: ep_none"
in
(match data_state with
| `Frame_done -> IO.return ()
| `Frame_going it ->
I.run &
(I.feed_it it
(I.EOF
(Some
(H.Ws.Close_received
1006)))))
>>%
(fun () ->
let () =
dbg
"ws: io_loop: ep_none: worker_got_close = %b"
!worker_got_close
in
if not !worker_got_close
then
(let () =
worker_got_close := true
in
I.run
(ws_worker
(`Close (Some 1006))))
else IO.return ())
| (* just exit *) I.EP_Some enum_part ->
let () = dbg "ws: io_loop: ep_some"
in
io_loop enum_part.I.enumpart_poly
sl_l data_state))))
in io_loop enum_part sl_l `Frame_done
(**********************************************************)
let dbg fmt = Printf.ksprintf (Printf.printf "hs: %s\n%!") fmt
let http_server_func (userfunc : request -> (segpath * H.service_desc))
(inch, outch) : unit =
IO.run_and_ignore_result
(let string_of_exn e =
let rec loop e =
match e with
| I.Iteratees_err_msg e -> "it/" ^ (loop e)
| Ws_service_error e -> "wssrv/" ^ (loop e)
| _ -> Printexc.to_string e
in loop e
in
try
let closed = ref false in
let do_close () =
let () = dbg "http_server_func: close 0"
in
if !closed
then IO.return ()
else
IO.catch
(fun () ->
let () = closed := true in
let () = dbg "http_server_func: close 1"
in
(IO.close_in inch) >>%
(fun () ->
let () = dbg "http_server_func: close 2"
in
(IO.close_out outch) >>%
(fun () ->
let () =
dbg "http_server_func: close 3"
in IO.return ())))
(fun e ->
let () =
dbg "http_server_func: close error: %s"
(Printexc.to_string e)
in IO.return ())
in
IO.catch
(fun () ->
let it_req = H.it_http userfunc in
let rec loop can_switch_to_ws (it, sl_l, opt_enp) =
let () = dbg "hs: entered loop"
in
match opt_enp with
| I.EP_None -> do_close ()
| I.EP_Some cont ->
let () = dbg "got cont"
in
(match it with
| I.IE_cont ((Some e), _) ->
let () =
dbg "cont: error %S"
(Printexc.to_string e)
in do_close ()
| I.IE_cont (None, _) ->
let () = dbg "cont: cont"
in failwith "cont-cont1111"
| I.IE_done http_or_ws_resp ->
let () = dbg "hsf: got req/resp"
in
(match http_or_ws_resp with
| (request, `Http it) ->
(I.run it) >>%
(fun resp ->
let is_head =
request.rq_method = `HEAD
in
(H.output_response ~is_head
outch resp)
>>%
(fun () ->
let need_to_close
=
(List.mem `Close
request.rq_headers.
connection)
||
(List.exists
(fun (hk, hv) ->
(hk =
"Connection")
&&
(hv =
"close"))
resp.rs_headers.
rs_all)
in
if need_to_close
then do_close ()
else
(IO.flush outch)
>>%
(fun () ->
let sl
=
Lazy.force
sl_l
in
(cont.I.
enumpart_poly
sl it_req)
>>%
(loop
false))))
| (request,
`Ws (handshake, ws_service, segpath))
->
if can_switch_to_ws
then
(H.output_response
~is_head: false outch
(continue_ws_resp handshake))
>>%
(fun () ->
let ws_out_socket =
H.Ws.of_out_channel outch in
let worker =
Partapp3.apply ws_service
segpath request
ws_out_socket
in
(IO.catch
(fun () ->
(do_websocket
ws_out_socket
sl_l
cont.I.
enumpart_poly
worker)
>>%
(fun () ->
IO.return &
(`Ok ())))
(fun e ->
IO.return &
(`Error e)))
>>%
(fun res ->
match res with
| `Ok () ->
IO.return ()
| `Error e ->
IO.error e))
else
(H.output_response
~is_head: false outch
can't_switch_ws_resp)
>>% (fun () -> do_close ())))
in
(I.enumpart_readchars ~buffer_size: 4096
~read_func: IO.read_into inch I.Sl.empty it_req)
>>% (loop true))
(fun e ->
((Printf.eprintf "amall http: exception: %s\n%!") &
(string_of_exn e);
do_close ()))
with
| e ->
((Printf.eprintf "amall http: uncaught exception: %s\n%!") &
(string_of_exn e);
IO.return ()))
let run_addr listen_addr
(userfunc : request -> (segpath * H.service_desc)) =
IO.establish_server listen_addr (http_server_func userfunc)
let run listen_addr userfunc =
let a =
match listen_addr with
| `Inet_any port -> Unix.ADDR_INET ((Unix.inet_addr_any, port))
| `Inet_loopback port ->
Unix.ADDR_INET ((Unix.inet_addr_loopback, port))
| `Inet_str (str, port) ->
Unix.ADDR_INET (((Unix.inet_addr_of_string str), port))
| `Inet_addr (a, p) -> Unix.ADDR_INET ((a, p))
in run_addr a userfunc
end