001 package railo.runtime.gateway; 002 003 import java.io.BufferedReader; 004 import java.io.IOException; 005 import java.io.InputStreamReader; 006 import java.io.PrintWriter; 007 import java.io.Reader; 008 import java.io.Writer; 009 import java.net.ServerSocket; 010 import java.net.Socket; 011 import java.util.ArrayList; 012 import java.util.Iterator; 013 import java.util.List; 014 import java.util.Map; 015 016 import railo.runtime.gateway.GatewayPro; 017 import railo.runtime.gateway.GatewayEnginePro; 018 import railo.runtime.gateway.GatewayException; 019 020 import railo.loader.engine.CFMLEngine; 021 import railo.loader.engine.CFMLEngineFactory; 022 import railo.runtime.exp.PageException; 023 import railo.runtime.type.Struct; 024 import railo.runtime.util.Cast; 025 import railo.runtime.util.Creation; 026 027 public class SocketGateway implements GatewayPro { 028 029 private GatewayEnginePro engine; 030 private int port; 031 private String welcomeMessage="Welcome to the Railo Socket Gateway"; 032 033 private String id; 034 private CFMLEngine cfmlEngine; 035 private Cast caster; 036 private Creation creator; 037 private List<SocketServerThread> sockets=new ArrayList<SocketServerThread>(); 038 private ServerSocket serverSocket; 039 protected int state=STOPPED; 040 private String cfcPath; 041 042 043 @Override 044 public void init(GatewayEnginePro engine, String id, String cfcPath, Map config)throws GatewayException { 045 this.engine=engine; 046 cfmlEngine=CFMLEngineFactory.getInstance(); 047 caster=cfmlEngine.getCastUtil(); 048 creator = cfmlEngine.getCreationUtil(); 049 this.cfcPath=cfcPath; 050 this.id=id; 051 052 // config 053 Object oPort=config.get("port"); 054 port=caster.toIntValue(oPort, 1225); 055 056 Object oWM=config.get("welcomeMessage"); 057 String strWM=caster.toString(oWM,"").trim(); 058 if(strWM.length()>0)welcomeMessage=strWM; 059 } 060 061 062 public void doStart() { 063 state = STARTING; 064 try { 065 createServerSocket(); 066 state = RUNNING; 067 do { 068 try { 069 SocketServerThread sst = new SocketServerThread(serverSocket.accept()); 070 sst.start(); 071 sockets.add(sst); 072 } 073 catch (Throwable t) { 074 error("Failed to listen on Socket ["+id+"] on port ["+port+"]: " + t.getMessage()); 075 } 076 } 077 while (getState()==RUNNING || getState()==STARTING); 078 079 close(serverSocket); 080 serverSocket = null; 081 } 082 catch (Throwable e) { 083 state=FAILED; 084 error("Error in Socet Gateway ["+id+"]: " + e.getMessage()); 085 e.printStackTrace(); 086 //throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(e); 087 } 088 } 089 090 public void doStop() { 091 state = STOPPING; 092 try{ 093 094 // close all open connections 095 Iterator<SocketServerThread> it = sockets.iterator(); 096 while (it.hasNext()) { 097 close(it.next().socket); 098 } 099 100 // close server socket 101 close(serverSocket); 102 serverSocket = null; 103 state = STOPPED; 104 } 105 catch(Throwable e){ 106 state=FAILED; 107 error("Error in Socket Gateway ["+id+"]: " + e.getMessage()); 108 e.printStackTrace(); 109 //throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(e); 110 } 111 } 112 113 private void createServerSocket() throws PageException, RuntimeException { 114 try { 115 serverSocket = new ServerSocket(port); 116 } 117 catch (Throwable t) { 118 error("Failed to start Socket Gateway ["+id+"] on port ["+port+"] " +t.getMessage()); 119 throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(t); 120 } 121 } 122 123 124 125 126 private void invokeListener(String line, String originatorID) { 127 128 Struct data=creator.createStruct(); 129 data.setEL(creator.createKey("message"), line); 130 Struct event=creator.createStruct(); 131 event.setEL(creator.createKey("data"), data); 132 event.setEL(creator.createKey("originatorID"), originatorID); 133 134 event.setEL(creator.createKey("cfcMethod"), "onIncomingMessage"); 135 event.setEL(creator.createKey("cfcTimeout"), new Double(10)); 136 event.setEL(creator.createKey("cfcPath"), cfcPath); 137 138 event.setEL(creator.createKey("gatewayType"), "Socket"); 139 event.setEL(creator.createKey("gatewayId"), id); 140 141 142 143 if (engine.invokeListener(this, "onIncomingMessage", event)) 144 info("Socket Gateway Listener ["+id+"] invoked."); 145 else 146 error("Failed to call Socket Gateway Listener ["+id+"]"); 147 } 148 149 150 private class SocketServerThread extends Thread { 151 private Socket socket; 152 private PrintWriter out; 153 private String _id; 154 155 public SocketServerThread(Socket socket) throws IOException { 156 this.socket = socket; 157 out = new PrintWriter(socket.getOutputStream(), true); 158 this._id=String.valueOf(hashCode()); 159 } 160 161 public void run() { 162 BufferedReader in = null; 163 try { 164 in = new BufferedReader(new InputStreamReader(socket.getInputStream())); 165 out.println(welcomeMessage); 166 out.print("> "); 167 String line; 168 while ((line = in.readLine()) != null) { 169 if (line.trim().equals("exit")) break; 170 invokeListener(line,_id); 171 } 172 //socketRegistry.remove(this.getName()); 173 } 174 catch (Throwable t) { 175 error("Failed to read from Socket Gateway ["+id+"]: " + t.getMessage()); 176 } 177 finally{ 178 close(out); 179 out=null; 180 close(in); 181 close(socket); 182 sockets.remove(this); 183 } 184 } 185 186 public void writeOutput(String str) { 187 out.println(str); 188 out.print("> "); 189 } 190 } 191 192 193 194 195 196 197 198 199 public String sendMessage(Map _data) { 200 Struct data=caster.toStruct(_data, null, false); 201 String msg = (String) data.get("message",null); 202 String originatorID=(String) data.get("originatorID",null); 203 204 String status="OK"; 205 if (msg!=null ) { 206 207 Iterator<SocketServerThread> it = sockets.iterator(); 208 SocketServerThread sst; 209 try { 210 boolean hasSend=false; 211 while(it.hasNext()){ 212 sst=it.next(); 213 if(originatorID!=null && !sst._id.equalsIgnoreCase(originatorID)) continue; 214 sst.writeOutput(msg); 215 hasSend=true; 216 } 217 218 if(!hasSend) { 219 if(sockets.size()==0) { 220 error("There is no connection"); 221 status = "EXCEPTION"; 222 } 223 else { 224 it = sockets.iterator(); 225 StringBuilder sb=new StringBuilder(); 226 while(it.hasNext()){ 227 if(sb.length()>0) sb.append(", "); 228 sb.append(it.next()._id); 229 } 230 error("There is no connection with originatorID ["+originatorID+"], available originatorIDs are ["+sb+"]"); 231 status = "EXCEPTION"; 232 } 233 } 234 } 235 catch (Exception e) { 236 e.printStackTrace(); 237 error("Failed to send message with exception: " + e.toString()); 238 status = "EXCEPTION"; 239 } 240 } 241 return status; 242 } 243 244 245 @Override 246 public void doRestart() { 247 doStop(); 248 doStart(); 249 } 250 251 252 253 @Override 254 public String getId() { 255 return id; 256 } 257 258 @Override 259 public int getState() { 260 return state; 261 } 262 263 264 265 @Override 266 public Object getHelper() { 267 return null; 268 } 269 270 271 public void info(String msg) { 272 engine.log(this,GatewayEnginePro.LOGLEVEL_INFO,msg); 273 } 274 275 public void error(String msg) { 276 engine.log(this,GatewayEnginePro.LOGLEVEL_ERROR,msg); 277 } 278 279 280 private void close(Writer writer) { 281 if(writer==null) return; 282 try{ 283 writer.close(); 284 } 285 catch(Throwable t){} 286 } 287 private void close(Reader reader) { 288 if(reader==null) return; 289 try{ 290 reader.close(); 291 } 292 catch(Throwable t){} 293 } 294 private void close(Socket socket) { 295 if(socket==null) return; 296 try{ 297 socket.close(); 298 } 299 catch(Throwable t){} 300 } 301 private void close(ServerSocket socket) { 302 if(socket==null) return; 303 try{ 304 socket.close(); 305 } 306 catch(Throwable t){} 307 } 308 309 }