Java Streams API

Presentation Mode

Introduction to Java Streams

Java Streams provide a powerful, functional way to process data collections. They simplify data manipulation tasks by handling sequences of elements either in parallel or sequentially, reducing the need for verbose, boilerplate code. By leveraging the functional programming paradigm, streams allow for more readable, maintainable, and efficient code.

A functional approach means focusing on what needs to be done with the data rather than how to perform the operations. This shifts the emphasis from explicit control flow to declarative data transformations.

Java Streams support various operations such as filtering, mapping, reducing, and many others that can be executed in a chain, enhancing the expressiveness and flexibility of code.

// Example: Filtering and Mapping Names
List<String> names = Arrays.asList("Michael", "Jim", "Pam", "Dwight", "Angela");
List<String> result = names.stream()
                           .filter(name -> name.startsWith("A"))
                           .map(String::toUpperCase)
                           .collect(Collectors.toList());
// Result: ["ANGELA"]

This example demonstrates how a stream can filter a list to include only names that start with 'A' and then map those names to uppercase. The code is concise, readable, and avoids the need for explicit loops and conditionals.

Benefits of Using Java Streams

  • Conciseness: Reduces boilerplate code, making code shorter and easier to read.
  • Parallel Processing: Leverages multi-core processors to process data in parallel, potentially improving performance.
  • Immutability: Promotes immutable operations, reducing the risk of side effects and enhancing code safety.
  • Lazy Evaluation: Streams are evaluated lazily, meaning operations are only executed when necessary, optimizing performance.

Real-World Application of Streams

In real-world applications, streams can be used to simplify complex data processing tasks. For example, processing large datasets, performing transformations, and aggregating results can all be done concisely with streams. They are particularly useful in scenarios such as data analysis, machine learning pipelines, and handling real-time data streams.

// Real-World Example: Aggregating Employee Salaries
List<Employee> employees = getEmployees();
double totalSalary = employees.stream()
                              .filter(
                              employee -> employee.getDepartment().equals("Sales"))
                              .mapToDouble(Employee::getSalary)
                              .sum();
// Result: Total salary of employees in the Sales department

In this example, the stream filters employees by department and then maps their salaries to a double stream, which is subsequently summed up. This showcases how streams can be effectively used to aggregate and compute results from data collections.

Key Characteristics of Java Streams

Java Streams have several key characteristics that make them powerful tools for data processing. These characteristics help developers write cleaner, more efficient, and more readable code.

  • Declarative Style: Emphasizes the what to do rather than the how to do it. This allows developers to focus on the desired outcomes and results of data processing rather than the specific steps to achieve those outcomes.
// Declarative Style Example
List<String> longNames = names.stream()
                              .filter(name -> name.length() > 3)
                              .collect(Collectors.toList());
// Instead of using explicit loops and conditionals
  • Lazy Evaluation: Operations are not executed until a terminal operation is invoked, optimizing performance by avoiding unnecessary computations.

Lazy evaluation means that streams are processed on demand. No operation is performed until it's needed for a terminal operation. This optimizes the performance and allows for the building of complex pipelines without immediate computation overhead.

// Lazy Evaluation Example
List<String> processedNames = names.stream()
                                   .filter(name -> name.length() > 3)
                                   .map(String::toUpperCase)
                                   .collect(Collectors.toList());
// Output printed during processing due to peek()
  • Single Use: Streams are designed to be consumed once. Attempting to reuse a stream after it has been processed will result in an IllegalStateException.

Streams follow a "single-use" principle, meaning once a stream has been consumed by a terminal operation, it cannot be reused. This promotes immutability and side-effect-free operations but requires caution to avoid inadvertently trying to reuse streams.

// Single Use Example
Stream<String> nameStream = names.stream();
nameStream.forEach(System.out::println);
nameStream.forEach(System.out::println); // Throws IllegalStateException
  • Non-Destructive Operations: Methods like filter(), map(), and sorted() do not modify the original data, preserving data integrity.

Stream operations are non-destructive, meaning the original data remains unchanged. This immutability makes streams safer and easier to work with, as there are no unintended side effects that can alter data.

// Non-Destructive Example
List<String> originalNames = new ArrayList<>(Arrays.asList("Michael", "Jim", "Pam"));
List<String> modifiedNames = originalNames.stream()
                                          .map(String::toUpperCase)
                                          .collect(Collectors.toList());
// 'originalNames' remains unchanged
  • Internal Iteration: Unlike collections that require explicit loops (external iteration), streams use internal iteration, handling the logic behind the scenes.

