Salesforce Mutual Authentication – Part 2: Web Service Connector (WSC)

CodeIn my last blog entry I explained how to enable, configure and test Salesforce’s Mutual Authentication feature. This time, I’ll share my experience getting Mutual Authentication working with the Java client SDK for Salesforce’s SOAP and Bulk APIs: Web Service Connector, aka WSC.

StreamSets Data Collector‘s Salesforce integration accesses the SOAP and Bulk APIs via WSC, so, when I was implementing Mutual Authentication in SDC, I examined WSC to see where I could configure the client key and certificate chain. Although there is no mention of SSLContext or SSLSocketFactory in the WSC code, it is possible to set a custom TransportFactory on the WSC ConnectorConfig object. The TransportFactory is used to create a Transport, which in turn is responsible for making the HTTPS connection to Salesforce.

To enable Mutual Authentication I would need to create an SSLContext with the client key and certificate chain. This is straightforward enough:

// Make a KeyStore from the PKCS-12 file
KeyStore ks = KeyStore.getInstance("PKCS12");
try (FileInputStream fis = new FileInputStream(KEYSTORE_PATH)) {
  ks.load(fis, KEYSTORE_PASSWORD.toCharArray());
}

// Make a KeyManagerFactory from the KeyStore
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(ks, KEYSTORE_PASSWORD.toCharArray());

// Now make an SSL Context with our Key Manager and the default Trust Manager
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), null, null);

Given the SSLContext, we can create an SSLSocketFactory and set it on the HttpsURLConnection. Here’s the code we’d use if we were simply using the java.net classes directly:

URL url = new URL(someURL);
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
// Check that we did get an HttpsURLConnection before casting to it
if (conn instanceof HttpsURLConnection) {
  ((HttpsURLConnection)conn).setSSLSocketFactory(
      sslContext.getSocketFactory()
  );
}

Mutual Authentication and the Salesforce SOAP API

The default Transport implementation, JdkHttpTransport, looked like a good place to start. My first thought was to extend JdkHttpTransport, overriding the relevant methods. Unfortunately, JdkHttpTransport‘s createConnection method, which calls url.openConnection(), is static, so it’s impossible to override. The connectRaw() method also looked like a promising route, since it calls createConnection(), performs some setup on the HttpURLConnection, and then gets the OutputStream, but it’s private, and once the OutputStream has been created, it’s too late to set the SSLSocketFactory.

In my searching for an answer, I came across this comment from Salesforce Software Engineer Steven Lawrance in a Salesforce Trailblazer Community answer.

You’ll generally need to set the TransportFactory in the ConnectorConfig object that you use to create the PartnerConnection (or EnterpriseConnection, etc), though another option is to set the Transport.

It’s possible to create a Transport implementation that is based off of the com.sforce.ws.transport.JdkHttpTransport class while having the JdkHttpTransport create the connection with its static createConnection method. Your Transport implementation can then set up the SSLSocketFactory (casting the connection to HttpsURLConnection is required to do that), and your SSLSocketFactory can be created from creating an SSLContext that is initialized to include your client certificate.

I followed Steven’s advice and created ClientSSLTransport, a clone of JdkHttpTransport, and ClientSSLTransportFactory, its factory class. To minimize the amount of copied code, I changed the implementation of connectRaw() to call JdkHttpTransport.createConnection() and then set the SSLSocketFactory:

private OutputStream connectRaw(String uri, HashMap<String, String> httpHeaders, boolean enableCompression)
throws IOException {
  url = new URL(uri);

  connection = JdkHttpTransport.createConnection(config, url, 
      httpHeaders, enableCompression);
  if (connection instanceof HttpsURLConnection) {
    ((HttpsURLConnection)connection).setSSLSocketFactory(
        sslContext.getSocketFactory()
    );
  }
  connection.setRequestMethod("POST");
  connection.setDoInput(true);
  connection.setDoOutput(true);
  if (config.useChunkedPost()) {
    connection.setChunkedStreamingMode(4096);
  }

  return connection.getOutputStream();
}

With this in place, I wrote a simple test application to call an API with Mutual Authentication. As I mentioned in the previous blog post, the Salesforce login service does not support Mutual Authentication, so the inital code to authenticate is just the same as the default case:

