Uploading data to the Salesforce Wave Analytics Cloud

5 minute read

bi_phoneOverDesktopAs you might know from my last post, I moved from Salesforce to StreamSets a couple of weeks ago. It didn’t take long before I was signing up for a fresh Developer Edition org, though! I’m creating a StreamSets destination to allow me to write data to Wave Analytics datasets, and it’s fair to say that the documentation is… sparse. Working from the Wave Analytics External Data API Developer Guide and Wave Analytics External Data Format Reference (why are these separate docs???), and my understanding of how Salesforce works, I was able to put together a working sample Java app that creates a dataset from CSV data. Here’s the code - I explain a few idiosyncrasies below, and reveal the easiest way to get this working with Wave.

package wsc;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import com.sforce.soap.partner.Connector;
import com.sforce.soap.partner.Error;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.soap.partner.QueryResult;
import com.sforce.soap.partner.SaveResult;
import com.sforce.soap.partner.sobject.SObject;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;

public class Main {
    // Describes the data we'll be uploading
    static String metadataJson = "{\n" + 
        " \"fileFormat\": {\n" + 
            " \"charsetName\": \"UTF-8\",\n" + 
            " \"fieldsDelimitedBy\": \",\",\n" + 
            " \"fieldsEnclosedBy\": \"\\\"\",\n" + 
            " \"numberOfLinesToIgnore\": 1\n" + 
        " },\n" + 
        " \"objects\": [\n" + 
            " {\n" + 
                " \"connector\": \"AcmeCSVConnector\",\n" + 
                " \"description\": \"\",\n" + 
                " \"fields\": [\n" + 
                    " {\n" + 
                        " \"description\": \"\",\n" + 
                        " \"fullyQualifiedName\": \"SalesData.Name\",\n" + 
                        " \"isMultiValue\": false,\n" + 
                        " \"isSystemField\": false,\n" + 
                        " \"isUniqueId\": false,\n" + 
                        " \"label\": \"Account Name\",\n" + 
                        " \"name\": \"Name\",\n" + 
                        " \"type\": \"Text\"\n" + 
                    " },\n" + 
                    " {\n" + 
                        " \"defaultValue\": \"0\",\n" + 
                        " \"description\": \"\",\n" + 
                        " \"format\": \"$#,##0.00\",\n" + 
                        " \"fullyQualifiedName\": \"SalesData.Amount\",\n" + 
                        " \"isSystemField\": false,\n" + 
                        " \"isUniqueId\": false,\n" + 
                        " \"label\": \"Opportunity Amount\",\n" + 
                        " \"name\": \"Amount\",\n" + 
                        " \"precision\": 10,\n" + 
                        " \"scale\": 2,\n" + 
                        " \"type\": \"Numeric\"\n" + 
                    " },\n" + 
                    " {\n" + 
                        " \"description\": \"\",\n" + 
                        " \"fiscalMonthOffset\": 0,\n" + 
                        " \"format\": \"MM/dd/yyyy\",\n" + 
                        " \"fullyQualifiedName\": \"SalesData.CloseDate\",\n" + 
                        " \"isSystemField\": false,\n" + 
                        " \"isUniqueId\": false,\n" + 
                        " \"label\": \"Opportunity Close Date\",\n" + 
                        " \"name\": \"CloseDate\",\n" + 
                        " \"type\": \"Date\"\n" + 
                    " }\n" + 
                " ],\n" + 
                " \"fullyQualifiedName\": \"SalesData\",\n" + 
                " \"label\": \"Sales Data\",\n" + 
                " \"name\": \"SalesData\"\n" + 
            " }\n" + 
        " ]\n" + 
    "}"; 

    // This is the data we'll be uploading
    static String data = "Name,Amount,CloseDate\n" + 
        "opportunityA,100.99,6/30/2014\n" + 
        "opportunityB,99.01,1/31/2012\n";

    // This will be the name of the data set in Wave
    // Must be unique across the organization
    static String datasetName = "tester";

    // Change these as appropriate
    static final String USERNAME = "[email protected]";
    static final String PASSWORD = "p455w0rd";

    // Status values indicating that the job is done
    static final List<String> DONE = (List<String>)Arrays.asList( "Completed", "CompletedWithWarnings", "Failed", "NotProcessed" );