Streams abstract away the iteration logic, making the code more concise and readable. The iteration is handled internally, freeing developers from managing the loop counters and breaking conditions.

// Internal Iteration Example
names.stream().forEach(System.out::println);
// The iteration is handled internally, no need for 'for' or 'while' loops

Internal Workings of a Stream and Pipeline

Streams in Java process data using a pipeline model, which involves three main stages:

  • Source: Represents the data origin, such as a collection, an array, or an I/O channel. The source stage initiates the stream and provides the data elements that will be processed by the pipeline.

The source is the starting point of a stream. It can be any data structure that Java can iterate over, including collections (like lists and sets), arrays, or input/output channels (like files). The source establishes the type of data the stream will process.

// Example: Stream Source
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);  // Source: List collection
Stream<Integer> stream = numbers.stream();
  • Intermediate Operations: These operations transform a stream into another stream, allowing for various data manipulations. Examples include filter(), map(), and sorted(). Intermediate operations are always lazy; they are not executed until a terminal operation is invoked.

Intermediate operations allow you to build a pipeline of processing steps. They transform the data as it passes through the stream, allowing for filtering, mapping, and sorting. Because these operations are lazy, they do not perform any actual computation until the terminal operation is invoked.

// Example: Intermediate Operations
List<Integer> evenSquares = numbers.stream()          // Source
                                   .filter(n -> n % 2 == 0) // Intermediate
                                   .map(n -> n * n);   // Intermediate
// Nothing is printed or calculated until a terminal operation is called
  • Terminal Operation: An operation that produces a result or a side effect, such as collect(), forEach(), or reduce(). It triggers the execution of the entire pipeline, processing all elements as defined by the intermediate operations.

Terminal operations mark the end of the stream pipeline. They produce a result or a side effect and trigger the processing of all elements through the intermediate operations. Once a terminal operation is executed, the stream is considered "consumed" and can no longer be used.

// Example: Terminal Operation
List<Integer> result = evenSquares.collect(Collectors.toList()); // Terminal
// Result: [4, 16] - Stream pipeline execution triggered and completed

Putting It All Together: A Complete Stream Pipeline

A complete stream pipeline includes a source, zero or more intermediate operations, and a terminal operation. This example demonstrates how a stream processes data from start to finish.

// Complete Stream Pipeline Example
List<String> names = Arrays.asList("Michael", "Jim", "Pam", "Dwight", "Angela");
List<String> processedNames = names.stream()                 // Source
                                   .filter(name -> name.length() > 3) // Intermediate
                                   .map(String::toUpperCase)         // Intermediate
                                   .sorted()                         // Intermediate
                                   .collect(Collectors.toList());    // Terminal
// Result: ["ANGELA", "DWIGHT", "MICHAEL"]

In this pipeline, the stream starts with a source (the list of names), applies several intermediate operations (filtering, mapping, sorting), and finishes with a terminal operation (collecting the results into a list). Each stage plays a crucial role in the processing pipeline.

Core Operations of Java Streams

The core operations supported by Java Streams fall into three categories:

  • Intermediate Operations: These operations, such as filter() and map(), return another stream and are lazy, meaning they are not executed until a terminal operation is called.

Intermediate operations are essential for building a stream pipeline. They transform the stream, filter data, or map elements to new values, but do not themselves trigger processing. Because these operations are lazy, they are evaluated only when a terminal operation is invoked, allowing for more efficient data processing.

// Example: Intermediate Operations
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> evenNumbers = numbers.stream()
                                   .filter(n -> n % 2 == 0)  // Intermediate
                                   .map(n -> n * n)          // Intermediate
                                   .collect(Collectors.toList()); // Terminal
// Result: [4, 16]
  • Terminal Operations: Operations like collect() and forEach() produce a result or a side effect and trigger the processing of the stream.

Terminal operations are the final steps in a stream pipeline. They initiate the processing of data and produce a result, whether it's a collection of results, a sum, or a side effect like printing to the console. Once a terminal operation is invoked, the stream is considered consumed and can no longer be used.

// Example: Terminal Operations
List<String> names = Arrays.asList("Michael", "Jim", "Pam", "Dwight", "Angela");

names.stream()
     .filter(name -> name.length() > 3)
     .forEach(System.out::println);  // Terminal
// Output: Michael, Dwight, Angela
  • Short-Circuit Operations: Operations such as findFirst() and anyMatch() can stop further processing as soon as a certain condition is met, improving efficiency.

