(IO : IO_Type) (I : It_type.IT with type 'a It_IO.m = 'a IO.m and type It_IO.input_channel = IO.input_channel) : sig val post_form : request -> (char, (request * ((string * string) list))) I.iteratee val it_post_vars : (char, (string * string) list) I.iteratee val run : listen_addr -> (request -> (segpath * Amall_http.Make(IO)(I).service_desc)) -> IO. server end = struct module H = Amall_http.Make(IO)(I) let it_post_vars = H.it_post_vars open I.Ops let (runA : (('el, 'a) I.iteratee) IO.m -> 'a It_Types.res) = fun i -> IO.runIO (i >>% I.run) let rec printexc e = match e with | I.Iteratees_err_msg e -> printexc e | _ -> Printexc.to_string e let rec dump_chars_chunks title = let rec step s = match s with | I.EOF oe -> let err = (match oe with | None -> "eof" | Some e -> printexc e) in (IO.printf "dump_chars_chunks: %s: EOF: %s.\n" title err) >>% (fun () -> I.ie_doneM () s) | I.Chunk c -> (IO.printf "dump_chars_chunks: %s: Chunk: %S\n" title (I.Subarray.to_string c)) >>% (fun () -> I.ie_contM step) in I.ie_cont step let (post_form : request -> (char, (request * ((string * string) list))) I.iteratee) = fun request -> (I.printf "request headers was read ok, request_uri: %S, uri: %S.\n\ headers:\n%s\n===\n%!" (Uri.dump_uri request.rq_request_uri__) (* Uri.dump_uri request.rq_uri *) "" ((String.concat "\n") & (List.map (fun (k, v) -> sprintf "%S = %S" k v) request.rq_headers.rq_all))) >>= (fun () -> (* dump_chars_chunks "header" *) H.request_with_post_vars request) (* value _ () = let fn = "post-req" in match runA & I.enum_file fn & ((H.it_http & post_form) >>= fun r -> dump_chars_chunks "after body" >>= fun () -> I.return r ) with [ `Ok _ -> () | `Error e -> eprintf "exception: %s\n%!" & printexc e ] ; *) let () = ignore & I.limit (**********************************************************) let can't_switch_ws_resp = { rs_status_code = 400; rs_reason_phrase = "Bad request"; rs_headers = { rs_all = []; }; rs_body = Body_string "Can't switch to Websockets after HTTP requests"; } let ws_guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" open Am_Common let (continue_ws_resp : string -> response) = let accept handshake = let open Cryptokit in transform_string (Base64.encode_compact_pad ()) (hash_string (Hash.sha1 ()) (handshake ^ ws_guid)) and other_headers = [ ("Upgrade", "websocket"); ("Connection", "Upgrade") ] in fun handshake -> { rs_status_code = 101; rs_reason_phrase = "Switching Protocols"; rs_body = No_body; rs_headers = { rs_all = ("Sec-WebSocket-Accept", (accept handshake)) :: other_headers; }; } (* TODO: переписать с прямым доступом к массиву. *) let it_uint16_as_64_netord = I.head >>= (fun b0 -> I.head >>= (fun b1 -> I.return & (Int64.of_int (((Char.code b0) lsl 8) lor (Char.code b1))))) (* TODO: переписать с прямым доступом к массиву. *) let ws_it_close_code ~masking_key = I.head >>= (fun b0 -> I.head >>= (fun b1 -> I.return ((((Char.code b0) lxor (Char.code masking_key.[0])) lsl 8) lor ((Char.code b1) lxor (Char.code masking_key.[1]))))) exception Ws_error of string let ws_error msg = I.throw_err (Ws_error msg) exception Ws_service_error of exn (* TODO: переписать с прямым доступом к массиву и _прямо_. *) let ws_len64 = let h = I.head >>= (fun c -> I.return & (Int64.of_int (Char.code c))) in h >>= (fun i0 -> let open Int64 in let ( lor ) = Int64.logor and ( land ) = Int64.logand and ( lsl ) = Int64.shift_left in if (i0 land 0x80L) == 0x80L then ws_error "ws_len64: 'the most significant bit MUST be 0' [rfc6455]" else h >>= (fun i1 -> h >>= (fun i2 -> h >>= (fun i3 -> h >>= (fun i4 -> h >>= (fun i5 -> h >>= (fun i6 -> h >>= (fun i7 -> I.return (((((((i7 lor (i6 lsl 8)) lor (i5 lsl 16)) lor (i4 lsl 24)) lor (i3 lsl 32)) lor (i2 lsl 40)) lor (i1 lsl 48)) lor (i0 lsl 56)))))))))) let ws_it_masking_key () = I.head >>= (fun a0 -> I.head >>= (fun a1 -> I.head >>= (fun a2 -> I.head >>= (fun a3 -> let module String = Bytes in let k = String.make 4 '\x00' in (k.[0] <- a0; k.[1] <- a1; k.[2] <- a2; k.[3] <- a3; I.return k))))) let ws_it_frame_header = let no_masking_key = Bytes.make 4 '\x00' in I.head >>= (fun b0 -> I.head >>= (fun b1 -> let fin = ((Char.code b0) land 0x80) == 0x80 in let opcode_int = (Char.code b0) land 0x0F in let mask = ((Char.code b1) land 0x80) == 0x80 in let len1 = (Char.code b1) land 0x7F in (if len1 = 126 then it_uint16_as_64_netord else if len1 = 127 then ws_len64 else I.return (Int64.of_int len1)) >>= (fun len -> (if mask then ws_it_masking_key () else I.return no_masking_key) >>= (fun masking_key -> (if opcode_int >= 0x8 then if not fin then ws_error "All control frames MUST NOT be fragmented." else if len > 125L then ws_error "All control frames MUST have a payload length \ of 125 bytes or less." else (match opcode_int with | 0x8 -> if len = 0L then I.return & (`Close None) else if len = 1L then ws_error "Close: 'If there is a body, the first two bytes of \ the body MUST be a 2-byte unsigned integer', \ but body has only 1 byte." else (ws_it_close_code ~masking_key) >>= (fun status_code -> let () = dbg "ws_it_frame_header: Close: status_code = %i" status_code in let () = let m0 = masking_key.[0] and m1 = masking_key.[1] and m2 = masking_key.[2] and m3 = masking_key.[3] in (masking_key.[0] <- m2; masking_key.[1] <- m3; masking_key.[2] <- m0; masking_key.[3] <- m1) in I.return & (`Close (Some status_code))) | 0x9 -> I.return `Ping | 0xA -> I.return `Pong | x -> I.return & (`Ctrl x)) else I.return (match opcode_int with | 0x0 -> `Cont | 0x1 -> `Text | 0x2 -> `Binary | x -> `Nonctrl x)) >>= (fun opcode -> let len = match opcode with | `Close (Some _) -> Int64.sub len 2L | _ -> len in let open H in I.return & { fin = fin; opcode = opcode; len = len; masking_key = masking_key; }))))) type ws_it_feed_resp = [ | `Frame_going of (((char, unit) I.iteratee) * [ | `Finished | `Continues ]) | `Closed of int ] type ws_it_loop_data_state = [ | `Frame_going of (char, unit) I.iteratee | `Frame_done ] type ws_it_loop_resp = [ | ws_it_feed_resp | `Closed of int | (* status code *) `Ping of (((char, string) I.iteratee) * (* body *) ws_it_loop_data_state) | `Proto_error of (int * string) | (* (code, reason) *) `Frame_done ] let enee_unmask ~masking_key ~growing_array : (Byte.t, Byte.t, 'a) I.enumeratee = fun it -> let mlen = String.length masking_key in if mlen = 0 then I.throw_err & (Invalid_argument "enum_unmask: masking_key is empty") else (let rec unmask ~mi it = match it with | I.IE_done _ | I.IE_cont ((Some _), _) -> I.return it | I.IE_cont (None, k) -> I.ie_cont (step ~mi it k) and step ~mi it k s = match s with | I.EOF _ -> I.ie_doneM it s | I.Chunk c -> let open I.S.C in let clen = c.len in let tarr = Array.GrowingArray.request_array clen growing_array in let carr = c.arr and cofs = c.ofs in let cmaxi = cofs + clen in let rec loop ~mi ~ci ~ti = if ci = cmaxi then mi else (tarr.(ti) <- Char.chr ((Char.code carr.(ci)) lxor (Char.code masking_key.[mi])); loop ~mi: ((mi + 1) mod mlen) ~ci: (ci + 1) ~ti: (ti + 1)) in let mi = loop ~mi ~ci: cofs ~ti: 0 in let new_chunk = mk ~arr: tarr ~ofs: 0 ~len: clen in (k (I.Chunk new_chunk)) >>% (fun (it, _s) -> IO.return ((unmask ~mi it), I.Sl.empty)) in unmask ~mi: 0 it) let wrap_feeding it_f = I.catch it_f (fun e -> match e with | Ws_service_error e -> I.throw_err e | e -> I.throw_err & (Ws_service_error e)) let feed_going_it ~growing_array ~fh it = let len_int = Int64.to_int fh.len in let () = assert (fh.len = (Int64.of_int len_int)) in (wrap_feeding (fun () -> I.take_exact len_int (I.joinI & (enee_unmask ~growing_array ~masking_key: fh.masking_key it)))) >>= (fun (ex, it) -> match ex with | `Done -> I.return & (`Frame_going (it, (if fh.fin then `Finished else `Continues))) | `Eof ((has_read, opt_err_msg)) -> let () = dbg "ws: it_loop: feed_going_it: got %i bytes \ instead of %i because of %s; closing." (len_int - has_read) len_int (match opt_err_msg with | None -> "normal EOF" | Some e -> sprintf "error %s" (Printexc.to_string e)) in (wrap_feeding (fun () -> I.feed_it it (I.EOF (Some (H.Ws.Close_received 1006))))) >>= (fun _it -> I.return & (`Closed 1006))) let do_websocket outsock (sl_l : (char I.sl) Lazy.t) enum_part (ws_worker : H.websocket_service_func_worker) : unit IO.m = let worker_got_close = ref false and growing_array = Array.GrowingArray.make 100 '\x00' in let rec it_loop (data_state : ws_it_loop_data_state) : (char, ws_it_loop_resp) I.iteratee = let () = dbg "ws: it_loop" in (I.ie_cont & (fun s -> match s with | I.EOF _ -> let () = dbg "it_loop: stream: EOF" in I.ie_doneM () s | I.Chunk c -> let str = I.S.to_string c in let () = dbg "it_loop: stream: Chunk \"%s\"" (Bytes.hexdump ~style: `Line str) in I.ie_doneM () s)) >>= (fun () -> let () = dbg "ws: it_loop: after dump" in I.peek >>= (fun p -> let () = dbg "ws: it_loop: peek = %s" (match p with | None -> "None" | Some c -> sprintf "Some %C" c) in ws_it_frame_header >>= (fun fh -> let () = dbg "ws: it_loop: fh" in let rec ret_closed ~opc ~code = let () = worker_got_close := true in (wrap_feeding (fun () -> feed_new_worker ~opc)) >>= (fun () -> I.return & (`Closed code)) and feed : data_state: ws_it_loop_data_state -> opc: _ -> (char, ws_it_feed_resp) I.iteratee = fun ~data_state ~opc -> let it = match data_state with | `Frame_done -> wrap_feeding (fun () -> ws_worker opc) | `Frame_going it -> it in feed_going_it ~growing_array ~fh it and feed_new_worker ~opc : (char, unit) I.iteratee = (feed ~data_state: `Frame_done ~opc) >>= (fun fr -> match fr with | `Frame_going (it, _) -> (wrap_feeding (fun () -> I.feed_it it (I.EOF None))) >>= (fun _it -> I.return ()) | `Closed _code -> I.return ()) and ret_data_state () = I.return & (match data_state with | (`Frame_done as x) -> x | `Frame_going it -> `Frame_going (it, `Continues)) in match fh.opcode with | (`Close opt_code as opc) -> let code = (match opt_code with | Some x -> x | None -> 1005) in (match data_state with | `Frame_done -> ret_closed ~opc ~code | `Frame_going it -> (wrap_feeding (fun () -> I.feed_it it (I.EOF (Some (H.Ws.Close_received code))))) >>= (fun _it -> (* итерат, получив EOF, должен перейти в "готовое состояние", а оно неинтересно. *) ret_closed ~opc ~code)) | `Ping -> (feed_going_it ~growing_array ~fh I. gather_to_string) >>= (fun fr -> match fr with | (`Closed _ as c) -> I.return c | `Frame_going (gath_it, _cont_fin) -> I.return & (`Ping (gath_it, data_state))) | `Pong -> ret_data_state () | (`Ctrl _ as opc) -> (feed_new_worker ~opc) >>= (fun () -> ret_data_state ()) | (`Text | `Binary | `Nonctrl _ as opc) -> let () = dbg "ws: it_loop: text/binary/nonctrl" in (match data_state with | `Frame_going it -> let msg = "data frame without previous fin" in (wrap_feeding (fun () -> I.feed_it it (I.EOF (Some (Ws_error msg))))) >>= (* ошибка, так как при фрейме с fin=true состояние станет _done, а при исходном _going должен быть fin=true перед другими фреймами. *) (fun _it -> I.return & (`Proto_error (1002, msg))) | `Frame_done -> (feed ~data_state ~opc :> (char, ws_it_loop_resp) I. iteratee)) | `Cont -> (match data_state with | `Frame_done -> I.return & (`Proto_error (1003, "continuation frame without \ previous beginning frame")) | (* продолжение без предыдущего начала *) `Frame_going it -> (feed_going_it it ~growing_array ~fh :> (char, ws_it_loop_resp) I.iteratee))))) and io_loop (enum_part : (char, ws_it_loop_resp) I.enumpart) sl_l data_state = let () = dbg "ws: io_loop" in if H.Ws.is_close_sent outsock then IO.return () else (let sl = Lazy.force sl_l in (enum_part sl (it_loop data_state)) >>% (fun (loop_it, sl_l, enp_opt) -> let () = dbg "ws: io_loop: enum_part returned" in (let proc_error e = let () = dbg "ws: io_loop: error: %s" (Printexc.to_string & (let rec loop e = match e with | Ws_service_error e | I.Iteratees_err_msg e -> loop e | _ -> e in loop e)) in let () = dbg "ws: io_loop: proc_error: 1: is_close_sent = %b" (H.Ws.is_close_sent outsock) in (if H.Ws.is_close_sent outsock then IO.return () else (match e with | Ws_service_error e -> let msg = Printexc.to_string e in let msg = if (String.length msg) > 125 then (String.sub msg 0 122) ^ "..." else msg in let () = dbg "ws: io_loop: service error: %S" msg in H.Ws.close outsock (Some ((1011, msg))) | _ -> H.Ws.close outsock None)) >>% (fun () -> let () = dbg "ws: it_loop: proc_error: 2: is_close_sent = %b" (H.Ws.is_close_sent outsock) in IO.return & (`Closed 1005)) in IO.catch (fun () -> I.run loop_it) (fun e -> proc_error e)) >>% (fun loop_resp -> let () = dbg "io_loop: analyzing loop_resp" in (match loop_resp with | `Frame_going (it, `Finished) -> (I.run it) >>% (fun () -> let () = dbg "ws: io_loop: finished service iteratee" in IO.return `Frame_done) | `Frame_going (it, `Continues) -> IO.return & (`Frame_going it) | `Frame_done -> IO.return `Frame_done | `Closed code -> (if not !worker_got_close then (let () = worker_got_close := true in I.run (ws_worker (`Close (Some code)))) else IO.return ()) >>% (fun () -> if H.Ws.is_close_sent outsock then IO.return `Frame_done else (H.Ws.close outsock (Some ((code, "")))) >>% (fun () -> IO.return `Frame_done)) | (* will exit after entering io_loop *) `Ping (it_body, old_state) -> (I.run it_body) >>% (fun body -> (H.Ws.send outsock `Pong body) >>% (fun () -> IO.return old_state)) | `Proto_error ((code : int), (msg : string)) -> let () = worker_got_close := true in (I.run (ws_worker (`Close (Some code)))) >>% (fun () -> if H.Ws.is_close_sent outsock then IO.return `Frame_done else (H.Ws.close outsock (Some ((code, msg)))) >>% (fun () -> IO.return `Frame_done))) >>% (fun data_state -> match enp_opt with | I.EP_None -> let () = dbg "ws: io_loop: ep_none" in (match data_state with | `Frame_done -> IO.return () | `Frame_going it -> I.run & (I.feed_it it (I.EOF (Some (H.Ws.Close_received 1006))))) >>% (fun () -> let () = dbg "ws: io_loop: ep_none: worker_got_close = %b" !worker_got_close in if not !worker_got_close then (let () = worker_got_close := true in I.run (ws_worker (`Close (Some 1006)))) else IO.return ()) | (* just exit *) I.EP_Some enum_part -> let () = dbg "ws: io_loop: ep_some" in io_loop enum_part.I.enumpart_poly sl_l data_state)))) in io_loop enum_part sl_l `Frame_done (**********************************************************) let dbg fmt = Printf.ksprintf (Printf.printf "hs: %s\n%!") fmt let http_server_func (userfunc : request -> (segpath * H.service_desc)) (inch, outch) : unit = IO.run_and_ignore_result (let string_of_exn e = let rec loop e = match e with | I.Iteratees_err_msg e -> "it/" ^ (loop e) | Ws_service_error e -> "wssrv/" ^ (loop e) | _ -> Printexc.to_string e in loop e in try let closed = ref false in let do_close () = let () = dbg "http_server_func: close 0" in if !closed then IO.return () else IO.catch (fun () -> let () = closed := true in let () = dbg "http_server_func: close 1" in (IO.close_in inch) >>% (fun () -> let () = dbg "http_server_func: close 2" in (IO.close_out outch) >>% (fun () -> let () = dbg "http_server_func: close 3" in IO.return ()))) (fun e -> let () = dbg "http_server_func: close error: %s" (Printexc.to_string e) in IO.return ()) in IO.catch (fun () -> let it_req = H.it_http userfunc in let rec loop can_switch_to_ws (it, sl_l, opt_enp) = let () = dbg "hs: entered loop" in match opt_enp with | I.EP_None -> do_close () | I.EP_Some cont -> let () = dbg "got cont" in (match it with | I.IE_cont ((Some e), _) -> let () = dbg "cont: error %S" (Printexc.to_string e) in do_close () | I.IE_cont (None, _) -> let () = dbg "cont: cont" in failwith "cont-cont1111" | I.IE_done http_or_ws_resp -> let () = dbg "hsf: got req/resp" in (match http_or_ws_resp with | (request, `Http it) -> (I.run it) >>% (fun resp -> let is_head = request.rq_method = `HEAD in (H.output_response ~is_head outch resp) >>% (fun () -> let need_to_close = (List.mem `Close request.rq_headers. connection) || (List.exists (fun (hk, hv) -> (hk = "Connection") && (hv = "close")) resp.rs_headers. rs_all) in if need_to_close then do_close () else (IO.flush outch) >>% (fun () -> let sl = Lazy.force sl_l in (cont.I. enumpart_poly sl it_req) >>% (loop false)))) | (request, `Ws (handshake, ws_service, segpath)) -> if can_switch_to_ws then (H.output_response ~is_head: false outch (continue_ws_resp handshake)) >>% (fun () -> let ws_out_socket = H.Ws.of_out_channel outch in let worker = Partapp3.apply ws_service segpath request ws_out_socket in (IO.catch (fun () -> (do_websocket ws_out_socket sl_l cont.I. enumpart_poly worker) >>% (fun () -> IO.return & (`Ok ()))) (fun e -> IO.return & (`Error e))) >>% (fun res -> match res with | `Ok () -> IO.return () | `Error e -> IO.error e)) else (H.output_response ~is_head: false outch can't_switch_ws_resp) >>% (fun () -> do_close ()))) in (I.enumpart_readchars ~buffer_size: 4096 ~read_func: IO.read_into inch I.Sl.empty it_req) >>% (loop true)) (fun e -> ((Printf.eprintf "amall http: exception: %s\n%!") & (string_of_exn e); do_close ())) with | e -> ((Printf.eprintf "amall http: uncaught exception: %s\n%!") & (string_of_exn e); IO.return ())) let run_addr listen_addr (userfunc : request -> (segpath * H.service_desc)) = IO.establish_server listen_addr (http_server_func userfunc) let run listen_addr userfunc = let a = match listen_addr with | `Inet_any port -> Unix.ADDR_INET ((Unix.inet_addr_any, port)) | `Inet_loopback port -> Unix.ADDR_INET ((Unix.inet_addr_loopback, port)) | `Inet_str (str, port) -> Unix.ADDR_INET (((Unix.inet_addr_of_string str), port)) | `Inet_addr (a, p) -> Unix.ADDR_INET ((a, p)) in run_addr a userfunc end