001 package railo.runtime.gateway; 002 003 import java.io.BufferedReader; 004 import java.io.IOException; 005 import java.io.InputStreamReader; 006 import java.io.PrintWriter; 007 import java.io.Reader; 008 import java.io.Writer; 009 import java.net.ServerSocket; 010 import java.net.Socket; 011 import java.util.ArrayList; 012 import java.util.Iterator; 013 import java.util.List; 014 import java.util.Map; 015 016 import org.opencfml.eventgateway.Gateway; 017 import org.opencfml.eventgateway.GatewayEngine; 018 import org.opencfml.eventgateway.GatewayException; 019 020 import railo.loader.engine.CFMLEngine; 021 import railo.loader.engine.CFMLEngineFactory; 022 import railo.runtime.exp.PageException; 023 import railo.runtime.type.Struct; 024 import railo.runtime.util.Cast; 025 import railo.runtime.util.Creation; 026 027 public class SocketGateway implements Gateway { 028 029 private GatewayEngine engine; 030 private int port; 031 private String welcomeMessage="Welcome to the Railo Socket Gateway"; 032 033 private String id; 034 private CFMLEngine cfmlEngine; 035 private Cast caster; 036 private Creation creator; 037 private List<SocketServerThread> sockets=new ArrayList<SocketServerThread>(); 038 private ServerSocket serverSocket; 039 protected int state=STOPPED; 040 private String cfcPath; 041 042 043 /** 044 * @see org.opencfml.eventgateway.Gateway#init(org.opencfml.eventgateway.GatewayEngine, java.lang.String, java.lang.String, java.util.Map) 045 */ 046 public void init(GatewayEngine engine, String id, String cfcPath, Map config)throws GatewayException { 047 this.engine=engine; 048 cfmlEngine=CFMLEngineFactory.getInstance(); 049 caster=cfmlEngine.getCastUtil(); 050 creator = cfmlEngine.getCreationUtil(); 051 this.cfcPath=cfcPath; 052 this.id=id; 053 054 // config 055 Object oPort=config.get("port"); 056 port=caster.toIntValue(oPort, 1225); 057 058 Object oWM=config.get("welcomeMessage"); 059 String strWM=caster.toString(oWM,"").trim(); 060 if(strWM.length()>0)welcomeMessage=strWM; 061 } 062 063 064 public void doStart() { 065 state = STARTING; 066 try { 067 createServerSocket(); 068 state = RUNNING; 069 do { 070 try { 071 SocketServerThread sst = new SocketServerThread(serverSocket.accept()); 072 sst.start(); 073 sockets.add(sst); 074 } 075 catch (Throwable t) { 076 error("Failed to listen on Socket ["+id+"] on port ["+port+"]: " + t.getMessage()); 077 } 078 } 079 while (getState()==RUNNING || getState()==STARTING); 080 081 close(serverSocket); 082 serverSocket = null; 083 } 084 catch (Throwable e) { 085 state=FAILED; 086 error("Error in Socet Gateway ["+id+"]: " + e.getMessage()); 087 e.printStackTrace(); 088 //throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(e); 089 } 090 } 091 092 public void doStop() { 093 state = STOPPING; 094 try{ 095 096 // close all open connections 097 Iterator<SocketServerThread> it = sockets.iterator(); 098 while (it.hasNext()) { 099 close(it.next().socket); 100 } 101 102 // close server socket 103 close(serverSocket); 104 serverSocket = null; 105 state = STOPPED; 106 } 107 catch(Throwable e){ 108 state=FAILED; 109 error("Error in Socket Gateway ["+id+"]: " + e.getMessage()); 110 e.printStackTrace(); 111 //throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(e); 112 } 113 } 114 115 private void createServerSocket() throws PageException, RuntimeException { 116 try { 117 serverSocket = new ServerSocket(port); 118 } 119 catch (Throwable t) { 120 error("Failed to start Socket Gateway ["+id+"] on port ["+port+"] " +t.getMessage()); 121 throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(t); 122 } 123 } 124 125 126 127 128 private void invokeListener(String line, String originatorID) { 129 130 Struct data=creator.createStruct(); 131 data.setEL(creator.createKey("message"), line); 132 Struct event=creator.createStruct(); 133 event.setEL(creator.createKey("data"), data); 134 event.setEL(creator.createKey("originatorID"), originatorID); 135 136 event.setEL(creator.createKey("cfcMethod"), "onIncomingMessage"); 137 event.setEL(creator.createKey("cfcTimeout"), new Double(10)); 138 event.setEL(creator.createKey("cfcPath"), cfcPath); 139 140 event.setEL(creator.createKey("gatewayType"), "Socket"); 141 event.setEL(creator.createKey("gatewayId"), id); 142 143 144 145 if (engine.invokeListener(this, "onIncomingMessage", event)) 146 info("Socket Gateway Listener ["+id+"] invoked."); 147 else 148 error("Failed to call Socket Gateway Listener ["+id+"]"); 149 } 150 151 152 private class SocketServerThread extends Thread { 153 private Socket socket; 154 private PrintWriter out; 155 private String _id; 156 157 public SocketServerThread(Socket socket) throws IOException { 158 this.socket = socket; 159 out = new PrintWriter(socket.getOutputStream(), true); 160 this._id=String.valueOf(hashCode()); 161 } 162 163 public void run() { 164 BufferedReader in = null; 165 try { 166 in = new BufferedReader(new InputStreamReader(socket.getInputStream())); 167 out.println(welcomeMessage); 168 out.print("> "); 169 String line; 170 while ((line = in.readLine()) != null) { 171 if (line.trim().equals("exit")) break; 172 else invokeListener(line,_id); 173 } 174 //socketRegistry.remove(this.getName()); 175 } 176 catch (Throwable t) { 177 error("Failed to read from Socket Gateway ["+id+"]: " + t.getMessage()); 178 } 179 finally{ 180 close(out); 181 out=null; 182 close(in); 183 close(socket); 184 sockets.remove(this); 185 } 186 } 187 188 public void writeOutput(String str) { 189 out.println(str); 190 out.print("> "); 191 } 192 } 193 194 195 196 197 198 199 200 201 public String sendMessage(Map _data) { 202 Struct data=caster.toStruct(_data, null, false); 203 String msg = (String) data.get("message",null); 204 String originatorID=(String) data.get("originatorID",null); 205 206 String status="OK"; 207 if (msg!=null ) { 208 209 Iterator<SocketServerThread> it = sockets.iterator(); 210 SocketServerThread sst; 211 try { 212 boolean hasSend=false; 213 while(it.hasNext()){ 214 sst=it.next(); 215 if(originatorID!=null && !sst._id.equalsIgnoreCase(originatorID)) continue; 216 sst.writeOutput(msg); 217 hasSend=true; 218 } 219 220 if(!hasSend) { 221 if(sockets.size()==0) { 222 error("There is no connection"); 223 status = "EXCEPTION"; 224 } 225 else { 226 it = sockets.iterator(); 227 StringBuilder sb=new StringBuilder(); 228 while(it.hasNext()){ 229 if(sb.length()>0) sb.append(", "); 230 sb.append(it.next()._id); 231 } 232 error("There is no connection with originatorID ["+originatorID+"], available originatorIDs are ["+sb+"]"); 233 status = "EXCEPTION"; 234 } 235 } 236 } 237 catch (Exception e) { 238 e.printStackTrace(); 239 error("Failed to send message with exception: " + e.toString()); 240 status = "EXCEPTION"; 241 } 242 } 243 return status; 244 } 245 246 247 /** 248 * @see org.opencfml.eventgateway.Gateway#doRestart() 249 */ 250 public void doRestart() { 251 doStop(); 252 doStart(); 253 } 254 255 256 257 /** 258 * @see org.opencfml.eventgateway.Gateway#getId() 259 */ 260 public String getId() { 261 return id; 262 } 263 264 /** 265 * @see org.opencfml.eventgateway.Gateway#getState() 266 */ 267 public int getState() { 268 return state; 269 } 270 271 272 273 /** 274 * @see org.opencfml.eventgateway.Gateway#getHelper() 275 */ 276 public Object getHelper() { 277 return null; 278 } 279 280 281 public void info(String msg) { 282 engine.log(this,GatewayEngine.LOGLEVEL_INFO,msg); 283 } 284 285 public void error(String msg) { 286 engine.log(this,GatewayEngine.LOGLEVEL_ERROR,msg); 287 } 288 289 290 private void close(Writer writer) { 291 if(writer==null) return; 292 try{ 293 writer.close(); 294 } 295 catch(Throwable t){} 296 } 297 private void close(Reader reader) { 298 if(reader==null) return; 299 try{ 300 reader.close(); 301 } 302 catch(Throwable t){} 303 } 304 private void close(Socket socket) { 305 if(socket==null) return; 306 try{ 307 socket.close(); 308 } 309 catch(Throwable t){} 310 } 311 private void close(ServerSocket socket) { 312 if(socket==null) return; 313 try{ 314 socket.close(); 315 } 316 catch(Throwable t){} 317 } 318 319 }