001    /**
002     * Implements the CFML Function arrayavg
003     */
004    package railo.runtime.functions.closure;
005    
006    import java.util.ArrayList;
007    import java.util.Enumeration;
008    import java.util.Iterator;
009    import java.util.List;
010    import java.util.Map;
011    import java.util.Map.Entry;
012    import java.util.concurrent.ExecutorService;
013    import java.util.concurrent.Executors;
014    import java.util.concurrent.Future;
015    
016    import railo.runtime.PageContext;
017    import railo.runtime.concurrency.UDFCaller;
018    import railo.runtime.exp.FunctionException;
019    import railo.runtime.exp.PageException;
020    import railo.runtime.ext.function.Function;
021    import railo.runtime.op.Caster;
022    import railo.runtime.type.Array;
023    import railo.runtime.type.Collection.Key;
024    import railo.runtime.type.Iteratorable;
025    import railo.runtime.type.UDF;
026    
027    
028    public final class Each implements Function {
029    
030            private static final long serialVersionUID = 1955185705863596525L;
031    
032            public static String call(PageContext pc , Object obj, UDF udf) throws PageException {
033                    return _call(pc, obj, udf, false,20);
034            }
035            public static String call(PageContext pc , Object obj, UDF udf, boolean parallel) throws PageException {
036                    return _call(pc, obj, udf, parallel, 20);
037            }
038            public static String call(PageContext pc , Object obj, UDF udf, boolean parallel, double maxThreads) throws PageException {
039                    return _call(pc, obj, udf, parallel, (int)maxThreads);
040            }
041            
042            private static String _call(PageContext pc , Object obj, UDF udf, boolean parallel, int maxThreads) throws PageException {
043                    ExecutorService execute=null;
044                    List<Future<String>> futures=null;
045                    if(parallel) {
046                            execute = Executors.newFixedThreadPool(maxThreads);
047                            futures=new ArrayList<Future<String>>();
048                    }
049                    // Array
050                    if(obj instanceof Array) {
051                            invoke(pc, (Array)obj, udf,execute,futures);
052                    }
053                    // other Iteratorable
054                    else if(obj instanceof Iteratorable) {
055                            invoke(pc, (Iteratorable)obj, udf,execute,futures);
056                    }
057                    // Map
058                    else if(obj instanceof Map) {
059                            Iterator it = ((Map)obj).entrySet().iterator();
060                            Entry e;
061                            while(it.hasNext()){
062                                    e = (Entry) it.next();
063                                    _call(pc,udf,new Object[]{e.getKey(),e.getValue()},execute,futures);
064                                    //udf.call(pc, new Object[]{e.getKey(),e.getValue()}, true);
065                            }
066                    }
067                    //List
068                    else if(obj instanceof List) {
069                            Iterator it = ((List)obj).iterator();
070                            while(it.hasNext()){
071                                    _call(pc,udf,new Object[]{it.next()},execute,futures);
072                                    //udf.call(pc, new Object[]{it.next()}, true);
073                            }
074                    }
075                    // Iterator
076                    else if(obj instanceof Iterator) {
077                            Iterator it = (Iterator)obj;
078                            while(it.hasNext()){
079                                    _call(pc,udf, new Object[]{it.next()},execute,futures);
080                                    //udf.call(pc, new Object[]{it.next()}, true);
081                            }
082                    }
083                    // Enumeration
084                    else if(obj instanceof Enumeration) {
085                            Enumeration e = (Enumeration)obj;
086                            while(e.hasMoreElements()){
087                                    _call(pc,udf,new Object[]{e.nextElement()},execute,futures);
088                                    //udf.call(pc, new Object[]{e.nextElement()}, true);
089                            }
090                    }
091                    else
092                            throw new FunctionException(pc, "Each", 1, "data", "cannot iterate througth this type "+Caster.toTypeName(obj.getClass()));
093                    
094                    if(parallel) afterCall(pc,futures);
095                            
096                    
097                    return null;
098            }
099            
100    
101            public static void afterCall(PageContext pc, List<Future<String>> futures) throws PageException {
102                    try{
103                            Iterator<Future<String>> it = futures.iterator();
104                            Future<String> f;
105                            while(it.hasNext()){
106                                    pc.write(it.next().get());
107                            }
108                    }
109                    catch(Exception e){
110                            throw Caster.toPageException(e);
111                    }
112            }
113            public static void invoke(PageContext pc , Array array, UDF udf,ExecutorService execute,List<Future<String>> futures) throws PageException {
114                    Iterator<Object> it = array.valueIterator();
115                    while(it.hasNext()){
116                            _call(pc,udf,new Object[]{it.next()},execute,futures);
117                            //udf.call(pc, new Object[]{it.next()}, true);
118                    }
119            }
120    
121            public static void invoke(PageContext pc , Iteratorable coll, UDF udf,ExecutorService execute,List<Future<String>> futures) throws PageException {
122                    Iterator<Entry<Key, Object>> it = coll.entryIterator();
123                    Entry<Key, Object> e;
124                    while(it.hasNext()){
125                            e = it.next();
126                            _call(pc,udf,new Object[]{e.getKey().getString(),e.getValue()},execute,futures);
127                            //udf.call(pc, new Object[]{e.getKey().getString(),e.getValue()}, true);
128                    }
129            }
130            
131            private static void _call(PageContext pc, UDF udf, Object[] args,ExecutorService es,List<Future<String>> futures) throws PageException {
132                    if(es==null) {
133                            udf.call(pc, args, true);
134                            return;
135                    }
136                    futures.add(es.submit(new UDFCaller(pc, udf, args, true)));
137            }
138    }