/* Copyright (c) 2011 Danish Maritime Authority.
 
*
 
* Licensed under the Apache License, Version 2.0 (the "License");
 
* you may not use this file except in compliance with the License.
 
* You may obtain a copy of the License at
 
*
 
*
     
http://www.apache.org/licenses/LICENSE-2.0
 
*
 
* Unless required by applicable law or agreed to in writing, software
 
* distributed under the License is distributed on an "AS IS" BASIS,
 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
* See the License for the specific language governing permissions and
 
* limitations under the License.
 
*/
package dk.dma.ais.store.write;

import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.exceptions.QueryValidationException;
import com.datastax.driver.core.querybuilder.Batch;
import com.datastax.driver.core.querybuilder.QueryBuilder;

import dk.dma.commons.service.AbstractBatchedStage;
import dk.dma.commons.util.DurationFormatter;
import dk.dma.db.cassandra.CassandraConnection;

/**
 
*
 
* @author Kasper Nielsen
 
*/

public abstract class CassandraBatchedStagedWriter<T> extends AbstractBatchedStage<T> {

    
/** The logger. */
    
private static final Logger LOG = LoggerFactory.getLogger(CassandraBatchedStagedWriter.class);

    
/** The connection to Cassandra. */
    
private final CassandraConnection connection;

    
final MetricRegistry metrics = new MetricRegistry();

    
final Meter persistedCount = metrics.meter(MetricRegistry.name("aistore", "cassandra",
            
"Number of persisted AIS messages"));

    
/** greater than 0 if the last batch was slow. */
    
private int lastSlowBatch;

    
/**
     
* @param queueSize
     
* @param maxBatchSize
     
*/

    
public CassandraBatchedStagedWriter(CassandraConnection connection, int batchSize) {
        
super(Math.min(100000, batchSize * 100), batchSize);
        
this.connection = requireNonNull(connection);
        
final JmxReporter reporter = JmxReporter.forRegistry(metrics).inDomain("fooo.erer.er").build();
        
reporter.start();
    
}

    
/** {@inheritDoc} */
    
@Override
    
protected final void handleMessages(List<T> messages) {
        
long start = System.nanoTime();
        
// Create a batch of message that we want to write.
        
List<RegularStatement> statements = new ArrayList<>();
        
for (T t : messages) {
            
try {
                
handleMessage(statements, t);
            
} catch (RuntimeException e) {
                
LOG.warn("Failed to write message: " + t, e); // Just in case we cannot process a message
            
}
        
}

        
// Try writing the batch
        
try {
            
Batch batch = QueryBuilder.batch(statements.toArray(new RegularStatement[statements.size()]));
            

            

            
long beforeSend = System.nanoTime();
            

            
ResultSetFuture f = connection.getSession().executeAsync(batch);
            
f.getUninterruptibly(); //throws QueryValidationExecption etc
            

            
long total = System.nanoTime();
            
// Is this an abnormal slow batch?
            
boolean isSlow = TimeUnit.MILLISECONDS.convert(total - start, TimeUnit.NANOSECONDS) > 200
                    
|| messages.size() >= getBatchSize();
            
if (isSlow || lastSlowBatch > 0) {
                
LOG.info("Total time: " + DurationFormatter.DEFAULT.formatNanos(total - start) + ", prepping="
                        
+ DurationFormatter.DEFAULT.formatNanos(beforeSend - start) + ", sending="
                        
+ DurationFormatter.DEFAULT.formatNanos(total - beforeSend) + ", size=" + messages.size());
                
// makes sure we write 10 info statements after the last slow batch we insert
                
lastSlowBatch = isSlow ? 10 : lastSlowBatch - 1;
            
}
            
persistedCount.mark(messages.size());
            
// sink.onSucces(messages);
        
} catch (QueryValidationException e) {
            
LOG.error("Could not execute query, this is an internal error", e);
        
} catch (Exception e) {
            
onFailure(messages, e);
            
try {
                
sleepUntilShutdown(2, TimeUnit.SECONDS);
            
} catch (InterruptedException ignore) {
                
Thread.interrupted();
            
}
        
}
    
}

    
protected abstract void handleMessage(List<RegularStatement> statements, T message);

    
public abstract void onFailure(List<T> messages, Throwable cause);

}
// batch.enableTracing();
// ExecutionInfo executionInfo = connection.getSession().execute(batch).getExecutionInfo();
// Thread.sleep(50); <~ we need to sleep when tracing
// System.out.println(executionInfo.getQueryTrace().getDurationMicros());
// for (QueryTrace.Event e : executionInfo.getQueryTrace().getEvents()) {
// System.out.println(e.getSourceElapsedMicros() + " : " + e.getDescription());
// }