Short-circuit operations are particularly useful for improving the efficiency of stream processing. They allow the pipeline to terminate early if a certain condition is met, preventing unnecessary computation on the rest of the elements.

// Example: Short-Circuit Operations
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);

boolean hasEven = numbers.stream()
                         .map(n -> n * n)             // Intermediate: map
                         .anyMatch(n -> n % 2 == 0);  // Short-Circuit: anyMatch
// Result: true

Intermediate Operations

Intermediate operations return a new stream and are lazily executed, meaning they are only performed when a terminal operation is invoked. These operations can be either stateless or stateful:

  • filter(Predicate<T>): Filters elements that match a condition. This is a stateless operation because it processes each element independently without relying on the state from previous elements.

The filter() method allows for the exclusion of elements that do not meet a specified condition. Since each element is evaluated independently, the operation does not need to maintain any state about previous elements.

// Example: Filtering elements
List<String> filteredNames = names.stream()
                                  .filter(name -> name.startsWith("A"));
  • map(Function<T, R>): Transforms each element. This is a stateless operation because it applies the function to each element independently.

The map() method is used to apply a function to each element of the stream, transforming it into another element. It does not rely on any external state, making it a stateless operation.

// Example: Mapping elements to uppercase
List<String> upperCaseNames = names.stream()
                                   .map(String::toUpperCase);
  • sorted(Comparator<T>): Sorts elements in natural order or by a comparator. This is a stateful operation because it requires holding state about the entire input stream to sort all the elements.

The sorted() method sorts the elements of the stream according to natural order or a specified comparator. This operation is stateful because it needs to know the relative ordering of all elements, requiring it to hold onto them during processing.

// Example: Sorting elements
List<String> sortedNames = names.stream()
                                .sorted();
  • distinct(): Removes duplicate elements from the stream. This is a stateful operation because it must remember the elements it has already seen to remove duplicates.

The distinct() method ensures that the stream contains only unique elements. As it needs to track which elements have been encountered, it requires maintaining state about the entire set of processed elements, making it stateful.

// Example: Removing duplicates
List<Integer> uniqueNumbers = numbers.stream()
                                     .distinct();
  • skip(long n): Skips the first n elements in the stream. This operation is useful for pagination or when you want to ignore a certain number of elements.

The skip() method allows you to skip over a specified number of elements in a stream. It is stateless because it does not depend on the previous elements except to count them.

// Example: Skipping elements
List<String> skippedNames = names.stream()
                                 .skip(2)
                                 .collect(Collectors.toList());
  • limit(long n): Limits the number of elements in the stream to n. This is useful for truncating a stream to a desired size.

The limit() method restricts the stream to a certain number of elements. It is stateless because it does not need to maintain any information about the elements beyond counting them.

// Example: Limiting elements
List<String> limitedNames = names.stream()
                                 .limit(3)
                                 .collect(Collectors.toList());

Terminal Operations

Terminal operations trigger the processing of the stream and produce a final result or side effect. Once a terminal operation is executed, the stream is considered consumed and cannot be reused.

  • collect(Collector<T, A, R>): Accumulates elements into a collection, such as a List, Set, or Map. This operation is commonly used to gather the processed stream elements into a different data structure.

The collect() method is a very versatile terminal operation that can be used to transform a stream into a collection or a summarized result. This makes it one of the most commonly used operations for converting the output of a stream back into a concrete collection.

// Example: Collecting elements to a list
List<String> collectedNames = names.stream()
                                   .collect(Collectors.toList());
  • forEach(Consumer<T>): Iterates over each element and performs a specified action. This is useful for executing a block of code on each element, such as printing them or updating a variable.

The forEach() method is often used for its side effects, like printing each element to the console. It is an easy way to iterate over elements and apply some action without modifying the original data structure.

// Example: Printing each element
names.stream().forEach(System.out::println);
  • reduce(BinaryOperator<T>): Reduces the stream to a single value using a binary operator. This operation is used to perform aggregation functions, such as summing numbers or concatenating strings.

The reduce() method is a powerful terminal operation that takes a binary operator and applies it repeatedly to combine elements into a single result. It is highly useful for operations like summing, multiplying, or finding the maximum or minimum value.

// Example: Reducing to sum
Integer sum = numbers.stream()
                     .reduce(0, Integer::sum);
  • count(): Returns the count of elements in the stream. This operation is particularly useful when you need to determine the number of elements that match a particular condition or to simply count the number of elements in a collection.

The count() method is a straightforward terminal operation that returns the number of elements in the stream. It is often used to determine how many elements meet a certain condition after applying a filter.

