June 21, 2009

Use IBM WebSphere eXtreme Scale to get the most out of your SOA investment!

Many customers have now used SOA best practices to rebuild their infrastructure. But, it's surprising how many of the customers are leaving a lot of potential savings on the table. A SOA style architecture with an ESB provides a great infrastructure to quickly achieve sizable savings and further demonstrate the value of SOA to the business.

I think a lot of customers think of extreme performance when they think about products like IBM WebSphere eXtreme Scale. Millions of transactions per second. Hundreds of servers and so on. "We're not facebook or google, we don't need this". They may wrongly conclude it's not for them. But, the reality is there are very few customers with those kinds of requirements. However, there are many, many more customers with conventional type applications doing a couple of hundred transactions/sec or less who could readily use WebSphere eXtreme Scale to save them a lot of money for little investment especially when they have already made the investment to go SOA.

WebSphere eXtreme Scale can be readily used as a very advanced caching technology with immense capacity and very low response times. We regularly talk with customers who back ends systems are under pressure. These backend systems range from simple databases, to SAP systems, mainframes running DB/390, CICS, IMS or other valuable enterprise applications. Todays economic climate means belts are tightening so there is little money available to expand the capacity of these backend systems even as additional load is known to be coming.

At the same time even if business is flat, budget controls are at best flat. Many customers report having to run with flat or smaller budgets for backend systems even while adding more demands on them in the form of additional applications leveraging the backend systems.

Most customers estimate that there are many redundant requests being handled by backend systems. One customer with an ESB fronted mainframe estimated 25% of all service requests are duplicates of requests issued within 24 hours, another indicated 50%. Some use cases are as high as 80%. Adding a large cache to such as ESB can capitalize on this and effectively reduce the load on those backend systems by that amount. This results in better response times and cost savings in terms of load on the backend systems. This means your backend can handle 25% more load without a capacity increase.

An ESB cache is an extremely cost effective way to introduce WebSphere eXtreme Scale in to an organization. Add a mediation to the ESB. It doesn't change the front end applications invoking services on the bus NOR does it change the backend system. It just means adding a 'mediation' to the ESB which checks the cache first before hitting the backend. WebSphere eXtreme Scale can handle evicting the data.

Could you do this with a normal, non distributed cache? I don't think so. That 25% saving takes 24 hours of caching to do. That can be a lot of data, more than will fit in a single JVM. You also need a shared cache so that no matter which server received the initial service request, ALL the servers benefit from the cache as soon as a single server caches something. This is crucial to getting the most out of this pattern. A conventional in process cache CANNOT do this. You need a network attached distributed cache. WebSphere eXtreme Scale makes 10GB, 100Gb or even Tb caches possible with very little work. This kind of use case is usually very straightforward to implement and can usually be readily integrated in to a SOA application.

So, what are you waiting for? You had the foresight to envision a SOA architecture and you invested the money to deploy it, exposing backends as services, deploying an ESB and so on. Make sure you get the most out of that investment and deploy WebSphere eXtreme Scale to get you that kind of saving too. This is low hanging fruit. We had a recent customer achieve 500K USD a MONTH in savings after deploying IBM WebSphere eXtreme Scale to take load of a mainframe.

June 18, 2009

Conversational state in IBM WebSphere eXtreme Scale: PER_CONTAINER grids

We added this type of grid starting with V6.1.0.3, prior to this we had 'normal' grids which are a fixed number of partitions and the key of a Map hashed to one of those partitions. We place the partitions on the set of online container JVMs and automatically scale them out or in if JVMs are added and removed. This grid works very well for key based grids, where the applications uses a key object to locate data in the grid.


