1 /**********************************************************\ 2 | | 3 | hprose | 4 | | 5 | Official WebSite: http://www.hprose.com/ | 6 | http://www.hprose.org/ | 7 | | 8 \**********************************************************/ 9 10 /**********************************************************\ 11 * * 12 * hprose/rpc/tcpservice.d * 13 * * 14 * hprose tcp service library for D. * 15 * * 16 * LastModified: Aug 3, 2016 * 17 * Author: Ma Bingyao <andot@hprose.com> * 18 * * 19 \**********************************************************/ 20 21 module hprose.rpc.tcpservice; 22 23 import hprose.rpc.service; 24 import hprose.rpc.common; 25 import hprose.rpc.tcpcontext; 26 import std.conv; 27 import std.file; 28 import std.stdio; 29 import std.traits; 30 import std.typecons; 31 import std.variant; 32 import vibe.core.net; 33 import vibe.stream.operations; 34 35 class TcpService: Service { 36 37 void handler(TCPConnection conn) { 38 while (conn.connected) { 39 try { 40 TcpContext context = new TcpContext(conn); 41 int headerLength = 4; 42 int dataLength = -1; 43 ubyte[4] id; 44 ubyte[4] dataLen; 45 conn.read(dataLen); 46 dataLength = 47 cast(int)(dataLen[0] & 0x7f) << 24 | 48 cast(int)(dataLen[1]) << 16 | 49 cast(int)(dataLen[2]) << 8 | 50 cast(int)(dataLen[3]); 51 if ((dataLen[0] & 0x80) != 0) { 52 headerLength = 8; 53 conn.read(id); 54 } 55 ubyte[] data = new ubyte[dataLength]; 56 conn.read(data); 57 data = handle(data, context); 58 dataLength = cast(int)data.length; 59 if (headerLength == 8) { 60 dataLen[0] = cast(ubyte)((dataLength >> 24) & 0x7f | 0x80); 61 dataLen[1] = cast(ubyte)((dataLength >> 16) & 0xff); 62 dataLen[2] = cast(ubyte)((dataLength >> 8) & 0xff); 63 dataLen[3] = cast(ubyte)((dataLength) & 0xff); 64 conn.write(dataLen); 65 conn.write(id); 66 } 67 else { 68 dataLen[0] = cast(ubyte)((dataLength >> 24) & 0x7f); 69 dataLen[1] = cast(ubyte)((dataLength >> 16) & 0xff); 70 dataLen[2] = cast(ubyte)((dataLength >> 8) & 0xff); 71 dataLen[3] = cast(ubyte)((dataLength) & 0xff); 72 conn.write(dataLen); 73 } 74 conn.write(data); 75 } 76 catch(Exception e) { 77 break; 78 } 79 } 80 } 81 } 82 83 class TcpServer: TcpService { 84 TCPListener start(ushort port, string address = "0.0.0.0") { 85 return listenTCP(port, &handler, address, TCPListenOptions.distribute); 86 } 87 } 88 89 unittest { 90 // import hprose.rpc.tcpclient; 91 import hprose.rpc.context; 92 import hprose.rpc.filter; 93 import std.datetime; 94 95 string hello(string name) { 96 return "hello " ~ name ~ "!"; 97 } 98 99 string goodbye(string name) { 100 return "goodbye " ~ name ~ "!"; 101 } 102 103 Variant missfunc(string name, Variant[] args) { 104 if (name == "mul") { 105 return args[0] * args[1]; 106 } 107 else if (name == "div") { 108 return args[0] / args[1]; 109 } 110 else { 111 return Variant(null); 112 } 113 } 114 115 int inc(ref int n, Context context) { 116 n++; 117 return n; 118 } 119 120 class BaseTest { 121 int add(int a, int b) { 122 return a + b; 123 } 124 int sub(int a, int b) { 125 return a - b; 126 } 127 } 128 class Test: BaseTest { 129 int sum(int[] nums) { 130 int sum = 0; 131 foreach (x; nums) { 132 sum += x; 133 } 134 return sum; 135 } 136 static string[] test() { 137 return ["Tom", "Jerry"]; 138 } 139 static Variant[string] test2() { 140 return ["name": Variant("张三"), "age": Variant(18)]; 141 } 142 } 143 144 Test test = new Test(); 145 146 // Server 147 TcpServer server = new TcpServer(); 148 server.add!("hello")(&hello); 149 server.add!(["goodbye", "inc"])(&goodbye, &inc); 150 server.add!(["add", "sub", "sum"])(test); 151 server.add!("test", Test)(); // add Test.test method to the server 152 server.add!(Test, "test")(); // add all static methods on Test with prefix "test" to the server 153 server.addMissingFunction(&missfunc); 154 server.use(delegate Variant(string name, ref Variant[] args, Context context, NextInvokeHandler next) { 155 writeln(name); 156 writeln(args); 157 Variant result = next(name, args, context); 158 writeln(result); 159 return result; 160 }, delegate Variant(string name, ref Variant[] args, Context context, NextInvokeHandler next) { 161 writeln(Clock.currStdTime()); 162 Variant result = next(name, args, context); 163 writeln(Clock.currStdTime()); 164 return result; 165 }); 166 server.use!"beforeFilter"(delegate ubyte[](ubyte[] request, Context context, NextFilterHandler next) { 167 writeln("beforeFilter"); 168 writeln(cast(string)request); 169 ubyte[] response = next(request, context); 170 writeln("beforeFilter"); 171 writeln(cast(string)response); 172 writeln(); 173 return response; 174 }); 175 server.use!"afterFilter"(delegate ubyte[](ubyte[] request, Context context, NextFilterHandler next) { 176 writeln("afterFilter"); 177 writeln(cast(string)request); 178 ubyte[] response = next(request, context); 179 writeln("afterFilter"); 180 writeln(cast(string)response); 181 return response; 182 }); 183 server.start(1234); 184 185 /* 186 // Client 187 interface Hello { 188 @Simple() string hello(string name); 189 string goodbye(string name); 190 int add(int a, int b); 191 int sub(int a, int b = 3); 192 int mul(int a, int b); 193 int div(int a, int b); 194 int sum(int[] nums...); 195 int inc(ref int n); 196 string[] test(); 197 Variant[string] test2(); 198 } 199 200 auto client = new TcpClient("tcp://127.0.0.1:1234/"); 201 Hello proxy = client.useService!Hello(); 202 203 Hello proxy2 = client.useService!(Hello, "test")(); 204 205 // client.filters ~= new class Filter { 206 // override ubyte[] inputFilter(ubyte[] data, Context context) { 207 // writeln(cast(string)data); 208 // return data; 209 // } 210 // 211 // override ubyte[] outputFilter(ubyte[] data, Context context) { 212 // writeln(cast(string)data); 213 // return data; 214 // }; 215 // }; 216 217 assert(proxy.hello("world") == "hello world!"); 218 assert(proxy.goodbye("world") == "goodbye world!"); 219 assert(proxy.add(1, 2) == 3); 220 assert(proxy.sub(1, 2) == -1); 221 assert(proxy.mul(1, 2) == 2); 222 assert(proxy.div(2, 2) == 1); 223 assert(proxy.sum(1, 2, 3) == 6); 224 assert(proxy.test() == ["Tom", "Jerry"]); 225 int n = 0; 226 assert(proxy.inc(n) == 1); 227 assert(proxy.inc(n) == 2); 228 assert(proxy.inc(n) == 3); 229 assert(n == 3); 230 assert(proxy2.test() == ["Tom", "Jerry"]); 231 assert(proxy2.test2() == ["name": Variant("张三"), "age": Variant(18)]); 232 233 */ 234 }