// Example: Counting elements
long count = names.stream()
                  .filter(name -> name.length() > 3)
                  .count();
// Result: 3
  • anyMatch(Predicate<T>): Returns true if any elements of the stream match the provided predicate. It is a short-circuiting terminal operation that stops processing as soon as a matching element is found.

The anyMatch() method is highly efficient for checking if at least one element meets a specific condition. It is especially useful in large datasets or streams where an early exit can save processing time.

// Example: Check if any names start with "M"
boolean hasNameStartingWithM = names.stream()
                                    .anyMatch(name -> name.startsWith("M"));
// Result: true

Short-Circuit Operations

Short-circuit operations provide efficiency by terminating the stream early when a specified condition is met. These operations are particularly useful in large datasets or infinite streams, as they can prevent unnecessary processing and optimize performance.

  • anyMatch(Predicate<T>): Returns true if any element matches the provided predicate. This is often used to check for the existence of an element that meets certain criteria.

The anyMatch() method evaluates the elements of the stream and stops once an element matching the condition is found. This operation is efficient for large datasets as it does not require processing all elements.

// Example: Check if any name starts with "A"
boolean hasNameStartingWithA = names.stream()
                                    .anyMatch(name -> name.startsWith("A"));
  • findFirst(): Returns the first element that matches a specified condition. Useful when you are only interested in the first occurrence of a match.

The findFirst() method finds the first element that meets the given condition. It's particularly useful in ordered streams where the order of processing is important.

// Example: Find the first even number
Optional<Integer> firstEven = numbers.stream()
                                     .filter(n -> n % 2 == 0)
                                     .findFirst();
  • findAny(): Retrieves any element matching a condition, particularly useful in parallel processing for optimizing performance. The result may vary in a parallel stream.

The findAny() method returns any element that meets the condition. It is particularly beneficial in parallel streams, where it can take advantage of the concurrent nature to return any matching element quickly, without processing the entire stream.

// Example: Find any name starting with "D"
Optional<String> anyNameWithD = names.parallelStream()
                                    .filter(name -> name.startsWith("D"))
                                    .findAny();
  • noneMatch(Predicate<T>): Returns true if no elements match the provided predicate. This operation stops at the first occurrence of a match and does not process the remaining elements.

The noneMatch() method evaluates the elements and returns true if none of them match the condition. It terminates as soon as a match is found, providing a short-circuiting behavior.

// Example: Check if no name starts with "Z"
boolean noNameStartingWithZ = names.stream()
                                   .noneMatch(name -> name.startsWith("Z"));
  • allMatch(Predicate<T>): Returns true if all elements match the provided predicate. This operation stops if any element does not match the condition.

The allMatch() method checks if all elements satisfy the given predicate. It stops processing as soon as it encounters an element that does not meet the condition.

// Example: Check if all names are longer than 3 characters
boolean allNamesLongerThan3 = names.stream()
                                   .allMatch(name -> name.length() > 3);

Simplifying Nested Loops with Streams

This example demonstrates how a simple stream operation can replace a nested loop for finding the maximum product of two elements in an array. Let's explore a more concise solution using Java Streams.

Stream Example: Finding Maximum Product

Using Java Streams, we can find the maximum product of two distinct elements in a single, concise operation. This approach leverages the power of the streams API to make the code more readable and maintainable.

The stream operation below first converts the array to a stream, then sorts the elements in descending order, selects the top two elements, and finally computes their product using a reduction operation.

// Stream example: Finding maximum product
int maxProduct = Arrays.stream(nums)
    .boxed() // Convert int[] to Stream<Integer>
    .sorted(Comparator.reverseOrder()) // Sort elements in descending order
    .limit(2) // Take the top two elements
    .reduce(1, (a, b) -> a * b); // Calculate the product of the two elements

This stream operation is more concise than the equivalent nested loop and eliminates the need for managing loop indices and conditions explicitly.

Performance Considerations

The stream-based approach is not only more readable but also leverages Java's internal optimizations. For example, the stream's sorting and limiting operations are optimized for performance, which can be especially beneficial for large datasets.

In contrast, the nested loop approach requires O(n^2) time complexity due to its double iteration over the array. The stream approach reduces this by first sorting (O(n log n)) and then performing a constant time operation to find the product of the top two elements (O(1)).

While the performance benefits may not be significant for small arrays, the readability and maintainability of the code using streams are markedly improved.

Java Streams API Examples