PER_CONTAINER grids are different. You specify the grid uses PER_CONTAINER using the placementPolicy attribute in the deployment xml file. Here, you specify how many partitions you want per container JVM that you start. Remember, normally, you specify how many partitions total in the grid, not per container. If the number of partitions is 5 then when you start a container JVM, WXS will create 5 new anonymous partition primaries on that JVM and will create any replicas on the other container JVMs already running. Here is a sequence:

  • Start a JVM, it gets 5 primaries. (P0-P4)
    JVM#1 -> P0, P1, P2, P3, P4 
  • Start another JVM, it gets 5 primaries (P5-P9). Replicas for P0-4 are placed on it and replicas for P5-9 go on JVM#1.
    JVM#1 -> P0,P1,P2,P3,P4,R5,R6,R7,R8,R9
    JVM#2 -> P5,P6,P7,P8,P9,R0,R1,R2,R3,R4
     
     
  •  Start another JVM, it gets 5 primaries (p10-p14). The replicas for p10-p14 are placed on JVM#1 and JVM#2 and some of the replicas for P0-P9 are moved to JVM#3 to balance the load.

    JVM#1 -> P0,P1,P2,P3,P4,R7,R8,R9,R10,R11,R12
    JVM#2 -> P5,P6,P7,P8,P9,R2,R3,R4,R13,R14
    JVM#3 -> P10,P11,P12,P13,P14,R5,R6,R0,R1
     
     
This continues as more JVMs are started. The grid will always create 5 new partition primaries on each new JVM and will then place replicas for those on the existing JVMs and will then balance the replicas out so the new JVM gets it's share. But, WXS will NEVER move a primary in PER_CONTAINER mode. It only moves replicas for balancing.

The partition numbers are arbitrary and have nothing to do with keys. You cannot do key based routing with this type of grid. If a JVM was to be stopped then the partition ids which were created for that JVM are not longer in use. This means there is a gap in the partition ids. There would no longer be partitions 5 to 9 if JVM#2 was to die in the above example. There would only be 0-4 and 10-14. This is why key based hashing doesn't work here. 

So, what's it good for? It's good for things like HTTP Session replication or application session state. Here, a HTTP router assigns a HTTP Session to a servlet container. The servlet container needs to create a HTTP Session for it and will choose one of the five local partition primaries for the session. The 'id' of the partition chosen is then stored in the cookie. The servlet container now has local access to the session state which means zero latency access to the data for this request as long as session affinity is maintained. Any changes to the partition are replicated by WXS.

Why not just have one partition per JVM? 5 or more likely 10 is good because think about what happens if a JVM fails. We want the burden of the JVM failing to be spread across the cluster 'evenly'. If there was one partition then when the JVM fails then one JVM has to pick up the entire load, the one where the replica was. This is bad because the load there may double. If there is 5 partitions per JVM then 5 JVMs pick up the load of the JVM which lowers the impact on each JVM by 80%. By using multiple partitions per JVM, we are lowering the impact on the replica substantially. Another way to look at it is if a JVM suddenly spiked then the replication load of that JVM is spread over 5 JVMs, which is better than one.

The next issue I hope some have noticed is that this means every time a JVM starts, we may five more partition primaries and 5 more replicas. Over time, we just keep making partitions and they never go away. This isn't how it works. When a JVM starts, it gets five partition primaries. These are 'home' primaries. They exist on the container which created them. If the JVM fails then the replicas become primaries and WXS will create 5 more replicas to maintain high availability (unless auto repair is turned off!). The new primaries are in a different JVM than the one that creates the partitions. They are foreign primaries. The application should never place new state or sessions in a foreign primary. They should be allowed to 'drain', sessions get evicted etc. Eventually, the foreign primary has no entries and WXS automatically deletes it and its associated replicas. The foreign primaries purpose is to allow existing sessions to still be available but it's not for new sessions.

How does a client interact with such a 'key' less grid then? The client just begins a transaction and then stores data in the grid. The keys are meaning less. The client can ask the Session for a SessionHandle object. This is a serializable handle which allows the client to get back to the same partition later. WXS picks a partition for the client from the list of home partition primaries. It will never return a foreign primary partition. This SessionHandle could be serialized in a HTTP cookie or similar device and then later on receiving it again, it can be converted back in to a SessionHandle and provided to the WXS API to obtain a Session bound to the same partition again. You cannot use Agents to interact with a PER_CONTAINER grid for now.

