001/**
002 *
003 * Copyright (c) 2014, the Railo Company Ltd. All rights reserved.
004 *
005 * This library is free software; you can redistribute it and/or
006 * modify it under the terms of the GNU Lesser General Public
007 * License as published by the Free Software Foundation; either 
008 * version 2.1 of the License, or (at your option) any later version.
009 * 
010 * This library is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
013 * Lesser General Public License for more details.
014 * 
015 * You should have received a copy of the GNU Lesser General Public 
016 * License along with this library.  If not, see <http://www.gnu.org/licenses/>.
017 * 
018 **/
019package lucee.runtime.spooler;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.ObjectInputStream;
024import java.io.ObjectOutputStream;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Iterator;
028import java.util.LinkedList;
029import java.util.List;
030
031import lucee.commons.io.IOUtil;
032import lucee.commons.io.SystemUtil;
033import lucee.commons.io.log.Log;
034import lucee.commons.io.log.LogUtil;
035import lucee.commons.io.res.Resource;
036import lucee.commons.io.res.filter.ResourceNameFilter;
037import lucee.commons.io.res.util.ResourceUtil;
038import lucee.commons.lang.ExceptionUtil;
039import lucee.commons.lang.SerializableObject;
040import lucee.commons.lang.StringUtil;
041import lucee.runtime.config.Config;
042import lucee.runtime.engine.ThreadLocalConfig;
043import lucee.runtime.exp.DatabaseException;
044import lucee.runtime.exp.PageException;
045import lucee.runtime.op.Caster;
046import lucee.runtime.op.Duplicator;
047import lucee.runtime.type.Array;
048import lucee.runtime.type.Collection;
049import lucee.runtime.type.KeyImpl;
050import lucee.runtime.type.Query;
051import lucee.runtime.type.QueryImpl;
052import lucee.runtime.type.Struct;
053import lucee.runtime.type.dt.DateTimeImpl;
054import lucee.runtime.type.util.ArrayUtil;
055import lucee.runtime.type.util.CollectionUtil;
056import lucee.runtime.type.util.KeyConstants;
057
058public class SpoolerEngineImpl implements SpoolerEngine {
059        
060        private static final TaskFileFilter FILTER=new TaskFileFilter();
061
062        private static final Collection.Key LAST_EXECUTION = KeyImpl.intern("lastExecution");
063        private static final Collection.Key NEXT_EXECUTION = KeyImpl.intern("nextExecution");
064        
065        private static final Collection.Key CLOSED = KeyImpl.intern("closed");
066        private static final Collection.Key TRIES = KeyImpl.intern("tries");
067        private static final Collection.Key TRIES_MAX = KeyImpl.intern("triesmax");
068
069        
070        private String label;
071        
072
073        //private LinkedList<SpoolerTask> openTaskss=new LinkedList<SpoolerTask>();
074        //private LinkedList<SpoolerTask> closedTasks=new LinkedList<SpoolerTask>();
075        private SimpleThread simpleThread;
076        private SerializableObject token=new SerializableObject();
077        private SpoolerThread thread;
078        //private ExecutionPlan[] plans;
079        private Resource persisDirectory;
080        private long count=0;
081        private Log log;
082        private Config config; 
083        private int add=0;
084
085
086        private Resource closedDirectory;
087        private Resource openDirectory;
088
089        private int maxThreads;
090        
091        public SpoolerEngineImpl(Config config,Resource persisDirectory,String label, Log log, int maxThreads) {
092                this.config=config;
093                this.persisDirectory=persisDirectory;
094
095                closedDirectory = persisDirectory.getRealResource("closed");
096                openDirectory = persisDirectory.getRealResource("open");
097                //calculateSize();
098                
099
100                this.maxThreads=maxThreads;
101                this.label=label;
102                this.log=log;
103                //print.ds(persisDirectory.getAbsolutePath());
104                //load();
105                if(getOpenTaskCount()>0)start();
106        }
107
108        /*private void calculateSize() {
109                closedCount=calculateSize(closedDirectory);
110                openCount=calculateSize(openDirectory);
111        }*/
112
113        public void setMaxThreads(int maxThreads) {
114                this.maxThreads = maxThreads;
115        }
116
117        /**
118         * @return the maxThreads
119         */
120        public int getMaxThreads() {
121                return maxThreads;
122        }
123
124        private int calculateSize(Resource res) {
125                return ResourceUtil.directrySize(res,FILTER);
126        }
127
128        @Override
129        public synchronized void add(SpoolerTask task) {
130                // if there is no plan execute and forget
131                if(task.getPlans()==null) {
132                        if(task instanceof Task)
133                                start((Task)task);
134                        else {
135                                start(new TaskWrap(task));
136                                log.error("spooler", "make class "+task.getClass().getName()+" a Task class");
137                        }
138                        return;
139                }
140                
141                //openTasks.add(task);
142                add++;
143                if (task.nextExecution() == 0)
144                        task.setNextExecution(System.currentTimeMillis());
145                task.setId(createId(task));
146                store(task);
147                start();
148        }
149        
150        // add to interface
151        public void add(Task task) {
152                start(task);
153        }
154
155
156        private void start(Task task) {
157                if(task==null) return;
158                synchronized (task) {
159                        if(simpleThread==null || !simpleThread.isAlive()) {
160                                simpleThread=new SimpleThread(config,task);
161                                
162                                simpleThread.setPriority(Thread.MIN_PRIORITY);
163                                simpleThread.start();
164                        }
165                        else {
166                                simpleThread.tasks.add(task);
167                                simpleThread.interrupt();
168                        }
169                }
170        }
171
172        private void start() {
173                if(thread==null || !thread.isAlive()) {
174                        thread=new SpoolerThread(this);
175                        thread.setPriority(Thread.MIN_PRIORITY);
176                        thread.start();
177                }
178                else if(thread.sleeping) {
179                        thread.interrupt();
180                }
181        }
182
183        @Override
184        public String getLabel() {
185                return label;
186        }
187        
188        
189
190        private SpoolerTask getTaskById(Resource dir,String id) {
191                return getTask(dir.getRealResource(id+".tsk"),null);
192        }
193        
194        private SpoolerTask getTaskByName(Resource dir,String name) {
195                return getTask(dir.getRealResource(name),null);
196        }
197
198        private SpoolerTask getTask(Resource res, SpoolerTask defaultValue) {
199                InputStream is = null;
200        ObjectInputStream ois = null;
201        
202        SpoolerTask task=defaultValue;
203                try {
204                        is = res.getInputStream();
205                ois = new ObjectInputStream(is);
206                task = (SpoolerTask) ois.readObject();
207        } 
208        catch (Throwable t) {
209                        ExceptionUtil.rethrowIfNecessary(t);
210                IOUtil.closeEL(is);
211                IOUtil.closeEL(ois);
212                res.delete();
213        }
214        IOUtil.closeEL(is);
215        IOUtil.closeEL(ois);
216                return task;
217        }
218
219        private void store(SpoolerTask task) {
220                ObjectOutputStream oos=null;
221                Resource persis = getFile(task);
222                if(persis.exists()) persis.delete();
223        try {
224                oos = new ObjectOutputStream(persis.getOutputStream());
225                oos.writeObject(task);
226        } 
227        catch (IOException e) {e.printStackTrace();}
228        finally {
229                IOUtil.closeEL(oos);
230        }
231        }
232
233        private void unstore(SpoolerTask task) {
234                Resource persis = getFile(task);
235                boolean exists=persis.exists();
236                if(exists) persis.delete(); 
237        }
238        private Resource getFile(SpoolerTask task) {
239                Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open");
240                dir.mkdirs();
241                return dir.getRealResource(task.getId()+".tsk");
242        }
243        
244        private String createId(SpoolerTask task) {
245                Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open");
246                dir.mkdirs();
247                
248                String id=null;
249                do{
250                        id=StringUtil.addZeros(++count, 8);
251                }while(dir.getRealResource(id+".tsk").exists());
252                return id;
253        }
254
255        public long calculateNextExecution(SpoolerTask task) {
256                int _tries=0;
257                ExecutionPlan plan=null;
258                ExecutionPlan[] plans=task.getPlans();
259                
260                for(int i=0;i<plans.length;i++) {
261                        _tries+=plans[i].getTries();
262                        if(_tries>task.tries()) {
263                                plan=plans[i];
264                                break;
265                        }
266                }
267                if(plan==null)return -1;
268                return task.lastExecution()+(plan.getIntervall()*1000);
269        }
270        
271        public Query getOpenTasksAsQuery(int startrow, int maxrow) throws PageException {
272                return getTasksAsQuery(createQuery(),openDirectory,startrow, maxrow);
273        }
274
275        public Query getClosedTasksAsQuery(int startrow, int maxrow) throws PageException {
276                return getTasksAsQuery(createQuery(),closedDirectory,startrow, maxrow);
277        }
278
279        public Query getAllTasksAsQuery(int startrow, int maxrow) throws PageException {
280                Query query = createQuery();
281                //print.o(startrow+":"+maxrow);
282                getTasksAsQuery(query,openDirectory,startrow, maxrow);
283                int records = query.getRecordcount();
284                if(maxrow<0) maxrow=Integer.MAX_VALUE;
285                // no open tasks
286                if(records==0) {
287                        startrow-=getOpenTaskCount();
288                        if(startrow<1) startrow=1;
289                }
290                else {
291                        startrow=1;
292                        maxrow-=records;
293                }
294                if(maxrow>0)getTasksAsQuery(query,closedDirectory,startrow, maxrow);
295                return query;
296        }
297        
298        public int getOpenTaskCount() {
299                return calculateSize(openDirectory);
300        }
301        
302        public int getClosedTaskCount() {
303                return calculateSize(closedDirectory);
304        }
305        
306        
307        private Query getTasksAsQuery(Query qry,Resource dir, int startrow, int maxrow) {
308                String[] children = dir.list(FILTER);
309                if(ArrayUtil.isEmpty(children)) return qry;
310                if(children.length<maxrow)maxrow=children.length;
311                SpoolerTask task;
312                
313                int to=startrow+maxrow;
314                if(to>children.length)to=children.length;
315                if(startrow<1)startrow=1;
316                
317                for(int i=startrow-1;i<to;i++){
318                        task = getTaskByName(dir, children[i]);
319                        if(task!=null)addQueryRow(qry, task);
320                }
321                
322                return qry;
323        }
324        
325        private Query createQuery() throws DatabaseException {
326                String v="VARCHAR";
327                String d="DATE";
328                lucee.runtime.type.Query qry=new QueryImpl(
329                                new String[]{"type","name","detail","id","lastExecution","nextExecution","closed","tries","exceptions","triesmax"},
330                                new String[]{v,v,"object",v,d,d,"boolean","int","object","int"},
331                                0,"query");
332                return qry;
333        }
334        
335        private void addQueryRow(lucee.runtime.type.Query qry, SpoolerTask task) {
336        int row = qry.addRow();
337                try{
338                        qry.setAt(KeyConstants._type, row, task.getType());
339                        qry.setAt(KeyConstants._name, row, task.subject());
340                        qry.setAt(KeyConstants._detail, row, task.detail());
341                        qry.setAt(KeyConstants._id, row, task.getId());
342
343                        
344                        qry.setAt(LAST_EXECUTION, row,new DateTimeImpl(task.lastExecution(),true));
345                        qry.setAt(NEXT_EXECUTION, row,new DateTimeImpl(task.nextExecution(),true));
346                        qry.setAt(CLOSED, row,Caster.toBoolean(task.closed()));
347                        qry.setAt(TRIES, row,Caster.toDouble(task.tries()));
348                        qry.setAt(TRIES_MAX, row,Caster.toDouble(task.tries()));
349                        qry.setAt(KeyConstants._exceptions, row,translateTime(task.getExceptions()));
350                        
351                        int triesMax=0;
352                        ExecutionPlan[] plans = task.getPlans();
353                        for(int y=0;y<plans.length;y++) {
354                                triesMax+=plans[y].getTries();
355                        }
356                        qry.setAt(TRIES_MAX, row,Caster.toDouble(triesMax));
357                }
358                catch(Throwable t){
359                        ExceptionUtil.rethrowIfNecessary(t);
360                }
361        }
362        
363        private Array translateTime(Array exp) {
364                exp=(Array) Duplicator.duplicate(exp,true);
365                Iterator<Object> it = exp.valueIterator();
366                Struct sct;
367                while(it.hasNext()) {
368                        sct=(Struct) it.next();
369                        sct.setEL(KeyConstants._time,new DateTimeImpl(Caster.toLongValue(sct.get(KeyConstants._time,null),0),true));
370                }
371                return exp;
372        }
373        
374        class SimpleThread extends Thread {
375                
376                // 'tasks' needs to be synchronized because the other thread will access this list.
377                // otherwise tasks.size() will not match the actual size of the server and NPEs 
378                // and unlimited loops may result.
379                List<Task> tasks = Collections.synchronizedList(new LinkedList<Task>());
380                private Config config;
381                
382                public SimpleThread(Config config, Task task){
383                        this.config=config;
384                        tasks.add(task);
385                }
386                
387                
388                public void run() {
389                        Task task;
390                        while(tasks.size()>0){
391                                try {
392                                        task = CollectionUtil.remove(tasks,0,null);
393                                        if(task!=null)task.execute(config);
394                                }
395                                catch (Throwable t) {
396                                        ExceptionUtil.rethrowIfNecessary(t);
397                                }
398                        }
399                }
400                
401        }
402
403        class SpoolerThread extends Thread {
404
405                private SpoolerEngineImpl engine;
406                private boolean sleeping;
407                private final int maxThreads;
408
409                public SpoolerThread(SpoolerEngineImpl engine) {
410                        this.maxThreads=engine.getMaxThreads();
411                        this.engine=engine;
412                        try{
413                                this.setPriority(MIN_PRIORITY);
414                        }
415                        // can throw security exceptions
416                        catch(Throwable t){
417                                ExceptionUtil.rethrowIfNecessary(t);
418                        }
419                }
420                
421                public void run() {
422                        String[] taskNames;
423                        //SpoolerTask[] tasks;
424                        SpoolerTask task=null;
425                        long nextExection;
426                        ThreadLocalConfig.register(engine.config);
427                        //ThreadLocalPageContext.register(engine.);
428                        List<TaskThread> runningTasks=new ArrayList<TaskThread>();
429                        TaskThread tt;
430                        int adds;
431                        
432                        while(getOpenTaskCount()>0) {
433                                adds=engine.adds();
434                                taskNames = openDirectory.list(FILTER);
435                                //tasks=engine.getOpenTasks();
436                                nextExection=Long.MAX_VALUE;
437                                for(int i=0;i<taskNames.length;i++) {
438                                        task=getTaskByName(openDirectory, taskNames[i]);
439                                        if(task==null) continue;
440                                        
441                                        if(task.nextExecution()<=System.currentTimeMillis()) {
442                                                //print.o("- execute " + task.getId());
443                                                tt=new TaskThread(engine,task);
444                                                tt.start();
445                                                runningTasks.add(tt);
446                                        }
447                                        else if(task.nextExecution()<nextExection && 
448                                                        nextExection!=-1 && 
449                                                        !task.closed()) 
450                                                nextExection=task.nextExecution();
451                                        nextExection=joinTasks(runningTasks,maxThreads,nextExection);
452                                }
453                                
454                                nextExection=joinTasks(runningTasks,0,nextExection);
455                                if(adds!=engine.adds()) continue;
456                                
457                                if(nextExection==Long.MAX_VALUE)break;
458                                long sleep = nextExection-System.currentTimeMillis();
459                                
460                                //print.o("sleep:"+sleep+">"+(sleep/1000));
461                                if(sleep>0)doWait(sleep);
462                                
463                                //if(sleep<0)break;
464                        }
465                        //print.o("end:"+getOpenTaskCount());
466                }
467
468                private long joinTasks(List<TaskThread> runningTasks, int maxThreads,long nextExection) {
469                        if(runningTasks.size()>=maxThreads){
470                                Iterator<TaskThread> it = runningTasks.iterator();
471                                TaskThread tt;
472                                SpoolerTask task;
473                                while(it.hasNext()){
474                                        tt = it.next();
475                                        SystemUtil.join(tt);
476                                        task = tt.getTask();
477
478                                        if(task!=null && task.nextExecution()!=-1 && task.nextExecution()<nextExection && !task.closed()) {
479                                                nextExection=task.nextExecution();
480                                        }
481                                }
482                                runningTasks.clear();
483                        }
484                        return nextExection;
485                }
486
487                private void doWait(long sleep) {
488                        //long start=System.currentTimeMillis();
489                        try {
490                                sleeping=true;
491                                synchronized (this) {
492                                        wait(sleep);
493                                }
494                                
495                        } catch (Throwable t) {
496                                ExceptionUtil.rethrowIfNecessary(t);
497                        }
498                        finally {
499                                sleeping=false;
500                        }
501                }
502                
503        }
504        
505        
506        class TaskThread extends Thread {
507                
508                private SpoolerEngineImpl engine;
509                private SpoolerTask task;
510
511                public TaskThread(SpoolerEngineImpl engine,SpoolerTask task) {
512                        this.engine=engine;
513                        this.task=task;
514                }
515                
516                public SpoolerTask getTask() {
517                        return task;
518                }
519
520                public void run() {
521                        ThreadLocalConfig.register(engine.config);
522                        engine.execute(task);
523                        ThreadLocalConfig.release();
524                        
525                }
526        }
527        
528
529        /**
530         * remove that task from Spooler
531         * @param task
532         */
533        public void remove(SpoolerTask task) {
534                unstore(task);
535                //if(!openTasks.remove(task))closedTasks.remove(task);
536        }
537        
538        public void removeAll() {
539                ResourceUtil.removeChildrenEL(openDirectory);
540                ResourceUtil.removeChildrenEL(closedDirectory);
541                SystemUtil.sleep(100);
542                ResourceUtil.removeChildrenEL(openDirectory);
543                ResourceUtil.removeChildrenEL(closedDirectory);
544        }
545        
546        public int adds() {
547                //return openTasks.size()>0;
548                return add;
549        }    
550
551        @Override
552        public void remove(String id) {
553                SpoolerTask task = getTaskById(openDirectory,id);
554                if(task==null)task=getTaskById(closedDirectory,id);
555                if(task!=null)remove(task);
556        }
557
558        /*private SpoolerTask getTaskById(SpoolerTask[] tasks, String id) {
559                for(int i=0;i<tasks.length;i++) {
560                        if(tasks[i].getId().equals(id)) {
561                                return tasks[i];
562                        }
563                }
564                return null;
565        }*/
566
567        /**
568         * execute task by id and return eror throwd by task
569         * @param id
570         */
571        public PageException execute(String id) {
572                SpoolerTask task = getTaskById(openDirectory,id);
573                if(task==null)task=getTaskById(closedDirectory,id);
574                if(task!=null){
575                        return execute(task);
576                }
577                return null;
578        }
579        
580        public PageException execute(SpoolerTask task) {
581                //task.closed();
582                try {
583                        if(task instanceof SpoolerTaskSupport)  // FUTURE this is bullshit, call the execute method directly, but you have to rewrite them for that
584                                ((SpoolerTaskSupport)task)._execute(config);
585                        else 
586                                task.execute(config);
587                        
588                        unstore(task);
589                        log.log(Log.LEVEL_INFO,"remote-client", task.subject());
590                        task.setLastExecution(System.currentTimeMillis());
591                        task.setNextExecution(-1);
592                        
593                        task.setClosed(true);
594                        task=null;
595                } 
596                catch(Throwable t) {
597                        ExceptionUtil.rethrowIfNecessary(t);
598                        task.setLastExecution(System.currentTimeMillis());
599                        task.setNextExecution(calculateNextExecution(task));
600                        LogUtil.log(log,Log.LEVEL_ERROR,"remote-client", task.subject(),t);
601                        if(task.nextExecution()==-1) {
602                                //openTasks.remove(task);
603                                //if(!closedTasks.contains(task))closedTasks.add(task);
604                                unstore(task);
605                                task.setClosed(true);
606                                store(task);
607                                task=null;
608                        }
609                        else 
610                                store(task);
611                        
612                        return Caster.toPageException(t);
613                }
614                return null;
615        }
616
617        public void setLabel(String label) {
618                this.label = label;
619        }
620
621        public void setPersisDirectory(Resource persisDirectory) {
622                this.persisDirectory = persisDirectory;
623        }
624
625        public void setLog(Log log) {
626                this.log = log;
627        }
628
629        public void setConfig(Config config) {
630                this.config = config;
631        }
632        
633}
634        
635class TaskFileFilter implements ResourceNameFilter {
636
637        public boolean accept(Resource parent, String name) {
638                return name!=null && name.endsWith(".tsk");
639        }
640        
641}