// Login as normal to get instance URL and session token
ConnectorConfig config = new ConnectorConfig();
config.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/39.0");
config.setSslContext(sc);
config.setUsername(USERNAME);
config.setPassword(PASSWORD);

connection = Connector.newConnection(config);

// display some current settings
System.out.println("Auth EndPoint: "+config.getAuthEndpoint());
System.out.println("Service EndPoint: "+config.getServiceEndpoint());

Running this bit of code revealed that, not only does the login service not support Mutual Authentication, it returns the default service endpoint:

Auth EndPoint: https://login.salesforce.com/services/Soap/u/39.0
Service EndPoint: https://na30.salesforce.com/services/Soap/u/39.0/00D36000000psQd

Before we can call an API, then, we have to override the service endpoint, changing the port from the default 443 to 8443, as well as setting the TransportFactory:

String serviceEndpoint = config.getServiceEndpoint();
// Override service endpoint port to 8443
config.setServiceEndpoint(changePort(serviceEndpoint, 8443));

// Set custom transport factory
config.setTransportFactory(new ClientSSLTransportFactory(sslContext));

...

private static String changePort(String url, int port) throws URISyntaxException {
  URI uri = new URI(url);
  return new URI(
      uri.getScheme(), uri.getUserInfo(), uri.getHost(),
      port, uri.getPath(), uri.getQuery(), uri.getFragment()).toString();
}

With this in place, I could call a SOAP API in the normal way:

System.out.println("Querying for the 5 newest Contacts...");

// query for the 5 newest contacts
QueryResult queryResults = connection.query("SELECT Id, FirstName, LastName, Account.Name " +
    "FROM Contact WHERE AccountId != NULL ORDER BY CreatedDate DESC LIMIT 5");
if (queryResults.getSize() > 0) {
  for (SObject s: queryResults.getRecords()) {
    System.out.println("Id: " + s.getId() + " " + s.getField("FirstName") + " " +
        s.getField("LastName") + " - " + s.getChild("Account").getField("Name"));
  }
}

With output:

Querying for the 5 newest Contacts...
Id: 00336000009BusFAAS Rose Gonzalez - Edge Communications
Id: 00336000009BusGAAS Sean Forbes - Edge Communications
Id: 00336000009BusHAAS Jack Rogers - Burlington Textiles Corp of America
Id: 00336000009BusIAAS Pat Stumuller - Pyramid Construction Inc.
Id: 00336000009BusJAAS Andy Young - Dickenson plc

Success!

Mutual Authentication and the Salesforce Bulk API

Now, what about the Bulk API? Running a test app resulted in an error when I tried to create a Bulk API Job. Tracing through the WSC code revealed that when ConnectorConfig.createTransport() creates a Transport with a custom TransportFactory, it does not set the ConnectorConfig on the Transport:

public Transport createTransport() throws ConnectionException {
  if(transportFactory != null) {
    return transportFactory.createTransport();
  }

  try {
    Transport t = (Transport)getTransport().newInstance();
    t.setConfig(this);
    return t;
  } catch (InstantiationException e) {
    throw new ConnectionException("Failed to create new Transport " + getTransport());
  } catch (IllegalAccessException e) {
    throw new ConnectionException("Failed to create new Transport " + getTransport());
  }
}

ConnectorConfig.createTransport() is only used when the WSC Bulk API client is POSTing to the Bulk API, since the POST method is hardcoded into JdkHttpTransport.connectRaw() (all SOAP requests use HTTP POST). When the client wants to do a GET, it uses BulkConnection.doHttpGet(), which does not use ConnectorConfig.createTransport(), instead calling config.createConnection():