This is different than a normal FIXED_PARTITION or hash grid because the client stores data in a place in the grid, gets a handle to it and uses the handle to access it again next time. There is no application supplied key here.

Obviously, WXS is not making a new partition for each 'session'. Therefore, the keys used to store data in the partition should be unique within that partition. Maybe the client generates a unique SessionID and then uses that as the key to find information in Maps in that partition. Multiple client sessions will be assigned to the same partition so the application needs to make sure different keys are used to store session data in that partition for each partition.

I used 5 partitions as an example here but the numberOfPartitions parameter in the objectgrid.xml can be used to specify this, it just means the number of partitions per container rather than per grid. The number of replicas is specified in the normal way.

This also works with multiple data centers. If possible WXS will return a SessionHandle to a partition whose primary is located within the same zoned/data center as that client. The client can specify the zone as a parameter to the JVM or using an API. The client zone id can be set using serverproperties or clientproperties

The PER_CONTAINER style of grid suits applications which store conversational type state rather than database oriented data. The key to access it is some kind of conversation id and is not related to a specific database record or something like that. It provides higher performance (because the partition primaries can be collocated with the servlets for example), easier configuration (no need to worry about how many partitions are needed for how many JVMs), works very well in multiple data center scenarios (zone based routing). It provides a very useful extra tool in the bag for effectively using WebSphere eXtreme Scale.

June 10, 2009

Single Partition and every-partition transactions

I've been getting asked about this a lot lately so I figured I'd just blog about it. Products like WebSphere eXtreme Scale work by taking a dataset, partitioning it using a key and then assigning those partitions to a number of JVMs. Each partition usually has a primary and a replica. These 'shards' are assigned to JVMs. A transactional application typically interacts with the data on a single partition at a time. This means the transaction is executed in a single JVM. A server box will be able to do M of those transactions per second and it scales because N boxes does MN (M multiplied by N) transactions per second. Increase N, you get more transactions per second. Availability is very good because a transaction only depends on 1 of the N servers that are currently online. Any of the other (N-1) servers can go down or fail with no impact on the transaction. So, single partition transactions can scale indefinitely from a throughput point of view, offer very consistent response times and they are very available because they only point a small part of the grid at once.


All-partition transactions are different. A simple example might be that we are storing bank accounts in a grid. The account key is the bank account number. The value is an account object with the users online username and their password, address, portal profile, bank account information etc. Almost all access to the account is using the account number. Now, lets look at the login process for the banks portal. The user doesn't login with their account number, they login with the username. We have not partitioned on user name, we partitioned on account and did so for good reason as every other transaction type is keyed on account number.

So, given we can't easily look up a record using the user name what can we do. Option 1. Lets do a parallel search across all partitions to find account objects whose user name attribute is 'billy'. We can use a MapGridAgent in WebSphere eXtreme Scale to do this. The agent code will be executed in parallel across all partitions. It will run a query within that partition to find any accounts in that partition with a username of 'billy'. One account object should match across the whole grid and the client which called the agent should receive the account number as the result. Problem solved!

Not so fast. Lets examine this parallel search. How long does it take to run? The client invokes instructs each partition to execute the search code. These searches run in parallel and the client blocks until they all return. So, the client basically waits for the slowest 'partition' or server to return before it continues. How many of these lookup transactions can the grid perform per second? As many as the slowest box can do. If the number of accounts was to double, we could double the size of the grid. This lets us store twice as many accounts but what about the effect on our parallel search? It's true we are searching twice as fast as before (double the CPUs) but there is also twice as much data to search through so we are probably achieving the same response time as before. What about throughput? It's still the same. We can only do as many transactions per second as the slowest machine. Our throughput hasn't changed even though we doubled the size of the grid. Now, we can search twice as many records with the same response time as before, but throughput wise, nothing changed. The grid is scaling in terms of account capacity and records searched/second but the throughput number is not scaling at all.

