001    package railo.runtime.gateway;
002    
003    import java.io.BufferedReader;
004    import java.io.IOException;
005    import java.io.InputStreamReader;
006    import java.io.PrintWriter;
007    import java.io.Reader;
008    import java.io.Writer;
009    import java.net.ServerSocket;
010    import java.net.Socket;
011    import java.util.ArrayList;
012    import java.util.Iterator;
013    import java.util.List;
014    import java.util.Map;
015    
016    import railo.runtime.gateway.GatewayPro;
017    import railo.runtime.gateway.GatewayEnginePro;
018    import railo.runtime.gateway.GatewayException;
019    
020    import railo.loader.engine.CFMLEngine;
021    import railo.loader.engine.CFMLEngineFactory;
022    import railo.runtime.exp.PageException;
023    import railo.runtime.type.Struct;
024    import railo.runtime.util.Cast;
025    import railo.runtime.util.Creation;
026    
027    public class SocketGateway implements GatewayPro {
028    
029            private GatewayEnginePro engine;
030            private int port;
031            private String welcomeMessage="Welcome to the Railo Socket Gateway";
032        
033            private String id;
034            private CFMLEngine cfmlEngine;
035            private Cast caster;
036            private Creation creator;
037            private List<SocketServerThread> sockets=new ArrayList<SocketServerThread>();
038            private ServerSocket serverSocket;
039            protected int state=STOPPED;
040            private String cfcPath;
041    
042    
043            @Override
044            public void init(GatewayEnginePro engine, String id, String cfcPath, Map config)throws GatewayException {
045                    this.engine=engine;
046                    cfmlEngine=CFMLEngineFactory.getInstance();
047                    caster=cfmlEngine.getCastUtil();
048                    creator = cfmlEngine.getCreationUtil();
049                    this.cfcPath=cfcPath;
050                    this.id=id;
051                    
052                    // config
053                    Object oPort=config.get("port");
054                    port=caster.toIntValue(oPort, 1225);
055    
056                    Object oWM=config.get("welcomeMessage");
057                    String strWM=caster.toString(oWM,"").trim();
058                    if(strWM.length()>0)welcomeMessage=strWM;
059            }
060            
061            
062            public void doStart() {
063                    state = STARTING;
064                    try     {
065                            createServerSocket();
066                            state = RUNNING;
067                            do {
068                            try {
069                                    SocketServerThread sst = new SocketServerThread(serverSocket.accept());
070                        sst.start();
071                        sockets.add(sst);
072                    }
073                    catch (Throwable t) {
074                        error("Failed to listen on Socket ["+id+"] on port ["+port+"]: " + t.getMessage());
075                    }
076                }
077                            while (getState()==RUNNING || getState()==STARTING);
078                            
079                close(serverSocket);
080                serverSocket = null;
081            }
082            catch (Throwable e) {
083                    state=FAILED;
084                        error("Error in Socet Gateway ["+id+"]: " + e.getMessage());
085                e.printStackTrace();
086                //throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(e);
087            }
088        }
089            
090            public void doStop()    {
091                    state = STOPPING;
092                    try{
093                            
094                            // close all open connections
095                            Iterator<SocketServerThread> it = sockets.iterator();
096                    while (it.hasNext()) {
097                        close(it.next().socket);
098                    }
099                    
100                    // close server socket
101                    close(serverSocket);
102                    serverSocket = null;
103                        state = STOPPED;
104                    }
105                    catch(Throwable e){
106                            state=FAILED;
107                            error("Error in Socket Gateway ["+id+"]: " + e.getMessage());
108                e.printStackTrace();
109                //throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(e);
110                    }
111        }
112    
113            private void createServerSocket() throws PageException, RuntimeException {
114                    try     {
115                serverSocket = new ServerSocket(port);
116            }
117            catch (Throwable t) {
118                error("Failed to start Socket Gateway ["+id+"] on port ["+port+"] " +t.getMessage());
119                throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(t);
120            }
121            }
122             
123             
124             
125    
126             private void invokeListener(String line, String originatorID) {
127                    
128                    Struct data=creator.createStruct();
129            data.setEL(creator.createKey("message"), line);
130            Struct event=creator.createStruct();
131            event.setEL(creator.createKey("data"), data);
132            event.setEL(creator.createKey("originatorID"), originatorID);
133            
134            event.setEL(creator.createKey("cfcMethod"), "onIncomingMessage");
135            event.setEL(creator.createKey("cfcTimeout"), new Double(10));
136            event.setEL(creator.createKey("cfcPath"), cfcPath);
137    
138            event.setEL(creator.createKey("gatewayType"), "Socket");
139            event.setEL(creator.createKey("gatewayId"), id);
140            
141            
142                                
143            if (engine.invokeListener(this, "onIncomingMessage", event))    
144                info("Socket Gateway Listener ["+id+"] invoked.");
145            else
146                    error("Failed to call Socket Gateway Listener ["+id+"]");           
147             }
148    
149             
150             private class SocketServerThread extends Thread        {
151                    private Socket socket;
152                    private PrintWriter out;
153                            private String _id;
154                            
155                    public SocketServerThread(Socket socket) throws IOException     {
156                        this.socket = socket;
157                    out = new PrintWriter(socket.getOutputStream(), true);
158                    this._id=String.valueOf(hashCode());
159                    }
160    
161                    public void run()       {
162                            BufferedReader in = null;
163                        try {
164                            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
165                            out.println(welcomeMessage);
166                            out.print("> ");
167                            String line;
168                            while ((line = in.readLine()) != null) {
169                                    if (line.trim().equals("exit")) break;
170                                invokeListener(line,_id);
171                            }
172                            //socketRegistry.remove(this.getName());
173                        }
174                        catch (Throwable t) {
175                            error("Failed to read from Socket Gateway ["+id+"]: " + t.getMessage());
176                        }
177                        finally{
178                            close(out);
179                            out=null;
180                            close(in);
181                            close(socket);
182                            sockets.remove(this);
183                        }
184                    }
185    
186                            public void writeOutput(String str)     {
187                                    out.println(str);
188                        out.print("> ");
189                    }
190                }
191    
192            
193            
194            
195            
196    
197    
198    
199            public String sendMessage(Map _data) {
200                    Struct data=caster.toStruct(_data, null, false);
201                    String msg = (String) data.get("message",null);
202                    String originatorID=(String) data.get("originatorID",null);
203                    
204                    String status="OK";
205            if (msg!=null ) {
206                
207                    Iterator<SocketServerThread> it = sockets.iterator();
208                    SocketServerThread sst;
209                    try     {
210                            boolean hasSend=false;
211                            while(it.hasNext()){
212                                    sst=it.next();
213                                    if(originatorID!=null && !sst._id.equalsIgnoreCase(originatorID)) continue;
214                                    sst.writeOutput(msg);
215                                    hasSend=true;
216                            }
217                            
218                            if(!hasSend) {
219                                    if(sockets.size()==0) {
220                                            error("There is no connection");
221                                    status = "EXCEPTION";
222                                    }
223                                    else {
224                                            it = sockets.iterator();
225                                            StringBuilder sb=new StringBuilder();
226                                            while(it.hasNext()){
227                                                    if(sb.length()>0) sb.append(", ");
228                                                    sb.append(it.next()._id);
229                                            }
230                                            error("There is no connection with originatorID ["+originatorID+"], available originatorIDs are ["+sb+"]");
231                                            status = "EXCEPTION";
232                                    }
233                            }
234                }
235                catch (Exception e) {
236                    e.printStackTrace();
237                    error("Failed to send message with exception: " + e.toString());
238                    status = "EXCEPTION";
239                }
240            }
241            return status;
242        }
243            
244            
245            @Override
246            public void doRestart() {
247                    doStop();
248                    doStart();
249            }
250            
251            
252    
253            @Override
254            public String getId() {
255                    return id;
256            }
257            
258             @Override
259            public int getState() {
260                    return state;
261             }
262            
263            
264    
265        @Override
266        public Object getHelper() {
267            return null;
268        }
269    
270    
271            public void info(String msg) {
272                    engine.log(this,GatewayEnginePro.LOGLEVEL_INFO,msg);
273            }
274            
275            public void error(String msg) {
276                    engine.log(this,GatewayEnginePro.LOGLEVEL_ERROR,msg);
277            }
278                
279    
280        private void close(Writer writer) {
281                    if(writer==null) return;
282                    try{
283                            writer.close();
284                    }
285                    catch(Throwable t){}
286            }
287        private void close(Reader reader) {
288                    if(reader==null) return;
289                    try{
290                            reader.close();
291                    }
292                    catch(Throwable t){}
293            }
294        private void close(Socket socket) {
295                    if(socket==null) return;
296                    try{
297                            socket.close();
298                    }
299                    catch(Throwable t){}
300            }
301        private void close(ServerSocket socket) {
302                    if(socket==null) return;
303                    try{
304                            socket.close();
305                    }
306                    catch(Throwable t){}
307            }
308    
309    }