001    package railo.runtime.gateway;
002    
003    import java.io.BufferedReader;
004    import java.io.IOException;
005    import java.io.InputStreamReader;
006    import java.io.PrintWriter;
007    import java.io.Reader;
008    import java.io.Writer;
009    import java.net.ServerSocket;
010    import java.net.Socket;
011    import java.util.ArrayList;
012    import java.util.Iterator;
013    import java.util.List;
014    import java.util.Map;
015    
016    import org.opencfml.eventgateway.Gateway;
017    import org.opencfml.eventgateway.GatewayEngine;
018    import org.opencfml.eventgateway.GatewayException;
019    
020    import railo.loader.engine.CFMLEngine;
021    import railo.loader.engine.CFMLEngineFactory;
022    import railo.runtime.exp.PageException;
023    import railo.runtime.type.Struct;
024    import railo.runtime.util.Cast;
025    import railo.runtime.util.Creation;
026    
027    public class SocketGateway implements Gateway {
028    
029            private GatewayEngine engine;
030            private int port;
031            private String welcomeMessage="Welcome to the Railo Socket Gateway";
032        
033            private String id;
034            private CFMLEngine cfmlEngine;
035            private Cast caster;
036            private Creation creator;
037            private List<SocketServerThread> sockets=new ArrayList<SocketServerThread>();
038            private ServerSocket serverSocket;
039            protected int state=STOPPED;
040            private String cfcPath;
041    
042    
043            /**
044             * @see org.opencfml.eventgateway.Gateway#init(org.opencfml.eventgateway.GatewayEngine, java.lang.String, java.lang.String, java.util.Map)
045             */
046            public void init(GatewayEngine engine, String id, String cfcPath, Map config)throws GatewayException {
047                    this.engine=engine;
048                    cfmlEngine=CFMLEngineFactory.getInstance();
049                    caster=cfmlEngine.getCastUtil();
050                    creator = cfmlEngine.getCreationUtil();
051                    this.cfcPath=cfcPath;
052                    this.id=id;
053                    
054                    // config
055                    Object oPort=config.get("port");
056                    port=caster.toIntValue(oPort, 1225);
057    
058                    Object oWM=config.get("welcomeMessage");
059                    String strWM=caster.toString(oWM,"").trim();
060                    if(strWM.length()>0)welcomeMessage=strWM;
061            }
062            
063            
064            public void doStart() {
065                    state = STARTING;
066                    try     {
067                            createServerSocket();
068                            state = RUNNING;
069                            do {
070                            try {
071                                    SocketServerThread sst = new SocketServerThread(serverSocket.accept());
072                        sst.start();
073                        sockets.add(sst);
074                    }
075                    catch (Throwable t) {
076                        error("Failed to listen on Socket ["+id+"] on port ["+port+"]: " + t.getMessage());
077                    }
078                }
079                            while (getState()==RUNNING || getState()==STARTING);
080                            
081                close(serverSocket);
082                serverSocket = null;
083            }
084            catch (Throwable e) {
085                    state=FAILED;
086                        error("Error in Socet Gateway ["+id+"]: " + e.getMessage());
087                e.printStackTrace();
088                //throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(e);
089            }
090        }
091            
092            public void doStop()    {
093                    state = STOPPING;
094                    try{
095                            
096                            // close all open connections
097                            Iterator<SocketServerThread> it = sockets.iterator();
098                    while (it.hasNext()) {
099                        close(it.next().socket);
100                    }
101                    
102                    // close server socket
103                    close(serverSocket);
104                    serverSocket = null;
105                        state = STOPPED;
106                    }
107                    catch(Throwable e){
108                            state=FAILED;
109                            error("Error in Socket Gateway ["+id+"]: " + e.getMessage());
110                e.printStackTrace();
111                //throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(e);
112                    }
113        }
114    
115            private void createServerSocket() throws PageException, RuntimeException {
116                    try     {
117                serverSocket = new ServerSocket(port);
118            }
119            catch (Throwable t) {
120                error("Failed to start Socket Gateway ["+id+"] on port ["+port+"] " +t.getMessage());
121                throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(t);
122            }
123            }
124             
125             
126             
127    
128             private void invokeListener(String line, String originatorID) {
129                    
130                    Struct data=creator.createStruct();
131            data.setEL(creator.createKey("message"), line);
132            Struct event=creator.createStruct();
133            event.setEL(creator.createKey("data"), data);
134            event.setEL(creator.createKey("originatorID"), originatorID);
135            
136            event.setEL(creator.createKey("cfcMethod"), "onIncomingMessage");
137            event.setEL(creator.createKey("cfcTimeout"), new Double(10));
138            event.setEL(creator.createKey("cfcPath"), cfcPath);
139    
140            event.setEL(creator.createKey("gatewayType"), "Socket");
141            event.setEL(creator.createKey("gatewayId"), id);
142            
143            
144                                
145            if (engine.invokeListener(this, "onIncomingMessage", event))    
146                info("Socket Gateway Listener ["+id+"] invoked.");
147            else
148                    error("Failed to call Socket Gateway Listener ["+id+"]");           
149             }
150    
151             
152             private class SocketServerThread extends Thread        {
153                    private Socket socket;
154                    private PrintWriter out;
155                            private String _id;
156                            
157                    public SocketServerThread(Socket socket) throws IOException     {
158                        this.socket = socket;
159                    out = new PrintWriter(socket.getOutputStream(), true);
160                    this._id=String.valueOf(hashCode());
161                    }
162    
163                    public void run()       {
164                            BufferedReader in = null;
165                        try {
166                            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
167                            out.println(welcomeMessage);
168                            out.print("> ");
169                            String line;
170                            while ((line = in.readLine()) != null) {
171                                    if (line.trim().equals("exit")) break;
172                                else invokeListener(line,_id);
173                            }
174                            //socketRegistry.remove(this.getName());
175                        }
176                        catch (Throwable t) {
177                            error("Failed to read from Socket Gateway ["+id+"]: " + t.getMessage());
178                        }
179                        finally{
180                            close(out);
181                            out=null;
182                            close(in);
183                            close(socket);
184                            sockets.remove(this);
185                        }
186                    }
187    
188                            public void writeOutput(String str)     {
189                                    out.println(str);
190                        out.print("> ");
191                    }
192                }
193    
194            
195            
196            
197            
198    
199    
200    
201            public String sendMessage(Map _data) {
202                    Struct data=caster.toStruct(_data, null, false);
203                    String msg = (String) data.get("message",null);
204                    String originatorID=(String) data.get("originatorID",null);
205                    
206                    String status="OK";
207            if (msg!=null ) {
208                
209                    Iterator<SocketServerThread> it = sockets.iterator();
210                    SocketServerThread sst;
211                    try     {
212                            boolean hasSend=false;
213                            while(it.hasNext()){
214                                    sst=it.next();
215                                    if(originatorID!=null && !sst._id.equalsIgnoreCase(originatorID)) continue;
216                                    sst.writeOutput(msg);
217                                    hasSend=true;
218                            }
219                            
220                            if(!hasSend) {
221                                    if(sockets.size()==0) {
222                                            error("There is no connection");
223                                    status = "EXCEPTION";
224                                    }
225                                    else {
226                                            it = sockets.iterator();
227                                            StringBuilder sb=new StringBuilder();
228                                            while(it.hasNext()){
229                                                    if(sb.length()>0) sb.append(", ");
230                                                    sb.append(it.next()._id);
231                                            }
232                                            error("There is no connection with originatorID ["+originatorID+"], available originatorIDs are ["+sb+"]");
233                                            status = "EXCEPTION";
234                                    }
235                            }
236                }
237                catch (Exception e) {
238                    e.printStackTrace();
239                    error("Failed to send message with exception: " + e.toString());
240                    status = "EXCEPTION";
241                }
242            }
243            return status;
244        }
245            
246            
247            /**
248             * @see org.opencfml.eventgateway.Gateway#doRestart()
249             */
250            public void doRestart() {
251                    doStop();
252                    doStart();
253            }
254            
255            
256    
257            /**
258             * @see org.opencfml.eventgateway.Gateway#getId()
259             */
260            public String getId() {
261                    return id;
262            }
263            
264             /**
265             * @see org.opencfml.eventgateway.Gateway#getState()
266             */
267            public int getState() {
268                    return state;
269             }
270            
271            
272    
273        /**
274         * @see org.opencfml.eventgateway.Gateway#getHelper()
275         */
276        public Object getHelper() {
277            return null;
278        }
279    
280    
281            public void info(String msg) {
282                    engine.log(this,GatewayEngine.LOGLEVEL_INFO,msg);
283            }
284            
285            public void error(String msg) {
286                    engine.log(this,GatewayEngine.LOGLEVEL_ERROR,msg);
287            }
288                
289    
290        private void close(Writer writer) {
291                    if(writer==null) return;
292                    try{
293                            writer.close();
294                    }
295                    catch(Throwable t){}
296            }
297        private void close(Reader reader) {
298                    if(reader==null) return;
299                    try{
300                            reader.close();
301                    }
302                    catch(Throwable t){}
303            }
304        private void close(Socket socket) {
305                    if(socket==null) return;
306                    try{
307                            socket.close();
308                    }
309                    catch(Throwable t){}
310            }
311        private void close(ServerSocket socket) {
312                    if(socket==null) return;
313                    try{
314                            socket.close();
315                    }
316                    catch(Throwable t){}
317            }
318    
319    }