private InputStream doHttpGet(URL url) throws IOException, AsyncApiException {
  HttpURLConnection connection = config.createConnection(url, null);
  connection.setRequestProperty(SESSION_ID, config.getSessionId());
  ...

The problem here is that config.createConnection() ultimately just calls url.openConnection() directly, bypassing any custom Transport:

public HttpURLConnection createConnection(URL url,
HashMap<String, String> httpHeaders, boolean enableCompression) throws IOException {

  if (isTraceMessage()) {
    getTraceStream().println( "WSC: Creating a new connection to " + url + " Proxy = " +
        getProxy() + " username " + getProxyUsername());
  }

  HttpURLConnection connection = (HttpURLConnection) url.openConnection(getProxy());
  connection.addRequestProperty("User-Agent", VersionInfo.info());
  ...

Luckily, config.createConnection() is public, so my solution to these problems was to subclass ConnectorConfig as MutualAuthConnectorConfig, providing an SSLContext in its constructor, and overriding createConnection():

public class MutualAuthConnectorConfig extends ConnectorConfig {
  private final SSLContext sc;

  public MutualAuthConnectorConfig(SSLContext sc) {
    this.sc = sc;
  }

  @Override
  public HttpURLConnection createConnection(URL url, HashMap<String, String> httpHeaders, 
      boolean enableCompression) throws IOException {
    HttpURLConnection connection = super.createConnection(url, httpHeaders, enableCompression);
    if (connection instanceof HttpsURLConnection) {
      ((HttpsURLConnection)connection).setSSLSocketFactory(sc.getSocketFactory());
    }
    return connection;
  }
}

If you look at ClientSSLTransport and ClientSSLTransportFactory, you’ll notice that the factory has a two-argument constructor that allows us to pass the ConnectorConfig. This ensures that the Transport can get the configuration it needs, despite the fact that ConnectorConfig.createTransport() neglects to set the config.

Now, when creating a BulkConnection from a Partner API ConnectorConfig, I use my subclassed ConnectorConfig class AND set the TransportFactory on it, so that the SSLSocketFactory is set for both GET and POST:

  ConnectorConfig bulkConfig = new MutualAuthConnectorConfig(sslContext);
  bulkConfig.setTransportFactory(new ClientSSLTransportFactory(sslContext, bulkConfig));
  bulkConfig.setSessionId(partnerConfig.getSessionId()); 

  // The endpoint for the Bulk API service is the same as for the normal 
  // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber' 
  String soapEndpoint = partnerConfig.getServiceEndpoint(); 
  String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/")) 
      + "async/" + conf.apiVersion; 

  // Remember to swap the port for Mutual Authentication! 
  bulkConfig.setRestEndpoint(changePort(restEndpoint, 8443));

Running my simple sample app showed that I was able to successfully retrieve data via the Bulk API:

Querying for the 5 newest Contacts via the Bulk API...
Created job: 7503600000KbCyMAAV
Batch state is: Queued
Sleeping for a second...
Sleeping for a second...
Sleeping for a second...
Batch state is: Completed
Result header:[Id, FirstName, LastName, Account.Name]
Id: 00336000009BusFAAS Rose Gonzalez - Edge Communications
Id: 00336000009BusGAAS Sean Forbes - Edge Communications
Id: 00336000009BusHAAS Jack Rogers - Burlington Textiles Corp of America
Id: 00336000009BusIAAS Pat Stumuller - Pyramid Construction Inc.
Id: 00336000009BusJAAS Andy Young - Dickenson plc

You can grab my sample app and all of the above mentioned files here.

Proposed WSC Changes

With the above changes I was able to call both the SOAP and Bulk APIs and include the WSC JAR files unchanged. I filed issue #213 on WSC, and then fixed the problems in the WSC directly (pull request) by adding an SSLContext member variable and its getter/setter to ConnectorConfig and having JdkHttpTransport.connectRaw() and BulkConnection.doHttpGet() set the SSLSocketFactory on the HttpsURLConnection immediately after it’s created. I’ll update this blog entry if and when my pull request is accepted.

Conclusion

The first blog entry in this series explained how to enable, configure and test Salesforce Mutual Authentication. This time, I showed how to work around the shortcomings in the Salesforce Web Service Connector (WSC) to allow it to work with Mutual Authentication.

In part 3, the final installment in this series, I show you how to use Mutual Authentication with common HTTP clients to access Salesforce API endpoints directly.

Uploading data to the Salesforce Wave Analytics Cloud

bi_phoneOverDesktopAs you might know from my last post, I moved from Salesforce to StreamSets a couple of weeks ago. It didn’t take long before I was signing up for a fresh Developer Edition org, though! I’m creating a StreamSets destination to allow me to write data to Wave Analytics datasets, and it’s fair to say that the documentation is… sparse. Working from the Wave Analytics External Data API Developer Guide and Wave Analytics External Data Format Reference (why are these separate docs???), and my understanding of how Salesforce works, I was able to put together a working sample Java app that creates a dataset from CSV data.

Here’s the code – I explain a few idiosyncrasies below, and reveal the easiest way to get this working with Wave.

[java]
package wsc;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;

import com.sforce.soap.partner.Connector;
import com.sforce.soap.partner.Error;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.soap.partner.QueryResult;
import com.sforce.soap.partner.SaveResult;
import com.sforce.soap.partner.sobject.SObject;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;

public class Main {

// Describes the data we’ll be uploading
static String metadataJson =
"{\n" +
" \"fileFormat\": {\n" +
" \"charsetName\": \"UTF-8\",\n" +
" \"fieldsDelimitedBy\": \",\",\n" +
" \"fieldsEnclosedBy\": \"\\\"\",\n" +
" \"numberOfLinesToIgnore\": 1\n" +
" },\n" +
" \"objects\": [\n" +
" {\n" +
" \"connector\": \"AcmeCSVConnector\",\n" +
" \"description\": \"\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"description\": \"\",\n" +
" \"fullyQualifiedName\": \"SalesData.Name\",\n" +
" \"isMultiValue\": false,\n" +
" \"isSystemField\": false,\n" +
" \"isUniqueId\": false,\n" +
" \"label\": \"Account Name\",\n" +
" \"name\": \"Name\",\n" +
" \"type\": \"Text\"\n" +
" },\n" +
" {\n" +
" \"defaultValue\": \"0\",\n" +
" \"description\": \"\",\n" +
" \"format\": \"$#,##0.00\",\n" +
" \"fullyQualifiedName\": \"SalesData.Amount\",\n" +
" \"isSystemField\": false,\n" +
" \"isUniqueId\": false,\n" +
" \"label\": \"Opportunity Amount\",\n" +
" \"name\": \"Amount\",\n" +
" \"precision\": 10,\n" +
" \"scale\": 2,\n" +
" \"type\": \"Numeric\"\n" +
" },\n" +
" {\n" +
" \"description\": \"\",\n" +
" \"fiscalMonthOffset\": 0,\n" +
" \"format\": \"MM/dd/yyyy\",\n" +
" \"fullyQualifiedName\": \"SalesData.CloseDate\",\n" +
" \"isSystemField\": false,\n" +
" \"isUniqueId\": false,\n" +
" \"label\": \"Opportunity Close Date\",\n" +
" \"name\": \"CloseDate\",\n" +
" \"type\": \"Date\"\n" +
" }\n" +
" ],\n" +
" \"fullyQualifiedName\": \"SalesData\",\n" +
" \"label\": \"Sales Data\",\n" +
" \"name\": \"SalesData\"\n" +
" }\n" +
" ]\n" +
"}";

// This is the data we’ll be uploading
static String data =
"Name,Amount,CloseDate\n" +
"opportunityA,100.99,6/30/2014\n" +
"opportunityB,99.01,1/31/2012\n";

// This will be the name of the data set in Wave
// Must be unique across the organization
static String datasetName = "tester";

// Change these as appropriate
static final String USERNAME = "user@example.com";
static final String PASSWORD = "p455w0rd";

// Status values indicating that the job is done
static final List&lt;String&gt; DONE = (List&lt;String&gt;)Arrays.asList(
"Completed",
"CompletedWithWarnings",
"Failed",
"NotProcessed"
);

public static void main(String[] args) {
PartnerConnection connection;

ConnectorConfig config = new ConnectorConfig();

config.setUsername(USERNAME);
config.setPassword(PASSWORD);

try {

connection = Connector.newConnection(config);

System.out.println("Successfully authenticated as "+config.getUsername());

// Wave time!

// First, we create an InsightsExternalData job
SObject sobj = new SObject();
sobj.setType("InsightsExternalData");
sobj.setField("Format","Csv");
sobj.setField("EdgemartAlias", datasetName);
sobj.setField("MetadataJson", metadataJson.getBytes(StandardCharsets.UTF_8));
sobj.setField("Operation","Overwrite");
sobj.setField("Action","None");

String parentID = null;
SaveResult[] results = connection.create(new SObject[] { sobj });
for(SaveResult sv:results) {
if(sv.isSuccess()) {
parentID = sv.getId();
System.out.println("Success creating InsightsExternalData: "+parentID);
} else {
for (Error e : sv.getErrors()) {
System.out.println("Error: " + e.getMessage());
}
System.exit(1);
}
}

// Now upload some actual data. You can do this as many times as necessary,
// subject to the Wave External Data API Limits
sobj = new SObject();
sobj.setType("InsightsExternalDataPart");
sobj.setField("DataFile", data.getBytes(StandardCharsets.UTF_8));
sobj.setField("InsightsExternalDataId", parentID);
sobj.setField("PartNumber", 1);

results = connection.create(new SObject[] { sobj });
for(SaveResult sv:results) {
if(sv.isSuccess()) {
String rowId = sv.getId();
System.out.println("Success creating InsightsExternalDataPart: "+rowId);
} else {
for (Error e : sv.getErrors()) {
System.out.println("Error: " + e.getMessage());
}
System.exit(1);
}
}

// Instruct Wave to start processing the data
sobj = new SObject();
sobj.setType("InsightsExternalData");
sobj.setField("Action","Process");
sobj.setId(parentID);
results = connection.update(new SObject[] { sobj });
for(SaveResult sv:results) {
if(sv.isSuccess()) {
String rowId = sv.getId();
System.out.println("Success updating InsightsExternalData: "+rowId);
} else {
for (Error e : sv.getErrors()) {
System.out.println("Error: " + e.getMessage());
}
System.exit(1);
}
}

// Periodically check whether the job is done
boolean done = false;
int sleepTime = 1000;
while (!done) {
try {
Thread.sleep(sleepTime);
sleepTime *= 2;
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
QueryResult queryResults = connection.query(
"SELECT Status FROM InsightsExternalData WHERE Id = ‘" + parentID + "’"
);
if (queryResults.getSize() &gt; 0) {
for (SObject s: queryResults.getRecords()) {
String status = (String)s.getField("Status");
System.out.println(s.getField("Status"));
if (DONE.contains(status)) {
done = true;
String statusMessage = (String)s.getField("StatusMessage");
if (statusMessage != null) {
System.out.println(statusMessage);
}
}
}
} else {
System.out.println("Can’t find InsightsExternalData with Id " + parentID);
}
}
} catch (ConnectionException e1) {
e1.printStackTrace();
}
}
}
[/java]

  • Lines 7-14 – I’m using the WSC with the SOAP Partner API, just because I’m working in Java, and that was what was used in the bits of sample code included in the docs.
  • Lines 19-72 – this is the metadata that describes the CSV you’re uploading. This is optional, but recommended.
  • Lines 75-78 – CSV is the only format currently supported, though the docs reserve a binary format for Salesforce use.
  • Line 82 – the dataset name must be unique across your org.
  • Lines 85-86 – change these to your login credentials.
  • Line 117 – the API wants base64-encoded data, so you’d likely try encoding the data yourself and passing the resulting string here, resulting in an error message. Instead you have to pass the raw bytes of the unencoded string and let the WSC library sort it out.
  • Lines 137-154 – you can repeat this block in a loop as many times as necessary.

You will need the WSC jar, and the SOAP Partner API jar – follow Jeff Douglas‘ excellent article Introduction to the Force.com Web Services Connector for details on setting this up – use the ‘uber’ JAR as it contains all the required dependencies. The sample above used Jeff’s Partner API sample as a starting point – thanks, Jeff!

The fastest way to get started with Wave is, of course, Salesforce Trailhead. Follow the Wave Analytics Basics module and you’ll end up with a Wave-enabled Developer Edition all ready to go.

Once you have your Wave DE org, and the sample app, you should be able to run it and see something like:

Successfully authenticated as wave@patorg.com
Success creating InsightsExternalData: 06V360000008RIlEAM
Success creating InsightsExternalDataPart: 06W36000000PDXFEA4
Success updating InsightsExternalData: 06V360000008RIlEAM
InProgress
InProgress
Completed

If you go look in the Wave Analytics app, you should see the ‘tester’ dataset:

WaveAnalytics

Click on ‘tester’ and you’ll see the ‘big blue line’:

TesterDataset

Now you can drill into the data (all 2 rows of it!) by account name, close date etc.

You could, of course, extend the above code to accept a CSV filename and dataset name on the command line, and create all sorts of interesting extensions. Follow the StreamSets blog to learn where I plan to go with this!