Availability is also impacted when compared with single partition transactions. The single partition transactions only used a single partition/server. The every partition transaction needs the whole grid to be up to complete. The failure of a single box will delay the transaction from completing. Now, products like WebSphere eXtreme Scale will very quickly recover from a failure (typically sub second) but on a large enough grid then you'll see response time glitches where maybe a second or so is added if the admins are cycling through servers doing maintenance or something like that. This delay is very unlikely to happen in a single partition transaction case. You'd have a 1/N change of it happening. Much better than the 100% chance with a every partition transaction.

This lack of throughput scalability for every partition transactions is a problem as login is a operation whose throughput needs to go up as the web site becomes more popular. So, it looks like using parallel search for an operations which need to scale from a throughput point of view is a bad idea. What else can we do?

We could partition using user name instead of account but now we have the search problem for all the account number based transactions which are the bulk of all transactions and besides, users like being able to change the user name which would be a nightmare if everything was based on usernames.

We could cache the results of looking up usernames with parallel searches. The cache would be a Map whose key was username and the value was account number. A Loader attached to the Map would do a parallel search with a MapGridAgent if its Loader#get method was called on a cache miss. The problem here is that when we warm up the cache, we'll be getting a lot of cache misses and a lot of parallel searches. Not good either.

Or, we could maintain a persistent reverse index. This index is a Map which has the user name for the key and the account id for the value. The Map is backed by a database table or other long term persistence mechanism. Now, when a user logs in, we simply do a Map.get("billy") and receive the account id with a single partition transaction and the throughput of those does scale with grid size. We have to maintain this reverse index so that if the user changes their username then we need to make sure the reverse index is updated and so on.

Login now is a matter of looking up the user name in the reverse index map (revMap.get "billy" returning 1234) and then retrieving the account object using a second get to check the password (accMap.get "1234" returning the account object with the password). This is a much better solution than a parallel search. This is a query cache. Effectively, we are caching the results of the parallel search using a persistent map. We have converted a parallel transaction to a single partition transaction and as a result, our login operation is now throughput scalable.

Multi-partition transactions can be great for searching large amounts of data in parallel. The search speed/second does increase with the grid size. Larger grids can store larger amounts of data but the throughput typically stays the same as the grid grows (assuming the data size grows linearly with grid size). This means using parallel operations for something whose throughput will grow as your application scales up is a mistake as the throughput of the grid has nothing to do with the grid size, it's limited to the throughput of the slowest box. 

You need to convert that parallel search operation to a single partition get if you want the system to scale from a throughput point of view. Caching the parallel searches OR using a reverse index (effectively this is a disk persistent query cache) is the normal way to handle this conversion.

How can you make an every partition operation scale from a throughput point of view then if you can't use reverse indexes? Use multiple grids which are all the same and round robin the requests over them. Each grid will be able to do M transactions per second and N grids givens you MN per second. If you need throughput scalable every partition transactions then this is probably the only way to make it scale from a throughput point of view. Ever wonder why google needs millions of servers...

This article is really talking about transactions that involve every partition like a search. Some transaction may use two partitions for example or some small number of partitions relative to the total number but thats for another blog entry...

June 08, 2009

Instrumenting a Loader with BTrace

This is a script for measuring and timing the get/batchupdate and commit methods on a Loader implementation and its associated TransactionCallback. The lack of the ability to factor the code and invoke methods is why the timing code is copy and pasted in the return methods. btrace doesn't allow factoring...


BTrace experience continued

