Changeset 7123


Ignore:
Timestamp:
Apr 19, 2016, 2:14:44 PM (7 years ago)
Author:
Nicklas Nordborg
Message:

References #2000: Batch API for annotation handling

Re-factored the annotation batcher to be based on the AbstractBatcher class. This helps us hook into the transaction and re-act to commit and rollbacks.

Annotation snapshots are not deleted until the batcher is closed which minimizes the problem with a parallell transaction that re-creates the snapshot from the old data.

Improved logging.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/core/net/sf/basedb/core/AnnotationBatcher.java

    r7122 r7123  
    7878*/
    7979public class AnnotationBatcher
     80  extends AbstractBatcher
    8081{
    81   private final DbControl dc;
     82 
     83  private static final org.slf4j.Logger log =
     84    org.slf4j.LoggerFactory.getLogger("net.sf.basedb.core.AnnotationBatcher");
     85  private final boolean isDebugEnabled = log.isDebugEnabled();
     86 
    8287  private final Item itemType;
    8388 
     
    113118  private final UpdateBatcher updateItemTable;
    114119
    115   // All update batchers (to make it easy to flush all at once)
    116   private final List<UpdateBatcher> allBatchers;
     120  // All batchers (to make it easy to flush/close all at once)
     121  private final List<UpdateBatcher> allUpdateBatchers;
     122  private final List<InsertBatcher> allInsertBatchers;
    117123 
    118124  private final UpdateBatcher deleteFromAnnotations;
     
    127133  private Annotatable currentItem;
    128134  private int currentAnnotationSetId;
     135  private boolean deleteSnapshotIfModifed;
    129136 
    130137  // Lists for temporary keeping track of value IDs for
     
    136143  private Map<Integer, CurrentAnnotationInfo> currentInfo;
    137144 
     145  // List of AnnotationSet IDs that should be deleted
     146  // from the snapshot manager
     147  private final List<Integer> snapshotsToDelete;
     148 
    138149  /**
    139150    Create a new batcher instance.
     
    150161    }
    151162   
    152     this.dc = dc;
    153163    this.itemType = itemType;
    154164    this.batchDate = new Date();
     
    174184    this.currentInfo = new HashMap<Integer, CurrentAnnotationInfo>();
    175185   
    176     this.allBatchers = new ArrayList<UpdateBatcher>();
     186    this.allUpdateBatchers = new ArrayList<UpdateBatcher>();
     187    this.allInsertBatchers = new ArrayList<InsertBatcher>();
     188    this.snapshotsToDelete = new ArrayList<Integer>();
    177189    try
    178190    {
     
    272284      throw new BaseException(ex);
    273285    }
    274   }
     286   
     287    setDbControl(dc);
     288  }
     289 
     290 
     291  /*
     292    From the Batcher interface
     293    --------------------------
     294  */
     295 
     296  /**
     297    Flush the batchers.
     298  */
     299  @Override
     300  public void flush()
     301  {
     302    System.out.println(new Date() + ": " + numItems + " done; flushing batchers");
     303    String currentSql = null;
     304    try
     305    {
     306      for (UpdateBatcher b : allUpdateBatchers)
     307      {
     308        currentSql = b.getSql();
     309        b.flush();
     310      }
     311    }
     312    catch (SQLException ex)
     313    {
     314      log.error(currentSql, ex);
     315      LogUtil.logSQLExceptionChain(log, ex);
     316      throw new BaseException(ex);
     317    }
     318  }
     319  // ------------------------
     320 
     321  /*
     322    From the AbstractBatcher class
     323    ------------------------------
     324  */
     325  @Override
     326  protected void onBeforeClose()
     327  {
     328    System.out.println(new Date() + "; done!! " + numItems + " items.");
     329    for (UpdateBatcher b : allUpdateBatchers)
     330    {
     331      b.close();
     332    }
     333    for (InsertBatcher b : allInsertBatchers)
     334    {
     335      b.close();
     336    }
     337    for (Integer id : snapshotsToDelete)
     338    {
     339      SnapshotManager.removeSnapshot(currentAnnotationSetId);
     340    }
     341  }
     342  // ---------------------------
    275343 
    276344  private InsertBatcher createInsertBatcher(Connection c, String sql, int... types)
     
    279347    sql = sql.replace('[', dialect.openQuote()).replace(']', dialect.closeQuote());
    280348    InsertBatcher b = new InsertBatcher(c, sql, types);
     349    allInsertBatchers.add(b);
    281350    return b;
    282351  }
     
    287356    sql = sql.replace('[', dialect.openQuote()).replace(']', dialect.closeQuote());
    288357    UpdateBatcher b = new UpdateBatcher(c, sql, types);
    289     allBatchers.add(b);
     358    allUpdateBatchers.add(b);
    290359    return b;
    291360  }
    292361 
    293   /**
    294     Flush the batchers.
    295   */
    296   private void flushBatchers()
    297   {
    298     try
    299     {
    300       for (UpdateBatcher b : allBatchers)
    301       {
    302         b.flush();
    303       }
    304     }
    305     catch (SQLException ex)
    306     {
    307       throw new BaseException(ex);
    308     }
    309   }
    310362 
    311363  /**
     
    318370  public void addAnnotationTypes(Collection<AnnotationType> types)
    319371  {
     372    if (isClosed()) throw new ConnectionClosedException();
    320373    if (currentItem != null)
    321374    {
     
    327380    {
    328381      if (currentInfo.containsKey(at.getId())) continue; // Already added
     382     
     383      log.debug("Adding annotation type: " + at);
    329384     
    330385      at.checkPermission(Permission.USE);
     
    354409  public void setCurrentItem(Annotatable item)
    355410  {
     411    if (isClosed()) throw new ConnectionClosedException();
    356412    if (currentInfo.size() == 0)
    357413    {
     
    365421    item.checkPermission(Permission.WRITE);
    366422   
    367     if (numItems > 0 && numItems % 100 == 0)
    368     {
    369       System.out.println(new Date() + ": " + numItems + " done; flushing batchers");
    370       flushBatchers();
     423    log.debug("Setting current item: " + item);
     424   
     425    if (numItems > 0 && numItems % getBatchSize() == 0)
     426    {
     427      flush();
    371428    }
    372429   
     
    375432    this.currentItem = item;
    376433    this.currentAnnotationSetId = 0;
     434    this.deleteSnapshotIfModifed = false;
    377435
    378436    // Read current values
     
    381439      currentAnnotationSetId = ((AnnotatableData)((BasicItem)item).getData()).getAnnotationSet().getId();
    382440      loadAnnotationInfo.setInteger("annotationSet", currentAnnotationSetId);
     441      deleteSnapshotIfModifed = true;
     442     
     443      if (isDebugEnabled)
     444      {
     445        log.debug("Loading annotations for current item: " + item);
     446      }
    383447     
    384448      // Maps value_id -> Annotation info
    385449      Map<Integer, CurrentAnnotationInfo> tmp = new HashMap<Integer, CurrentAnnotationInfo>();
    386450      List<Object[]> list = HibernateUtil.loadList(Object[].class, loadAnnotationInfo, null);
     451
     452      if (isDebugEnabled)
     453      {
     454        log.debug("Found " + list.size() + " annotations for current item: " + item);
     455      }
    387456     
    388457      for (Object[] data : list)
     
    394463          // This annotation type is not among the specified annotation types
    395464          continue; // with the next annotation
     465        }
     466       
     467        if (isDebugEnabled)
     468        {
     469          log.trace("Found annotation: AnnotationType.id=" + info.annotationTypeId);
    396470        }
    397471       
     
    419493        if (valueIds[typeIndex].isEmpty()) continue; // No annotation with this value type
    420494       
     495        if (isDebugEnabled)
     496        {
     497          log.debug("Loading " + t.name() + " values: " + valueIds[typeIndex]);
     498        }
     499       
    421500        org.hibernate.Query query = loadAnnotationValues[typeIndex];
    422501        query.setParameterList("listOfIds", valueIds[typeIndex], Type.INT.getTypeWrapper().getHibernateType());
    423502     
    424503        list = HibernateUtil.loadList(Object[].class, query, null);
     504        if (isDebugEnabled)
     505        {
     506          log.debug("Found " + list.size() + " values");
     507        }
    425508        for (Object[] data : list)
    426509        {
    427510          Integer valueId = (Integer)data[0];
    428511          tmp.get(valueId).values.add(data[1]);
     512          if (isDebugEnabled) log.trace(valueId + ": "+data[1]);
    429513        }
    430514       
     
    479563      throw new IllegalStateException("Not allowed to call this method with the same annotation more than once: " + annotationType);
    480564    }
     565   
     566    if (isDebugEnabled)
     567    {
     568      log.trace("Setting values for annotation type " + annotationType + ": " + values);
     569    }
     570   
    481571    info.hasBeenUsed = true;
    482572    info.itemId = currentItem.getId();
     
    489579        if (infoIsForCurrentItem)
    490580        {
     581          if (isDebugEnabled)
     582          {
     583            log.trace("Deleting " + annotationType + " from current item " + currentItem);
     584          }
     585         
    491586          // Delete the annotation
    492587          changeType = Change.DELETED;
     
    540635          if (!EqualsHelper.invariantEquals(convertedValues, info.values))
    541636          {
     637            if (isDebugEnabled)
     638            {
     639              log.trace("Updating " + annotationType + " for current item " + currentItem);
     640            }
     641           
    542642            // Update
    543643            changeType = Change.UPDATED;
     
    555655            updateAnnotations.addToBatch(info.version+1, unit != null ? unit.getId() : info.unitId, batchDate, info.annotationId, info.version);
    556656          }
     657          else
     658          {
     659            if (isDebugEnabled)
     660            {
     661              log.trace("No change " + annotationType + " for current item " + currentItem);
     662            }
     663          }
    557664        }
    558665        else
    559666        {
     667          if (isDebugEnabled)
     668          {
     669            log.trace("Creating " + annotationType + " for current item " + currentItem);
     670          }
     671         
    560672          // Create a new annotation
    561673          changeType = Change.ADDED;
     
    589701    }
    590702   
    591     if (changeType != Change.NO_CHANGE)
    592     {
    593       SnapshotManager.removeSnapshot(currentAnnotationSetId);
     703    // Store the annotation set ID so that we can tell
     704    // the SnapshotManager to clear the cache when this
     705    // batcher is closed
     706    if (changeType != Change.NO_CHANGE && deleteSnapshotIfModifed)
     707    {
     708      deleteSnapshotIfModifed = false;
     709      snapshotsToDelete.add(currentAnnotationSetId);
    594710    }
    595711   
    596712    return changeType;
    597   }
    598  
    599   public void close()
    600   {
    601     System.out.println(new Date() + "; done!! " + numItems + " items.");
    602     flushBatchers();
    603713  }
    604714 
     
    690800      throws SQLException
    691801    {
     802      log.debug(sql);
    692803      this.sql = sql;
    693804      this.statement = c.prepareStatement(sql, new String[] { "id" } );
     
    695806    }
    696807   
     808    /**
     809      The SQL statement this batcher is executing.
     810    */
     811    String getSql()
     812    {
     813      return sql;
     814    }
     815
    697816    /**
    698817      Execute an insert SQL statement with the given parameter
     
    721840      return keys.getInt(1);
    722841    }
     842   
     843    /**
     844      Close the batcher.
     845    */
     846    void close()
     847    {
     848      try
     849      {
     850        statement.close();
     851      }
     852      catch (SQLException ex)
     853      {
     854        log.error(ex.getMessage(), ex);
     855        LogUtil.logSQLExceptionChain(log, ex);
     856      }
     857    }
     858
    723859  }
    724860 
     
    741877      throws SQLException
    742878    {
     879      log.debug(sql);
    743880      this.sql = sql;
    744881      this.statement = c.prepareStatement(sql);
     
    747884   
    748885    /**
     886      The SQL statement this batcher is executing.
     887    */
     888    String getSql()
     889    {
     890      return sql;
     891    }
     892   
     893    /**
    749894      Add a new entry to the batch. The number of values must match the number of
    750895      parameter types.
     
    780925      }
    781926    }
     927
     928    /**
     929      Close the batcher.
     930    */
     931    void close()
     932    {
     933      try
     934      {
     935        statement.close();
     936      }
     937      catch (SQLException ex)
     938      {
     939        log.error(ex.getMessage(), ex);
     940        LogUtil.logSQLExceptionChain(log, ex);
     941      }
     942    }
    782943  }
    783944 
Note: See TracChangeset for help on using the changeset viewer.