    public static void main(String[] args) {
        PartnerConnection connection;

        ConnectorConfig config = new ConnectorConfig();
        config.setUsername(USERNAME);
        config.setPassword(PASSWORD);

        try {
            connection = Connector.newConnection(config);
            System.out.println("Successfully authenticated as "+config.getUsername());

            // Wave time!

            // First, we create an InsightsExternalData job
            SObject sobj = new SObject();
            sobj.setType("InsightsExternalData");
            sobj.setField("Format","Csv");
            sobj.setField("EdgemartAlias", datasetName);
            sobj.setField("MetadataJson", metadataJson.getBytes(StandardCharsets.UTF_8));
            sobj.setField("Operation","Overwrite");
            sobj.setField("Action","None");

            String parentID = null;
            SaveResult[] results = connection.create(new SObject[] { sobj });

            for(SaveResult sv:results) {
                if(sv.isSuccess()) {
                    parentID = sv.getId();
                    System.out.println("Success creating InsightsExternalData: "+parentID); 
                } else {
                    for (Error e : sv.getErrors()) {
                        System.out.println("Error: " + e.getMessage());
                    }
                    System.exit(1);
                }
            }

            // Now upload some actual data. You can do this as many times as necessary,
            // subject to the Wave External Data API Limits
            sobj = new SObject();
            sobj.setType("InsightsExternalDataPart");
            sobj.setField("DataFile", data.getBytes(StandardCharsets.UTF_8));
            sobj.setField("InsightsExternalDataId", parentID);
            sobj.setField("PartNumber", 1);

            results = connection.create(new SObject[] { sobj });

            for(SaveResult sv:results) {
                if(sv.isSuccess()) {
                    String rowId = sv.getId();
                    System.out.println("Success creating InsightsExternalDataPart: "+rowId);
                } else {
                    for (Error e : sv.getErrors()) {
                        System.out.println("Error: " + e.getMessage());
                    }
                    System.exit(1);
                }
            }

            // Instruct Wave to start processing the data
            sobj = new SObject();
            sobj.setType("InsightsExternalData");
            sobj.setField("Action","Process");
            sobj.setId(parentID);

            results = connection.update(new SObject[] { sobj });

            for(SaveResult sv:results) {
                if(sv.isSuccess()) {
                    String rowId = sv.getId();
                    System.out.println("Success updating InsightsExternalData: "+rowId);
                } else {
                    for (Error e : sv.getErrors()) {
                        System.out.println("Error: " + e.getMessage());
                    }
                    System.exit(1);
                }
            }

            // Periodically check whether the job is done
            boolean done = false;
            int sleepTime = 1000;
            while (!done) {
                try {
                    Thread.sleep(sleepTime);
                    sleepTime *= 2;
                } catch(InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }

                QueryResult queryResults = connection.query( "SELECT Status FROM InsightsExternalData WHERE Id = '" + parentID + "'" );

                if (queryResults.getSize() > 0) {
                    for (SObject s: queryResults.getRecords()) {
                        String status = (String)s.getField("Status");
                        System.out.println(s.getField("Status"));
                        if (DONE.contains(status)) {
                            done = true;
                            String statusMessage = (String)s.getField("StatusMessage");
                            if (statusMessage != null) {
                                System.out.println(statusMessage);
                            }
                        }
                    }
                } else {
                    System.out.println("Can't find InsightsExternalData with Id " + parentID);
                }
            }
        } catch (ConnectionException e1) {
            e1.printStackTrace();
        }
    }
}
  • Lines 7-14 - I’m using the WSC with the SOAP Partner API, just because I’m working in Java, and that was what was used in the bits of sample code included in the docs.
  • Lines 19-72 - this is the metadata that describes the CSV you’re uploading. This is optional, but recommended.
  • Lines 75-78 - CSV is the only format currently supported, though the docs reserve a binary format for Salesforce use.
  • Line 82 - the dataset name must be unique across your org.
  • Lines 85-86 - change these to your login credentials.
  • Line 117 - the API wants base64-encoded data, so you’d likely try encoding the data yourself and passing the resulting string here, resulting in an error message. Instead you have to pass the raw bytes of the unencoded string and let the WSC library sort it out.
  • Lines 137-154 - you can repeat this block in a loop as many times as necessary.

You will need the WSC jar, and the SOAP Partner API jar - follow Jeff Douglas’ excellent article Introduction to the Force.com Web Services Connector for details on setting this up - use the ‘uber’ JAR as it contains all the required dependencies. The sample above used Jeff’s Partner API sample as a starting point - thanks, Jeff! The fastest way to get started with Wave is, of course, Salesforce Trailhead. Follow the Wave Analytics Basics module and you’ll end up with a Wave-enabled Developer Edition all ready to go. Once you have your Wave DE org, and the sample app, you should be able to run it and see something like:

Successfully authenticated as [email protected]
Success creating InsightsExternalData: 06V360000008RIlEAM
Success creating InsightsExternalDataPart: 06W36000000PDXFEA4
Success updating InsightsExternalData: 06V360000008RIlEAM
InProgress
InProgress
Completed

If you go look in the Wave Analytics app, you should see the ‘tester’ dataset: WaveAnalytics Click on ‘tester’ and you’ll see the ‘big blue line’: TesterDataset Now you can drill into the data (all 2 rows of it!) by account name, close date etc. You could, of course, extend the above code to accept a CSV filename and dataset name on the command line, and create all sorts of interesting extensions. Follow the StreamSets blog to learn where I plan to go with this!

Updated:

Comments

Terence Wilson

Hi Pat, there is a Analytics Cloud Data Integration Best Practice guide by ASG Salesforce, available on customer success site which has fully worked out examples using both WCS and REST API approaches to load large data files with compression and data chunking. I concur that the official products documents did not make it obvious on how to do this hence my reason for coding the examples. Hope you find them useful as well

SS

No, error I can see, it is kind of hanging after that line. Is it something related to jar ?, because in line number 122 connection is calling create method, that is going to force-wsc’s jar.

SS

Hi all, Wanted to one more thing, Is there any way to generate metadata json automatically from csv/DB for to load the data in wave analytics ??

As because, I am facing a dynamic change in the DB(Data Base), and every time DB changes, I need to change the metadata json manually, if there is a way I can generate that automatically then that will be helpful.

geoff rothman

SS: google “github datasetutils”. thats a reference implementation of the API and you can add a commandline switch to automatically generate the metadata json along with the upload

Leave a Comment

Your email address will not be published. Required fields are marked *

Loading...