Sample Data

  { high: 120, low: 100, open: 110, close: 115, startTime: "2023-08-25T10:00:00Z", endTime: "2023-08-25T10:01:00Z" },
  { high: 130, low: 105, open: 115, close: 125, startTime: "2023-08-25T10:01:00Z", endTime: "2023-08-25T10:02:00Z" },
  { high: 128, low: 102, open: 123, close: 107, startTime: "2023-08-25T10:02:00Z", endTime: "2023-08-25T10:03:00Z" },
  { high: 135, low: 110, open: 130, close: 132, startTime: "2023-08-25T10:03:00Z", endTime: "2023-08-25T10:04:00Z" },
  { high: 140, low: 115, open: 134, close: 138, startTime: "2023-08-25T10:04:00Z", endTime: "2023-08-25T10:05:00Z" }

Calculate High Price

private double calculateHighPrice(List<Aggregates> aggregates) {
    return aggregates.stream()  // Source: List of aggregates
            .mapToDouble(Aggregates::getHigh) // Intermediate Operation, Stateless
            .max() // Terminal Operation, Stateful, Determines the highest value
            .orElse(Double.NaN); // Provides a default value if the stream is empty
}
// For Loop Equivalent
private double calculateHighPrice(List<Aggregates> aggregates) {
    double maxHigh = Double.NaN;
    for (Aggregates aggregate : aggregates) {
        if (Double.isNaN(maxHigh) || aggregate.getHigh() > maxHigh) {
            maxHigh = aggregate.getHigh();
        }
    }
    return maxHigh;
}

Expected High Price: 140

Calculate Start Time

private ZonedDateTime calculateStartTime(List<Aggregates> aggregates) {
    return aggregates.stream() // Source: list of aggregates
            .map(Aggregates::getStartTime) // Transforms each Agg to its 'startTime'
            // Terminal, Stateful, must process all elements to find the minimum
            .min(ZonedDateTime::compareTo) 
            .orElse(ZonedDateTime.now()); // Default value if stream is empty
}
// For Loop Equivalent
private ZonedDateTime calculateStartTime(List<Aggregates> aggregates) {
    ZonedDateTime earliestStart = ZonedDateTime.now();
    for (Aggregates aggregate : aggregates) {
        if (aggregate.getStartTime().isBefore(earliestStart)) {
            earliestStart = aggregate.getStartTime();
        }
    }
    return earliestStart;
}

Expected Start Time: 2023-08-25T10:00:00Z

Calculate Trigger Price Up

private double calculateTriggerPriceUp(List<Aggregates> aggregates) {
    return aggregates.stream() // Source: list of aggregates
            // Terminal-like operation: finds max element
            .max(Comparator.comparing(Aggregates::getEndTime))
            .map(Aggregates::getHigh) // map to extract the 'high' value
            .orElse(Double.NaN); // Provides a default value if stream is empty
}
// For Loop Equivalent
private double calculateTriggerPriceUp(List<Aggregates> aggregates) {
    Aggregates latestAggregate = aggregates.get(0);
    for (Aggregates aggregate : aggregates) {
        if (aggregate.getEndTime().isAfter(latestAggregate.getEndTime())) {
            latestAggregate = aggregate;
        }
    }
    return latestAggregate.getHigh();
}

Expected Trigger Price Up: 140

Calculate Target Price Up

private double calculateTargetPriceUp(List<Aggregates> aggregates) {
    return aggregates.stream() // Source: Stream of aggregates
            // Stateful: sorts elements in reverse order by 'endTime'
            .sorted(Comparator.comparing(Aggregates::getEndTime).reversed())
            .skip(1) // Stateless: skips the first element
            .findFirst() // Terminal, short-circuit: finds first element after 'skip'
            .map(Aggregates::getHigh) // Optional Op: maps to 'high' value if present
            .orElse(Double.NaN); // Default value if Optional is empty
}
// For Loop Equivalent
private double calculateTargetPriceUp(List<Aggregates> aggregates) {
    if (aggregates.size() < 2) return Double.NaN;

    Aggregates latest = aggregates.get(0);
    Aggregates secondLatest = null;

    for (Aggregates aggregate : aggregates) {
        if (aggregate.getEndTime().isAfter(latest.getEndTime())) {
            secondLatest = latest;
            latest = aggregate;
        } else if (secondLatest == null ||
                    aggregate.getEndTime().isAfter(secondLatest.getEndTime())) {
            secondLatest = aggregate;
        }
    }

    return secondLatest != null ? secondLatest.getHigh() : Double.NaN;
}

Expected Target Price Up: 140

Open to Work,
Hire Me!