Trying to write a btrace script for measuring Loader performance. Time and count calls to the Loader.get and Loader.batchUpdate methods. This is doable pretty easy but it points out some limitations.


    @OnMethod(clazz = "purequery.loader.GenericPQLoader", method = "get", location = @Location(Kind.RETURN))

    public static void onCallMapAgentReturn() 

    {

        if (executingVerb != null) {

            AggregationKey key = newAggregationKey(executingVerb);

            int duration = (int) (timeNanos() - timeStampNanos) / 1000;

            BTraceUtils.print("Duration of "); BTraceUtils.print(executingVerb); BTraceUtils.print(" "); BTraceUtils.println(duration);

            addToAggregation(histogram, key, duration);

            addToAggregation(average, key, duration);

            addToAggregation(max, key, duration);

            addToAggregation(min, key, duration);

            addToAggregation(sum, key, duration);

            addToAggregation(count, key, duration);

            addToAggregation(globalCount, duration);

            executingVerb = null;

        }

    }

This code basically adds measurements for the verb measurementVerb to the stats. You need this code called when we leave the Loader.get or Loader.batchUpdate methods. The executionVerb indicates whether it's a get or a batchupdate. My first attempt at this put most of that method in a seperate static method called by the onGetReturn and onBatchUpdateReturn methods. This was rejected by the btracec compiler because you can only call BTraceUtils methods in the script. This restriction will quickly make scripts very big and hard to maintain as you end up copy and pasting the same code in a bunch of places.

Running btrace on multiple JVMs on a single box

It looks like the btrace java agent opens a socket on port 2020. Running two JVMs causes a port conflict and the agent doesn't initialize in the second or third JVMs. You can specify a port on the second JVM to avoid these conflicts using something like this:


-javaagent:/Users/bnewport/Development/btrace-bin/build/btrace-agent.jar=script=AgentServerTimer.class,port=2021

Just make sure each JVM uses a different port. Another issue is the agent writes the script logs to a file named after the agent. If two JVMs use the same script then you get both processes trying to open the same file and that won't work. For now, I'm not hitting this as I use one script for the client JVM (measuring calls to invoke agents) and another with a different name on the server JVM to measure the times to invoke the actual agents.

Client script: Download AgentTimer

This lets me see the time on the client versus the actual time to run the agent server side. But, it will only work on a single client/single server JVM on a single box for now. You can work around this by copying the scripts around or using multiple boxes but maybe they should include the pid or port number in the file name.


June 06, 2009

btrace for performance tuning

Been playing with btrace this weekend. Very cool utility for adding performance monitoring to methods using weaving and aspects. It's pretty easy to use. You write a script which is a Java class which specifies the classes and methods you want watched. There are restrictions on what you can do in the methods called when a method is called or returns but it's flexible enough.


I'm attaching a script that times all Agent class made from a client. It prints them out by Agent class every 20 seconds. The script needs to be compiled with the btracec utility like this:

./../btrace-bin/bin/btracec -cp ../lib/objectgrid.jar AgentTimer.java
 
This creates an AgentTimer.class file which is the compiled form of the script

Now, run the client JVM but add this to the command line:

-javaagent:/Users/bnewport/Development/btrace-bin/build/btrace-agent.jar=script=AgentTimer.class

When you run the client, an AgentTimer.class.btrace file is created in the same directory as the AgentTimer.class file. This is where all the output goes.

The output looks like this:

Count
  redis.agent.list.LTrim                                            3
  redis.agent.Set                                                   3
  redis.agent.Incr                                                  3
  redis.agent.list.Push                                             6
  redis.agent.set.SetCard                                          14
  redis.agent.set.SetMembers                                       17
  redis.agent.list.LRange                                          18
Min
  redis.agent.list.LRange                                        4743
  redis.agent.set.SetCard                                        6221
  redis.agent.list.LTrim                                         6562
  redis.agent.set.SetMembers                                     6582
  redis.agent.list.Push                                          7549
  redis.agent.Incr                                               9445
  redis.agent.Set                                               11536
Max
  redis.agent.list.LTrim                                        10037
  redis.agent.list.Push                                         12034
  redis.agent.set.SetCard                                       16798
  redis.agent.Set                                               25215
  redis.agent.set.SetMembers                                    28472
  redis.agent.Incr                                              33212

It times each CallMapAgent call and plots all the results in a histogram. Very handy. I also print out the duration of each call.

I'll do more playing around with this. I was unable to get it to attach scripts dynamically to a running JVM. It only works for me when I use the agent approach to specify a script.

May 14, 2009

Running iLog jRules in WebSphere eXtreme Scale

Quite a few customers have been looking for a fault tolerant rules engine. I've been working with Victor Moore, a Distinguished Engineer working in Telco on this and we have something now which looks good. Fault tolerant here means that the a specific rules engines needs to run in exactly one Java Virtual Machine and fail over to another if its current JVM fails or stops. The working memory for that rules engine needs to be stored in replicated memory so that if the JVM hosting it fails then there is no lost data.


Clients wanting to access the rules engine need a way to add or remove items from the working memory of the rules engine and also interrogate the working memory looking for results if needed. Of course, the rules engine can execute Java code as a result of rules firing within the current JVM. WebSphere eXtreme Scale handles routing these client requests in a transparent fashion to the JVM currently hosting the rules engine.

The prototype (for Telco and Chemical industries but could be used in any industry) allows the customer to define a set of rules engines. The prototype then runs these rules engines in as many JVMs as are required. WebSphere eXtreme Scale manages the rules engines availability and ensures each rules engine is running in exactly one of the JVMs and we wrote some code to back the working memory of the rules engine with a memory replicated Map. If a JVM failure occurred then we fail over the rules engines that were in that JVM to the JVMs with the replicas of the working set. WebSphere eXtreme Scale would then create additional replicas to achieve full fault tolerance automatically. 

WebSphere eXtreme Scale handles making sure the rules engines are spread evenly across the set of JVMs and handles moving them around to ensure this balance when additional JVMs are added or JVMs fail, this is elastic scaling. The prototype could easily be extended to allow rules engines to be added or removed while it's all running.

If anyone is interested in seeing the code then email myself or Vic and this will likely end up as an article/blog entry with the sample code over the next few weeks. The prototype basically gives you an XTP business rules grid.

April 22, 2009

Episode #8, WXS impact sessions, Reverse indexes, scaling expensive queries

It's posted on youtube and on the other blog.

April 14, 2009

Logging failed write behind updates in IBM WebSphere eXtreme Scale

Write behind is a technology to write updated records asynchronously to a backend such as a database. The application can 'commit' changes to replicated memory only and then periodically, all updated records are written to the database in a large batch. This eliminates the database from the response time for updates which improves performance and drastically lowers the load on the database by combining lots of small transactions in to a few big ones.


Clearly, once the application updates the data then the writes to the database must succeed. Sometimes, there are problems. Unforeseen interactions with the data or just application bugs. When the write behind logic cannot update a record then it inserts an entry in a special Map associated with the map holding the failed write behind update. The application needs to watch this map and handle these errors. Usually, the application will just log them to disk, it may try to reconcile the problem also. It all depends on whats possible in the application.

I'm attaching sample source code showing how to write such a watcher. The watcher should be added to your objectgrid.xml file like this:

        <objectGrid name="Grid">

        <bean id="TransactionCallback" className="purequery.loader.PQTxCallback">

            <property name="connecturi" type="java.lang.String" value="jdbc:db2://172.16.129.133:50000/SAMPLE"/>

            <property name="username" type="java.lang.String" value="db2admin"/>

            <property name="password" type="java.lang.String" value="db2admin"/>

            </bean>

                     

        <bean id="ObjectGridEventListener" className="utils.WriteBehindDumper" />


You can see the ObjectGridEventListener bean thats been added which is the write behind watcher or dumper is what I've called it here.

It basically interacts over the Maps for all primary shards in a JVM looking for ones with write behind enabled. If it finds one then it tries to log up to 100 bad updates. It keeps watching a primary shard until it's moved to a different JVM. All applications using write behind MUST use a watcher similar to this one as otherwise the JVMs will run out of memory as this error map is never evicted.